Skip to content

JiwaniZakir/lattice

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

5 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation


πŸ”· Lattice

Adaptive Multi-Agent Orchestration Framework

Route, plan, verify, and execute multi-agent workflows with learned routing, formal safety proofs, and full observability.


Python PyTorch Z3 License Stars Forks


Quick Start Β β€’Β  Architecture Β β€’Β  Features Β β€’Β  Benchmarks Β β€’Β  Examples Β β€’Β  Contributing



Overview

Lattice is a Python framework for building multi-agent systems with formal safety guarantees. It combines a contextual bandit router that learns optimal agent assignment from execution feedback, a hybrid ReAct + Plan-and-Solve planner for task decomposition, and a Z3-powered verifier that proves DAG acyclicity, budget feasibility, and capability constraints before any code executes. The result is a production-grade orchestration layer where agents are routed intelligently, plans are verified mathematically, and every step is traced end-to-end via OpenTelemetry.


πŸ—οΈ Architecture

graph LR
    subgraph Input
        T[Task Input]
    end

    subgraph Orchestration
        R[Router<br/><i>Contextual Bandits</i>]
        P[Planner<br/><i>ReAct + Plan-and-Solve</i>]
        V[Verifier<br/><i>Z3 Safety Proofs</i>]
        E[Executor<br/><i>DAG Engine</i>]
    end

    subgraph Agents
        A1[Tool Agent]
        A2[Critic Agent]
        A3[Human Agent]
        A4[Custom Agents]
    end

    subgraph Output
        O[Result + Traces]
    end

    T --> R
    R --> P
    P --> V
    V --> E
    E --> A1 & A2 & A3 & A4
    A1 & A2 & A3 & A4 --> O

    M[(Scoped Memory)] -.-> R & P & E
    OB[Observability<br/><i>OpenTelemetry</i>] -.-> R & P & V & E

    style R fill:#4A90D9,color:#fff
    style P fill:#7B68EE,color:#fff
    style V fill:#E74C3C,color:#fff
    style E fill:#2ECC71,color:#fff
    style M fill:#F39C12,color:#fff
    style OB fill:#95A5A6,color:#fff
Loading

Pipeline flow: A task enters the Router, which uses a learned contextual bandit policy to select the optimal agent. The Planner decomposes the task into a DAG of sub-goals. The Verifier proves the plan is safe via Z3 constraint solving. The Executor dispatches sub-goals in topological order with parallel execution, retry, and checkpointing. Scoped Memory provides hierarchical state sharing across steps, and OpenTelemetry traces the entire pipeline.


✨ Features

Feature Description
🎯 Learned Routing Contextual bandit router (epsilon-greedy, UCB, Thompson sampling) that learns optimal agent assignment from task embeddings and execution feedback via an online-trained MLP
🧠 Adaptive Planning Hybrid ReAct + Plan-and-Solve planner with Voyager-style skill caching -- auto-selects strategy based on estimated task complexity
πŸ”’ Formal Verification Z3-based safety verification checks DAG acyclicity, budget feasibility, capability matching, and custom policies before any execution
⚑ DAG Execution Topological execution engine with dependency tracking, parallel dispatch, configurable concurrency, retry with exponential backoff, and timeout
πŸ—„οΈ Scoped Memory Hierarchical shared memory with namespace inheritance, TTL expiration, semantic similarity search, LRU eviction, and optional Redis persistence
🌊 Token Streaming Async token-level streaming with backpressure, multi-consumer fan-out, and real-time throughput statistics
πŸ’Ύ Checkpointing Execution checkpoints for fault recovery -- resume failed plans from the last successful step
πŸ’° Cost Attribution Per-agent, per-model cost tracking with budget monitoring and alerts via 100+ models through LiteLLM
πŸ“‘ OpenTelemetry Full distributed tracing with Lattice-specific span attributes -- export to Jaeger, Honeycomb, or any OTLP collector
βš–οΈ Constitutional Critic Evaluation agent that scores outputs against configurable principles (helpfulness, accuracy, safety, coherence, completeness) with weighted scoring
πŸ‘€ Human-in-the-Loop Queue-based human approval agent with timeout, pre-loaded responses for testing, and full audit logging

πŸ“š Paper Implementations

Lattice implements ideas from the following research:

Paper What Lattice Uses
Voyager (Wang et al., 2023) Skill library pattern: successful plans are cached and reused for similar future tasks via the memory system
Plan-and-Solve (Wang et al., 2023) High-level task decomposition into ordered sub-goals with dependency edges, forming a DAG for the executor
ReAct (Yao et al., 2023) Interleaved thought-action-observation loops for tool-using agents and fine-grained execution planning
Contextual Bandits (Agarwal et al., 2014) Learned routing via reward prediction on task embeddings with epsilon-greedy, UCB, and Thompson exploration
Constitutional AI (Bai et al., 2022) Critic agent that evaluates outputs against configurable principles with weighted scoring and revision suggestions

πŸ› οΈ Tech Stack

Pydantic NumPy NetworkX LiteLLM OpenTelemetry Redis OpenAI Anthropic


πŸš€ Quick Start

Installation

pip install lattice

Or install from source with dev dependencies:

git clone https://github.com/JiwaniZakir/lattice.git
cd lattice
pip install -e ".[dev]"

Minimal Example

import asyncio
import numpy as np
from lattice import Router, Planner, Verifier, Executor
from lattice.agents.base import AgentResult, BaseAgent

