Skip to main content

StateGraph API Reference

StateGraph

from synapsekit.graph import StateGraph

Builds a directed graph of async node functions. Call compile() to produce an executable CompiledGraph.

__init__(state_schema=None)

StateGraph(state_schema: type[TypedState] | None = None)
ParameterTypeDefaultDescription
state_schematype[TypedState] | NoneNoneOptional typed state class for schema validation

add_node(name, fn)

Register a node function.

ParameterTypeDefaultDescription
namestrrequiredUnique node identifier
fnasync (state: dict) -> dictrequiredAsync function that reads state and returns partial updates
async def my_node(state: dict) -> dict:
return {"answer": "42"}

graph.add_node("my_node", my_node)

add_edge(from_node, to_node)

Add a dependency edge: to_node will not execute until from_node completes.

graph.add_edge("ingest", "embed")
graph.add_edge("embed", "retrieve")

add_conditional_edge(source, condition, destinations)

Add a dynamic routing edge. After source completes, condition is called with the current state and must return the name of the next node.

ParameterTypeDefaultDescription
sourcestrrequiredNode that triggers the routing decision
condition(state: dict) -> strrequiredFunction that returns the name of the next node
destinationslist[str]requiredAll possible return values; validated at compile time
def route(state: dict) -> str:
return "rag_node" if state.get("has_docs") else "llm_node"

graph.add_conditional_edge(
source="classify",
condition=route,
destinations=["rag_node", "llm_node"],
)

set_entry_point(node_name) / set_finish_point(node_name)

Declare the first and terminal nodes. Required when the graph has multiple roots or multiple terminals.


compile(checkpointer=None, allow_cycles=False, max_steps=50)

Validate the graph structure and return a CompiledGraph.

ParameterTypeDefaultDescription
checkpointerBaseCheckpointer | NoneNonePersistence backend for state checkpointing and HITL
allow_cyclesboolFalseAllow cyclic edges (required for agentic loops)
max_stepsint50Hard limit on total node executions per run
compiled = graph.compile(
checkpointer=RedisCheckpointer(url="redis://localhost:6379"),
allow_cycles=True,
max_steps=20,
)

CompiledGraph

async run(initial_state, run_id=None)

Execute the graph and return the final state.

ParameterTypeDefaultDescription
initial_statedictrequiredStarting state passed to entry point nodes
run_idstr | NoneNoneUnique run identifier for checkpointing
result = await compiled.run({"question": "What is SynapseKit?"})
print(result["answer"])

run_sync(initial_state, run_id=None)

Synchronous wrapper around run(). Blocks until the graph completes.


async stream(initial_state, run_id=None)

Execute the graph and yield partial state updates after each node completes.

async for update in compiled.stream({"question": "..."}):
print(update) # e.g. {"retrieved_chunks": [...]}

async stream_tokens(initial_state, run_id=None)

Execute the graph and yield individual LLM tokens as they are generated.

async for token in compiled.stream_tokens({"question": "..."}):
print(token, end="", flush=True)

async resume(run_id, update=None)

Resume a paused run. Applies update to the state at the interruption point, then continues.

ParameterTypeDefaultDescription
run_idstrrequiredThe run ID of a previously paused run
updatedict | NoneNoneState updates to apply before resuming
result = await compiled.resume(
run_id="my-run-001",
update={"approved": True},
)

async sse_stream(initial_state, run_id=None)

Execute the graph and yield Server-Sent Events formatted strings. Suitable for HTTP streaming endpoints. Each yielded string has the format data: <json>\n\n.

@app.get("/stream")
async def stream_endpoint(question: str):
async def generator():
async for event in compiled.sse_stream({"question": question}):
yield event
return StreamingResponse(generator(), media_type="text/event-stream")

async ws_stream(websocket, initial_state, run_id=None)

Execute the graph and send updates over an open WebSocket connection.

ParameterTypeDefaultDescription
websocketWebSocketrequiredAn open WebSocket instance (FastAPI/Starlette compatible)
initial_statedictrequiredStarting state
run_idstr | NoneNoneOptional run identifier

get_mermaid()

Return a Mermaid diagram string representing the graph topology.

diagram = compiled.get_mermaid()
# graph TD
# ingest --> chunk
# chunk --> embed
# embed --> retrieve
# retrieve --> generate

get_mermaid_with_trace(run_id)

Return a Mermaid diagram annotated with the actual execution path from a completed run. Requires a checkpointer.

trace_diagram = compiled.get_mermaid_with_trace("my-run-001")

TypedState

from synapsekit.graph import TypedState, StateField

class PipelineState(TypedState):
query: str
documents: list[str] = StateField(default_factory=list)
answer: str = StateField(default="")
score: float = StateField(default=0.0, reducer=max)
tags: list[str] = StateField(default_factory=list, reducer="extend")

StateField

ParameterTypeDefaultDescription
defaultAnyMISSINGStatic default value
default_factoryCallable | NoneNoneCallable that produces the default
reducerstr | Callable | NoneNoneHow to merge parallel writes
descriptionstr | NoneNoneHuman-readable description

Built-in reducer strings:

ReducerBehavior
"extend"Concatenate two lists
"update"Merge two dicts (shallow)
"add"Sum two numeric values
"max"Take the larger value
"min"Take the smaller value

Checkpointer backends

ClassImportStorage
InMemoryCheckpointersynapsekit.graph.checkpointersIn-memory (dev only)
RedisCheckpointersynapsekit.graph.checkpointersRedis
SqliteCheckpointersynapsekit.graph.checkpointersSQLite file
PostgresCheckpointersynapsekit.graph.checkpointersPostgreSQL
from synapsekit.graph.checkpointers import RedisCheckpointer

checkpointer = RedisCheckpointer(url="redis://localhost:6379", ttl_seconds=86400)

See also