Skip to content

πŸ“ˆ aios_metrics β€” Metrics SDK (Prometheus + Rolling + Optional Redis)

A lightweight Python SDK that exposes Prometheus-native metrics (Counter, Gauge, Histogram), rolling summaries (1m/5m/15m), optional hardware/process hooks, JSON snapshots, and optional Redis publishing (batching + backoff). Designed to work standalone or alongside your in-cluster Prometheus/Grafana stack.


1) Introduction

aios_metrics wraps prometheus_client with a small, ergonomic API:

  • First-class Counter/Gauge/Histogram registration & updates
  • Labels done right (same calls work for labeled or unlabeled metrics)
  • Rolling metrics (thread-safe) for fast 1m/5m/15m averages and counts
  • HTTP exporter (Prometheus /metrics) with one call
  • Optional Redis publisher for centralized aggregation (batching + retries)
  • Graceful shutdown and structured JSON snapshot of all metrics
  • Designed to run inside your service, no sidecar required

2) Installation

From the repo root (or sdks/metrics/):

# Option A: editable install (dev)
pip install -e sdks/metrics

# Option B: from the metrics SDK folder
cd sdks/metrics
pip install -e .

Runtime dependencies

pip install prometheus-client redis  # redis is optional; install if you enable publishing

3) Quick Start

from aios_metrics.metrics import AIOSMetrics, MetricsConfig

# Configure (env-aware). Redis is optional; set METRICS_REDIS_ENABLED=false to disable.
cfg = MetricsConfig(
    block_id="block-1",
    instance_id="instance-42",
    redis_enabled=False,           # turn off if you don't need Redis publishing
)

m = AIOSMetrics(config=cfg)

# Register some metrics (do this once at startup)
m.register_counter("jobs_processed_total", "Total jobs processed", labelnames=("status",))
m.register_gauge("queue_size", "Current queue length")
m.register_histogram("job_latency_seconds", "Job latency", buckets=[0.05,0.1,0.25,0.5,1,2,5])

# Start the /metrics endpoint (non-blocking)
m.start_http_server(port=8000)

# Use them in your code
m.increment_counter("jobs_processed_total", labels={"status": "ok"})
m.set_gauge("queue_size", 17)
m.observe_histogram("job_latency_seconds", 0.123)

# Rolling metrics for fast summaries
m.observe_rolling("queue_size", 17)
m.observe_rolling("job_latency_seconds", 0.123)

# Optional: JSON snapshot (for custom collectors/testing)
payload = m.snapshot_json()
print(payload)

# On shutdown
m.stop()

Prometheus will scrape http://<pod-ip>:8000/metrics (or via Service), and you can build dashboards in Grafana.


4) Configuration (env & args)

You can pass a MetricsConfig instance or rely on environment variables. Defaults are sensible for dev; flip envs for production.

Config field / Env Meaning Default
block_id (BLOCK_ID) Logical block identifier test-block
instance_id (INSTANCE_ID) Unique instance identifier instance-001
http_host (METRICS_HOST) HTTP bind address for /metrics 0.0.0.0
http_port (METRICS_PORT) Port for /metrics 8000
redis_enabled (METRICS_REDIS_ENABLED) Enable Redis publishing true
redis_host (METRICS_REDIS_HOST) Redis host localhost
redis_port (METRICS_REDIS_PORT) Redis port 6379
redis_db (METRICS_REDIS_DB) Redis DB index 0
redis_password (METRICS_REDIS_PASSWORD) Redis auth (optional) None
redis_queue_key (METRICS_REDIS_QUEUE) List key for LPUSH NODE_METRICS
redis_push_interval_s (METRICS_PUSH_INTERVAL) Snapshot interval (sec) 30
redis_batch_size (METRICS_BATCH_SIZE) Batch LPUSH size (pipeline) 1
redis_backoff_initial_s (METRICS_REDIS_BACKOFF_INITIAL) Retry start backoff 1
redis_backoff_max_s (METRICS_REDIS_BACKOFF_MAX) Retry max backoff 30
avg_1m (ROLLING_AVG_1M) Rolling window seconds 60
avg_5m (ROLLING_AVG_5M) Rolling window seconds 300
avg_15m (ROLLING_AVG_15M) Rolling window seconds 900
default_hist_buckets (HISTO_BUCKETS) Default histogram buckets 0.1,0.2,0.5,1,2,5,10

