Skip to main content

Nodes

A node is any callable that takes the current graph state and returns a partial state to be merged in.

NodeFn type

NodeFn = Callable[[dict], dict | Awaitable[dict]]

Both sync and async functions are accepted. The returned dict is shallow-merged into the shared state via state.update(partial).

Writing a node

# Async (preferred)
async def my_node(state: dict) -> dict:
result = await some_api_call(state["input"])
return {"output": result}

# Sync also works
def my_sync_node(state: dict) -> dict:
return {"doubled": state["x"] * 2}

Return only the keys you want to add or update — untouched keys are preserved.

Built-in helpers

agent_node(executor, input_key, output_key)

Wrap an AgentExecutor as a node.

from synapsekit import StateGraph, AgentExecutor, AgentConfig, CalculatorTool, agent_node
from synapsekit.llm.openai import OpenAILLM
from synapsekit.llm.base import LLMConfig

llm = OpenAILLM(LLMConfig(model="gpt-4o-mini", api_key="sk-..."))
executor = AgentExecutor(AgentConfig(llm=llm, tools=[CalculatorTool()]))

node_fn = agent_node(executor, input_key="question", output_key="answer")

graph = (
StateGraph()
.add_node("agent", node_fn)
.set_entry_point("agent")
.set_finish_point("agent")
.compile()
)

result = await graph.run({"question": "What is 12 factorial?"})
print(result["answer"])

rag_node(pipeline, input_key, output_key)

Wrap a RAGPipeline as a node.

from synapsekit import StateGraph, RAGPipeline, RAGConfig, rag_node

pipeline = RAGPipeline(RAGConfig(llm=llm, retriever=retriever))
node_fn = rag_node(pipeline, input_key="query", output_key="answer")

graph = (
StateGraph()
.add_node("rag", node_fn)
.set_entry_point("rag")
.set_finish_point("rag")
.compile()
)

Parameters

HelperParameterDefaultDescription
agent_nodeinput_key"input"State key to read the question from
agent_nodeoutput_key"output"State key to write the answer to
rag_nodeinput_key"input"State key to read the query from
rag_nodeoutput_key"output"State key to write the answer to

llm_node(llm, input_key, output_key, stream)

Wrap a BaseLLM as a node, with optional token-level streaming support.

from synapsekit import StateGraph, llm_node
from synapsekit.llm.openai import OpenAILLM
from synapsekit.llm.base import LLMConfig

llm = OpenAILLM(LLMConfig(model="gpt-4o-mini", api_key="sk-..."))

graph = (
StateGraph()
.add_node("llm", llm_node(llm, input_key="prompt", output_key="response"))
.set_entry_point("llm")
.set_finish_point("llm")
.compile()
)

result = await graph.run({"prompt": "Explain RAG in one sentence"})
print(result["response"])

With stream=True, the node emits token-level streaming events. See Token Streaming and CompiledGraph.stream_tokens() for details.

node_fn = llm_node(llm, stream=True)

subgraph_node(compiled_graph, input_mapping, output_mapping)

Wrap a CompiledGraph as a node for nesting graphs. This lets you compose complex workflows from smaller, independently testable graphs.

from synapsekit import StateGraph, subgraph_node

# Build a subgraph
async def process(state):
return {"output": state["input"].upper()}

sub = (
StateGraph()
.add_node("process", process)
.set_entry_point("process")
.set_finish_point("process")
.compile()
)

# Nest it in a parent graph
parent = (
StateGraph()
.add_node("sub", subgraph_node(
sub,
input_mapping={"query": "input"},
output_mapping={"output": "sub_result"},
))
.set_entry_point("sub")
.set_finish_point("sub")
.compile()
)

result = await parent.run({"query": "hello"})
print(result["sub_result"]) # "HELLO"

Key points:

  • input_mapping maps parent state keys to subgraph state keys. If omitted, the full parent state is passed through.
  • output_mapping maps subgraph output keys to parent state keys. If omitted, the subgraph result is returned as-is.
  • The subgraph runs with its own internal state, fully isolated from the parent.

Parameters

HelperParameterDefaultDescription
agent_nodeinput_key"input"State key to read the question from
agent_nodeoutput_key"output"State key to write the answer to
rag_nodeinput_key"input"State key to read the query from
rag_nodeoutput_key"output"State key to write the answer to
llm_nodeinput_key"input"State key to read the prompt from
llm_nodeoutput_key"output"State key to write the response to
llm_nodestreamFalseEnable token-level streaming
subgraph_nodeinput_mappingNoneMap parent keys to subgraph keys
subgraph_nodeoutput_mappingNoneMap subgraph output keys to parent keys

Token streaming

When an LLM node has stream=True, use CompiledGraph.stream_tokens() to receive token-by-token events:

graph = (
StateGraph()
.add_node("llm", llm_node(llm, stream=True))
.set_entry_point("llm")
.set_finish_point("llm")
.compile()
)

async for event in graph.stream_tokens({"input": "Tell me about RAG"}):
if event["type"] == "token":
print(event["token"], end="", flush=True)
elif event["type"] == "node_complete":
print(f"\n[{event['node']} finished]")

Each event is a dict:

KeyTypeDescription
"type"strEither "token" or "node_complete"
"node"strName of the node emitting the event
"token"strThe token text (only for "token" events)
"state"dictState snapshot (only for "node_complete" events)

Parallel nodes

Nodes that are reachable in the same wave (no dependency between them) run concurrently via asyncio.gather. No extra configuration needed — just add edges from a common predecessor to multiple nodes.

async def fetch_weather(state):
return {"weather": "sunny"}

async def fetch_news(state):
return {"news": "all good"}

async def merge(state):
return {"report": f"{state['weather']} / {state['news']}"}

async def start(state):
return {}

graph = (
StateGraph()
.add_node("start", start)
.add_node("weather", fetch_weather)
.add_node("news", fetch_news)
.add_node("merge", merge)
.add_edge("start", "weather")
.add_edge("start", "news")
.add_edge("weather", "merge")
.add_edge("news", "merge")
.set_entry_point("start")
.set_finish_point("merge")
.compile()
)