General Transactions Server
Introduction
The Transactions Server is a centrally deployed service that your agents (clients) use to record and query transactional events. Agents submit transactions via a lightweight SDK (or direct REST calls). The server persists data in PostgreSQL/TimescaleDB and uses a Redis-backed queue for durable, high-throughput ingestion. Writes are asynchronous: API calls enqueue operations and the background worker commits them to the database in efficient batches, either when a batch fills up or when a short timer elapses.
Core components
- REST API: enqueue writes and run rich queries.
- Redis Stream: durable write queue, consumer groups for scale-out.
- Worker: reads from Redis, folds & batches ops, commits to DB, acks on success.
- DB: PostgreSQL/TimescaleDB table
transactions
with JSONB and array fields for flexibility.
Data Model
Table: transactions
All timestamps are epoch seconds (BIGINT
). JSON stored as JSONB
. Arrays are TEXT[]
.
Field | Type | Description | Examples / Notes |
---|---|---|---|
tx_id |
TEXT PK |
Unique transaction id | "tx_01a2b3" |
tx_group_id |
TEXT |
Logical batch/group identifier | "import_2025_09_10" |
timestamp |
BIGINT |
Event time (epoch seconds) | 1725945600 |
tx_status |
TEXT |
Status/state of the transaction | "pending" , "completed" , "failed" |
tx_input_data |
JSONB |
Arbitrary input payload | {"amount":100,"currency":"INR"} |
tx_output_data |
JSONB |
Arbitrary output/result payload | {"ok":true,"ref":"ABC123"} |
tx_subject_id |
TEXT |
Subject that owns the transaction (your agent identity, tenant, or user) | "agent_42" |
tx_parent_subject_ids |
TEXT[] |
Related/parent subjects | ["org_main","project_alpha"] |
tx_type |
TEXT |
High-level type | "payment" , "task" , "event" |
tx_sub_type |
TEXT |
Subtype | "upi" , "retry" , "webhook" |
Indexes
- BTREE:
timestamp
,tx_status
,tx_group_id
,tx_subject_id
,tx_type
,tx_sub_type
- GIN:
tx_input_data
,tx_output_data
,tx_parent_subject_ids
Notes
- If you later enable Timescale, you can add a computed
ts timestamptz GENERATED ALWAYS AS (to_timestamp(timestamp)) STORED
and maketransactions
a hypertable onts
.
REST API
Base URL
http://<management-server-host>:8080/
Auth
If enabled in your deployment, pass Authorization: Bearer <token>
.
Content type
Content-Type: application/json
Error shape
{"error": "message"}
with appropriate HTTP status.
Health
GET /health
→200 {"ok": true}
Write APIs (enqueue to Redis, then worker commits to DB)
All return 202 Accepted
with the Redis stream id: {"queued": true, "id": "<stream-id>"}
POST /v1/transactions/insert
Body: full transaction record (must includetx_id
; the SDK can generate one)POST /v1/transactions/upsert
Body: full record; on conflict bytx_id
, fields are updatedPATCH /v1/transactions/{tx_id}/status
Body:{"status":"<new_status>"}
PATCH /v1/transactions/{tx_id}
Body:{"fields": { ... updatable fields ... }}
DELETE /v1/transactions/{tx_id}
Example
curl -X POST http://localhost:8080/v1/transactions/upsert \
-H 'Content-Type: application/json' \
-d '{
"tx_id": "tx_001",
"tx_group_id": "batch_A",
"timestamp": 1725960000,
"tx_status": "pending",
"tx_input_data": {"amount": 500, "currency": "INR"},
"tx_subject_id": "agent_42",
"tx_parent_subject_ids": ["org1"],
"tx_type": "payment",
"tx_sub_type": "upi"
}'
Read/query APIs (query DB synchronously)
GET /v1/transactions/{tx_id}
GET /v1/transactions/list_by_group?tx_group_id=...&limit=...&offset=...&order_by=...
GET /v1/transactions/list_by_status?tx_status=...
GET /v1/transactions/list_by_subject?tx_subject_id=...
GET /v1/transactions/list_by_type?tx_type=...&tx_sub_type=...
GET /v1/transactions/range_by_time?start_ts=...&end_ts=...
POST /v1/transactions/search_input_contains
Body:{"subset": { ... }, "limit": 100, "offset": 0, "order_by": "timestamp DESC"}
POST /v1/transactions/search_output_contains
Body as abovePOST /v1/transactions/json_path_equals
Body:{"column":"tx_input_data","path":["customer","id"],"value":"CUST-1","limit":100,"offset":0}
GET /v1/stats/status_counts?start_ts=...&end_ts=...
GET /v1/stats/type_counts?include_sub_type=true
GET /v1/histogram?bucket_seconds=3600&start_ts=...&end_ts=...&tx_status=...
POST /v1/transactions/query
(generic, safe query builder) Body:
json
{
"where": {
"and": [
{"field":"tx_type","op":"IN","value":["payment","refund"]},
{"field":"timestamp","op":">=","value":1725900000}
]
},
"limit": 200,
"offset": 0,
"order_by": "timestamp DESC",
"select_cols": ["tx_id","timestamp","tx_status","tx_type"]
}
Example
curl -X POST http://localhost:8080/v1/transactions/query \
-H 'Content-Type: application/json' \
-d '{
"where": [
{"field": "tx_status", "op": "=", "value": "completed"},
{"field": "tx_subject_id", "op": "=", "value": "agent_42"}
],
"limit": 50,
"order_by": "timestamp DESC"
}'
Transaction Queue & Ingestion
Why a queue?
Agents often produce bursts of events. The queue decouples producers from the database and allows the server to fold & batch operations for high throughput and resilience.
Transport
- Redis Stream:
TX_STREAM
(defaulttx_queue
) - Consumer Group:
TX_GROUP
(defaulttx_workers
) - Messages are appended with
XADD
; the worker reads withXREADGROUP
and acknowledges withXACK
after a successful DB commit.
Message formats
All messages have a data
field containing JSON:
- Insert
{"op":"insert","record":{...full transaction...}}
- Upsert
{"op":"upsert","record":{...full transaction...}}
- Update status
{"op":"update_status","tx_id":"tx_001","status":"completed"}
- Update fields
{"op":"update_fields","tx_id":"tx_001","fields":{"tx_output_data":{"ok":true}}}
- Delete
{"op":"delete","tx_id":"tx_001"}
Batching & timing
-
The worker accumulates messages and flushes when:
-
batch_size
items have been collected, or max_wait_s
seconds have elapsed since the first item was buffered.- This guarantees bounded latency under light load and high throughput under heavy load.
Folding & idempotency
-
The worker folds multiple operations for the same
tx_id
within a batch: -
Later
upsert
overrides earlier ones in the batch. - Field updates are merged into the final upsert for that
tx_id
. delete
wins and cancels prior upserts/updates for thattx_id
within the batch.- Failed DB writes leave messages pending; they are retried. Stale pending entries are auto-claimed from crashed consumers.
Typical client flow (SDK)
Using the queue client (agents):
from agents_txs import GeneralAgentsTransactionManager
mgr = GeneralAgentsTransactionManager(subject_id="agent_42")
tx_id = mgr.new_tx_id()
mgr.upsert({
"tx_id": tx_id,
"tx_group_id": "batch_A",
"timestamp": 1725960000,
"tx_status": "pending",
"tx_input_data": {"amount": 500, "currency": "INR"},
"tx_type": "payment",
"tx_sub_type": "upi"
})
mgr.update_status(tx_id, "completed")
mgr.update_fields(tx_id, {"tx_output_data": {"ok": True, "ref": "ABC123"}})
Operational settings (env)
REDIS_URL
(e.g.,redis://redis:6379/0
)TX_STREAM
(defaulttx_queue
)TX_GROUP
(defaulttx_workers
)TX_BATCH_SIZE
(default200
)TX_MAX_WAIT_S
(default1.5
)TX_BLOCK_MS
(default1000
)TX_CLAIM_STALE_MS
(default60000
)TX_CLAIM_INTERVAL_S
(default30
)PG_DSN
(e.g.,postgresql://user:pass@host:5432/db
)API_HOST
/API_PORT
(default0.0.0.0:8080
)
Quick Start
- Start the server (runs API in a thread, worker in the main loop):
agents-txs-serve
- From an agent, send an upsert:
curl -X POST http://<server>:8080/v1/transactions/upsert \
-H 'Content-Type: application/json' \
-d '{"tx_id":"tx_demo","timestamp":1725960000,"tx_status":"pending","tx_subject_id":"agent_42"}'
- Query it:
curl -X GET "http://<server>:8080/v1/transactions/list_by_subject?tx_subject_id=agent_42&limit=10"