To fully disable Redis writes: set METRICS_REDIS_ENABLED=false or pass MetricsConfig(redis_enabled=False).


5) Usage Guide

5.1 Register metrics

m.register_counter("events_total", "Total events", labelnames=("type",))
m.register_gauge("workers", "Active workers")
m.register_histogram("op_seconds", "Operation latency")

Register at startup; name collisions with existing metrics will raise from prometheus_client (keep names stable).

5.2 Update metrics (with or without labels)

# Counter
m.increment_counter("events_total", labels={"type": "click"}, amount=2)

# Gauge
m.set_gauge("workers", 8)

# Histogram
m.observe_histogram("op_seconds", 0.42, labels={"route": "/api/items"})

5.3 Rolling metrics

Use rolling metrics for quick 1m/5m/15m summaries without PromQL:

m.observe_rolling("queue_depth", 21)          # default 15m window
m.observe_rolling("latency", 0.130)

# Optional custom windows on first observation:
m.observe_rolling("latency_p95_proxy", 0.200, window_seconds=300)

# Inspect summaries (also included in snapshot)
print(m.get_extended_metrics())
# -> {"latency": {"current": 0.13, "count_1m": 12, "average_1m": 0.11, "average_5m": 0.12, "average_15m": 0.10}, ...}

You can also organize category-scoped rollups:

m.observe_custom_rolling("model", "inference_ms", 42.1, window_seconds=300)

5.4 Expose /metrics HTTP endpoint

m.start_http_server(port=8000, host="0.0.0.0")  # non-blocking background thread

Now Prometheus can scrape your service. Combine with a Kubernetes Service/ServiceMonitor if you use kube-prometheus-stack.

5.5 Optional Redis publishing

When enabled, the SDK periodically pushes a JSON snapshot into a Redis list (LPUSH), with batching and exponential backoff on failures.

# Enable via config or METRICS_REDIS_ENABLED=true (default)
cfg = MetricsConfig(redis_enabled=True, redis_queue_key="NODE_METRICS", redis_batch_size=5, redis_push_interval_s=10)
m = AIOSMetrics(config=cfg)
m.start_http_server()   # starts Redis writer too (if enabled)

Consumers can then read and process the stream (e.g., global aggregators or dashboards).

5.6 Snapshot the current state

data = m.snapshot()       # dict of current metrics + hardware + rolling
payload = m.snapshot_json()

Snapshot includes:

  • Your registered metric samples (labels encoded as metric{label=val,...})
  • Hardware metrics (if available)
  • Rolling metrics summaries

5.7 Graceful shutdown

m.stop()  # flushes Redis batch and joins background threads

6) Public API Reference (with examples)

class MetricsConfig(dataclass)

Holds configuration for HTTP server, Redis publishing, rolling windows, and default histogram buckets. Create directly or rely on env vars:

cfg = MetricsConfig(redis_enabled=False, http_port=9000)

class AIOSMetrics

__init__(block_id: Optional[str] = None, config: Optional[MetricsConfig] = None, logger: Optional[logging.Logger] = None)

Create an instance. If block_id is None, uses config.block_id or env. Initializes hardware metrics if available; sets up internal locks and threads.

m = AIOSMetrics(config=MetricsConfig(redis_enabled=False))

Registration

  • register_counter(name, documentation, labelnames: Optional[Iterable[str]] = None) -> None
  • register_gauge(name, documentation, labelnames: Optional[Iterable[str]] = None) -> None
  • register_histogram(name, documentation, labelnames: Optional[Iterable[str]] = None, buckets: Optional[Iterable[float]] = None) -> None
