Skip to content

darshjme/duta

दूत — duta by Darshankumar Joshi

📨 दूत · duta

Concurrent task dispatcher for parallel agent execution. Run dozens of LLM calls simultaneously — zero dependencies, pure Python threads.

PyPI Version Python Tests Zero Dependencies License Vedic Arsenal

Part of the Vedic Arsenal — 100 production-grade Python libraries for LLM agents.


Why duta Exists

LLM API calls are slow — typically 2–15 seconds each. Sequential execution of N calls takes N × latency seconds. For agentic pipelines that must call an LLM to summarize 50 documents, or fan out to 20 tools simultaneously, sequential execution is a non-starter.

duta provides a clean, minimal ThreadPoolExecutor-backed dispatcher that runs tasks concurrently, collects results in order, enforces per-task timeouts, and handles failures gracefully — without requiring asyncio, Celery, Ray, or any external dependency. Install it, dispatch tasks, get results back as a list.

Result: A 50-task pipeline that would take 250 seconds sequentially completes in ~10 seconds with max_workers=16.


Installation

pip install agent-dispatcher

Or from source:

git clone https://github.com/darshjme/duta.git
cd duta && pip install -e .

Quick Start

from agent_dispatcher import Task, Dispatcher
import uuid

# Define your workload
def summarize(doc: str) -> str:
    return call_llm(f"Summarize: {doc}")

docs = ["Doc A content...", "Doc B content...", "Doc C content..."]

tasks = [
    Task(id=str(uuid.uuid4()), func=summarize, args=(doc,))
    for doc in docs
]

# Dispatch all tasks in parallel, wait for results
with Dispatcher(max_workers=8, timeout_seconds=30) as d:
    results = d.dispatch(tasks)

for r in results:
    if r.success:
        print(f"[{r.task_id[:8]}] ({r.duration_ms:.0f}ms) {r.result}")
    else:
        print(f"[{r.task_id[:8]}] FAILED: {r.error}")

One-Shot Async Submit

from agent_dispatcher import Task, Dispatcher
import uuid, time

dispatcher = Dispatcher(max_workers=4)

# Fire and forget (non-blocking)
task_id = dispatcher.submit(Task(id="t1", func=long_running_job, args=("input",)))
print(f"Submitted: {task_id}")

# Do other work...
time.sleep(1)

# Retrieve result
result = dispatcher.get_result("t1")
print(result.success, result.result)

dispatcher.shutdown()

Decorator Pattern

from agent_dispatcher import dispatch_parallel

@dispatch_parallel(max_workers=12, timeout_seconds=60)
def call_llm(prompt: str) -> str:
    return openai_client.chat(prompt)

# Pass a list of arg-tuples — each gets dispatched as a parallel task
prompts = [
    ("Summarize the Q1 report",),
    ("List key risks from the filing",),
    ("Extract all financial metrics",),
    ("Identify management tone and sentiment",),
]

responses = call_llm(prompts)  # all 4 run in parallel
for resp in responses:
    print(resp)

API Reference

Task

class Task:
    """A dispatchable unit of work.

    Args:
        id:       Unique identifier (use uuid.uuid4() for production).
        func:     Callable to execute.
        args:     Positional arguments for func. Default: ().
        kwargs:   Keyword arguments for func. Default: {}.
        priority: Reserved for future priority queuing. Default: 0.
    """
    id:          str
    func:        Callable
    args:        tuple
    kwargs:      dict
    priority:    int
    status:      str           # "pending" | "running" | "done" | "failed" | "timeout"
    result:      Any           # Set after execution
    error:       str | None    # Set on failure
    duration_ms: float | None  # Wall-clock execution time

Dispatcher

class Dispatcher:
    """ThreadPoolExecutor-backed concurrent task dispatcher.

    Args:
        max_workers:      Thread pool size. Default: 4.
        timeout_seconds:  Per-task timeout. Tasks exceeding this are cancelled. Default: 30.0.
    """

    def dispatch(self, tasks: list[Task]) -> list[DispatchResult]:
        """Run all tasks concurrently. Returns results in input order.
        Blocks until all tasks complete or timeout."""

    def submit(self, task: Task) -> str:
        """Non-blocking submit. Returns task_id for later retrieval."""

    def get_result(self, task_id: str) -> DispatchResult | None:
        """Retrieve result of a previously submitted task, or None if not done."""

    def shutdown(self, wait: bool = True) -> None:
        """Gracefully shut down the thread pool."""

Also works as a context manager (with Dispatcher(...) as d: ...).

DispatchResult

class DispatchResult:
    task_id:     str           # Matches the Task.id
    success:     bool          # False if exception was raised or task timed out
    result:      Any           # Return value of func (None on failure)
    error:       str | None    # Exception message on failure
    duration_ms: float         # Actual wall-clock time in milliseconds

@dispatch_parallel

def dispatch_parallel(max_workers: int = 4, timeout_seconds: float = 30.0):
    """Decorator factory.

    Wraps a function so that calling it with a list of arg-tuples dispatches
    each tuple as an independent parallel task.

    Raises RuntimeError on first task failure.
    Returns results in input order.
    """

Real-World Example

Multi-stage agentic pipeline processing a batch of research papers:

import uuid
from agent_dispatcher import Task, Dispatcher

# Stage 1: Extract metadata from 20 papers in parallel
def extract_metadata(paper_path: str) -> dict:
    text = read_pdf(paper_path)
    return call_llm_structured(f"Extract title, authors, year, abstract: {text[:2000]}")

papers = load_paper_paths("./papers/")  # 20 PDFs

with Dispatcher(max_workers=10, timeout_seconds=45) as d:
    extraction_tasks = [
        Task(id=str(uuid.uuid4()), func=extract_metadata, args=(p,))
        for p in papers
    ]
    extraction_results = d.dispatch(extraction_tasks)

# Collect successes, log failures
metadata = []
for r in extraction_results:
    if r.success:
        metadata.append(r.result)
    else:
        print(f"⚠ Extraction failed after {r.duration_ms:.0f}ms: {r.error}")

print(f"Extracted {len(metadata)}/{len(papers)} papers in parallel")

# Stage 2: Cross-compare findings (fan-out)
def compare_papers(meta_a: dict, meta_b: dict) -> str:
    return call_llm(f"Compare: {meta_a['title']} vs {meta_b['title']}")

comparisons = [
    Task(id=str(uuid.uuid4()), func=compare_papers, args=(metadata[i], metadata[i+1]))
    for i in range(0, len(metadata) - 1, 2)
]

with Dispatcher(max_workers=8, timeout_seconds=60) as d:
    comparison_results = d.dispatch(comparisons)

print(f"Completed {len(comparison_results)} cross-comparisons in parallel")

Performance Notes

Scenario Sequential duta (8 workers) Speedup
10 × 5s LLM calls 50s ~7s
50 × 3s LLM calls 150s ~20s 7.5×
20 × 10s tool calls 200s ~28s

Actual speedup depends on API rate limits and I/O saturation.


The Vedic Principle

दूत — the Divine Messenger. In the Ramayana, Hanuman (the supreme duta) did not travel to Lanka in sequence — he leapt across the ocean in one bound, fulfilled many missions simultaneously, and returned with perfect results. Delay was dharma-breaking; the mission demanded speed.

duta brings this principle to software. Don't make your agents wait in line. Dispatch them all at once.


The Vedic Arsenal

duta is one of 100 libraries in darshjme/arsenal:

Library Source Purpose
duta Ramayana — Sundarakanda Task dispatch
niti Chanakya / Nitishastra Policy enforcement
smriti Vedic Smriti tradition LLM caching
kala Mahabharata BG 11.32 Timeout management
raksha Ramayana — Sundarakanda Agent security

Contributing

  1. Fork the repo
  2. Create a feature branch (git checkout -b fix/your-fix)
  3. Add tests — zero external dependencies only
  4. Submit a PR

License

MIT © Darshankumar Joshi


📨 Built by Darshankumar Joshi · @thedarshanjoshi

"कर्मण्येवाधिकारस्ते मा फलेषु कदाचन" Your right is to action alone, never to its fruits. — Bhagavad Gita 2.47

Vedic Arsenal · GitHub · Twitter

About

Concurrent task dispatcher for parallel agent execution — ThreadPoolExecutor, @dispatch_parallel. Zero dependencies.

Topics

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages