Skip to content

darshjme/a2a-reliability-starter

Repository files navigation

A2A Reliability Starter Kit

A production-ready boilerplate for building reliable agents on top of Google's Agent-to-Agent (A2A) protocol. It wraps every outbound A2A call with a circuit breaker, exponential-backoff retry, sliding-window rate limiter, and distributed tracing — all wired together out of the box. Drop this kit into your project on day one and ship a resilient multi-agent system without reinventing the reliability stack.


Quick Start

# 1. Install dependencies
pip install -r requirements.txt

# 2. Run the live demo (no real A2A server needed — uses mocks)
python3 demo.py

# 3. Run the test suite
python3 -m pytest tests/ -v

Architecture

┌─────────────────────────────────────────────────────────┐
│                   Your Application                      │
└─────────────────┬───────────────────────────────────────┘
                  │ send_task(A2ATask)
                  ▼
┌─────────────────────────────────────────────────────────┐
│              A2ARateLimiter  (rate_limiter.py)          │
│   Sliding-window guard — blocks calls over quota        │
└─────────────────┬───────────────────────────────────────┘
                  │ enforce(agent_url)
                  ▼
┌─────────────────────────────────────────────────────────┐
│           ReliableA2AClient  (reliable_a2a_client.py)  │
│                                                         │
│  ┌───────────────────────────────────────────────────┐  │
│  │  RetryExecutor  (punarjanma pattern)               │  │
│  │  • Exponential backoff + full jitter               │  │
│  │  • Retries on 5xx / network errors                 │  │
│  │  • MaxRetriesExceeded after N attempts             │  │
│  └──────────────────┬────────────────────────────────┘  │
│                     │                                    │
│  ┌──────────────────▼────────────────────────────────┐  │
│  │  CircuitBreaker  (kavacha pattern)                 │  │
│  │  • CLOSED → OPEN after N failures                  │  │
│  │  • HALF-OPEN probe after recovery_timeout          │  │
│  │  • OPEN → CLOSED after M successes                 │  │
│  └──────────────────┬────────────────────────────────┘  │
│                     │                                    │
│  ┌──────────────────▼────────────────────────────────┐  │
│  │  Tracer  (anusarana pattern)                       │  │
│  │  • Root span per send_task / get_task call         │  │
│  │  • Child HTTP span with URL + duration             │  │
│  │  • Error span capture on failure                   │  │
│  └──────────────────┬────────────────────────────────┘  │
└─────────────────────┼───────────────────────────────────┘
                      │ POST /tasks/send
                      ▼
         ┌────────────────────────┐
         │  Remote A2A Agent      │
         │  (HTTP JSON-RPC 2.0)   │
         └────────────────────────┘

┌─────────────────────────────────────────────────────────┐
│           A2ASessionManager  (session_manager.py)       │
│   Per-user conversation history + TTL eviction          │
│   (sanga pattern — Message / Session / SessionStore)    │
└─────────────────────────────────────────────────────────┘

File Reference

File Purpose
config.py Central AgentConfig dataclass — all tuneable thresholds
reliable_a2a_client.py ReliableA2AClient — circuit-breaker + retry + tracing
session_manager.py A2ASessionManager — per-session state (sanga pattern)
rate_limiter.py A2ARateLimiter — sliding-window quota guard
demo.py Five runnable demos; no real server required
tests/ 20+ pytest tests covering all components

Configuration

All defaults are overridable via environment variables or by passing an AgentConfig instance:

from config import AgentConfig, CircuitBreakerConfig, RetryConfig

