Skip to main content

Streaming

SynapseKit supports streaming at every layer: token-level LLM output, RAG pipeline responses, graph node updates, Server-Sent Events (SSE), and WebSockets. This guide covers every pattern with complete runnable examples.

Prerequisites

pip install synapsekit[openai] fastapi uvicorn

Set your API key:

export OPENAI_API_KEY="sk-..."

1. Token-level LLM streaming

The simplest streaming pattern: call .stream() on any LLM and iterate the async generator. Each yielded value is a string token (or small chunk) as it arrives from the API.

import asyncio
from synapsekit.llms.openai import OpenAILLM

llm = OpenAILLM(model="gpt-4o-mini")

async def main():
async for token in llm.stream("Tell me about RAG in three sentences"):
print(token, end="", flush=True)
print() # newline after stream ends

asyncio.run(main())

# Expected output (streamed token-by-token):
# Retrieval-Augmented Generation (RAG) is a technique that combines a
# retrieval system with a language model to answer questions using
# external knowledge. It works by fetching relevant documents from a
# vector store and injecting them into the prompt context. RAG reduces
# hallucinations and keeps answers grounded in real data.

You can also collect the full response while streaming:

async def stream_and_collect(prompt: str) -> str:
"""Stream tokens to stdout and return the full response."""
tokens = []
async for token in llm.stream(prompt):
print(token, end="", flush=True)
tokens.append(token)
print()
return "".join(tokens)

asyncio.run(stream_and_collect("What is a vector database?"))

Streaming with system prompt

async def main():
async for token in llm.stream(
"Summarize quantum computing",
system="You are a science communicator. Be concise.",
):
print(token, end="", flush=True)

asyncio.run(main())
# Expected output:
# Quantum computing uses quantum bits (qubits) that can exist in
# superposition, enabling parallel computation on certain problems...

2. RAG pipeline streaming

Pass astream() on a RAG instance to stream the LLM answer while the retrieval step completes synchronously before generation begins.

import asyncio
from synapsekit import RAG
from synapsekit.llms.openai import OpenAILLM

async def main():
llm = OpenAILLM(model="gpt-4o-mini")
rag = RAG(llm=llm)

# Index some documents
await rag.aadd([
"SynapseKit supports streaming by default across all LLM providers.",
"The RAG pipeline retrieves context before generating an answer.",
"Streaming reduces time-to-first-token, improving perceived latency.",
])

print("Answer: ", end="", flush=True)
async for token in rag.astream("What does SynapseKit support?"):
print(token, end="", flush=True)
print()

asyncio.run(main())

# Expected output:
# Answer: SynapseKit supports streaming by default across all LLM
# providers, which reduces time-to-first-token and improves the
# perceived latency of your application.

3. Graph node-level streaming

compiled_graph.stream() yields one update dict per node execution. This lets you show incremental progress in multi-step workflows.

import asyncio
from synapsekit.graph import StateGraph

async def fetch_data(state: dict) -> dict:
# Simulate data fetching
return {"data": "fetched_results", "step": "fetch"}

async def process_data(state: dict) -> dict:
# Simulate processing
return {"result": f"processed: {state['data']}", "step": "process"}

async def format_output(state: dict) -> dict:
return {"output": state["result"].upper(), "step": "format"}

graph = StateGraph()
graph.add_node("fetch", fetch_data)
graph.add_node("process", process_data)
graph.add_node("format", format_output)
graph.add_edge("fetch", "process")
graph.add_edge("process", "format")
graph.set_entry_point("fetch")
compiled = graph.compile()

async def main():
async for update in compiled.stream({"input": "hello"}):
node_name = list(update.keys())[0]
print(f"[{node_name}] -> {update[node_name]}")

asyncio.run(main())

# Expected output:
# [fetch] -> {'data': 'fetched_results', 'step': 'fetch'}
# [process] -> {'result': 'processed: fetched_results', 'step': 'process'}
# [format] -> {'output': 'PROCESSED: FETCHED_RESULTS', 'step': 'format'}

4. Token-level streaming from graph LLM nodes

Combine graph execution with token streaming using stream_tokens. This lets each LLM node in a graph stream its output token-by-token.

import asyncio
from synapsekit.graph import StateGraph
from synapsekit.graph.nodes import llm_node, stream_tokens
from synapsekit.llms.openai import OpenAILLM

llm = OpenAILLM(model="gpt-4o-mini")

graph = StateGraph()
graph.add_node("generate", llm_node(llm, prompt_key="input"))
graph.set_entry_point("generate")
compiled = graph.compile()

async def main():
print("Streaming tokens from graph: ", end="", flush=True)
async for token in stream_tokens(compiled, {"input": "Explain RAG in one sentence"}):
print(token, end="", flush=True)
print()

asyncio.run(main())

# Expected output:
# Streaming tokens from graph: RAG (Retrieval-Augmented Generation)
# enhances LLM responses by first retrieving relevant documents from
# a knowledge base and injecting them into the prompt.

Multi-node graph with token streaming

graph2 = StateGraph()
graph2.add_node("research", llm_node(llm, prompt_key="topic", output_key="research"))
graph2.add_node("summarize", llm_node(llm, prompt_key="research", output_key="summary"))
graph2.add_edge("research", "summarize")
graph2.set_entry_point("research")
compiled2 = graph2.compile()

async def main():
# stream_tokens emits tokens from the final LLM node by default
async for token in stream_tokens(compiled2, {"topic": "vector databases"}):
print(token, end="", flush=True)
print()

asyncio.run(main())

5. SSE streaming with FastAPI

