🧠aios_tracing — Distributed Tracing SDK
A lightweight Python SDK that wraps OpenTelemetry to make tracing dead-simple for services and agents. It ships sensible defaults, decorators for sync/async spans, context propagation helpers, structured events, exception recording, logging correlation, and easy auto-instrumentation for common libraries. Works out-of-the-box with your OTel Collector → Tempo deployment.
1) Introduction
aios_tracing
provides a small, ergonomic class—TracingSDK
—that hides most OpenTelemetry boilerplate:
- Export over OTLP gRPC or HTTP (to your OTel Collector)
- W3C propagation by default; B3 optional
- Simple decorators for functions (
@sdk.trace
,@sdk.trace_async
) - Context manager for manual spans (
with sdk.span(...)
) - Add events, set attributes, record exceptions
- Inject/extract headers for RPC/HTTP boundaries
- Logging correlation (trace/span IDs in your logs)
- One-line auto-instrumentation for requests, SQLAlchemy, Redis, gRPC, Flask/FastAPI, etc.
2) Installation
From the repo root (or the sdks/tracing/
directory):
# Option A: editable install (dev)
pip install -e sdks/tracing
# Option B: from the tracing SDK folder
cd sdks/tracing
pip install -e .
Runtime dependencies
The SDK expects OpenTelemetry libraries:
pip install \
opentelemetry-api \
opentelemetry-sdk \
opentelemetry-exporter-otlp \
opentelemetry-proto \
opentelemetry-instrumentation \
opentelemetry-instrumentation-requests \
opentelemetry-instrumentation-logging
# Optional extras (install as needed)
pip install \
opentelemetry-instrumentation-flask \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-sqlalchemy \
opentelemetry-instrumentation-psycopg2 \
opentelemetry-instrumentation-redis \
opentelemetry-instrumentation-grpc \
opentelemetry-instrumentation-aiohttp-client
Prefer gRPC for in-cluster traffic: it’s efficient and widely supported. Use HTTP when gRPC isn’t viable.
3) Quick Start
from aios_tracing.tracing import TracingSDK
from opentelemetry.trace import SpanKind
# Point to your OTel Collector service (K8s service or localhost)
sdk = TracingSDK(
service_name="orders-api",
deployment_env="prod",
otlp_protocol="grpc", # "grpc" or "http"
otlp_endpoint="otel-collector.tracing.svc:4317",
otlp_insecure=True, # OK inside cluster
sampling_ratio=1.0, # 0.0..1.0
use_b3_propagation=False, # set True if your mesh uses B3
resource_attributes={"service.version": "1.4.0"},
)
# Optional: turn on popular instrumentations
sdk.enable_instrumentations(
requests=True, sqlalchemy=True, psycopg2=True,
redis=True, grpc_client=True, flask=False, fastapi=False
)
# Decorator (sync)
@sdk.trace("fetch_user", attrs={"db.table": "users"})
def fetch_user(uid: str):
return {"id": uid, "name": "Jane"}
# Context manager with events/attributes
with sdk.span("checkout", kind=SpanKind.SERVER, attributes={"cart.items": 3}) as span:
user = fetch_user("u_123")
sdk.set_attributes(span, {"user.id": user["id"]})
sdk.add_event(span, "payment.started", {"amount": 1499, "currency": "INR"})
# Graceful flush (also happens at process exit)
sdk.shutdown()
4) Configuration (env & args)
You can configure via constructor args or environment variables.
Parameter / Env | Meaning | Default |
---|---|---|
service_name |
Logical service name | required |
deployment_env (DEPLOYMENT_ENV ) |
Environment tag (dev , staging , prod ) |
dev |
otlp_protocol (OTLP_PROTOCOL ) |
grpc or http |
grpc |
otlp_endpoint (OTLP_ENDPOINT ) |
Collector endpoint (host\:port or URL) | http://localhost:4317 |
otlp_insecure |
Disable TLS for gRPC exporter | True |
otlp_headers |
Dict of headers to pass (auth, tenant) | None |
sampling_ratio (OTEL_TRACES_SAMPLER_ARG ) |
Probability 0.0–1.0 | 1.0 |
use_batch |
Batch vs Simple processor | True |
enable_console_exporter |
Print spans to stdout | False |
use_b3_propagation |
Use B3 instead of W3C | False |
resource_attributes |
Extra resource attrs (service.version , region , etc.) |
{} |
enable_logging_correlation |
Put trace/span IDs in logs | True |
Notes:
- gRPC exporter accepts
host:port
(strip scheme); HTTP exporter needs a full URL and appends/v1/traces
automatically. - Sampler used:
ParentBased(TraceIdRatioBased(sampling_ratio))
.
5) Usage Guide
5.1 Create spans
Context manager (manual control)
from opentelemetry.trace import SpanKind
with sdk.span("db.query", kind=SpanKind.CLIENT, attributes={"db.system": "postgres"}) as span:
# do work...
pass
Decorator (sync)
@sdk.trace("user.lookup", attrs={"component": "user-service"})
def get_user(uid: str): ...
Decorator (async)
@sdk.trace_async("render.page", kind=SpanKind.SERVER)
async def render_page(request): ...
Imperative start/finish
span = sdk.start_span("work", attributes={"step": 1})
try:
# ...
pass
finally:
span.end()
5.2 Add events, attributes, exceptions
with sdk.span("process") as span:
sdk.add_event(span, "parse.start", {"items": 42})
try:
# ...
pass
except Exception as e:
sdk.record_exception(span, e, stage="parse")
raise
5.3 Propagate context across services
Client side (inject headers)
headers = sdk.inject_headers({})
# requests.get("http://payments.svc/charge", headers=headers)
Server side (extract & attach)
from opentelemetry import context
def handler(req):
ctx = sdk.extract_context(dict(req.headers))
token = context.attach(ctx)
try:
with sdk.span("handle.request"): ...
finally:
context.detach(token)
Link to the current span
link = sdk.link_current_span() # returns an otel.Link or None
child = sdk.start_span("followup", links=[link] if link else None)
child.end()
5.4 Logging correlation
If enable_logging_correlation=True
(default), your logs carry trace_id
and span_id
.
- If
opentelemetry-instrumentation-logging
is available, it wires into the stdlib logger. - Otherwise, the SDK adds a lightweight logging filter with a format like:
2025-09-08 10:00:00 INFO [trace=5c... span=8d...] mylogger: message
5.5 Auto-instrumentation helpers
sdk.enable_instrumentations(
requests=True,
aiohttp_client=False,
grpc_client=True,
sqlalchemy=True,
psycopg2=True,
redis=True,
flask=False,
fastapi=False,
logging_correlation=True
)
Missing packages are silently ignored with an info log.
6) Public API Reference (with examples)
Below are the public methods exposed by TracingSDK
and common helpers.
TracingSDK.__init__(...)
Create and configure the tracer provider, exporter, processors, propagators, and logging correlation.
Example
sdk = TracingSDK(
service_name="billing",
otlp_protocol="grpc",
otlp_endpoint="otel-collector.tracing.svc:4317",
sampling_ratio=0.5,
use_b3_propagation=False,
resource_attributes={"service.version": "2.0.0", "region": "ap-south-1"},
)
tracer() -> opentelemetry.trace.Tracer
Return the underlying OpenTelemetry Tracer
.
Example
tr = sdk.tracer()
span = tr.start_span("raw.span")
span.end()
span(name, attributes=None, kind=SpanKind.INTERNAL, links=None, record_exceptions=True)
Context manager that creates a span and ends it on exit. If an exception occurs and record_exceptions=True
, it is recorded and the span status set to ERROR.
Example
with sdk.span("io.read", attributes={"file": "/tmp/data"}, kind=SpanKind.CLIENT) as span:
...
start_span(name, attributes=None, kind=SpanKind.INTERNAL, links=None) -> Span
Imperatively start a span. You must call span.end()
yourself.
Example
span = sdk.start_span("pipeline.step", {"step": "B"})
# ...
span.end()
add_event(span, name, attributes=None) -> None
Attach a structured event to a span.
Example
sdk.add_event(span, "cache.miss", {"key": "user:123"})
set_attributes(span, attributes: Dict[str, Any]) -> None
Set multiple attributes on a span.
sdk.set_attributes(span, {"http.method": "GET", "http.status_code": 200})
record_exception(span, exc: BaseException, **attrs) -> None
Record an exception and set span status to ERROR.
try:
1/0
except Exception as e:
sdk.record_exception(span, e, op="divide")
inject_headers(carrier: Dict[str, str], ctx: Optional[Context] = None) -> Dict[str, str>
Inject the current (or provided) context into a headers-like carrier for outbound requests.
headers = sdk.inject_headers({})
extract_context(carrier: Dict[str, str]) -> Context
Extract a context from inbound headers.
from opentelemetry import context
ctx = sdk.extract_context(dict(request.headers))
token = context.attach(ctx)
# ... handle request ...
context.detach(token)
link_current_span() -> Optional[Link]
Create a link to the current span’s context (useful for fan-in/fan-out).
link = sdk.link_current_span()
child = sdk.start_span("aggregate", links=[link] if link else None)
child.end()
trace(name: Optional[str] = None, *, kind=SpanKind.INTERNAL, attrs=None)
Decorator for sync functions; auto-creates a span, records exceptions, sets attributes, and ends the span.
@sdk.trace("db.insert", attrs={"table": "orders"})
def insert_order(o): ...
trace_async(name: Optional[str] = None, *, kind=SpanKind.INTERNAL, attrs=None)
Decorator for async functions with the same behavior as trace
.
@sdk.trace_async("render.page", kind=SpanKind.SERVER)
async def render(...): ...
enable_instrumentations(...flags...) -> None
Enable common OpenTelemetry instrumentations; missing ones are skipped gracefully.
sdk.enable_instrumentations(requests=True, sqlalchemy=True, grpc_client=True)
shutdown() -> None
Flush and shutdown the tracer provider (also called via atexit
).
sdk.shutdown()
7) Framework Integration Snippets
Flask (server-side extraction)
from flask import Flask, request
from opentelemetry import context
from aios_tracing.tracing import TracingSDK
app = Flask(__name__)
sdk = TracingSDK(service_name="flask-api", otlp_endpoint="otel-collector.tracing.svc:4317")
@app.before_request
def attach_ctx():
ctx = sdk.extract_context(dict(request.headers))
request._otel_token = context.attach(ctx)
@app.after_request
def detach_ctx(resp):
token = getattr(request, "_otel_token", None)
if token:
context.detach(token)
return resp
@app.route("/hello")
def hello():
with sdk.span("hello.handler"):
return "ok"
FastAPI
from fastapi import FastAPI, Request
from opentelemetry import context
from aios_tracing.tracing import TracingSDK
app = FastAPI()
sdk = TracingSDK(service_name="fastapi-api", otlp_endpoint="otel-collector.tracing.svc:4317")
@app.middleware("http")
async def tracing_mw(request: Request, call_next):
ctx = sdk.extract_context(dict(request.headers))
token = context.attach(ctx)
try:
with sdk.span("http.request"):
resp = await call_next(request)
return resp
finally:
context.detach(token)