Skip to main content

CompiledGraph

CompiledGraph is the runnable object returned by StateGraph.compile(). It exposes three execution modes and a Mermaid diagram helper.

run(state, checkpointer=None, graph_id=None) — async

Run the graph to completion and return the final state dict.

result = await graph.run({"input": "hello"})
print(result["output"])

The state dict you pass is not mutated — the graph works on an internal copy.

With checkpointing:

from synapsekit import InMemoryCheckpointer

cp = InMemoryCheckpointer()
result = await graph.run({"input": "hello"}, checkpointer=cp, graph_id="run-1")

stream(state) — async generator

Yield an event after each node completes. Useful for progress reporting and debugging.

async for event in graph.stream({"input": "hello"}):
print(event["node"], event["state"])

Each event is a dict:

KeyTypeDescription
"node"strName of the node that just completed
"state"dictSnapshot of the full state after that node

State accumulates across events — each snapshot contains all keys written so far.

async for event in graph.stream({"x": 3}):
node = event["node"]
keys = list(event["state"].keys())
print(f"{node}: {keys}")

run_sync(state) — sync

Blocking wrapper around run(). Works inside and outside a running event loop (uses a thread pool when inside one, e.g., Jupyter).

result = graph.run_sync({"input": "hello"})

get_mermaid() — diagram

Returns a Mermaid flowchart string for the graph structure.

print(graph.get_mermaid())

See the Mermaid export page for details and rendering options.

Error handling

ExceptionWhen
GraphRuntimeErrorA node returns a non-dict, an unknown node is referenced at runtime, or _MAX_STEPS is exceeded
GraphConfigErrorRaised at compile() for invalid structure
from synapsekit import GraphRuntimeError, GraphConfigError

try:
result = await graph.run(state)
except GraphRuntimeError as e:
print(f"Runtime failure: {e}")

resume(graph_id, checkpointer, updates=None) — async

Resume execution from a previously checkpointed state. Optionally apply human edits before resuming:

# Simple resume
result = await graph.resume("run-1", cp)

# Resume with human edits (Human-in-the-Loop)
result = await graph.resume("run-1", cp, updates={"approved": True, "feedback": "Looks good"})

The updates dict is merged into the checkpointed state before re-execution begins. This is the key mechanism for Human-in-the-Loop workflows.

Raises GraphRuntimeError if no checkpoint exists for the given graph_id. See Checkpointing for details.

stream_tokens(state) — async generator

Yield token-level events from LLM nodes that have stream=True. Non-streaming nodes emit a node_complete event instead.

async for event in graph.stream_tokens({"input": "Explain RAG"}):
if event["type"] == "token":
print(event["token"], end="", flush=True)
elif event["type"] == "node_complete":
print(f"\n[{event['node']} finished]")
Event keyTypeDescription
"type"str"token" or "node_complete"
"node"strName of the node
"token"strToken text (only for "token" events)
"state"dictState snapshot (only for "node_complete" events)

See Nodes — Token streaming for how to create streaming LLM nodes.

_MAX_STEPS guard

The engine tracks the number of execution waves. The default limit is _MAX_STEPS = 100. Override it at compile time:

compiled = graph.compile(max_steps=500)

If the limit is exceeded, a GraphRuntimeError is raised. This guards against infinite loops created by conditional edges that always route back to a previous node. See Cycles for more.