Server-Sent Events let browsers consume streaming responses over a standard HTTP connection. Use sse_stream to wrap any compiled graph.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from synapsekit.graph import sse_stream

app = FastAPI()

# Build and compile your graph once at startup
graph = StateGraph()
# ... add nodes and edges ...
compiled_graph = graph.compile()

@app.get("/stream")
async def stream_endpoint(query: str):
"""Stream graph output as SSE events."""
async def event_generator():
async for event in sse_stream(compiled_graph, {"input": query}):
# SSE format: each message must be "data: ...\n\n"
yield f"data: {event}\n\n"
# Signal end of stream
yield "data: [DONE]\n\n"

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)

Run with:

uvicorn app:app --reload
# curl "http://localhost:8000/stream?query=What+is+RAG"
# Expected output (SSE format):
# data: Retrieval
# data: -Aug
# data: mented
# data: Generation
# data: [DONE]

SSE with connection keep-alive

import asyncio

@app.get("/stream/keepalive")
async def stream_keepalive(query: str):
async def event_generator():
async for event in sse_stream(compiled_graph, {"input": query}):
yield f"data: {event}\n\n"
yield "data: [DONE]\n\n"

# Wrap in a keep-alive generator that sends comments every 15s
async def with_keepalive(gen):
keepalive_task = asyncio.create_task(asyncio.sleep(0))
async for chunk in gen:
yield chunk

return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)

6. WebSocket streaming

WebSockets provide full-duplex communication. Use ws_stream for token-by-token delivery over a WebSocket connection.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from synapsekit.graph import ws_stream

app = FastAPI()

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
query = await websocket.receive_text()
async for token in ws_stream(compiled_graph, {"input": query}):
await websocket.send_text(token)
# Signal end of response
await websocket.send_text("[DONE]")
except WebSocketDisconnect:
pass # Client disconnected cleanly

WebSocket with structured messages

import json

@app.websocket("/ws/structured")
async def websocket_structured(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = json.loads(await websocket.receive_text())
query = data["query"]
session_id = data.get("session_id", "default")

async for token in ws_stream(compiled_graph, {"input": query}):
await websocket.send_text(json.dumps({
"type": "token",
"content": token,
"session_id": session_id,
}))

await websocket.send_text(json.dumps({
"type": "done",
"session_id": session_id,
}))
except WebSocketDisconnect:
pass

7. Consuming SSE in JavaScript

// Plain JavaScript EventSource
const source = new EventSource('/stream?query=What+is+RAG');
const output = document.getElementById('output');

source.onmessage = (event) => {
if (event.data === '[DONE]') {
source.close();
return;
}
output.textContent += event.data;
};

source.onerror = (err) => {
console.error('SSE error:', err);
source.close();
};

Using the Fetch API for POST requests (more flexible)

async function streamPost(query) {
const response = await fetch('/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query }),
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value);
const lines = chunk.split('\n');

for (const line of lines) {
if (line.startsWith('data: ')) {
const token = line.slice(6);
if (token !== '[DONE]') {
document.getElementById('output').textContent += token;
}
}
}
}
}

streamPost("Explain vector databases");

WebSocket client

const ws = new WebSocket('ws://localhost:8000/ws');

ws.onopen = () => {
ws.send('What is RAG?');
};

ws.onmessage = (event) => {
if (event.data === '[DONE]') {
console.log('Stream complete');
return;
}
document.getElementById('output').textContent += event.data;
};

ws.onerror = (err) => console.error('WebSocket error:', err);

8. Streaming with progress callbacks

from synapsekit import RAG
from synapsekit.llms.openai import OpenAILLM

async def main():
llm = OpenAILLM(model="gpt-4o-mini")
rag = RAG(llm=llm)
await rag.aadd(["SynapseKit is a Python library for building LLM applications."])

token_count = 0

async def on_token(token: str):
nonlocal token_count
token_count += 1
print(token, end="", flush=True)

async for token in rag.astream("What is SynapseKit?"):
await on_token(token)

print(f"\n\nTotal tokens streamed: {token_count}")
# Expected output:
# SynapseKit is a Python library for building LLM applications.
#
# Total tokens streamed: 12

9. Gotchas and common mistakes

Do not use asyncio.run() inside a running event loop

# WRONG — raises RuntimeError in Jupyter or FastAPI handlers
import asyncio
asyncio.run(my_async_generator())

# CORRECT — use await directly
async def my_handler():
async for token in llm.stream("hello"):
print(token, end="", flush=True)

Sync wrappers do not support streaming

# WRONG — sync wrappers buffer the full response
result = llm.generate_sync("Tell me about RAG") # No streaming

# CORRECT — use async API
async for token in llm.stream("Tell me about RAG"):
print(token, end="", flush=True)

Always flush stdout when printing tokens

# WRONG — tokens may appear in batches due to buffering
print(token, end="")

# CORRECT — force immediate flush
print(token, end="", flush=True)

Handle stream interruptions gracefully

async def safe_stream(llm, prompt: str) -> str:
tokens = []
try:
async for token in llm.stream(prompt):
print(token, end="", flush=True)
tokens.append(token)
except Exception as e:
print(f"\nStream interrupted: {e}")
# Return partial result
return "".join(tokens)

Summary

PatternMethodUse case
Token streamingllm.stream()CLI tools, notebooks
RAG streamingrag.astream()Chat interfaces
Graph node updatescompiled.stream()Progress indicators
Graph token streamingstream_tokens()Fine-grained UI updates
SSEsse_stream()Web browsers via HTTP
WebSocketws_stream()Real-time bidirectional