π aios_metrics β Metrics SDK (Prometheus + Rolling + Optional Redis)
A lightweight Python SDK that exposes Prometheus-native metrics (Counter, Gauge, Histogram), rolling summaries (1m/5m/15m), optional hardware/process hooks, JSON snapshots, and optional Redis publishing (batching + backoff). Designed to work standalone or alongside your in-cluster Prometheus/Grafana stack.
1) Introduction
aios_metrics
wraps prometheus_client
with a small, ergonomic API:
- First-class Counter/Gauge/Histogram registration & updates
- Labels done right (same calls work for labeled or unlabeled metrics)
- Rolling metrics (thread-safe) for fast 1m/5m/15m averages and counts
- HTTP exporter (Prometheus
/metrics
) with one call - Optional Redis publisher for centralized aggregation (batching + retries)
- Graceful shutdown and structured JSON snapshot of all metrics
- Designed to run inside your service, no sidecar required
2) Installation
From the repo root (or sdks/metrics/
):
# Option A: editable install (dev)
pip install -e sdks/metrics
# Option B: from the metrics SDK folder
cd sdks/metrics
pip install -e .
Runtime dependencies
pip install prometheus-client redis # redis is optional; install if you enable publishing
3) Quick Start
from aios_metrics.metrics import AIOSMetrics, MetricsConfig
# Configure (env-aware). Redis is optional; set METRICS_REDIS_ENABLED=false to disable.
cfg = MetricsConfig(
block_id="block-1",
instance_id="instance-42",
redis_enabled=False, # turn off if you don't need Redis publishing
)
m = AIOSMetrics(config=cfg)
# Register some metrics (do this once at startup)
m.register_counter("jobs_processed_total", "Total jobs processed", labelnames=("status",))
m.register_gauge("queue_size", "Current queue length")
m.register_histogram("job_latency_seconds", "Job latency", buckets=[0.05,0.1,0.25,0.5,1,2,5])
# Start the /metrics endpoint (non-blocking)
m.start_http_server(port=8000)
# Use them in your code
m.increment_counter("jobs_processed_total", labels={"status": "ok"})
m.set_gauge("queue_size", 17)
m.observe_histogram("job_latency_seconds", 0.123)
# Rolling metrics for fast summaries
m.observe_rolling("queue_size", 17)
m.observe_rolling("job_latency_seconds", 0.123)
# Optional: JSON snapshot (for custom collectors/testing)
payload = m.snapshot_json()
print(payload)
# On shutdown
m.stop()
Prometheus will scrape http://<pod-ip>:8000/metrics
(or via Service), and you can build dashboards in Grafana.
4) Configuration (env & args)
You can pass a MetricsConfig
instance or rely on environment variables.
Defaults are sensible for dev; flip envs for production.
Config field / Env | Meaning | Default |
---|---|---|
block_id (BLOCK_ID ) |
Logical block identifier | test-block |
instance_id (INSTANCE_ID ) |
Unique instance identifier | instance-001 |
http_host (METRICS_HOST ) |
HTTP bind address for /metrics |
0.0.0.0 |
http_port (METRICS_PORT ) |
Port for /metrics |
8000 |
redis_enabled (METRICS_REDIS_ENABLED ) |
Enable Redis publishing | true |
redis_host (METRICS_REDIS_HOST ) |
Redis host | localhost |
redis_port (METRICS_REDIS_PORT ) |
Redis port | 6379 |
redis_db (METRICS_REDIS_DB ) |
Redis DB index | 0 |
redis_password (METRICS_REDIS_PASSWORD ) |
Redis auth (optional) | None |
redis_queue_key (METRICS_REDIS_QUEUE ) |
List key for LPUSH | NODE_METRICS |
redis_push_interval_s (METRICS_PUSH_INTERVAL ) |
Snapshot interval (sec) | 30 |
redis_batch_size (METRICS_BATCH_SIZE ) |
Batch LPUSH size (pipeline) | 1 |
redis_backoff_initial_s (METRICS_REDIS_BACKOFF_INITIAL ) |
Retry start backoff | 1 |
redis_backoff_max_s (METRICS_REDIS_BACKOFF_MAX ) |
Retry max backoff | 30 |
avg_1m (ROLLING_AVG_1M ) |
Rolling window seconds | 60 |
avg_5m (ROLLING_AVG_5M ) |
Rolling window seconds | 300 |
avg_15m (ROLLING_AVG_15M ) |
Rolling window seconds | 900 |
default_hist_buckets (HISTO_BUCKETS ) |
Default histogram buckets | 0.1,0.2,0.5,1,2,5,10 |
To fully disable Redis writes: set
METRICS_REDIS_ENABLED=false
or passMetricsConfig(redis_enabled=False)
.
5) Usage Guide
5.1 Register metrics
m.register_counter("events_total", "Total events", labelnames=("type",))
m.register_gauge("workers", "Active workers")
m.register_histogram("op_seconds", "Operation latency")
Register at startup; name collisions with existing metrics will raise from
prometheus_client
(keep names stable).
5.2 Update metrics (with or without labels)
# Counter
m.increment_counter("events_total", labels={"type": "click"}, amount=2)
# Gauge
m.set_gauge("workers", 8)
# Histogram
m.observe_histogram("op_seconds", 0.42, labels={"route": "/api/items"})
5.3 Rolling metrics
Use rolling metrics for quick 1m/5m/15m summaries without PromQL:
m.observe_rolling("queue_depth", 21) # default 15m window
m.observe_rolling("latency", 0.130)
# Optional custom windows on first observation:
m.observe_rolling("latency_p95_proxy", 0.200, window_seconds=300)
# Inspect summaries (also included in snapshot)
print(m.get_extended_metrics())
# -> {"latency": {"current": 0.13, "count_1m": 12, "average_1m": 0.11, "average_5m": 0.12, "average_15m": 0.10}, ...}
You can also organize category-scoped rollups:
m.observe_custom_rolling("model", "inference_ms", 42.1, window_seconds=300)
5.4 Expose /metrics
HTTP endpoint
m.start_http_server(port=8000, host="0.0.0.0") # non-blocking background thread
Now Prometheus can scrape your service. Combine with a Kubernetes Service/ServiceMonitor if you use kube-prometheus-stack.
5.5 Optional Redis publishing
When enabled, the SDK periodically pushes a JSON snapshot into a Redis list (LPUSH), with batching and exponential backoff on failures.
# Enable via config or METRICS_REDIS_ENABLED=true (default)
cfg = MetricsConfig(redis_enabled=True, redis_queue_key="NODE_METRICS", redis_batch_size=5, redis_push_interval_s=10)
m = AIOSMetrics(config=cfg)
m.start_http_server() # starts Redis writer too (if enabled)
Consumers can then read and process the stream (e.g., global aggregators or dashboards).
5.6 Snapshot the current state
data = m.snapshot() # dict of current metrics + hardware + rolling
payload = m.snapshot_json()
Snapshot includes:
- Your registered metric samples (labels encoded as
metric{label=val,...}
) - Hardware metrics (if available)
- Rolling metrics summaries
5.7 Graceful shutdown
m.stop() # flushes Redis batch and joins background threads
6) Public API Reference (with examples)
class MetricsConfig(dataclass)
Holds configuration for HTTP server, Redis publishing, rolling windows, and default histogram buckets. Create directly or rely on env vars:
cfg = MetricsConfig(redis_enabled=False, http_port=9000)
class AIOSMetrics
__init__(block_id: Optional[str] = None, config: Optional[MetricsConfig] = None, logger: Optional[logging.Logger] = None)
Create an instance. If block_id
is None, uses config.block_id
or env.
Initializes hardware metrics if available; sets up internal locks and threads.
m = AIOSMetrics(config=MetricsConfig(redis_enabled=False))
Registration
register_counter(name, documentation, labelnames: Optional[Iterable[str]] = None) -> None
register_gauge(name, documentation, labelnames: Optional[Iterable[str]] = None) -> None
register_histogram(name, documentation, labelnames: Optional[Iterable[str]] = None, buckets: Optional[Iterable[float]] = None) -> None
m.register_counter("requests_total", "Total requests", labelnames=("method","code"))
m.register_gauge("inflight_requests", "In-flight")
m.register_histogram("request_seconds", "Latency", buckets=[0.05,0.1,0.25,0.5,1,2,5])
Updating
increment_counter(name, labels: Optional[Dict[str,str]] = None, amount: float = 1.0) -> None
set_gauge(name, value: float, labels: Optional[Dict[str,str]] = None) -> None
observe_histogram(name, value: float, labels: Optional[Dict[str,str]] = None) -> None
m.increment_counter("requests_total", {"method":"GET", "code":"200"})
m.set_gauge("inflight_requests", 3)
m.observe_histogram("request_seconds", 0.087, {"method":"GET"})
Rolling metrics
observe_rolling(name: str, value: float, window_seconds: Optional[int] = None) -> None
observe_custom_rolling(category: str, name: str, value: float, window_seconds: Optional[int] = None) -> None
get_extended_metrics() -> Dict[str, Any]
m.observe_rolling("qps", 125.0)
print(m.get_extended_metrics()["qps"])
HTTP & Redis lifecycle
-
start_http_server(port: Optional[int] = None, host: Optional[str] = None) -> None
Starts the HTTP metrics endpoint; ifredis_enabled
, also starts the Redis writer thread. -
stop(join_timeout: float = 5.0) -> None
Signals background threads to stop and joins.
m.start_http_server() # expose /metrics and (optionally) enable Redis stream
# ...
m.stop()
Snapshots
snapshot() -> Dict[str, Any]
snapshot_json() -> str
data = m.snapshot()
print(data["rolling"]["latency"]["average_1m"])
Convenience
-
register_standard_block_metrics() -> None
Registers a small default set:aios_tasks_processed_total
,aios_queue_length
,aios_latency_seconds
. -
record_task_done(status: str, latency_s: Optional[float] = None, queue_len: Optional[int] = None) -> None
Increments counter by status, updates histogram/gauge, and feeds rolling metrics.
m.register_standard_block_metrics()
m.record_task_done(status="ok", latency_s=0.145, queue_len=12)
class RollingMetric
Thread-safe rolling window for numeric observations.
add(value: float) -> None
average(window: int) -> float
current() -> float
count(window: Optional[int] = None) -> int
from aios_metrics.metrics import RollingMetric
rm = RollingMetric(window_seconds=900)
rm.add(1.0); rm.add(2.0)
print(rm.average(60), rm.current(), rm.count())
7) Patterns & Recipes
7.1 Labeled counters that never explode
Keep label cardinality under control (e.g., HTTP status, route template). Avoid raw user IDs or timestamps.
m.register_counter("http_requests_total", "HTTP requests", labelnames=("method","code","route"))
m.increment_counter("http_requests_total", {"method":"GET","code":"200","route":"/api/items"})
7.2 Gauges from background threads
If you update gauges periodically, donβt forget to call m.stop()
on shutdown to stop those threads cleanly.
7.3 Histograms for latency SLOs
Pick buckets that match your SLOs and expected distribution; prefer few well-chosen buckets.
7.4 Redis as a side-channel
If Prometheus scraping is impractical (air-gapped, multi-cluster fan-in), enable Redis publishing and consume the list from a central service.
8) Troubleshooting
Symptom | Likely cause | Fix |
---|---|---|
/metrics not reachable |
Service not exposed or wrong port | m.start_http_server(port=...) , check K8s Service |
Duplicate metric errors | Re-registered same name/type | Register once at startup; avoid module reload conflicts |
High label cardinality | Unbounded labels (user IDs, trace IDs) | Reduce labels; bucketize or remove |
Redis writer errors | Auth/endpoint/SSL mismatch | Verify METRICS_REDIS_* ; check pipeline permissions |
Empty rolling summaries | No observe_rolling() calls |
Feed rolling metrics alongside Prometheus updates |