Skip to content

General Transactions Server

Introduction

The Transactions Server is a centrally deployed service that your agents (clients) use to record and query transactional events. Agents submit transactions via a lightweight SDK (or direct REST calls). The server persists data in PostgreSQL/TimescaleDB and uses a Redis-backed queue for durable, high-throughput ingestion. Writes are asynchronous: API calls enqueue operations and the background worker commits them to the database in efficient batches, either when a batch fills up or when a short timer elapses.

Core components

  • REST API: enqueue writes and run rich queries.
  • Redis Stream: durable write queue, consumer groups for scale-out.
  • Worker: reads from Redis, folds & batches ops, commits to DB, acks on success.
  • DB: PostgreSQL/TimescaleDB table transactions with JSONB and array fields for flexibility.

Data Model

Table: transactions

All timestamps are epoch seconds (BIGINT). JSON stored as JSONB. Arrays are TEXT[].

Field Type Description Examples / Notes
tx_id TEXT PK Unique transaction id "tx_01a2b3"
tx_group_id TEXT Logical batch/group identifier "import_2025_09_10"
timestamp BIGINT Event time (epoch seconds) 1725945600
tx_status TEXT Status/state of the transaction "pending", "completed", "failed"
tx_input_data JSONB Arbitrary input payload {"amount":100,"currency":"INR"}
tx_output_data JSONB Arbitrary output/result payload {"ok":true,"ref":"ABC123"}
tx_subject_id TEXT Subject that owns the transaction (your agent identity, tenant, or user) "agent_42"
tx_parent_subject_ids TEXT[] Related/parent subjects ["org_main","project_alpha"]
tx_type TEXT High-level type "payment", "task", "event"
tx_sub_type TEXT Subtype "upi", "retry", "webhook"

Indexes

  • BTREE: timestamp, tx_status, tx_group_id, tx_subject_id, tx_type, tx_sub_type
  • GIN: tx_input_data, tx_output_data, tx_parent_subject_ids

Notes

  • If you later enable Timescale, you can add a computed ts timestamptz GENERATED ALWAYS AS (to_timestamp(timestamp)) STORED and make transactions a hypertable on ts.

REST API

Base URL http://<management-server-host>:8080/

Auth If enabled in your deployment, pass Authorization: Bearer <token>.

Content type Content-Type: application/json

Error shape {"error": "message"} with appropriate HTTP status.

Health

  • GET /health → 200 {"ok": true}

Write APIs (enqueue to Redis, then worker commits to DB)

All return 202 Accepted with the Redis stream id: {"queued": true, "id": "<stream-id>"}

  • POST /v1/transactions/insert Body: full transaction record (must include tx_id; the SDK can generate one)
  • POST /v1/transactions/upsert Body: full record; on conflict by tx_id, fields are updated
  • PATCH /v1/transactions/{tx_id}/status Body: {"status":"<new_status>"}
  • PATCH /v1/transactions/{tx_id} Body: {"fields": { ... updatable fields ... }}
  • DELETE /v1/transactions/{tx_id}

Example

curl -X POST http://localhost:8080/v1/transactions/upsert \
  -H 'Content-Type: application/json' \
  -d '{
    "tx_id": "tx_001",
    "tx_group_id": "batch_A",
    "timestamp": 1725960000,
    "tx_status": "pending",
    "tx_input_data": {"amount": 500, "currency": "INR"},
    "tx_subject_id": "agent_42",
    "tx_parent_subject_ids": ["org1"],
    "tx_type": "payment",
    "tx_sub_type": "upi"
  }'

Read/query APIs (query DB synchronously)

  • GET /v1/transactions/{tx_id}
  • GET /v1/transactions/list_by_group?tx_group_id=...&limit=...&offset=...&order_by=...
  • GET /v1/transactions/list_by_status?tx_status=...
  • GET /v1/transactions/list_by_subject?tx_subject_id=...
  • GET /v1/transactions/list_by_type?tx_type=...&tx_sub_type=...
  • GET /v1/transactions/range_by_time?start_ts=...&end_ts=...
  • POST /v1/transactions/search_input_contains Body: {"subset": { ... }, "limit": 100, "offset": 0, "order_by": "timestamp DESC"}
  • POST /v1/transactions/search_output_contains Body as above
  • POST /v1/transactions/json_path_equals Body: {"column":"tx_input_data","path":["customer","id"],"value":"CUST-1","limit":100,"offset":0}
  • GET /v1/stats/status_counts?start_ts=...&end_ts=...
  • GET /v1/stats/type_counts?include_sub_type=true
  • GET /v1/histogram?bucket_seconds=3600&start_ts=...&end_ts=...&tx_status=...
  • POST /v1/transactions/query (generic, safe query builder) Body:

