Observability API Reference
SynapseKit provides token tracing, cost tracking, budget enforcement, OpenTelemetry export, and distributed tracing.
TokenTracer
Tracks token usage across all LLM calls in a session.
from synapsekit.observability import TokenTracer
tracer = TokenTracer(llm: BaseLLM)
| Parameter | Type | Default | Description |
|---|---|---|---|
llm | BaseLLM | required | The LLM to wrap with token tracking |
Properties and methods
tokens_used—{"input": int, "output": int, "total": int}reset()— reset token counters to zeroasync generate(prompt, **kwargs)— proxiesllm.generate()and records usageasync stream(prompt, **kwargs)— proxiesllm.stream()and records usage
tracer = TokenTracer(llm=my_llm)
response = await tracer.generate("Explain RAG in one sentence.")
print(tracer.tokens_used) # {"input": 12, "output": 38, "total": 50}
CostTracker
Tracks estimated cost of LLM calls based on published pricing tables.
from synapsekit.observability import CostTracker
tracker = CostTracker(llm: BaseLLM, currency: str = "USD")
| Parameter | Type | Default | Description |
|---|---|---|---|
llm | BaseLLM | required | The LLM to wrap |
currency | str | "USD" | Display currency |
cost_so_far— estimated cost in USD (property)records— list ofCostRecordinstances (property)reset()— reset cost to zero and clear recordsasync generate(...)/async stream(...)— proxy methods that record aCostRecord
tracker = CostTracker(llm=openai_llm)
for _ in range(10):
await tracker.generate("Short prompt")
print(f"Total cost: ${tracker.cost_so_far:.4f}")
CostRecord
@dataclass
class CostRecord:
model: str
provider: str
input_tokens: int
output_tokens: int
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float
timestamp: datetime
prompt_preview: str # first 100 chars
BudgetGuard
Enforces a cost budget, raising BudgetExceeded when the limit is hit.
from synapsekit.observability import BudgetGuard, BudgetLimit
guard = BudgetGuard(llm: BaseLLM, limit: BudgetLimit, on_exceeded: str = "raise")
| Parameter | Type | Default | Description |
|---|---|---|---|
llm | BaseLLM | required | The LLM to wrap |
limit | BudgetLimit | required | Budget configuration |
on_exceeded | str | "raise" | Action: "raise" or "warn" |
guard = BudgetGuard(
llm=llm,
limit=BudgetLimit(max_cost_usd=1.00, window="day"),
)
try:
result = await guard.generate("My prompt")
except BudgetExceeded as e:
print(f"Budget exceeded: {e.spent:.4f} / {e.limit:.4f}")
BudgetLimit
@dataclass
class BudgetLimit:
max_cost_usd: float
window: str = "session" # "session", "hour", "day", "month"
max_tokens: int | None = None
| Field | Type | Default | Description |
|---|---|---|---|
max_cost_usd | float | required | Maximum allowed cost in USD |
window | str | "session" | Time window for budget reset |
max_tokens | int | None | None | Optional hard token limit |
CircuitState
Enum representing the state of a circuit breaker used by BudgetGuard.
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, rejecting calls
HALF_OPEN = "half_open" # Testing recovery
Access via guard.circuit_state.
OTelExporter
Exports traces to any OpenTelemetry-compatible backend.
from synapsekit.observability import OTelExporter
exporter = OTelExporter(
endpoint: str,
service_name: str = "synapsekit",
headers: dict | None = None,
insecure: bool = False,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
endpoint | str | required | OTLP gRPC or HTTP endpoint |
service_name | str | "synapsekit" | Service name in traces |
headers | dict | None | None | Auth headers |
insecure | bool | False | Allow plaintext OTLP (no TLS) |
Dependency: pip install synapsekit[otel]
OTelExporter(endpoint="http://localhost:4317", insecure=True).install()
Span
@dataclass
class Span:
span_id: str
trace_id: str
parent_span_id: str | None
name: str
start_time: datetime
end_time: datetime | None
duration_ms: float | None
attributes: dict
events: list[dict]
status: str # "ok", "error", "unset"
error: str | None
TracingMiddleware
Wraps an LLM or RAG pipeline to automatically create spans for every call.
from synapsekit.observability import TracingMiddleware
middleware = TracingMiddleware(
component: BaseLLM | RAGPipeline,
tracer_name: str = "synapsekit",
record_inputs: bool = True,
record_outputs: bool = True,
max_input_length: int = 500,
max_output_length: int = 500,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
component | BaseLLM | RAGPipeline | required | The component to trace |
tracer_name | str | "synapsekit" | OTLP tracer name |
record_inputs | bool | True | Include prompt/query in span attributes |
record_outputs | bool | True | Include response in span attributes |
max_input_length | int | 500 | Truncate input attribute at N characters |
max_output_length | int | 500 | Truncate output attribute at N characters |
OTelExporter(endpoint="http://localhost:4317", insecure=True).install()
traced_rag = TracingMiddleware(rag_pipeline)
answer = await traced_rag.aquery("What is SynapseKit?")
DistributedTracer
Propagates trace context across service boundaries.
from synapsekit.observability import DistributedTracer
tracer = DistributedTracer(
service_name: str,
propagation_format: str = "w3c",
)
| Parameter | Type | Default | Description |
|---|---|---|---|
service_name | str | required | Name of this service in the trace |
propagation_format | str | "w3c" | Trace context format: "w3c" or "b3" |
Methods
start_span(name, parent_context=None) -> TraceSpaninject_context(headers: dict) -> dict— inject trace context into outgoing headersextract_context(headers: dict) -> dict | None— extract from incoming headers
tracer = DistributedTracer(service_name="rag-service")
parent = tracer.extract_context(request.headers)
with tracer.start_span("rag.query", parent_context=parent) as span:
answer = await rag.aquery(question)
span.set_attribute("answer_length", len(answer))
TraceSpan
Context manager returned by DistributedTracer.start_span().
class TraceSpan:
span_id: str
trace_id: str
def set_attribute(self, key: str, value: Any) -> None: ...
def add_event(self, name: str, attributes: dict | None = None) -> None: ...
def record_exception(self, exc: Exception) -> None: ...
def set_status(self, status: str) -> None: ... # "ok" or "error"
def end(self) -> None: ...
PrometheusMetrics
Exports LLM cost, token, and latency metrics to Prometheus. Metrics are labelled by model and provider.
Metrics exported:
| Metric | Type | Description |
|---|---|---|
synapsekit_cost_usd_total | Counter | Cumulative LLM cost in USD |
synapsekit_tokens_total | Counter | Cumulative LLM tokens |
synapsekit_latency_seconds | Histogram | LLM latency in seconds (standard buckets up to 10s) |
Install: pip install synapsekit[observe] (includes prometheus-client>=0.20)
A Helm chart for a full Prometheus + Grafana stack is available at assets/helm/synapsekit-observability/.
from synapsekit.observability import PrometheusMetrics
metrics = PrometheusMetrics(
*,
enabled: bool = True,
namespace: str = "synapsekit",
registry: Any | None = None,
start_server: bool = False,
host: str = "0.0.0.0",
port: int = 8000,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
enabled | bool | True | Set to False to disable all metric recording (no-op) |
namespace | str | "synapsekit" | Prometheus metric name prefix |
registry | Any | None | None | Custom CollectorRegistry; defaults to a new private registry |
start_server | bool | False | Start an HTTP metrics server on construction |
host | str | "0.0.0.0" | Bind address for the HTTP server |
port | int | 8000 | Port for the HTTP server |
Methods
start_http_server(*, host: str = "0.0.0.0", port: int = 8000) -> None— start the Prometheus HTTP scrape endpoint; no-op if already startedrecord_llm(*, model: str, provider: str, cost_usd: float | None, total_tokens: int | None, latency_ms: float | None) -> None— record a single LLM call;Nonevalues are silently skippedrecord_span(span: Span) -> None— extract metrics from aSpan; only processes spans withname="llm.generate"
import asyncio
from synapsekit import OpenAILLM, LLMConfig
from synapsekit.observability import PrometheusMetrics, CostTracker
async def main():
llm = OpenAILLM(LLMConfig(model="gpt-4o-mini", api_key="sk-..."))
# Start a Prometheus HTTP server on port 8000
prom = PrometheusMetrics(start_server=True, port=8000)
# Wrap with CostTracker to get cost data
tracker = CostTracker(llm)
questions = ["What is RAG?", "Explain embeddings.", "What is an LLM?"]
for question in questions:
response = await tracker.generate(question)
# Record metrics after each call
last_record = tracker.records[-1]
prom.record_llm(
model=last_record.model,
provider=last_record.provider,
cost_usd=last_record.total_cost_usd,
total_tokens=last_record.input_tokens + last_record.output_tokens,
latency_ms=None, # CostTracker does not record latency; use TracingMiddleware if needed
)
print(f"Total cost: ${tracker.cost_so_far:.4f}")
# Prometheus scrape endpoint is now live at http://0.0.0.0:8000
asyncio.run(main())
Full observability setup example
import asyncio
from synapsekit import RAG, RAGConfig, OpenAILLM, InMemoryVectorStore, SynapsekitEmbeddings, LLMConfig
from synapsekit.observability import OTelExporter, TracingMiddleware, CostTracker, BudgetGuard, BudgetLimit
async def main():
llm = OpenAILLM(LLMConfig(model="gpt-4o-mini", api_key="sk-..."))
guarded_llm = BudgetGuard(
llm=CostTracker(llm),
limit=BudgetLimit(max_cost_usd=5.00, window="day"),
)
OTelExporter(endpoint="http://localhost:4317", insecure=True).install()
store = InMemoryVectorStore(SynapsekitEmbeddings())
config = RAGConfig(llm=guarded_llm, vector_store=store)
rag = RAG(config)
traced_rag = TracingMiddleware(rag)
await traced_rag.aadd(["SynapseKit is an async-first Python library."])
answer = await traced_rag.aquery("What is SynapseKit?")
print(answer)
asyncio.run(main())