Skip to content

General Transactions SDK Documentation

1) Introduction

The SDK lets your agents submit transactions and query them from the centrally deployed Transactions Server.

  • Writes: Agents enqueue operations (insert/upsert/update/delete) to the tx-queue (Redis Streams). The server’s worker batches and commits to TimescaleDB/PostgreSQL.
  • Reads: Agents query the server via REST for low-latency analytics and lookups.
  • Subject scoping: Most apps are multi-tenant per agent. The SDK provides a high-level wrapper that automatically sets tx_subject_id on writes and scopes reads to the same subject by default.

Key SDK classes

  • TxQueueClient: Minimal Redis queue producer (enqueue insert, upsert, update_status, update_fields, delete, and bulk).
  • TransactionsQueryClient (HTTP): REST client for reads (and write-enqueue endpoints if desired).
  • GeneralAgentsTransactionManager: High-level wrapper that combines both, takes subject_id in the constructor, and offers opinionated helper methods.

The server writes to the DB asynchronously. Read-after-write is eventually consistent: your query may not see a just-enqueued write until the worker flushes the batch (size- or time-triggered).


2) Installation

pip install agents_txs

Option B — from source

Place the modules in your project and install:

pip install -e .

Python version

  • Python ≥ 3.9

Dependencies (installed automatically)

  • redis>=5.0
  • requests>=2.31
  • urllib3>=2.0

If you also run the server locally, you’ll need server deps too (psycopg, Flask, etc.), but for client-only usage, the above is enough.

Imports

If installed as a package:

from agents_txs import (
    TxQueueClient,
    TransactionsQueryClient,  # alias for TransacationsQueryClient
    GeneralAgentsTransactionManager,
)

If you vendored files locally, import from your module paths:

from tx_queue_client import TxQueueClient
from transactions_query_client import TransactionsQueryClient
from general_agents_transaction_manager import GeneralAgentsTransactionManager

Environment variables (optional defaults)

  • TX_API_BASE_URL – e.g. http://tx-server.tx.svc.cluster.local:8080/
  • TX_API_KEY – bearer token if your deployment uses auth
  • REDIS_URL – e.g. redis://:password@tx-queue.tx.svc.cluster.local:6379/0
  • TX_STREAM – default tx_queue
  • TX_STREAM_MAXLEN – default 100000 (approximate trimming)

You can also pass these explicitly when constructing clients.


3) Usage Guide

3.1 High-level wrapper: GeneralAgentsTransactionManager

This is the easiest entry point for agents. It takes a subject_id and wires up both a queue client and a REST client.

from agents_txs import GeneralAgentsTransactionManager

mgr = GeneralAgentsTransactionManager(
    subject_id="agent_42",                 # required
    rest_base_url="http://tx-server:8080/",
    redis_url="redis://:pass@tx-queue:6379/0",
)

Create or upsert a transaction

tx_id = mgr.new_tx_id()  # generates a unique tx_id (e.g., "tx_<uuid>")

mgr.upsert({
    "tx_id": tx_id,
    "tx_group_id": "batch_A",
    "timestamp": 1725960000,               # epoch seconds
    "tx_status": "pending",
    "tx_input_data": {"amount": 500, "currency": "INR"},
    "tx_type": "payment",
    "tx_sub_type": "upi"
})

The wrapper automatically sets tx_subject_id="agent_42" if not provided.

Update status / fields / delete

mgr.update_status(tx_id, "completed")
mgr.update_fields(tx_id, {"tx_output_data": {"ok": True, "ref": "ABC123"}})
mgr.delete(tx_id)

Bulk operations

mgr.bulk_upsert([
    {"tx_id": mgr.new_tx_id(), "timestamp": 1725960100, "tx_status": "pending", "tx_type": "task", "tx_sub_type": "sync"},
    {"tx_id": mgr.new_tx_id(), "timestamp": 1725960200, "tx_status": "pending", "tx_type": "task", "tx_sub_type": "sync"},
])

mgr.bulk_update_status([
    {"tx_id": tx_id, "status": "completed"},
])

mgr.bulk_update_fields([
    {"tx_id": tx_id, "fields": {"tx_output_data": {"ok": True}}},
])

mgr.bulk_delete([tx_id])

Query (subject-scoped by default)

# Get by id (ensures subject matches)
one = mgr.get(tx_id)

# List recent for this subject
recent = mgr.list_recent(limit=50)

# Filter by status for this subject
completed = mgr.list_by_status("completed", limit=100)

# Time range
between = mgr.range_by_time(1725960000, 1725963600, limit=100)

# JSON search on input/output
payments_in_inr = mgr.search_input_contains({"currency": "INR"}, limit=100)

# Generic query with JSON path
rows = mgr.json_path_equals("tx_input_data", ["customer", "id"], "CUST-1", limit=50)

# Aggregations
status_counts = mgr.status_counts()
type_counts = mgr.type_counts(include_sub_type=True)

To query across all subjects, pass include_all_subjects=True to the wrapper methods, e.g.:

all_recent = mgr.list_recent(limit=50, include_all_subjects=True)

3.2 Queue-only: TxQueueClient

Use when your agent only needs to enqueue and doesn’t require reads.

from agents_txs import TxQueueClient

q = TxQueueClient(redis_url="redis://:pass@tx-queue:6379/0", stream_key="tx_queue")

q.upsert({
    "tx_id": "tx_123",
    "timestamp": 1725960000,
    "tx_status": "pending",
    "tx_subject_id": "agent_42",
    "tx_type": "payment",
    "tx_sub_type": "upi"
})