config = AgentConfig(
    agent_urls=["http://my-agent:8080"],
    request_timeout=15.0,
    circuit_breaker=CircuitBreakerConfig(
        failure_threshold=5,
        recovery_timeout=30.0,
    ),
    retry=RetryConfig(
        max_attempts=4,
        base_delay=0.5,
        max_delay=10.0,
    ),
)
Env Var Default Description
A2A_AGENT_URL http://localhost:8080 Default agent endpoint
A2A_REQUEST_TIMEOUT 10.0 Per-call HTTP timeout (seconds)
CB_FAILURE_THRESHOLD 5 Failures before circuit opens
CB_RECOVERY_TIMEOUT 30.0 Seconds before HALF-OPEN probe
RETRY_MAX_ATTEMPTS 3 Total call attempts
RETRY_BASE_DELAY 0.5 Initial backoff delay (seconds)
RATE_MAX_REQUESTS 30 Max outbound calls per window
RATE_WINDOW_SECONDS 60.0 Sliding window length (seconds)
SESSION_TTL_SECONDS 3600.0 Session TTL before eviction

Usage Example

from config import AgentConfig
from rate_limiter import A2ARateLimiter, RateLimitExceeded
from reliable_a2a_client import A2ATask, ReliableA2AClient, CircuitOpenError, MaxRetriesExceeded
from session_manager import A2ASessionManager

config = AgentConfig()
limiter = A2ARateLimiter()
session_mgr = A2ASessionManager()

with ReliableA2AClient(agent_url="http://my-agent:8080", config=config) as client:
    session = session_mgr.get_or_create("user-123")
    task = A2ATask(message="Summarise the attached report.")
    session.add("user", task.message)

    try:
        limiter.enforce("http://my-agent:8080")
        response = client.send_task(task)
        session.add("agent", response.text)
        print(response.text)
        print(client.trace_summary())
    except RateLimitExceeded as e:
        print(f"Rate limited — retry in {e.retry_after:.0f}s")
    except CircuitOpenError as e:
        print(f"Agent is down — {e}")
    except MaxRetriesExceeded as e:
        print(f"All retries failed — {e}")

Multi-Provider Fallback

When building resilient A2A agents, configure a prioritised provider list so the circuit breaker can automatically fall back to the next available model:

# config.py — recommended provider priority (March 2026)
PROVIDERS = [
    "anthropic/claude-opus-4-6",
    "anthropic/claude-sonnet-4-6",
    "nvidia/moonshotai/kimi-k2.5",   # MiMo-V2-Pro — free, 1M ctx
    "google/gemini-3.1-pro",          # Gemini 3.1 Pro — March 2026
    "google/gemini-3.1-flash",        # Fast fallback
]

Pass the list to AgentConfig.agent_urls (one URL per provider endpoint) and ReliableA2AClient will iterate through them as each circuit opens.

March 2026 note: Gemini 3.1 Pro and Xiaomi MiMo-V2-Pro are recommended as free/low-cost fallback tiers for production A2A workloads.


Arsenal Integration

This starter kit mirrors the API surface of Arsenal — a collection of production-grade Python libraries for AI agents:

Arsenal Module Pattern Used Inline Class
kavacha Circuit Breaker CircuitBreaker in reliable_a2a_client.py
punarjanma Retry + Backoff RetryPolicy / RetryExecutor
anusarana Distributed Tracing Tracer / Span
maryada Rate Limiting RequestRateLimiter in rate_limiter.py
sanga Session Management Session / SessionStore in session_manager.py

Once Arsenal is published to PyPI, replace the inline implementations by uncommenting the relevant lines in requirements.txt and updating the imports.


Running Tests

python3 -m pytest tests/ -v

The test suite covers:

  • ✅ Successful task submission
  • ✅ Retry on 5xx / network errors
  • ✅ No retry on 4xx client errors
  • ✅ Circuit breaker state transitions (CLOSED → OPEN → HALF-OPEN → CLOSED)
  • ✅ Rate limiter allow / block / reset behaviour
  • ✅ Session creation, message history, TTL eviction
  • ✅ Trace span emission per call
  • ✅ Error span capture on failure

License

MIT


Built by Darshankumar Joshi

About

Production-ready reliability layer for Google A2A Protocol agents — circuit-breaker, retry, tracing, rate-limiting, session management. Zero external dependencies. 20 tests.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages