Human-in-the-Loop Workflows
Some decisions require human judgment. GraphInterrupt pauses the graph at any node, serializes the current state to the checkpointer, and surfaces that state to a reviewer. When the reviewer responds, resume() restarts execution from the exact interrupt point with their input merged into the state.
What you'll build: A content moderation pipeline that auto-approves safe content, auto-rejects clearly harmful content, and pauses for human review on borderline cases — surviving process restarts between the pause and the resume. Time: ~25 min. Difficulty: Intermediate
Prerequisites
pip install synapsekit[openai,graph]
What you'll learn
- Design a graph with conditional routing to three decision paths
- Raise
GraphInterruptinside a node to pause execution - Handle
GraphInterruptEventin the caller and display state to a reviewer - Resume a paused graph with
resume(updates={...}) - Use the
approval_node()shortcut for common approve/reject patterns - Visualize the graph with
get_mermaid()
Step 1: Define the graph state
# human_in_the_loop.py
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from typing import Optional
from synapsekit.graph import StateGraph, GraphInterrupt, CompiledGraph
from synapsekit.graph.checkpointing import SQLiteCheckpointer
from synapsekit.graph.visualizer import GraphVisualizer
from synapsekit.llms.openai import OpenAILLM
from synapsekit import LLMConfig
# Every field is serializable so it can be checkpointed.
@dataclass
class ModerationState:
# Input
content: str = ""
content_id: str = ""
# Set by the classify_content node
score: float = 0.0 # 0.0 (harmful) to 1.0 (safe)
classification: str = "" # "safe" | "borderline" | "unsafe"
classification_reason: str = ""
# Set by a human reviewer via resume(updates={...})
human_approved: Optional[bool] = None
reviewer_id: str = ""
reviewer_notes: str = ""
# Set by the audit node
final_decision: str = "" # "approved" | "rejected" | "escalated"
audit_log: list[str] = field(default_factory=list)
Step 2: Implement the nodes
llm = OpenAILLM(
model="gpt-4o-mini",
config=LLMConfig(temperature=0.0), # Deterministic scoring
)
async def classify_content(state: ModerationState) -> ModerationState:
"""Ask the LLM to score content on a 0–1 safety scale."""
prompt = f"""Rate the following content on a safety scale from 0.0 to 1.0.
0.0 = clearly harmful (violence, hate speech, explicit content, PII exposure)
0.5 = borderline (ambiguous, context-dependent)
1.0 = clearly safe (constructive, informative, on-topic)
Respond in JSON: {{"score": <float>, "classification": "<safe|borderline|unsafe>", "reason": "<brief>"}}
Content: {state.content}"""
response = await llm.agenerate(prompt)
import json
data = json.loads(response.text)
state.score = float(data["score"])
state.classification = data["classification"]
state.classification_reason = data["reason"]
print(f"[classify_content] score={state.score:.2f} class={state.classification}")
print(f" reason: {state.classification_reason}")
return state
def auto_approve(state: ModerationState) -> ModerationState:
"""Automatically approve high-confidence safe content."""
state.final_decision = "approved"
state.audit_log.append(f"Auto-approved (score={state.score:.2f})")
print(f"[auto_approve] Content {state.content_id!r} approved automatically.")
return state
def auto_reject(state: ModerationState) -> ModerationState:
"""Automatically reject high-confidence unsafe content."""
state.final_decision = "rejected"
state.audit_log.append(
f"Auto-rejected (score={state.score:.2f}, reason={state.classification_reason})"
)
print(f"[auto_reject] Content {state.content_id!r} rejected automatically.")
return state
def flag_for_review(state: ModerationState) -> ModerationState:
"""Pause the graph and wait for a human reviewer.
Raising GraphInterrupt causes the graph engine to serialize state to the
checkpointer and suspend. The caller receives a GraphInterruptEvent with the
current state. When they call graph.resume(), execution continues from here.
"""
print(f"[flag_for_review] Flagging {state.content_id!r} for human review.")
print(f" Score: {state.score:.2f} Reason: {state.classification_reason}")
print(" Graph pausing — waiting for reviewer input...")
raise GraphInterrupt(
state=state,
message=(
f"Content requires human review. "
f"Score: {state.score:.2f}. Reason: {state.classification_reason}"
),
interrupt_type="human_review",
)
def audit(state: ModerationState) -> ModerationState:
"""Log the final decision and close the moderation record."""
entry = (
f"decision={state.final_decision} "
f"score={state.score:.2f} "
f"reviewer={state.reviewer_id or 'auto'} "
f"notes={state.reviewer_notes or 'n/a'}"
)
state.audit_log.append(entry)
print(f"[audit] Final decision for {state.content_id!r}: {state.final_decision}")
print(f" {entry}")
return state
Step 3: Define routing logic
def route_by_score(state: ModerationState) -> str:
"""Return the name of the next node based on the classification score."""
if state.score >= 0.9:
return "auto_approve" # Very safe — no human review needed
elif state.score < 0.4:
return "auto_reject" # Very unsafe — no human review needed
else:
return "flag_for_review" # Borderline — escalate to human
Step 4: Build the graph with checkpointing
def build_graph() -> CompiledGraph:
# SQLiteCheckpointer persists state after every node.
# If the process crashes mid-run, resume() reads from this database.
checkpointer = SQLiteCheckpointer(db_path="./moderation_checkpoints.db")
graph = StateGraph(ModerationState, checkpointer=checkpointer)
graph.add_node("classify_content", classify_content)
graph.add_node("auto_approve", auto_approve)
graph.add_node("auto_reject", auto_reject)
graph.add_node("flag_for_review", flag_for_review)
graph.add_node("audit", audit)
graph.set_entry_point("classify_content")
# After classify_content, call route_by_score to choose the next node
graph.add_conditional_edges(
"classify_content",
route_by_score,
{
"auto_approve": "auto_approve",
"auto_reject": "auto_reject",
"flag_for_review": "flag_for_review",
}
)
# All three branches converge on audit
graph.add_edge("auto_approve", "audit")
graph.add_edge("auto_reject", "audit")
graph.add_edge("flag_for_review", "audit") # Resumes here after GraphInterrupt
return graph.compile()
Step 5: Run the graph and handle interrupts
from synapsekit.graph import GraphInterruptEvent
async def moderate_content(content: str, content_id: str) -> ModerationState:
"""Run a piece of content through the moderation graph."""
graph = build_graph()
initial_state = ModerationState(content=content, content_id=content_id)
try:
# arun() executes to completion or until a GraphInterrupt
final_state = await graph.arun(initial_state, run_id=content_id)
print(f"\nCompleted without human review. Decision: {final_state.final_decision}")
return final_state
except GraphInterruptEvent as evt:
# The graph is paused — display state to the reviewer
print(f"\nGraph interrupted for human review.")
print(f" Content: {evt.state.content[:100]}")
print(f" Score: {evt.state.score:.2f}")
print(f" Reason: {evt.state.classification_reason}")
print(f" Run ID: {content_id} (use this to resume)")
# In production, send this to a review dashboard and return.
# Here we simulate an immediate human decision.
return await simulate_human_review(graph, content_id)
async def simulate_human_review(graph: CompiledGraph, run_id: str) -> ModerationState:
"""Simulate a reviewer approving the flagged content."""
print("\n[Simulating reviewer decision: APPROVED]")
# resume() restarts the graph from the interrupt point.
# updates= merges new values into the state before execution continues.
final_state = await graph.resume(
run_id=run_id,
updates={
"human_approved": True,
"final_decision": "approved",
"reviewer_id": "mod-007",
"reviewer_notes": "Context is educational; not harmful.",
}
)
print(f"\nResumed. Final decision: {final_state.final_decision}")
return final_state
Step 6: Use the approval_node() shortcut
For simple approve/reject workflows, SynapseKit provides a convenience node builder that wraps GraphInterrupt for you:
from synapsekit.graph import approval_node
# approval_node() creates a pre-built GraphInterrupt node.
# It surfaces the named field to the reviewer and waits for {"approved": True/False}.
review_node = approval_node(
prompt_field="classification_reason", # Display this field to the reviewer
approved_field="human_approved", # Write the decision here in state
on_approve="audit", # Next node if approved
on_reject="auto_reject", # Next node if rejected
)
# Drop-in replacement for the custom flag_for_review node:
# graph.add_node("flag_for_review", review_node)
Complete working example
SAMPLES = [
("How to bake sourdough bread at home — step by step guide.", "post-safe-001"),
("My experience getting help for mental health issues — sharing my story.", "post-borderline-042"),
("Step-by-step instructions for bypassing security systems.", "post-unsafe-007"),
]
async def main():
for content, cid in SAMPLES:
print(f"\n{'='*70}")
print(f"Moderating: {cid}")
result = await moderate_content(content, cid)
print(f"\nAudit log:")
for entry in result.audit_log:
print(f" - {entry}")
asyncio.run(main())
Expected output
======================================================================
Moderating: post-safe-001
[classify_content] score=0.97 class=safe
[auto_approve] Content 'post-safe-001' approved automatically.
[audit] Final decision for 'post-safe-001': approved
Audit log:
- Auto-approved (score=0.97)
- decision=approved score=0.97 reviewer=auto
======================================================================
Moderating: post-borderline-042
[classify_content] score=0.61 class=borderline
[flag_for_review] Flagging 'post-borderline-042' for human review.
Graph pausing — waiting for reviewer input...
Graph interrupted for human review.
[Simulating reviewer decision: APPROVED]
[audit] Final decision for 'post-borderline-042': approved
======================================================================
Moderating: post-unsafe-007
[classify_content] score=0.08 class=unsafe
[auto_reject] Content 'post-unsafe-007' rejected automatically.
[audit] Final decision for 'post-unsafe-007': rejected
How it works
When flag_for_review raises GraphInterrupt, the graph engine:
- Catches the exception before it reaches the caller.
- Saves the current state to the checkpointer under
(run_id, "flag_for_review"). - Re-raises it as a
GraphInterruptEventso the caller'sexceptblock can handle it.
When resume(run_id, updates) is called:
- The graph loads the checkpointed state.
- Merges
updatesinto the state (overwriting any fields listed). - Continues execution from
flag_for_review's successor node (auditin this case).
The updates dict is the mechanism by which human decisions re-enter the workflow.
Variations
Restart resilience — survive a full process restart
# Process 1: run until interrupt, then exit
try:
await graph.arun(ModerationState(...), run_id="post-resume-demo")
except GraphInterruptEvent:
print("Paused. Exiting process.")
# Process 2: pick up after restart
graph2 = build_graph() # Reloads all checkpoints from SQLite
final = await graph2.resume(
run_id="post-resume-demo",
updates={"human_approved": True, "final_decision": "approved", "reviewer_id": "mod-001"}
)
Multiple interrupt points in one graph
A graph can have more than one node that raises GraphInterrupt. Each pause/resume cycle uses the same run_id; the graph advances past each interrupt point in order.
Timeout paused reviews
Use a scheduler (e.g., a cron job) to query checkpointer.list_runs() for runs that have been paused for more than N hours and auto-resolve them as escalated.
Troubleshooting
GraphInterruptEvent is never raised
Verify that flag_for_review is reachable by the routing function. Print state.score in route_by_score to confirm the threshold is being triggered.
State not persisting after restart
Confirm db_path points to a writable file and that the same run_id is used on both the original run and the resume() call.
resume() raises "run not found"
The run ID must exist in the checkpointer database. If you are using a different db_path than the original run, the checkpoint will not be found.
Mermaid diagram does not show conditional labels
Pass include_conditions=True to get_mermaid() to annotate conditional edges with the routing function's return values.
Next steps
- Checkpointing and Resumable Workflows — deeper dive into the checkpointer API
- Graph Error Recovery — handle node failures gracefully
- Mermaid Visualization — visualize this graph with
get_mermaid()