q.update_status("tx_123", "completed")
q.update_fields("tx_123", {"tx_output_data": {"ok": True}})
q.delete("tx_123")

q.bulk([
    {"op": "upsert", "record": {"tx_id": "tx_abc", "timestamp": 1725960300, "tx_status": "pending", "tx_subject_id": "agent_42"}},
    {"op": "update_status", "tx_id": "tx_abc", "status": "completed"},
])

Tips

  • Always include tx_subject_id with writes if you’re not using the wrapper.
  • Expect eventual consistency on reads (the worker batches). See “Read-after-write” below.

3.3 REST-only: TransactionsQueryClient

Use for queries, dashboards, and admin tools.

from agents_txs import TransactionsQueryClient

api = TransactionsQueryClient(base_url="http://tx-server:8080/")  # add api_key=... if needed

one = api.get_transaction("tx_123")
by_group = api.list_by_group("batch_A", limit=50)
by_status = api.list_by_status("completed", limit=100)
time_range = api.range_by_time(1725960000, 1725963600)
inp_contains = api.search_input_contains({"currency": "INR"})
hist = api.histogram(3600, start_ts=1725960000, end_ts=1725967200, tx_status="completed")

rows = api.query({
  "and": [
    {"field":"tx_type","op":"IN","value":["payment","refund"]},
    {"field":"timestamp","op":">=","value":1725960000}
  ]
}, limit=200)

The REST client also exposes the same enqueue endpoints (insert, upsert, update_status, update_fields, delete) which forward to the server. In most deployments, agents call the queue directly for lowest latency; the REST write endpoints are handy for tooling and scripts.


3.4 Read-after-write & eventual consistency

Writes go to the queue; the worker flushes to DB when:

  • batch is full (TX_BATCH_SIZE), or
  • max wait elapsed (TX_MAX_WAIT_S)

If you must confirm persistence before proceeding (e.g., a follow-up query depends on the write), poll with a short backoff:

import time

def wait_until_visible(mgr, tx_id, timeout_s=5.0, interval_s=0.2):
    deadline = time.time() + timeout_s
    while time.time() < deadline:
        row = mgr.get(tx_id)      # subject-scoped
        if row:
            return row
        time.sleep(interval_s)
    return None

tx_id = mgr.new_tx_id()
mgr.upsert({"tx_id": tx_id, "timestamp": 1725960000, "tx_status": "pending"})
row = wait_until_visible(mgr, tx_id)

Tune TX_MAX_WAIT_S server-side if your app needs tighter visibility.


3.5 Data model recap (for writers)

A valid record (for insert/upsert) has:

{
  "tx_id": "tx_001",                  # required (use mgr.new_tx_id() if needed)
  "tx_group_id": "batch_A",
  "timestamp": 1725960000,            # epoch seconds
  "tx_status": "pending",
  "tx_input_data": {"k": "v"},        # JSON-serializable
  "tx_output_data": {"k": "v"},
  "tx_subject_id": "agent_42",        # critical for multi-tenant scoping
  "tx_parent_subject_ids": ["orgA"],  # list[str]
  "tx_type": "payment",
  "tx_sub_type": "upi"
}

For update_fields, pass only the fields to update (same column names).


3.6 Error handling & retries

  • TxQueueClient raises TxQueueError on failures (e.g., Redis unreachable). Wrap enqueues with retry/backoff if your agent can tolerate transient failures.
  • TransactionsQueryClient uses requests with built-in retry (5xx) and raises for non-2xx. Catch requests.HTTPError where appropriate.
  • The server worker only acks queue messages after a successful DB transaction. If a batch fails, those messages remain pending and are retried or claimed by another worker.

3.7 Performance tips

  • Prefer bulk enqueues when sending many items.
  • If your traffic is bursty but you need faster visibility, lower TX_MAX_WAIT_S or raise TX_BATCH_SIZE (trade-offs apply).
  • Keep tx_input_data/tx_output_data concise; store large blobs elsewhere and reference them by ID.

3.8 Security

  • Use a locked-down REDIS_URL (auth enabled; NetworkPolicy restricting who can connect).
  • If REST auth is enabled, set api_key on TransactionsQueryClient or via TX_API_KEY.
  • Avoid sending secrets inside tx_input_data; prefer references.

3.9 Troubleshooting

  • Queue ping fails:

  • Check DNS of the service name.

  • Verify password and that requirepass matches your REDIS_URL.
  • Confirm NetworkPolicy allows agent → Redis connectivity.
  • Writes visible too slowly:

  • Reduce server TX_MAX_WAIT_S or increase TX_BATCH_SIZE to better match your load.

  • Ensure server has enough CPU.
  • Missing subject:

  • If not using the wrapper, remember to include tx_subject_id to avoid orphaned data or cross-tenant leaks.


3.10 Quick recipes

Fire-and-forget payment record

mgr.upsert({
  "tx_id": mgr.new_tx_id(),
  "timestamp": int(time.time()),
  "tx_status": "pending",
  "tx_input_data": {"amount": 999, "currency": "INR"},
  "tx_type": "payment", "tx_sub_type": "upi",
})

Mark done with output

mgr.update_status(tx_id, "completed")
mgr.update_fields(tx_id, {"tx_output_data": {"ok": True, "ref": "R-777"}})

Query all completed for this agent in last hour

now = int(time.time())
rows = mgr.query([
  {"field": "tx_subject_id", "op": "=", "value": "agent_42"},
  {"field": "tx_status", "op": "=", "value": "completed"},
  {"field": "timestamp", "op": ">=", "value": now - 3600},
], limit=200)