Skip to main content

Streaming RAG Responses

Open In Colab

Non-streaming RAG makes the user stare at a blank screen until the full answer is ready — which can be several seconds for long responses. Streaming displays each token as it is generated, turning a frustrating wait into a fast-feeling experience. SynapseKit's astream() makes this a one-line change from aquery(). What you'll build: A streaming RAG pipeline that prints tokens in real time, plus a FastAPI endpoint that streams answers via Server-Sent Events. Time: ~10 min. Difficulty: Beginner

Prerequisites

pip install synapsekit fastapi uvicorn
export OPENAI_API_KEY="sk-..."

What you'll learn

  • How astream() differs from aquery() and when to use each
  • How to print tokens to the terminal in real time
  • How to wire streaming RAG into a FastAPI SSE endpoint
  • Why retrieval still happens once before generation in streaming mode

Step 1: Build a pipeline with sample documents

import asyncio
from synapsekit import RAGPipeline
from synapsekit.llms.openai import OpenAILLM
from synapsekit.embeddings.openai import OpenAIEmbeddings
from synapsekit.vectorstores.memory import InMemoryVectorStore

rag = RAGPipeline(
llm=OpenAILLM(model="gpt-4o-mini"),
embeddings=OpenAIEmbeddings(model="text-embedding-3-small"),
vectorstore=InMemoryVectorStore(),
)

docs = [
"SynapseKit supports streaming via the astream() async generator method.",
"Streaming lets the UI render partial output as soon as the first token arrives.",
"The retrieval step happens once before generation; it is not streamed.",
"FastAPI's StreamingResponse combined with SSE is the standard way to stream to browsers.",
]

await rag.aadd(docs)

Step 2: Stream to the terminal

# astream() returns an async generator that yields one token string at a time.
# flush=True ensures each token appears immediately rather than buffering in
# the terminal's stdout buffer.
print("Answer: ", end="")
async for token in rag.astream("How does SynapseKit handle streaming?"):
print(token, end="", flush=True)
print() # newline when the stream finishes

astream() takes the same arguments as aquery() — including k for the number of retrieved chunks. The only difference is the return type: a string vs. an async generator.

Step 3: Measure time-to-first-token

import time

# time-to-first-token (TTFT) is the user-perceived latency before anything
# appears on screen. Streaming reduces TTFT from total_generation_time to
# retrieval_time + first_token_time, which is typically 3-5x faster.
start = time.perf_counter()
first_token_time = None

async for token in rag.astream("What is time-to-first-token?"):
if first_token_time is None:
first_token_time = time.perf_counter() - start
print(f"[TTFT: {first_token_time:.2f}s] ", end="")
print(token, end="", flush=True)

print(f"\n[Total: {time.perf_counter() - start:.2f}s]")

Step 4: Collect the full response while streaming

# Sometimes you need both the streamed output for UX and the complete string
# for logging or post-processing. Accumulate tokens into a list and join.
tokens = []
async for token in rag.astream("What does astream() return?"):
tokens.append(token)
print(token, end="", flush=True)
print()

full_response = "".join(tokens)
print(f"\nFull response ({len(full_response)} chars): {full_response[:100]}...")

Step 5: FastAPI SSE endpoint

# Server-Sent Events (SSE) is the standard browser protocol for streaming
# text from a server. Each event is a newline-terminated "data: ..." line.
# StreamingResponse with media_type="text/event-stream" handles the HTTP framing.

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

@app.get("/rag/stream")
async def stream_rag(question: str):
async def event_generator():
async for token in rag.astream(question):
# SSE format: each message must start with "data: " and end with "\n\n"
yield f"data: {token}\n\n"
# Signal the client that the stream has ended.
yield "data: [DONE]\n\n"

return StreamingResponse(event_generator(), media_type="text/event-stream")

Run with uvicorn app:app --reload and open http://localhost:8000/rag/stream?question=How+does+streaming+work.

Step 6: JavaScript client for the SSE endpoint

<!-- Paste into a browser console or an HTML file to test the SSE endpoint -->
<script>
const source = new EventSource(
"http://localhost:8000/rag/stream?question=How+does+streaming+work"
);

source.onmessage = (event) => {
if (event.data === "[DONE]") {
source.close();
return;
}
// Append each token to the DOM as it arrives.
document.getElementById("answer").textContent += event.data;
};
</script>
<div id="answer"></div>

Complete working example

import asyncio
from synapsekit import RAGPipeline
from synapsekit.llms.openai import OpenAILLM
from synapsekit.embeddings.openai import OpenAIEmbeddings
from synapsekit.vectorstores.memory import InMemoryVectorStore

rag = RAGPipeline(
llm=OpenAILLM(model="gpt-4o-mini"),
embeddings=OpenAIEmbeddings(model="text-embedding-3-small"),
vectorstore=InMemoryVectorStore(),
)

async def main():
await rag.aadd([
"SynapseKit supports streaming via the astream() async generator method.",
"Streaming lets the UI render partial output as soon as the first token arrives.",
"The retrieval step happens once before generation; it is not streamed.",
"FastAPI's StreamingResponse combined with SSE is the standard way to stream to browsers.",
])

print("Streaming response:")
print("-" * 40)
tokens = []
async for token in rag.astream("How does SynapseKit handle streaming?"):
tokens.append(token)
print(token, end="", flush=True)
print()
print("-" * 40)
print(f"Total tokens received: {len(tokens)}")

asyncio.run(main())

Expected output

Streaming response:
----------------------------------------
SynapseKit handles streaming through the astream() async generator method, which
yields one token at a time as soon as the language model produces it. The
retrieval step still happens once before generation begins, but each token of
the generated answer is yielded immediately rather than waiting for the complete
response.
----------------------------------------
Total tokens received: 47

How it works

When astream() is called, RAGPipeline first performs the full retrieval pass synchronously — embedding the query, searching the vector store, and building the prompt. It then calls the LLM's streaming interface (OpenAILLM uses AsyncOpenAI's stream=True parameter) and yields each chunk as it arrives from the API. The async generator protocol means the caller can process each token immediately without buffering the full response in memory, which matters for long answers.

Variations

VariationChange required
Stream with source attributionYield sources as a final SSE event after [DONE]
Cancel mid-streamCall generator.aclose() inside an asyncio.CancelledError handler
Stream to WebSocketReplace StreamingResponse with FastAPI's WebSocket.send_text()
Add per-token latency loggingRecord time.perf_counter() inside the async for loop
Use a different streaming LLMAny SynapseKit LLM that supports streaming works identically

Troubleshooting

Tokens arrive all at once instead of one-by-one Your terminal or HTTP layer is buffering output. In the terminal, ensure flush=True is passed to print(). In FastAPI, verify media_type="text/event-stream" is set on StreamingResponse.

TypeError: 'async_generator' object is not iterable You used a regular for loop instead of async for. astream() returns an async generator that requires async for inside an async function.

The stream stops mid-sentence The LLM hit its max_tokens limit. Pass max_tokens=1024 (or higher) to OpenAILLM() to increase the generation budget.

Next steps