json { "where": { "and": [ {"field":"tx_type","op":"IN","value":["payment","refund"]}, {"field":"timestamp","op":">=","value":1725900000} ] }, "limit": 200, "offset": 0, "order_by": "timestamp DESC", "select_cols": ["tx_id","timestamp","tx_status","tx_type"] }

Example

curl -X POST http://localhost:8080/v1/transactions/query \
  -H 'Content-Type: application/json' \
  -d '{
    "where": [
      {"field": "tx_status", "op": "=", "value": "completed"},
      {"field": "tx_subject_id", "op": "=", "value": "agent_42"}
    ],
    "limit": 50,
    "order_by": "timestamp DESC"
  }'

Transaction Queue & Ingestion

Why a queue?

Agents often produce bursts of events. The queue decouples producers from the database and allows the server to fold & batch operations for high throughput and resilience.

Transport

  • Redis Stream: TX_STREAM (default tx_queue)
  • Consumer Group: TX_GROUP (default tx_workers)
  • Messages are appended with XADD; the worker reads with XREADGROUP and acknowledges with XACK after a successful DB commit.

Message formats

All messages have a data field containing JSON:

  • Insert {"op":"insert","record":{...full transaction...}}
  • Upsert {"op":"upsert","record":{...full transaction...}}
  • Update status {"op":"update_status","tx_id":"tx_001","status":"completed"}
  • Update fields {"op":"update_fields","tx_id":"tx_001","fields":{"tx_output_data":{"ok":true}}}
  • Delete {"op":"delete","tx_id":"tx_001"}

Batching & timing

  • The worker accumulates messages and flushes when:

  • batch_size items have been collected, or

  • max_wait_s seconds have elapsed since the first item was buffered.
  • This guarantees bounded latency under light load and high throughput under heavy load.

Folding & idempotency

  • The worker folds multiple operations for the same tx_id within a batch:

  • Later upsert overrides earlier ones in the batch.

  • Field updates are merged into the final upsert for that tx_id.
  • delete wins and cancels prior upserts/updates for that tx_id within the batch.
  • Failed DB writes leave messages pending; they are retried. Stale pending entries are auto-claimed from crashed consumers.

Typical client flow (SDK)

Using the queue client (agents):

from agents_txs import GeneralAgentsTransactionManager

mgr = GeneralAgentsTransactionManager(subject_id="agent_42")

tx_id = mgr.new_tx_id()
mgr.upsert({
    "tx_id": tx_id,
    "tx_group_id": "batch_A",
    "timestamp": 1725960000,
    "tx_status": "pending",
    "tx_input_data": {"amount": 500, "currency": "INR"},
    "tx_type": "payment",
    "tx_sub_type": "upi"
})

mgr.update_status(tx_id, "completed")
mgr.update_fields(tx_id, {"tx_output_data": {"ok": True, "ref": "ABC123"}})

Operational settings (env)

  • REDIS_URL (e.g., redis://redis:6379/0)
  • TX_STREAM (default tx_queue)
  • TX_GROUP (default tx_workers)
  • TX_BATCH_SIZE (default 200)
  • TX_MAX_WAIT_S (default 1.5)
  • TX_BLOCK_MS (default 1000)
  • TX_CLAIM_STALE_MS (default 60000)
  • TX_CLAIM_INTERVAL_S (default 30)
  • PG_DSN (e.g., postgresql://user:pass@host:5432/db)
  • API_HOST / API_PORT (default 0.0.0.0:8080)

Quick Start

  1. Start the server (runs API in a thread, worker in the main loop):
agents-txs-serve
  1. From an agent, send an upsert:
curl -X POST http://<server>:8080/v1/transactions/upsert \
  -H 'Content-Type: application/json' \
  -d '{"tx_id":"tx_demo","timestamp":1725960000,"tx_status":"pending","tx_subject_id":"agent_42"}'
  1. Query it:
curl -X GET "http://<server>:8080/v1/transactions/list_by_subject?tx_subject_id=agent_42&limit=10"