m.register_counter("requests_total", "Total requests", labelnames=("method","code"))
m.register_gauge("inflight_requests", "In-flight")
m.register_histogram("request_seconds", "Latency", buckets=[0.05,0.1,0.25,0.5,1,2,5])

Updating

  • increment_counter(name, labels: Optional[Dict[str,str]] = None, amount: float = 1.0) -> None
  • set_gauge(name, value: float, labels: Optional[Dict[str,str]] = None) -> None
  • observe_histogram(name, value: float, labels: Optional[Dict[str,str]] = None) -> None
m.increment_counter("requests_total", {"method":"GET", "code":"200"})
m.set_gauge("inflight_requests", 3)
m.observe_histogram("request_seconds", 0.087, {"method":"GET"})

Rolling metrics

  • observe_rolling(name: str, value: float, window_seconds: Optional[int] = None) -> None
  • observe_custom_rolling(category: str, name: str, value: float, window_seconds: Optional[int] = None) -> None
  • get_extended_metrics() -> Dict[str, Any]
m.observe_rolling("qps", 125.0)
print(m.get_extended_metrics()["qps"])

HTTP & Redis lifecycle

  • start_http_server(port: Optional[int] = None, host: Optional[str] = None) -> None Starts the HTTP metrics endpoint; if redis_enabled, also starts the Redis writer thread.

  • stop(join_timeout: float = 5.0) -> None Signals background threads to stop and joins.

m.start_http_server()  # expose /metrics and (optionally) enable Redis stream
# ...
m.stop()

Snapshots

  • snapshot() -> Dict[str, Any]
  • snapshot_json() -> str
data = m.snapshot()
print(data["rolling"]["latency"]["average_1m"])

Convenience

  • register_standard_block_metrics() -> None Registers a small default set: aios_tasks_processed_total, aios_queue_length, aios_latency_seconds.

  • record_task_done(status: str, latency_s: Optional[float] = None, queue_len: Optional[int] = None) -> None Increments counter by status, updates histogram/gauge, and feeds rolling metrics.

m.register_standard_block_metrics()
m.record_task_done(status="ok", latency_s=0.145, queue_len=12)

class RollingMetric

Thread-safe rolling window for numeric observations.

  • add(value: float) -> None
  • average(window: int) -> float
  • current() -> float
  • count(window: Optional[int] = None) -> int
from aios_metrics.metrics import RollingMetric
rm = RollingMetric(window_seconds=900)
rm.add(1.0); rm.add(2.0)
print(rm.average(60), rm.current(), rm.count())

7) Patterns & Recipes

7.1 Labeled counters that never explode

Keep label cardinality under control (e.g., HTTP status, route template). Avoid raw user IDs or timestamps.

m.register_counter("http_requests_total", "HTTP requests", labelnames=("method","code","route"))
m.increment_counter("http_requests_total", {"method":"GET","code":"200","route":"/api/items"})

7.2 Gauges from background threads

If you update gauges periodically, don’t forget to call m.stop() on shutdown to stop those threads cleanly.

7.3 Histograms for latency SLOs

Pick buckets that match your SLOs and expected distribution; prefer few well-chosen buckets.

7.4 Redis as a side-channel

If Prometheus scraping is impractical (air-gapped, multi-cluster fan-in), enable Redis publishing and consume the list from a central service.


8) Troubleshooting

Symptom Likely cause Fix
/metrics not reachable Service not exposed or wrong port m.start_http_server(port=...), check K8s Service
Duplicate metric errors Re-registered same name/type Register once at startup; avoid module reload conflicts
High label cardinality Unbounded labels (user IDs, trace IDs) Reduce labels; bucketize or remove
Redis writer errors Auth/endpoint/SSL mismatch Verify METRICS_REDIS_*; check pipeline permissions
Empty rolling summaries No observe_rolling() calls Feed rolling metrics alongside Prometheus updates