# Define a custom agent
class MyAgent(BaseAgent):
    async def execute(self, task, context=None):
        return AgentResult(
            agent_id=self.agent_id,
            task=task,
            output=f"Completed: {task}",
        )

async def main():
    agent = MyAgent(agent_id="my_agent", name="My Agent")

    # 1. Route -- contextual bandit selects the best agent
    router = Router(agents=[agent], embedding_dim=384)
    embedding = np.random.randn(384).astype(np.float32)
    decision = await router.route(embedding)

    # 2. Plan -- decompose task into sub-goals
    planner = Planner()
    plan = await planner.plan(
        task="Analyze the quarterly report",
        available_agents=[decision.agent_id],
    )

    # 3. Verify -- Z3 proves the plan is safe
    verifier = Verifier()
    check = await verifier.verify_plan(plan)
    assert check.is_safe

    # 4. Execute -- DAG engine runs sub-goals with parallelism
    executor = Executor(agents={"my_agent": agent})
    result = await executor.execute(plan)
    print(result.status, result.total_cost_usd)

asyncio.run(main())

πŸ“‚ Project Structure

src/lattice/
β”œβ”€β”€ core/               # Router, Planner, Executor, Verifier, Memory
β”‚   β”œβ”€β”€ router.py       # Contextual bandit routing with MLP reward predictor
β”‚   β”œβ”€β”€ planner.py      # Hybrid ReAct + Plan-and-Solve with skill caching
β”‚   β”œβ”€β”€ verifier.py     # Z3-based safety verification engine
β”‚   β”œβ”€β”€ executor.py     # DAG execution with concurrency and retry
β”‚   └── memory.py       # Scoped hierarchical memory with TTL and Redis
β”œβ”€β”€ agents/             # Agent implementations
β”‚   β”œβ”€β”€ base.py         # BaseAgent ABC + AgentResult + streaming protocol
β”‚   β”œβ”€β”€ tool.py         # ReAct tool-using agent with tool registry
β”‚   β”œβ”€β”€ critic.py       # Constitutional AI evaluation agent
β”‚   └── human.py        # Human-in-the-loop approval agent
β”œβ”€β”€ routing/            # Routing subsystem
β”‚   β”œβ”€β”€ classifier.py   # Task type classification
β”‚   β”œβ”€β”€ embedder.py     # Task embedding generation
β”‚   └── feedback.py     # Reward feedback processing
β”œβ”€β”€ verification/       # Verification subsystem
β”‚   β”œβ”€β”€ invariants.py   # Safety invariant definitions
β”‚   β”œβ”€β”€ solver.py       # Z3 solver wrapper
β”‚   └── policies.py     # Custom constraint policies
β”œβ”€β”€ execution/          # Execution subsystem
β”‚   β”œβ”€β”€ dag.py          # DAG builder and topological sort
β”‚   β”œβ”€β”€ streaming.py    # Token-level async streaming with fan-out
β”‚   └── checkpointing.py # Checkpoint save/restore for fault recovery
β”œβ”€β”€ observability/      # Observability subsystem
β”‚   β”œβ”€β”€ tracing.py      # OpenTelemetry span management
β”‚   β”œβ”€β”€ metrics.py      # Metrics collection
β”‚   └── logging.py      # Structured logging via structlog
└── integrations/       # LLM provider integrations
    β”œβ”€β”€ openai.py       # OpenAI provider
    β”œβ”€β”€ anthropic.py    # Anthropic provider
    └── litellm.py      # LiteLLM unified provider (100+ models)

πŸ“Š Benchmarks

Measured on Apple M2 Pro, Python 3.12, single process:

Benchmark Value
Router decision latency (p50) ~50 Β΅s
Router decision latency (p95) ~120 Β΅s
Reward convergence (500 rounds) >0.85 mean
DAG execution (10 parallel steps) ~15 ms
DAG execution (20 sequential steps) ~45 ms
Memory set/get (in-process) ~5 Β΅s
Z3 verification (3-step plan) ~2 ms

Run benchmarks yourself:

python benchmarks/routing_benchmark.py
python benchmarks/throughput_benchmark.py

πŸ“ Examples

Example Description
quickstart.py Full pipeline walkthrough: agent creation, routing, planning, verification, execution
multi_agent_research.py Multi-agent DAG with critic evaluation and cost tracking
verified_workflow.py Z3 verification demos: DAG safety, budget proofs, capability matching
# Run the quickstart example
python examples/quickstart.py

πŸ§‘β€πŸ’» Development

# Install dev dependencies
pip install -e ".[dev]"

# Install pre-commit hooks (run once after cloning)
pre-commit install

# Run tests
pytest tests/ -v

# Run linter
ruff check src/ tests/

# Run type checker
mypy src/lattice/

# Run a single test file
pytest tests/test_router.py -v

Pre-commit hooks run automatically on git commit and enforce ruff check --fix, ruff format, trailing-whitespace cleanup, end-of-file normalization, YAML validity, and large-file detection.


🀝 Contributing

Contributions are welcome! Here is how to get started:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/my-feature)
  3. Write tests for your changes
  4. Ensure all checks pass (pytest, ruff, mypy)
  5. Submit a pull request

Please open an issue first for major changes so we can discuss the approach.


πŸ“„ License

This project is licensed under the MIT License.



Built with research. Verified with proofs. Orchestrated at scale.


About

Hierarchical multi-agent orchestration with learned routing, adaptive planning, and formal workflow verification.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages