General Transactions SDK Documentation
1) Introduction
The SDK lets your agents submit transactions and query them from the centrally deployed Transactions Server.
- Writes: Agents enqueue operations (insert/upsert/update/delete) to the tx-queue (Redis Streams). The server’s worker batches and commits to TimescaleDB/PostgreSQL.
- Reads: Agents query the server via REST for low-latency analytics and lookups.
- Subject scoping: Most apps are multi-tenant per agent. The SDK provides a high-level wrapper that automatically sets
tx_subject_id
on writes and scopes reads to the same subject by default.
Key SDK classes
TxQueueClient
: Minimal Redis queue producer (enqueueinsert
,upsert
,update_status
,update_fields
,delete
, andbulk
).TransactionsQueryClient
(HTTP): REST client for reads (and write-enqueue endpoints if desired).GeneralAgentsTransactionManager
: High-level wrapper that combines both, takessubject_id
in the constructor, and offers opinionated helper methods.
The server writes to the DB asynchronously. Read-after-write is eventually consistent: your query may not see a just-enqueued write until the worker flushes the batch (size- or time-triggered).
2) Installation
Option A — from PyPI (recommended)
pip install agents_txs
Option B — from source
Place the modules in your project and install:
pip install -e .
Python version
- Python ≥ 3.9
Dependencies (installed automatically)
redis>=5.0
requests>=2.31
urllib3>=2.0
If you also run the server locally, you’ll need server deps too (psycopg
, Flask
, etc.), but for client-only usage, the above is enough.
Imports
If installed as a package:
from agents_txs import (
TxQueueClient,
TransactionsQueryClient, # alias for TransacationsQueryClient
GeneralAgentsTransactionManager,
)
If you vendored files locally, import from your module paths:
from tx_queue_client import TxQueueClient
from transactions_query_client import TransactionsQueryClient
from general_agents_transaction_manager import GeneralAgentsTransactionManager
Environment variables (optional defaults)
TX_API_BASE_URL
– e.g.http://tx-server.tx.svc.cluster.local:8080/
TX_API_KEY
– bearer token if your deployment uses authREDIS_URL
– e.g.redis://:password@tx-queue.tx.svc.cluster.local:6379/0
TX_STREAM
– defaulttx_queue
TX_STREAM_MAXLEN
– default100000
(approximate trimming)
You can also pass these explicitly when constructing clients.
3) Usage Guide
3.1 High-level wrapper: GeneralAgentsTransactionManager
This is the easiest entry point for agents. It takes a subject_id
and wires up both a queue client and a REST client.
from agents_txs import GeneralAgentsTransactionManager
mgr = GeneralAgentsTransactionManager(
subject_id="agent_42", # required
rest_base_url="http://tx-server:8080/",
redis_url="redis://:pass@tx-queue:6379/0",
)
Create or upsert a transaction
tx_id = mgr.new_tx_id() # generates a unique tx_id (e.g., "tx_<uuid>")
mgr.upsert({
"tx_id": tx_id,
"tx_group_id": "batch_A",
"timestamp": 1725960000, # epoch seconds
"tx_status": "pending",
"tx_input_data": {"amount": 500, "currency": "INR"},
"tx_type": "payment",
"tx_sub_type": "upi"
})
The wrapper automatically sets
tx_subject_id="agent_42"
if not provided.
Update status / fields / delete
mgr.update_status(tx_id, "completed")
mgr.update_fields(tx_id, {"tx_output_data": {"ok": True, "ref": "ABC123"}})
mgr.delete(tx_id)
Bulk operations
mgr.bulk_upsert([
{"tx_id": mgr.new_tx_id(), "timestamp": 1725960100, "tx_status": "pending", "tx_type": "task", "tx_sub_type": "sync"},
{"tx_id": mgr.new_tx_id(), "timestamp": 1725960200, "tx_status": "pending", "tx_type": "task", "tx_sub_type": "sync"},
])
mgr.bulk_update_status([
{"tx_id": tx_id, "status": "completed"},
])
mgr.bulk_update_fields([
{"tx_id": tx_id, "fields": {"tx_output_data": {"ok": True}}},
])
mgr.bulk_delete([tx_id])
Query (subject-scoped by default)
# Get by id (ensures subject matches)
one = mgr.get(tx_id)
# List recent for this subject
recent = mgr.list_recent(limit=50)
# Filter by status for this subject
completed = mgr.list_by_status("completed", limit=100)
# Time range
between = mgr.range_by_time(1725960000, 1725963600, limit=100)
# JSON search on input/output
payments_in_inr = mgr.search_input_contains({"currency": "INR"}, limit=100)
# Generic query with JSON path
rows = mgr.json_path_equals("tx_input_data", ["customer", "id"], "CUST-1", limit=50)
# Aggregations
status_counts = mgr.status_counts()
type_counts = mgr.type_counts(include_sub_type=True)
To query across all subjects, pass include_all_subjects=True
to the wrapper methods, e.g.:
all_recent = mgr.list_recent(limit=50, include_all_subjects=True)
3.2 Queue-only: TxQueueClient
Use when your agent only needs to enqueue and doesn’t require reads.
from agents_txs import TxQueueClient
q = TxQueueClient(redis_url="redis://:pass@tx-queue:6379/0", stream_key="tx_queue")
q.upsert({
"tx_id": "tx_123",
"timestamp": 1725960000,
"tx_status": "pending",
"tx_subject_id": "agent_42",
"tx_type": "payment",
"tx_sub_type": "upi"
})
q.update_status("tx_123", "completed")
q.update_fields("tx_123", {"tx_output_data": {"ok": True}})
q.delete("tx_123")
q.bulk([
{"op": "upsert", "record": {"tx_id": "tx_abc", "timestamp": 1725960300, "tx_status": "pending", "tx_subject_id": "agent_42"}},
{"op": "update_status", "tx_id": "tx_abc", "status": "completed"},
])
Tips
- Always include
tx_subject_id
with writes if you’re not using the wrapper. - Expect eventual consistency on reads (the worker batches). See “Read-after-write” below.
3.3 REST-only: TransactionsQueryClient
Use for queries, dashboards, and admin tools.
from agents_txs import TransactionsQueryClient
api = TransactionsQueryClient(base_url="http://tx-server:8080/") # add api_key=... if needed
one = api.get_transaction("tx_123")
by_group = api.list_by_group("batch_A", limit=50)
by_status = api.list_by_status("completed", limit=100)
time_range = api.range_by_time(1725960000, 1725963600)
inp_contains = api.search_input_contains({"currency": "INR"})
hist = api.histogram(3600, start_ts=1725960000, end_ts=1725967200, tx_status="completed")
rows = api.query({
"and": [
{"field":"tx_type","op":"IN","value":["payment","refund"]},
{"field":"timestamp","op":">=","value":1725960000}
]
}, limit=200)
The REST client also exposes the same enqueue endpoints (insert
, upsert
, update_status
, update_fields
, delete
) which forward to the server. In most deployments, agents call the queue directly for lowest latency; the REST write endpoints are handy for tooling and scripts.
3.4 Read-after-write & eventual consistency
Writes go to the queue; the worker flushes to DB when:
- batch is full (
TX_BATCH_SIZE
), or - max wait elapsed (
TX_MAX_WAIT_S
)
If you must confirm persistence before proceeding (e.g., a follow-up query depends on the write), poll with a short backoff:
import time
def wait_until_visible(mgr, tx_id, timeout_s=5.0, interval_s=0.2):
deadline = time.time() + timeout_s
while time.time() < deadline:
row = mgr.get(tx_id) # subject-scoped
if row:
return row
time.sleep(interval_s)
return None
tx_id = mgr.new_tx_id()
mgr.upsert({"tx_id": tx_id, "timestamp": 1725960000, "tx_status": "pending"})
row = wait_until_visible(mgr, tx_id)
Tune TX_MAX_WAIT_S
server-side if your app needs tighter visibility.
3.5 Data model recap (for writers)
A valid record (for insert
/upsert
) has:
{
"tx_id": "tx_001", # required (use mgr.new_tx_id() if needed)
"tx_group_id": "batch_A",
"timestamp": 1725960000, # epoch seconds
"tx_status": "pending",
"tx_input_data": {"k": "v"}, # JSON-serializable
"tx_output_data": {"k": "v"},
"tx_subject_id": "agent_42", # critical for multi-tenant scoping
"tx_parent_subject_ids": ["orgA"], # list[str]
"tx_type": "payment",
"tx_sub_type": "upi"
}
For update_fields
, pass only the fields to update (same column names).
3.6 Error handling & retries
TxQueueClient
raisesTxQueueError
on failures (e.g., Redis unreachable). Wrap enqueues with retry/backoff if your agent can tolerate transient failures.TransactionsQueryClient
usesrequests
with built-in retry (5xx) and raises for non-2xx. Catchrequests.HTTPError
where appropriate.- The server worker only acks queue messages after a successful DB transaction. If a batch fails, those messages remain pending and are retried or claimed by another worker.
3.7 Performance tips
- Prefer bulk enqueues when sending many items.
- If your traffic is bursty but you need faster visibility, lower
TX_MAX_WAIT_S
or raiseTX_BATCH_SIZE
(trade-offs apply). - Keep
tx_input_data
/tx_output_data
concise; store large blobs elsewhere and reference them by ID.
3.8 Security
- Use a locked-down
REDIS_URL
(auth enabled; NetworkPolicy restricting who can connect). - If REST auth is enabled, set
api_key
onTransactionsQueryClient
or viaTX_API_KEY
. - Avoid sending secrets inside
tx_input_data
; prefer references.
3.9 Troubleshooting
-
Queue ping fails:
-
Check DNS of the service name.
- Verify password and that
requirepass
matches yourREDIS_URL
. - Confirm NetworkPolicy allows agent → Redis connectivity.
-
Writes visible too slowly:
-
Reduce server
TX_MAX_WAIT_S
or increaseTX_BATCH_SIZE
to better match your load. - Ensure server has enough CPU.
-
Missing subject:
-
If not using the wrapper, remember to include
tx_subject_id
to avoid orphaned data or cross-tenant leaks.
3.10 Quick recipes
Fire-and-forget payment record
mgr.upsert({
"tx_id": mgr.new_tx_id(),
"timestamp": int(time.time()),
"tx_status": "pending",
"tx_input_data": {"amount": 999, "currency": "INR"},
"tx_type": "payment", "tx_sub_type": "upi",
})
Mark done with output
mgr.update_status(tx_id, "completed")
mgr.update_fields(tx_id, {"tx_output_data": {"ok": True, "ref": "R-777"}})
Query all completed for this agent in last hour
now = int(time.time())
rows = mgr.query([
{"field": "tx_subject_id", "op": "=", "value": "agent_42"},
{"field": "tx_status", "op": "=", "value": "completed"},
{"field": "timestamp", "op": ">=", "value": now - 3600},
], limit=200)