Concurrent task dispatcher for parallel agent execution. Run dozens of LLM calls simultaneously — zero dependencies, pure Python threads.
Part of the Vedic Arsenal — 100 production-grade Python libraries for LLM agents.
LLM API calls are slow — typically 2–15 seconds each. Sequential execution of N calls takes N × latency seconds. For agentic pipelines that must call an LLM to summarize 50 documents, or fan out to 20 tools simultaneously, sequential execution is a non-starter.
duta provides a clean, minimal ThreadPoolExecutor-backed dispatcher that runs tasks concurrently, collects results in order, enforces per-task timeouts, and handles failures gracefully — without requiring asyncio, Celery, Ray, or any external dependency. Install it, dispatch tasks, get results back as a list.
Result: A 50-task pipeline that would take 250 seconds sequentially completes in ~10 seconds with max_workers=16.
pip install agent-dispatcherOr from source:
git clone https://github.com/darshjme/duta.git
cd duta && pip install -e .from agent_dispatcher import Task, Dispatcher
import uuid
# Define your workload
def summarize(doc: str) -> str:
return call_llm(f"Summarize: {doc}")
docs = ["Doc A content...", "Doc B content...", "Doc C content..."]
tasks = [
Task(id=str(uuid.uuid4()), func=summarize, args=(doc,))
for doc in docs
]
# Dispatch all tasks in parallel, wait for results
with Dispatcher(max_workers=8, timeout_seconds=30) as d:
results = d.dispatch(tasks)
for r in results:
if r.success:
print(f"[{r.task_id[:8]}] ({r.duration_ms:.0f}ms) {r.result}")
else:
print(f"[{r.task_id[:8]}] FAILED: {r.error}")from agent_dispatcher import Task, Dispatcher
import uuid, time
dispatcher = Dispatcher(max_workers=4)
# Fire and forget (non-blocking)
task_id = dispatcher.submit(Task(id="t1", func=long_running_job, args=("input",)))
print(f"Submitted: {task_id}")
# Do other work...
time.sleep(1)
# Retrieve result
result = dispatcher.get_result("t1")
print(result.success, result.result)
dispatcher.shutdown()from agent_dispatcher import dispatch_parallel
@dispatch_parallel(max_workers=12, timeout_seconds=60)
def call_llm(prompt: str) -> str:
return openai_client.chat(prompt)
# Pass a list of arg-tuples — each gets dispatched as a parallel task
prompts = [
("Summarize the Q1 report",),
("List key risks from the filing",),
("Extract all financial metrics",),
("Identify management tone and sentiment",),
]
responses = call_llm(prompts) # all 4 run in parallel
for resp in responses:
print(resp)class Task:
"""A dispatchable unit of work.
Args:
id: Unique identifier (use uuid.uuid4() for production).
func: Callable to execute.
args: Positional arguments for func. Default: ().
kwargs: Keyword arguments for func. Default: {}.
priority: Reserved for future priority queuing. Default: 0.
"""
id: str
func: Callable
args: tuple
kwargs: dict
priority: int
status: str # "pending" | "running" | "done" | "failed" | "timeout"
result: Any # Set after execution
error: str | None # Set on failure
duration_ms: float | None # Wall-clock execution timeclass Dispatcher:
"""ThreadPoolExecutor-backed concurrent task dispatcher.
Args:
max_workers: Thread pool size. Default: 4.
timeout_seconds: Per-task timeout. Tasks exceeding this are cancelled. Default: 30.0.
"""
def dispatch(self, tasks: list[Task]) -> list[DispatchResult]:
"""Run all tasks concurrently. Returns results in input order.
Blocks until all tasks complete or timeout."""
def submit(self, task: Task) -> str:
"""Non-blocking submit. Returns task_id for later retrieval."""
def get_result(self, task_id: str) -> DispatchResult | None:
"""Retrieve result of a previously submitted task, or None if not done."""
def shutdown(self, wait: bool = True) -> None:
"""Gracefully shut down the thread pool."""Also works as a context manager (with Dispatcher(...) as d: ...).
class DispatchResult:
task_id: str # Matches the Task.id
success: bool # False if exception was raised or task timed out
result: Any # Return value of func (None on failure)
error: str | None # Exception message on failure
duration_ms: float # Actual wall-clock time in millisecondsdef dispatch_parallel(max_workers: int = 4, timeout_seconds: float = 30.0):
"""Decorator factory.
Wraps a function so that calling it with a list of arg-tuples dispatches
each tuple as an independent parallel task.
Raises RuntimeError on first task failure.
Returns results in input order.
"""Multi-stage agentic pipeline processing a batch of research papers:
import uuid
from agent_dispatcher import Task, Dispatcher
# Stage 1: Extract metadata from 20 papers in parallel
def extract_metadata(paper_path: str) -> dict:
text = read_pdf(paper_path)
return call_llm_structured(f"Extract title, authors, year, abstract: {text[:2000]}")
papers = load_paper_paths("./papers/") # 20 PDFs
with Dispatcher(max_workers=10, timeout_seconds=45) as d:
extraction_tasks = [
Task(id=str(uuid.uuid4()), func=extract_metadata, args=(p,))
for p in papers
]
extraction_results = d.dispatch(extraction_tasks)
# Collect successes, log failures
metadata = []
for r in extraction_results:
if r.success:
metadata.append(r.result)
else:
print(f"⚠ Extraction failed after {r.duration_ms:.0f}ms: {r.error}")
print(f"Extracted {len(metadata)}/{len(papers)} papers in parallel")
# Stage 2: Cross-compare findings (fan-out)
def compare_papers(meta_a: dict, meta_b: dict) -> str:
return call_llm(f"Compare: {meta_a['title']} vs {meta_b['title']}")
comparisons = [
Task(id=str(uuid.uuid4()), func=compare_papers, args=(metadata[i], metadata[i+1]))
for i in range(0, len(metadata) - 1, 2)
]
with Dispatcher(max_workers=8, timeout_seconds=60) as d:
comparison_results = d.dispatch(comparisons)
print(f"Completed {len(comparison_results)} cross-comparisons in parallel")| Scenario | Sequential | duta (8 workers) |
Speedup |
|---|---|---|---|
| 10 × 5s LLM calls | 50s | ~7s | 7× |
| 50 × 3s LLM calls | 150s | ~20s | 7.5× |
| 20 × 10s tool calls | 200s | ~28s | 7× |
Actual speedup depends on API rate limits and I/O saturation.
दूत — the Divine Messenger. In the Ramayana, Hanuman (the supreme duta) did not travel to Lanka in sequence — he leapt across the ocean in one bound, fulfilled many missions simultaneously, and returned with perfect results. Delay was dharma-breaking; the mission demanded speed.
duta brings this principle to software. Don't make your agents wait in line. Dispatch them all at once.
duta is one of 100 libraries in darshjme/arsenal:
| Library | Source | Purpose |
|---|---|---|
duta |
Ramayana — Sundarakanda | Task dispatch |
niti |
Chanakya / Nitishastra | Policy enforcement |
smriti |
Vedic Smriti tradition | LLM caching |
kala |
Mahabharata BG 11.32 | Timeout management |
raksha |
Ramayana — Sundarakanda | Agent security |
- Fork the repo
- Create a feature branch (
git checkout -b fix/your-fix) - Add tests — zero external dependencies only
- Submit a PR
MIT © Darshankumar Joshi
📨 Built by Darshankumar Joshi · @thedarshanjoshi
"कर्मण्येवाधिकारस्ते मा फलेषु कदाचन" Your right is to action alone, never to its fruits. — Bhagavad Gita 2.47