Skip to content

fix: harden coordination pipeline with validators, logging, and fail-fast#333

Merged
Aureliolo merged 6 commits intomainfrom
feat/runtime-wiring
Mar 13, 2026
Merged

fix: harden coordination pipeline with validators, logging, and fail-fast#333
Aureliolo merged 6 commits intomainfrom
feat/runtime-wiring

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

@Aureliolo Aureliolo commented Mar 12, 2026

Summary

  • Model validators: Add cross-field validation to CoordinationPhaseResult (success/error consistency), CoordinationWave (non-empty subtask_ids), and CoordinationResult (min 1 phase)
  • Fix fail-fast bug: ContextDependentDispatcher now correctly stops wave execution when fail_fast=True and a wave fails — previously waves continued regardless
  • Mandatory workspace validation: DecentralizedDispatcher raises CoordinationError if workspace service is unavailable or isolation is disabled
  • Missing logging: Add WARNING logging on 7+ error paths across dispatchers and service (setup failures, wave errors, rollup errors, parent update failures)
  • Event constants: Add COORDINATION_CLEANUP_FAILED (was misleadingly using CLEANUP_COMPLETED for failures) and COORDINATION_WAVE_BUILT (separate planning from execution semantics)
  • Type tightening: partial_phases uses CoordinationPhaseResult instead of object, rollup parameter uses SubtaskStatusRollup instead of object
  • Code simplification: Remove redundant guards in DecentralizedDispatcher, extract inline ternaries, consolidate _phase_update_parent branches
  • Docs: Add Multi-Agent Coordination Pipeline section to docs/design/engine.md, update CLAUDE.md package structure and logging events

Test plan

  • Fix existing tests broken by new model validators (empty phases, success/error consistency)
  • Add test_success_with_error_rejected and test_failed_without_error_rejected for CoordinationPhaseResult
  • Add test_empty_subtask_ids_rejected for CoordinationWave
  • Add test_empty_phases_rejected for CoordinationResult
  • Add test_no_workspace_service_raises and test_isolation_disabled_raises for DecentralizedDispatcher
  • Add test_fail_fast_stops_on_wave_failure, test_setup_failure_skips_wave, test_fail_fast_stops_on_setup_failure for ContextDependentDispatcher
  • Add test_workspace_setup_failure_returns_early for CentralizedDispatcher
  • Add test_auto_topology_resolves_to_centralized for AUTO fallback
  • Add test_update_parent_submit_fails and test_update_parent_exception_captured for parent update error paths
  • Add test_rollup_error_captured for rollup failure path
  • Add test_total_cost_aggregated for cost computation
  • Add test_coordination_events_exist for all 13 coordination event constants
  • Fix test_phase_error_carries_partial_phases to use proper CoordinationPhaseResult objects
  • All 7172 tests pass, 94.61% coverage
  • ruff, mypy clean

Review coverage

Pre-reviewed by 8 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 32 findings addressed.

🤖 Generated with Claude Code

Closes #205

Copilot AI review requested due to automatic review settings March 12, 2026 23:32
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 12, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 97a47852-10dd-4522-a3f3-df78a5ce429a

📥 Commits

Reviewing files that changed from the base of the PR and between 4a5b6cf and e581af2.

📒 Files selected for processing (3)
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py

📝 Walkthrough

Summary by CodeRabbit

  • New Features
    • Multi-agent coordination pipeline: decomposition, routing, topology resolution, wave-based parallel execution, workspace isolation options, configurable concurrency/fail-fast, and multiple dispatcher strategies; new coordination public APIs.
  • Documentation
    • Added design docs describing the multi-agent pipeline, phases, dispatchers, and wave execution.
  • Observability
    • New coordination lifecycle and phase event identifiers for logging/metrics.
  • Bug Fixes / Error Handling
    • New coordination-specific error types carrying phase/context.
  • Tests
    • Extensive unit tests covering configs, dispatchers, models, group builder, service, and events.

Walkthrough

Adds a multi‑agent coordination subsystem: new coordination package (models, config, dispatchers, group builder), a MultiAgentCoordinator service implementing a multi‑phase pipeline, workspace lifecycle helpers, coordination errors and observability events, package exports, docs, and extensive unit tests.

Changes

Cohort / File(s) Summary
Coordination models & config
src/ai_company/engine/coordination/config.py, src/ai_company/engine/coordination/models.py
Add frozen Pydantic types: CoordinationConfig, CoordinationContext, CoordinationPhaseResult, CoordinationWave, CoordinationResult with validators and computed properties.
Dispatchers & helpers
src/ai_company/engine/coordination/dispatchers.py
Introduce topology-driven dispatch framework: TopologyDispatcher Protocol, DispatchResult, workspace helper functions, dispatcher implementations (SasDispatcher, CentralizedDispatcher, DecentralizedDispatcher, ContextDependentDispatcher), _execute_waves, and select_dispatcher. Extensive phase-level logging and error handling.
Group builder
src/ai_company/engine/coordination/group_builder.py
Add build_execution_waves to convert decomposition + routing into wave-ordered ParallelExecutionGroups, handling unroutable subtasks, resource_claim mapping, and per-wave config.
Coordinator service
src/ai_company/engine/coordination/service.py
Add MultiAgentCoordinator orchestrating seven phases (decompose, route, resolve topology, validate, dispatch, rollup, update parent), timing/logging per phase, and error propagation via CoordinationPhaseError.
Errors & observability
src/ai_company/engine/errors.py, src/ai_company/observability/events/coordination.py
Add CoordinationError and CoordinationPhaseError (stores phase and partial_phases). Add COORDINATION_* event constants covering lifecycle, phases, waves, topology, and cleanup.
Package exports
src/ai_company/engine/__init__.py, src/ai_company/engine/coordination/__init__.py
Re-export coordination public API and surface coordination errors at the engine package level.
Tests & test utilities
tests/unit/engine/conftest.py, tests/unit/engine/test_coordination_*.py, tests/unit/observability/test_events.py
Add comprehensive unit tests and factories covering config, models, dispatchers, group builder, service behavior, errors, and event constants.
Docs & notes
docs/design/engine.md, CLAUDE.md
Documentation expanded to describe multi‑agent coordination pipeline, dispatcher types, wave execution model, topology resolution, and updated logging/event conventions.
sequenceDiagram
    participant Client
    participant Coordinator as MultiAgentCoordinator
    participant Decomp as DecompositionService
    participant Router as TaskRoutingService
    participant Dispatcher as TopologyDispatcher
    participant Executor as ParallelExecutor
    participant Workspace as WorkspaceIsolationService
    participant Engine as TaskEngine

    Client->>Coordinator: coordinate(context)
    Coordinator->>Decomp: decompose(task)
    Decomp-->>Coordinator: DecompositionResult
    Coordinator->>Router: route(decomposition)
    Router-->>Coordinator: RoutingResult
    Coordinator->>Coordinator: resolve_topology(routing_result)
    Coordinator->>Dispatcher: dispatch(decomposition, routing, executor, config, workspace_service?)
    Dispatcher->>Workspace: setup_workspaces(...)
    Workspace-->>Dispatcher: workspaces
    Dispatcher->>Executor: execute(waves)
    Executor-->>Dispatcher: ParallelExecutionResults
    Dispatcher->>Workspace: merge_workspaces(workspaces)
    Workspace-->>Dispatcher: WorkspaceGroupResult
    Dispatcher-->>Coordinator: DispatchResult
    Coordinator->>Coordinator: rollup(execution_results)
    Coordinator->>Engine: update_parent_task(rollup) (optional)
    Engine-->>Coordinator: update_response
    Coordinator-->>Client: CoordinationResult
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title directly addresses the main changes: hardening the coordination pipeline through validators, logging improvements, and fail-fast bug fixes.
Description check ✅ Passed The description comprehensively covers the changeset, detailing model validators, fail-fast fixes, logging additions, event constants, type improvements, and documentation updates related to the multi-agent coordination pipeline.
Linked Issues check ✅ Passed All core objectives from #205 are addressed: runtime wiring of coordination components, topology selection with AUTO fallback, workspace lifecycle management, parallel execution orchestration, and integration with TaskEngine for state coordination.
Out of Scope Changes check ✅ Passed All changes are within scope of wiring multi-agent coordination. The PR adds coordination infrastructure, validators, logging, and documentation—no unrelated modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 96.40% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/runtime-wiring
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/runtime-wiring
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 12, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly hardens the multi-agent coordination pipeline by introducing robust data validation, improving error handling with a functional fail-fast mechanism, and ensuring critical workspace isolation requirements are met. It also enhances system observability through comprehensive logging and refines internal type consistency and code structure, leading to a more stable and predictable execution environment for complex multi-agent tasks.

Highlights

  • Model Validation: Added cross-field validation to CoordinationPhaseResult (success/error consistency), CoordinationWave (non-empty subtask_ids), and CoordinationResult (minimum 1 phase) to ensure data integrity.
  • Fail-Fast Bug Fix: Corrected a bug in ContextDependentDispatcher to ensure it now properly stops wave execution when fail_fast=True and a wave encounters a failure, preventing unnecessary continued processing.
  • Mandatory Workspace Validation: Implemented mandatory workspace validation in DecentralizedDispatcher, which now raises a CoordinationError if the workspace service is unavailable or isolation is disabled, enhancing reliability for decentralized topologies.
  • Enhanced Logging: Introduced WARNING level logging across more than 7 error paths within dispatchers and the coordination service, improving observability for setup failures, wave errors, rollup errors, and parent update failures.
  • New Event Constants: Added new event constants COORDINATION_CLEANUP_FAILED (to accurately reflect cleanup failures) and COORDINATION_WAVE_BUILT (to distinguish planning from execution semantics).
  • Type Tightening & Code Simplification: Refined type hints for partial_phases and rollup parameters, and simplified code by removing redundant guards, extracting inline ternaries, and consolidating _phase_update_parent branches.
  • Documentation Updates: Expanded documentation with a new 'Multi-Agent Coordination Pipeline' section in docs/design/engine.md and updated CLAUDE.md to reflect new package structure and logging events.
Changelog
  • CLAUDE.md
    • Updated the description for the engine/ module to include details about the multi-agent coordination pipeline.
    • Added COORDINATION_STARTED to the list of recommended event constants for logging.
  • docs/design/engine.md
    • Added a new section titled 'Multi-Agent Coordination Pipeline' detailing its phases and various topology dispatchers.
  • src/ai_company/engine/init.py
    • Imported and exposed new coordination-related classes and functions, including various dispatchers, coordination models, and error types.
    • Added CoordinationError and CoordinationPhaseError to the engine's public API.
  • src/ai_company/engine/coordination/init.py
    • Added a new __init__.py file to define the public API for the coordination package.
  • src/ai_company/engine/coordination/config.py
    • Added a new file defining the CoordinationConfig Pydantic model for multi-agent coordination settings.
  • src/ai_company/engine/coordination/dispatchers.py
    • Added a new file implementing various TopologyDispatcher strategies (SasDispatcher, CentralizedDispatcher, DecentralizedDispatcher, ContextDependentDispatcher).
    • Implemented logic for workspace setup, merge, and teardown within dispatchers.
    • Incorporated fail-fast logic into wave execution.
  • src/ai_company/engine/coordination/group_builder.py
    • Added a new file implementing build_execution_waves to convert DAG parallel groups and routing decisions into execution groups.
  • src/ai_company/engine/coordination/models.py
    • Added a new file defining Pydantic models for CoordinationContext, CoordinationPhaseResult, CoordinationWave, and CoordinationResult.
    • Implemented cross-field validation for CoordinationPhaseResult and CoordinationWave, and min_length validation for CoordinationResult phases.
  • src/ai_company/engine/coordination/service.py
    • Added a new file implementing MultiAgentCoordinator to orchestrate the end-to-end multi-agent coordination pipeline.
    • Integrated decomposition, routing, topology resolution, dispatch, rollup, and parent task updates.
    • Incorporated error handling and logging for each coordination phase.
  • src/ai_company/engine/errors.py
    • Added CoordinationError and CoordinationPhaseError classes to the engine's error hierarchy.
  • src/ai_company/observability/events/coordination.py
    • Added a new file defining new event constants for coordination-related activities.
  • tests/unit/engine/test_coordination_config.py
    • Added unit tests for CoordinationConfig, covering defaults, custom values, immutability, and validation rules.
  • tests/unit/engine/test_coordination_dispatchers.py
    • Added unit tests for the select_dispatcher factory and all TopologyDispatcher implementations.
    • Verified execution logic, workspace handling, and fail-fast behavior for dispatchers.
  • tests/unit/engine/test_coordination_errors.py
    • Added unit tests for CoordinationError and CoordinationPhaseError, verifying inheritance and attribute handling.
  • tests/unit/engine/test_coordination_group_builder.py
    • Added unit tests for build_execution_waves, covering various DAG structures, config propagation, workspace mapping, and handling of unroutable tasks.
  • tests/unit/engine/test_coordination_models.py
    • Added unit tests for CoordinationContext, CoordinationPhaseResult, CoordinationWave, and CoordinationResult, covering their validation rules and properties.
  • tests/unit/engine/test_coordination_service.py
    • Added unit tests for MultiAgentCoordinator, covering the full pipeline, SAS topology, failure handling, fail-fast behavior, TaskEngine integration, status rollup, workspace lifecycle, and cost aggregation.
  • tests/unit/observability/test_events.py
    • Updated test_all_domain_modules_discovered to include the 'coordination' module.
    • Added test_coordination_events_exist to verify the new coordination event constants.
Activity
  • Pre-reviewed by 8 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, type-design-analyzer, logging-audit, resilience-audit, docs-consistency).
  • 32 findings from pre-review agents were addressed.
  • Existing tests broken by new model validators were fixed.
  • New tests were added for CoordinationPhaseResult (success/error consistency), CoordinationWave (empty subtask IDs), and CoordinationResult (empty phases).
  • Tests were added for DecentralizedDispatcher to cover cases where workspace service is unavailable or isolation is disabled.
  • Tests were added for ContextDependentDispatcher to verify fail-fast behavior and setup failure handling.
  • Tests were added for CentralizedDispatcher to ensure early return on workspace setup failure.
  • A test was added for AUTO topology fallback to CENTRALIZED.
  • Tests were added for parent update error paths and rollup failure paths.
  • A test was added to verify total cost aggregation.
  • A test was added to confirm the existence of all 13 coordination event constants.
  • The test_phase_error_carries_partial_phases test was fixed to use proper CoordinationPhaseResult objects.
  • All 7172 tests passed with 94.61% coverage.
  • Ruff and Mypy checks passed cleanly.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 12, 2026 23:33 — with GitHub Actions Inactive
@codecov
Copy link
Copy Markdown

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 96.69211% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.82%. Comparing base (7a68d34) to head (e581af2).
⚠️ Report is 1 commits behind head on main.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/ai_company/engine/coordination/service.py 93.97% 8 Missing and 2 partials ⚠️
src/ai_company/engine/coordination/dispatchers.py 97.47% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #333      +/-   ##
==========================================
+ Coverage   93.76%   93.82%   +0.05%     
==========================================
  Files         434      441       +7     
  Lines       19850    20243     +393     
  Branches     1915     1950      +35     
==========================================
+ Hits        18613    18993     +380     
- Misses        956      966      +10     
- Partials      281      284       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive multi-agent coordination pipeline, complete with new services, dispatchers, and data models. The changes significantly enhance the system's capabilities by adding robust validation, improved logging, and fixing a critical fail-fast bug. The addition of extensive tests and documentation is commendable. My review identified a high-severity issue where AgentAssignment objects are created without all necessary fields, potentially leading to incorrect agent behavior. I also found a couple of medium-severity opportunities for improvement: one to enhance logging consistency and another to refactor object creation for better maintainability. Overall, this is a strong contribution that hardens the coordination engine.

Comment on lines +103 to +109
assignments.append(
AgentAssignment(
identity=candidate.agent_identity,
task=task,
resource_claims=resource_claims,
)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The AgentAssignment is being created with only a subset of its fields (identity, task, resource_claims). Other important fields like max_turns, timeout_seconds, memory_messages, and completion_config are omitted, causing them to fall back to default values. This is likely incorrect as it could lead to unexpected agent behavior by losing execution context (e.g., custom timeouts or turn limits). These parameters should be propagated from the parent task or another configuration source to ensure agents execute with the correct settings.

Comment on lines +326 to +345
new_assignments = tuple(
AgentAssignment(
identity=a.identity,
task=a.task,
resource_claims=(ws_lookup[a.task.id],)
if a.task.id in ws_lookup
else a.resource_claims,
max_turns=a.max_turns,
timeout_seconds=a.timeout_seconds,
memory_messages=a.memory_messages,
completion_config=a.completion_config,
)
for a in group.assignments
)
return ParallelExecutionGroup(
group_id=group.group_id,
assignments=new_assignments,
max_concurrency=group.max_concurrency,
fail_fast=group.fail_fast,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of manually reconstructing AgentAssignment and ParallelExecutionGroup objects, consider using Pydantic's model_copy(update=...) method. This approach is more concise and robust against future changes to the models. If new fields are added to AgentAssignment or ParallelExecutionGroup, model_copy will handle them automatically, whereas the current implementation would require manual updates.

    new_assignments = tuple(
        a.model_copy(update={"resource_claims": (ws_lookup[a.task.id],)}) if a.task.id in ws_lookup else a
        for a in group.assignments
    )
    return group.model_copy(update={"assignments": new_assignments})

Comment on lines +686 to +691
logger.info(
COORDINATION_WAVE_COMPLETED,
wave_index=wave_idx,
succeeded=exec_result.agents_succeeded,
failed=exec_result.agents_failed,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logger.info call for COORDINATION_WAVE_COMPLETED is missing the duration_seconds field. This is inconsistent with the logging in the _execute_waves helper function and reduces observability. The elapsed time is available in this scope and should be included in the log for better performance monitoring.

            logger.info(
                COORDINATION_WAVE_COMPLETED,
                wave_index=wave_idx,
                succeeded=exec_result.agents_succeeded,
                failed=exec_result.agents_failed,
                duration_seconds=elapsed,
            )

@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR introduces the complete multi-agent coordination pipeline (MultiAgentCoordinator + 4 topology dispatchers), hardens it with cross-field model validators, fixes the ContextDependentDispatcher fail-fast bug, makes workspace isolation mandatory for DecentralizedDispatcher, adds missing WARNING logs across 7+ error paths, introduces semantically correct event constants, and tightens types throughout. It is a large, well-structured addition that addresses 32 pre-review findings.

Key findings:

  • _resolve_topology drops accumulated phase context on failure — when inconsistent topologies are detected, CoordinationPhaseError is raised with partial_phases=() (the default), silently discarding the already-completed "decompose" and "route" phase results. Compare with _validate_routing which correctly threads the phases list through and passes partial_phases=tuple(phases).
  • _phase_dispatch violates CLAUDE.md logging policy — it logs COORDINATION_PHASE_STARTED on entry but never logs COORDINATION_PHASE_COMPLETED on success, and never appends a success CoordinationPhaseResult to phases. Every other pipeline phase method (_phase_decompose, _phase_route, _phase_rollup, _phase_update_parent) records both a completion log and a success phase result; _phase_dispatch is the sole exception.
  • Rollup may misclassify crashed agents as BLOCKED_phase_rollup counts wave.execution_result.outcomes and fills the remainder of expected_count as BLOCKED. If ParallelExecutor.execute_group ever produces fewer outcomes than assigned subtasks (e.g., mid-execution exception), a crashed agent's subtask would be labelled BLOCKED rather than FAILED, which can affect derived_parent_status propagated to the parent task.

Confidence Score: 3/5

  • Merge after addressing the partial_phases loss in _resolve_topology and the missing COORDINATION_PHASE_COMPLETED log in _phase_dispatch.
  • The PR is a substantial, well-tested new pipeline (7172 tests, 94.61% coverage, ruff/mypy clean, 32 pre-review findings addressed). The dispatcher logic, model validators, event constants, and fail-fast fix are all correct. The deductions come from: (1) _resolve_topology losing accumulated phase context in an error path — callers relying on CoordinationPhaseError.partial_phases for recovery get () instead of the real partial results; (2) _phase_dispatch missing the completion log and phase result on the happy path, a CLAUDE.md policy violation; (3) a potential rollup misclassification if ParallelExecutor produces fewer outcomes than assignments. None of these are crashes in the common path, but (1) and (2) are observable correctness gaps in a hardening PR.
  • Pay close attention to src/ai_company/engine/coordination/service.py — specifically _resolve_topology, _phase_dispatch, and _phase_rollup.

Important Files Changed

Filename Overview
src/ai_company/engine/coordination/service.py Core orchestration service with 3 issues: _resolve_topology raises CoordinationPhaseError with empty partial_phases (dropping accumulated phase context); _phase_dispatch never logs COORDINATION_PHASE_COMPLETED on success; rollup may misclassify crashed agents as BLOCKED rather than FAILED.
src/ai_company/engine/coordination/dispatchers.py 864-line new file implementing 4 topology dispatchers; fail-fast bug is correctly fixed, workspace lifecycle is sound, per-wave merge naming is addressed. Previously flagged issues (duplicate "merge" names, missing log before raise) appear resolved.
src/ai_company/engine/coordination/models.py Well-designed frozen Pydantic models; cross-field validators for success/error consistency and non-empty collections are correct and well-tested.
src/ai_company/engine/coordination/group_builder.py DAG-to-wave converter is clean; previously flagged unguarded KeyError on task_lookup[subtask_id] is now fixed with a .get() guard and a proper CoordinationError with logging.
tests/unit/engine/test_coordination_service.py Good coverage of service-level paths; missing test for _resolve_topology raising with inconsistent topologies (the partial_phases loss bug identified in this review).

Sequence Diagram

sequenceDiagram
    participant C as MultiAgentCoordinator
    participant D as DecompositionService
    participant R as TaskRoutingService
    participant SD as select_dispatcher
    participant TD as TopologyDispatcher
    participant PE as ParallelExecutor
    participant WS as WorkspaceIsolationService

    C->>D: decompose_task(task, context)
    D-->>C: DecompositionResult

    C->>R: route(decomp_result, agents, task)
    R-->>C: RoutingResult

    C->>C: _resolve_topology(routing_result)
    C->>C: _validate_routing(routing_result)

    C->>SD: select_dispatcher(topology)
    SD-->>C: TopologyDispatcher instance

    C->>TD: dispatch(decomp, routing, executor, ws_service, config)

    opt workspace isolation enabled
        TD->>WS: setup_group(requests)
        WS-->>TD: tuple[Workspace]
    end

    loop per DAG wave
        TD->>PE: execute_group(ParallelExecutionGroup)
        PE-->>TD: ParallelExecutionResult
        opt per-wave merge (ContextDependent)
            TD->>WS: merge_group(wave_workspaces)
            WS-->>TD: WorkspaceGroupResult
        end
    end

    opt post-all-waves merge (Centralized/Decentralized)
        TD->>WS: merge_group(all_workspaces)
        WS-->>TD: WorkspaceGroupResult
    end

    opt cleanup
        TD->>WS: teardown_group(workspaces)
    end

    TD-->>C: DispatchResult

    C->>C: _phase_rollup(dispatch_result, decomp_result)
    C->>C: _phase_update_parent(rollup)
    C-->>C: CoordinationResult
Loading

Comments Outside Diff (3)

  1. src/ai_company/engine/coordination/service.py, line 1851-1854 (link)

    CoordinationPhaseError raised with empty partial_phases

    When _resolve_topology detects inconsistent topologies across routing decisions, it raises CoordinationPhaseError without a partial_phases argument, which defaults to (). By this point in the pipeline, both the "decompose" and "route" phases have already been appended to the phases list in coordinate() — but because _resolve_topology doesn't receive that list, those accumulated results are silently dropped from the exception.

    Compare with _validate_routing which correctly threads phases through and raises with partial_phases=tuple(phases). Any caller catching CoordinationPhaseError here and inspecting .partial_phases for post-mortem reporting will see an empty tuple instead of the two completed phase results.

    The method signature would need phases added to thread the accumulated results through:

    def _resolve_topology(
        self,
        routing_result: RoutingResult,
        phases: list[CoordinationPhaseResult],  # add this
    ) -> CoordinationTopology:
        ...
        raise CoordinationPhaseError(
            msg,
            phase="resolve_topology",
            partial_phases=tuple(phases),  # pass accumulated phases
        )

    And in coordinate():

    topology = self._resolve_topology(routing_result, phases)
  2. src/ai_company/engine/coordination/service.py, line 1906-1949 (link)

    Missing COORDINATION_PHASE_COMPLETED log and success phase result

    _phase_dispatch is the only pipeline phase method that does not log COORDINATION_PHASE_COMPLETED or add a CoordinationPhaseResult with success=True on the happy path. Every other phase method (_phase_decompose, _phase_route, _phase_rollup, _phase_update_parent) logs COORDINATION_PHASE_COMPLETED and appends a success result to phases after the call completes.

    _phase_dispatch only logs COORDINATION_PHASE_STARTED and then (on failure) COORDINATION_PHASE_FAILED, leaving the "dispatch" state transition unlogged on success. This violates CLAUDE.md: "All state transitions must log at INFO".

    The root cause is that on success, _phase_dispatch returns the DispatchResult immediately without any post-return bookkeeping. A consistent fix:

    result = await dispatcher.dispatch(...)
    elapsed = time.monotonic() - start
    logger.info(
        COORDINATION_PHASE_COMPLETED,
        phase=phase_name,
        wave_count=len(result.waves),
        duration_seconds=elapsed,
    )
    return result

    Note: the fine-grained sub-phases from dispatch_result.phases are still added in coordinate() via phases.extend(dispatch_result.phases), so adding a top-level "dispatch" success phase is purely additive and does not change any existing behavior.

  3. src/ai_company/engine/coordination/service.py, line 1984-1987 (link)

    Rollup counts outcomes, not subtask IDs — silent over-count on exception mid-wave

    _phase_rollup iterates wave.execution_result.outcomes to collect statuses, then fills any gap against expected_count as BLOCKED. This is correct for subtasks that never reached a wave (unroutable, blocked by prerequisites, fail-fast skipped), but it silently misclassifies subtasks whose wave ran but whose agent produced no outcome (e.g., execute_group raised mid-execution and only partially populated outcomes).

    In that scenario the missing subtask would be counted as BLOCKED (suggesting it was never scheduled) rather than FAILED (suggesting its agent crashed). This can affect derived_parent_status — making the parent appear blocked when it should be failed.

    If ParallelExecutor.execute_group guarantees exactly one outcome per assignment even on partial failure, this is harmless. If not, consider iterating wave.subtask_ids as the source of truth and matching outcomes by task_id:

    outcome_by_task = {o.task_id: o for o in wave.execution_result.outcomes}
    for sid in wave.subtask_ids:
        outcome = outcome_by_task.get(sid)
        if outcome is None:
            statuses.append(TaskStatus.FAILED)  # agent crashed, not merely skipped
        elif outcome.is_success:
            statuses.append(TaskStatus.COMPLETED)
        else:
            statuses.append(TaskStatus.FAILED)
Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 1851-1854

Comment:
**`CoordinationPhaseError` raised with empty `partial_phases`**

When `_resolve_topology` detects inconsistent topologies across routing decisions, it raises `CoordinationPhaseError` without a `partial_phases` argument, which defaults to `()`. By this point in the pipeline, both the "decompose" and "route" phases have already been appended to the `phases` list in `coordinate()` — but because `_resolve_topology` doesn't receive that list, those accumulated results are silently dropped from the exception.

Compare with `_validate_routing` which correctly threads `phases` through and raises with `partial_phases=tuple(phases)`. Any caller catching `CoordinationPhaseError` here and inspecting `.partial_phases` for post-mortem reporting will see an empty tuple instead of the two completed phase results.

The method signature would need `phases` added to thread the accumulated results through:

```python
def _resolve_topology(
    self,
    routing_result: RoutingResult,
    phases: list[CoordinationPhaseResult],  # add this
) -> CoordinationTopology:
    ...
    raise CoordinationPhaseError(
        msg,
        phase="resolve_topology",
        partial_phases=tuple(phases),  # pass accumulated phases
    )
```

And in `coordinate()`:
```python
topology = self._resolve_topology(routing_result, phases)
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 1906-1949

Comment:
**Missing `COORDINATION_PHASE_COMPLETED` log and success phase result**

`_phase_dispatch` is the only pipeline phase method that does not log `COORDINATION_PHASE_COMPLETED` or add a `CoordinationPhaseResult` with `success=True` on the happy path. Every other phase method (`_phase_decompose`, `_phase_route`, `_phase_rollup`, `_phase_update_parent`) logs `COORDINATION_PHASE_COMPLETED` and appends a success result to `phases` after the call completes.

`_phase_dispatch` only logs `COORDINATION_PHASE_STARTED` and then (on failure) `COORDINATION_PHASE_FAILED`, leaving the "dispatch" state transition unlogged on success. This violates CLAUDE.md: *"All state transitions must log at INFO"*.

The root cause is that on success, `_phase_dispatch` returns the `DispatchResult` immediately without any post-return bookkeeping. A consistent fix:

```python
result = await dispatcher.dispatch(...)
elapsed = time.monotonic() - start
logger.info(
    COORDINATION_PHASE_COMPLETED,
    phase=phase_name,
    wave_count=len(result.waves),
    duration_seconds=elapsed,
)
return result
```

Note: the fine-grained sub-phases from `dispatch_result.phases` are still added in `coordinate()` via `phases.extend(dispatch_result.phases)`, so adding a top-level "dispatch" success phase is purely additive and does not change any existing behavior.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 1984-1987

Comment:
**Rollup counts outcomes, not subtask IDs — silent over-count on exception mid-wave**

`_phase_rollup` iterates `wave.execution_result.outcomes` to collect statuses, then fills any gap against `expected_count` as `BLOCKED`. This is correct for subtasks that never reached a wave (unroutable, blocked by prerequisites, fail-fast skipped), but it silently misclassifies subtasks whose *wave ran but whose agent produced no outcome* (e.g., `execute_group` raised mid-execution and only partially populated `outcomes`).

In that scenario the missing subtask would be counted as `BLOCKED` (suggesting it was never scheduled) rather than `FAILED` (suggesting its agent crashed). This can affect `derived_parent_status` — making the parent appear blocked when it should be failed.

If `ParallelExecutor.execute_group` guarantees exactly one outcome per assignment even on partial failure, this is harmless. If not, consider iterating `wave.subtask_ids` as the source of truth and matching outcomes by `task_id`:

```python
outcome_by_task = {o.task_id: o for o in wave.execution_result.outcomes}
for sid in wave.subtask_ids:
    outcome = outcome_by_task.get(sid)
    if outcome is None:
        statuses.append(TaskStatus.FAILED)  # agent crashed, not merely skipped
    elif outcome.is_success:
        statuses.append(TaskStatus.COMPLETED)
    else:
        statuses.append(TaskStatus.FAILED)
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: e581af2

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces a multi-agent coordination pipeline (models, dispatchers, orchestration service) with stricter validation, improved observability event coverage, and extensive unit tests to harden fail-fast and workspace-isolation behavior.

Changes:

  • Added coordination domain package (engine/coordination) including config, models/validators, DAG-to-wave builder, topology dispatchers, and MultiAgentCoordinator orchestrator.
  • Added coordination observability event constants and corresponding event tests.
  • Added comprehensive unit tests for coordinator, dispatchers, group builder, models, config, and new coordination errors; updated docs to describe the pipeline.

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Adds coordination events existence assertions.
tests/unit/engine/test_coordination_service.py New tests for end-to-end coordination service behavior and error paths.
tests/unit/engine/test_coordination_models.py New tests for model validators and computed fields.
tests/unit/engine/test_coordination_group_builder.py New tests for DAG→wave group building and workspace resource claim mapping.
tests/unit/engine/test_coordination_errors.py New tests for coordination error hierarchy.
tests/unit/engine/test_coordination_dispatchers.py New tests for dispatcher selection and per-topology dispatch behavior (including fail-fast).
tests/unit/engine/test_coordination_config.py New tests for coordination config validation and immutability.
src/ai_company/observability/events/coordination.py Adds coordination event name constants.
src/ai_company/engine/errors.py Adds CoordinationError and CoordinationPhaseError.
src/ai_company/engine/coordination/service.py Implements MultiAgentCoordinator orchestration pipeline with phase tracking and logging.
src/ai_company/engine/coordination/models.py Adds frozen Pydantic models + cross-field validation for coordination results.
src/ai_company/engine/coordination/group_builder.py Adds wave construction from dependency DAG + routing decisions.
src/ai_company/engine/coordination/dispatchers.py Adds topology dispatchers, workspace lifecycle helpers, and fail-fast handling.
src/ai_company/engine/coordination/config.py Adds coordination config model (frozen, extra forbidden).
src/ai_company/engine/coordination/init.py Exposes coordination public API.
src/ai_company/engine/init.py Re-exports coordination types/services from engine package.
docs/design/engine.md Documents the multi-agent coordination pipeline and dispatcher behaviors.
CLAUDE.md Updates package structure description and logging/event guidance.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +445 to +450
"""Decentralized dispatcher.

Single wave with all subtasks in parallel. Mandatory workspace
isolation — raises ``CoordinationError`` if workspace service
is unavailable or isolation is disabled.
"""
for wave_idx, group in enumerate(groups):
start = time.monotonic()
phase_name = f"execute_wave_{wave_idx}"
subtask_ids = tuple(NotBlankStr(a.task.id) for a in group.assignments)
True if the wave failed, False if it succeeded.
"""
start = time.monotonic()
subtask_ids = tuple(NotBlankStr(a.task.id) for a in group.assignments)
Comment on lines +211 to +215
except Exception as exc:
elapsed = time.monotonic() - start
phase = CoordinationPhaseResult(
phase=phase_name,
success=False,
Comment on lines +260 to +264
except Exception as exc:
elapsed = time.monotonic() - start
phase = CoordinationPhaseResult(
phase=phase_name,
success=False,
Comment on lines +379 to +385
except Exception as exc:
elapsed = time.monotonic() - start
logger.warning(
COORDINATION_PHASE_FAILED,
phase=phase_name,
error=str(exc),
)
Comment on lines +448 to +454
except Exception as exc:
elapsed = time.monotonic() - start
logger.warning(
COORDINATION_PHASE_FAILED,
phase=phase_name,
error=str(exc),
)
Comment on lines +293 to +300
except Exception as exc:
elapsed = time.monotonic() - start
logger.warning(
COORDINATION_PHASE_FAILED,
phase=phase_name,
wave_index=wave_idx,
error=str(exc),
)
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 13

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/design/engine.md`:
- Around line 762-764: The fenced code block containing the pipeline diagram
"decompose → route → resolve topology → dispatch → rollup → update parent" lacks
a language tag; update that fenced block to include the `text` language
specifier (i.e., change the opening triple backticks to "```text") so the
diagram is treated as plain text and satisfies linting.

In `@src/ai_company/engine/coordination/dispatchers.py`:
- Around line 267-291: The log call for wave completion currently always uses
logger.info with COORDINATION_WAVE_COMPLETED; change it to emit a warning when
the wave did not fully succeed by checking the local success variable (derived
from exec_result.all_succeeded) and calling logger.warning (or logger.error if
you prefer) with the same structured fields (wave_index=wave_idx,
succeeded=exec_result.agents_succeeded, failed=exec_result.agents_failed,
duration_seconds=elapsed, and include error or context) while keeping
logger.info for successful waves; apply the same change to the analogous block
around the symbols mentioned at lines 670-691 so failed waves are emitted at
warning level.
- Around line 426-431: Avoid merging workspaces that failed during execution:
update the logic around calls to _merge_workspaces so it only runs for
workspaces that succeeded (or when execute_group() returned no failures) instead
of passing the full workspaces list; filter the workspaces variable to a list of
successful_workspace objects (e.g., by checking a success flag or absence of
errors reported by execute_group()) before calling
_merge_workspaces(workspace_service, workspaces) and only append merge_phase to
all_phases when the merge is attempted; apply the same filtering/gating pattern
to the other call sites you noted (the blocks around lines where
_merge_workspaces is called, e.g., the occurrences near execute_group() results
and the later blocks at the other two locations).
- Around line 768-789: The COORDINATION_TOPOLOGY_RESOLVED event is emitted
prematurely; modify the dispatcher selection (the match on CoordinationTopology
in the dispatcher factory function that returns
SasDispatcher/CentralizedDispatcher/DecentralizedDispatcher/ContextDependentDispatcher)
so that COORDINATION_TOPOLOGY_RESOLVED is emitted only after a successful match
and not in the error branch (AUTO/unresolved). Move the logger call out of the
top of the function and call logger.info(COORDINATION_TOPOLOGY_RESOLVED,
topology=topology.value) after the matched dispatcher is created (or immediately
before returning the dispatcher) and do not emit it in the default (_) case
where you currently build the msg and
logger.warning(COORDINATION_PHASE_FAILED,...).
- Around line 462-467: Before raising CoordinationError in the decentralized
precondition guard, log the failure with context: emit a logger.warning or
logger.error that includes the values of workspace_service (e.g., whether it's
None) and config.enable_workspace_isolation so telemetry captures the cause,
then raise CoordinationError as before; use the existing module/class logger (or
create one if missing) and include the same descriptive message currently
assigned to msg.

In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 42-138: The build_execution_waves function is doing too many
things; extract the per-wave assignment materialization into a new helper (e.g.
_materialize_wave_assignments or _build_wave_assignments) that accepts
(wave_idx, subtask_ids, routing_lookup, task_lookup, workspace_lookup, config)
and returns the tuple/list of AgentAssignment plus any per-wave logging
metadata; keep DAG reconstruction and lookup creation
(DependencyGraph(plan.subtasks), parallel_groups, routing_lookup,
workspace_lookup, task_lookup) in build_execution_waves, call the new helper for
each parallel_groups entry, and move the logic that checks for missing
decisions, builds resource_claims, appends AgentAssignment, and produces
group_id/logging into that helper so build_execution_waves only assembles
ParallelExecutionGroup from the helper result and returns the groups. Ensure
tests and existing logger calls (COORDINATION_WAVE_BUILT) are preserved by
invoking logging from the new helper where appropriate.
- Around line 73-109: The code currently indexes task_lookup[subtask_id]
directly which raises a bare KeyError for routed subtasks missing from
decomposition_result.created_tasks; instead, in the loop in group_builder.py
before using task_lookup and creating AgentAssignment, check membership (if
subtask_id not in task_lookup) and log a WARNING or ERROR via logger including
wave_idx and subtask_id (use COORDINATION_WAVE_BUILT or a similar contextual
log) and then raise an explicit coordination failure (e.g., a CoordinationError
/ CoordinationException or a clear RuntimeError) so callers get a documented
failure path rather than a raw KeyError; update references to task_lookup,
decomposition_result.created_tasks, routing_lookup, wave_idx, and
AgentAssignment accordingly.

In `@src/ai_company/engine/coordination/service.py`:
- Around line 211-225: Before re-raising the exception in the decomposition
except block, log a WARNING with contextual data: call the module/class logger
(e.g., logger.warning or self.logger.warning) including phase_name, elapsed
(time.monotonic() - start), the exception exc, and current partial phases
(phases) so the failure is recorded before raising CoordinationPhaseError; keep
the existing creation of CoordinationPhaseResult and raising of
CoordinationPhaseError unchanged but add the warning log immediately before the
raise.
- Around line 260-274: Insert a WARNING-level log right before raising
CoordinationPhaseError in the except block that handles routing failures: log a
message containing phase_name and the exception details and include the current
partial_phases (the tuple(phases)) so the failure is recorded before raising
CoordinationPhaseError; place the log call immediately before the raise and use
the module/class logger's warning method (include exception info for traceback)
to mirror the behavior used in _phase_decompose.

In `@tests/unit/engine/test_coordination_dispatchers.py`:
- Around line 89-111: The helper _make_routing currently hard-codes
RoutingDecision.topology=CoordinationTopology.CENTRALIZED; change _make_routing
to accept a topology parameter (e.g., topology: CoordinationTopology =
CoordinationTopology.CENTRALIZED) and use that value when constructing each
RoutingDecision, then update the test call sites that expect non-centralized
behavior to pass the appropriate CoordinationTopology (e.g., DECENTRALIZED, SAS,
or CONTEXT_DEPENDENT) so routing metadata matches the dispatcher under test;
adjust the return type still as RoutingResult with decisions tuple and keep
parent_task_id default as before.

In `@tests/unit/engine/test_coordination_models.py`:
- Around line 267-310: Add a cross-field validator to the CoordinationResult
Pydantic model to ensure any nested parent_task_id fields belong to the same
aggregate: check decomposition_result.plan.parent_task_id,
routing_result.parent_task_id, and status_rollup.parent_task_id (if those nested
objects are present) and raise a validation error if any of them do not equal
CoordinationResult.parent_task_id; implement this as a root validator on
CoordinationResult in src/ai_company/engine/coordination/models.py and add a
unit test that supplies a mismatched parent_task_id for one of those nested
fields to assert the model rejects it.

In `@tests/unit/engine/test_coordination_service.py`:
- Around line 382-428: The test test_partial_execution_fail_fast_off should not
run a dependent subtask after its prerequisite fails: either make the downstream
subtask independent or assert it is skipped/blocked; update the decomposition or
assertions accordingly. Specifically, in the test body that constructs decomp
via _make_decomposition and the subtasks sub_a/sub_b, either remove the
dependency tuple ("sub-a",) from sub_b so sub_b is independent (then keep the
current expect-two-waves assertions), or keep the dependency and change the
expectations on coordinator.coordinate(ctx) to assert that the second wave was
not executed/was skipped/blocked (e.g., len(result.waves) reflects only executed
waves and status_rollup.failed/completed counts reflect the skipped dependent
wave). Locate the constructs _make_subtask("sub-b", dependencies=(...)),
_make_decomposition(...), the coordinator created via _make_coordinator(...),
and the CoordinationConfig(fail_fast=False) to apply the chosen fix.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: a15572d6-b798-4487-afc0-d11dbdd4871a

📥 Commits

Reviewing files that changed from the base of the PR and between fa8bf1d and 2cc14d4.

📒 Files selected for processing (18)
  • CLAUDE.md
  • docs/design/engine.md
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/coordination/__init__.py
  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/models.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/errors.py
  • src/ai_company/observability/events/coordination.py
  • tests/unit/engine/test_coordination_config.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_errors.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_models.py
  • tests/unit/engine/test_coordination_service.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
  • GitHub Check: Analyze (python)
🧰 Additional context used
📓 Path-based instructions (5)
docs/design/*.md

📄 CodeRabbit inference engine (CLAUDE.md)

When approved deviations occur, update the relevant docs/design/ page to reflect the new reality

Files:

  • docs/design/engine.md
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow
Prefer @pytest.mark.parametrize for testing similar cases

Files:

  • tests/unit/engine/test_coordination_config.py
  • tests/unit/engine/test_coordination_models.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_service.py
  • tests/unit/engine/test_coordination_errors.py
  • tests/unit/observability/test_events.py
{src,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like example-provider, example-large-001, test-provider, test-small-001

Files:

  • tests/unit/engine/test_coordination_config.py
  • tests/unit/engine/test_coordination_models.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • tests/unit/engine/test_coordination_group_builder.py
  • src/ai_company/engine/coordination/__init__.py
  • src/ai_company/observability/events/coordination.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_service.py
  • tests/unit/engine/test_coordination_errors.py
  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/models.py
  • tests/unit/observability/test_events.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/__init__.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement
For dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state using model_copy(update=...)
Use Pydantic v2 with BaseModel, model_validator, computed_field, ConfigDict
Use @computed_field for derived values instead of storing redundant fields (e.g. TokenUsage.total_tokens)
Use NotBlankStr from core.types for all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare create_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Use only logger as the variable name (not _logger, not log)
Use domain-specific event name constants from ai_company.observability.events modules (e.g...

Files:

  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/__init__.py
  • src/ai_company/observability/events/coordination.py
  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/models.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/__init__.py
src/ai_company/{providers,engine}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

RetryExhaustedError signals all retries failed — the engine layer catches this to trigger fallback chains

Files:

  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/__init__.py
  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/models.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/__init__.py
🧠 Learnings (9)
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Every module with business logic must have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use domain-specific event name constants from `ai_company.observability.events` modules (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly and never use string literals for event names

Applied to files:

  • CLAUDE.md
  • src/ai_company/observability/events/coordination.py
  • tests/unit/observability/test_events.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use structured logging with `logger.info(EVENT, key=value)` — never use `logger.info("msg %s", val)` formatting

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use only `logger` as the variable name (not `_logger`, not `log`)

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : All error paths must log at WARNING or ERROR with context before raising an exception

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : All state transitions must log at INFO level

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state using `model_copy(update=...)`

Applied to files:

  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/models.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/ai_company/{providers,engine}/**/*.py : `RetryExhaustedError` signals all retries failed — the engine layer catches this to trigger fallback chains

Applied to files:

  • src/ai_company/engine/errors.py
🧬 Code graph analysis (5)
tests/unit/engine/test_coordination_config.py (2)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
tests/unit/engine/test_coordination_models.py (1)
  • test_frozen (69-76)
tests/unit/engine/test_coordination_models.py (6)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (358-369)
  • TaskStatus (198-224)
tests/unit/engine/conftest.py (3)
  • engine (432-443)
  • make_assignment_agent (371-393)
  • make_assignment_task (396-407)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/coordination/models.py (5)
  • CoordinationContext (30-61)
  • CoordinationPhaseResult (64-96)
  • CoordinationResult (128-190)
  • CoordinationWave (99-125)
  • is_success (188-190)
src/ai_company/engine/decomposition/models.py (2)
  • SubtaskStatusRollup (179-259)
  • derived_parent_status (232-259)
src/ai_company/engine/parallel_models.py (4)
  • AgentOutcome (143-193)
  • ParallelExecutionResult (196-248)
  • task_id (87-89)
  • agent_id (79-81)
tests/unit/engine/test_coordination_group_builder.py (5)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (358-369)
  • TaskStructure (346-355)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/coordination/group_builder.py (1)
  • build_execution_waves (42-138)
src/ai_company/engine/decomposition/models.py (3)
  • DecompositionPlan (66-122)
  • DecompositionResult (125-176)
  • SubtaskDefinition (22-63)
src/ai_company/engine/routing/models.py (2)
  • RoutingCandidate (23-45)
  • RoutingResult (91-157)
src/ai_company/engine/coordination/models.py (7)
src/ai_company/core/agent.py (1)
  • AgentIdentity (266-342)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (358-369)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/decomposition/models.py (2)
  • DecompositionContext (262-287)
  • DecompositionResult (125-176)
src/ai_company/engine/parallel_models.py (1)
  • ParallelExecutionResult (196-248)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/workspace/models.py (1)
  • WorkspaceGroupResult (144-181)
src/ai_company/engine/errors.py (1)
src/ai_company/engine/coordination/models.py (1)
  • CoordinationPhaseResult (64-96)
🪛 LanguageTool
CLAUDE.md

[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...

(EG_NO_COMMA)

🪛 markdownlint-cli2 (0.21.0)
docs/design/engine.md

[warning] 762-762: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (15)
CLAUDE.md (2)

95-95: LGTM!

The engine description accurately reflects the new coordination package with TopologyDispatcher protocol, dispatcher implementations, wave execution, and workspace lifecycle integration.


130-130: LGTM!

The event naming guidance is updated consistently with other domain events, including the new COORDINATION_STARTED from events.coordination.

src/ai_company/engine/coordination/config.py (1)

1-38: LGTM!

The CoordinationConfig model follows project conventions:

  • Frozen Pydantic model for configuration immutability
  • Uses NotBlankStr for the base_branch identifier field
  • Sensible defaults and appropriate field constraints (ge=1 for concurrency)
  • Google-style docstring with attribute documentation
src/ai_company/engine/coordination/service.py (2)

50-91: LGTM!

The MultiAgentCoordinator class follows the project's dependency injection pattern with clear separation of concerns. The __slots__ declaration is appropriate for memory efficiency, and the docstring comprehensively documents the purpose and constructor parameters.


92-195: LGTM!

The coordinate() method implements a well-structured pipeline with:

  • Proper phase timing using time.monotonic()
  • Comprehensive event logging at start and completion
  • Correct exception handling that re-raises CoordinationPhaseError and logs unexpected exceptions
  • Clean aggregation of phase results and cost calculation
src/ai_company/observability/events/coordination.py (1)

1-17: LGTM!

Event constants follow the established naming convention (domain.subject.qualifier) and are properly typed with Final[str]. The coverage is comprehensive for the coordination lifecycle: start/complete/fail, phases, waves, topology resolution, and cleanup.

tests/unit/observability/test_events.py (2)

189-189: LGTM!

Correctly adds "coordination" to the expected domain modules set for discovery validation.


634-663: LGTM!

The test_coordination_events_exist method follows the established pattern for event existence tests, verifying all 13 coordination event constants with their expected string values. The inline imports and assertions are consistent with other similar tests in this file.

src/ai_company/engine/coordination/models.py (4)

30-61: LGTM!

CoordinationContext is well-designed with:

  • Frozen configuration for immutability
  • Sensible defaults for decomposition_context and config
  • Proper validation ensuring at least one agent is available

64-96: LGTM!

CoordinationPhaseResult includes excellent defensive validation in _validate_success_error_consistency ensuring that:

  • Successful phases cannot have errors
  • Failed phases must have an error description

This prevents inconsistent state representation.


99-125: LGTM!

CoordinationWave correctly models an execution wave with proper constraints (wave_index >= 0, non-empty subtask_ids) and optional execution_result for waves that haven't executed yet.


128-190: LGTM!

CoordinationResult provides a comprehensive aggregation of the coordination run:

  • Enforces at least one phase via min_length=1
  • Uses @computed_field for is_success as per guidelines
  • Properly types optional fields (decomposition_result, routing_result, status_rollup, workspace_merge)
src/ai_company/engine/errors.py (2)

3-6: LGTM!

Correctly uses TYPE_CHECKING guard to import CoordinationPhaseResult for type annotations only, avoiding potential circular import issues between errors.py and coordination/models.py.


128-152: LGTM!

The coordination error hierarchy is well-designed:

  • CoordinationError provides a clear base for coordination failures
  • CoordinationPhaseError carries diagnostic context (phase, partial_phases) enabling partial-result inspection
  • Keyword-only arguments with sensible defaults follow best practices
  • Google-style docstring properly documents the attributes
src/ai_company/engine/__init__.py (1)

42-57: Public coordination exports stay consistent.

The new coordination imports are mirrored in __all__, so the package-level API remains explicit and avoids half-exported symbols.

Also applies to: 73-75, 210-225, 237-258, 282-335

Comment on lines +89 to +111
def _make_routing(
subtask_agent_pairs: list[tuple[str, str]],
*,
parent_task_id: str = "parent-1",
) -> RoutingResult:
decisions: list[RoutingDecision] = []
for subtask_id, agent_name in subtask_agent_pairs:
agent = make_assignment_agent(agent_name)
decisions.append(
RoutingDecision(
subtask_id=subtask_id,
selected_candidate=RoutingCandidate(
agent_identity=agent,
score=0.9,
reason="Good match",
),
topology=CoordinationTopology.CENTRALIZED,
)
)
return RoutingResult(
parent_task_id=parent_task_id,
decisions=tuple(decisions),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the shared routing fixture topology-aware.

_make_routing() hard-codes RoutingDecision.topology=CoordinationTopology.CENTRALIZED, so the SAS, decentralized, and context-dependent tests all run with inconsistent routing metadata. That weakens those cases and can hide mismatches between the selected dispatcher and the routing result.

Suggested change
 def _make_routing(
     subtask_agent_pairs: list[tuple[str, str]],
     *,
     parent_task_id: str = "parent-1",
+    topology: CoordinationTopology = CoordinationTopology.CENTRALIZED,
 ) -> RoutingResult:
@@
                 selected_candidate=RoutingCandidate(
                     agent_identity=agent,
                     score=0.9,
                     reason="Good match",
                 ),
-                topology=CoordinationTopology.CENTRALIZED,
+                topology=topology,
             )
         )

Then pass the dispatcher-specific topology at each non-centralized call site.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_dispatchers.py` around lines 89 - 111,
The helper _make_routing currently hard-codes
RoutingDecision.topology=CoordinationTopology.CENTRALIZED; change _make_routing
to accept a topology parameter (e.g., topology: CoordinationTopology =
CoordinationTopology.CENTRALIZED) and use that value when constructing each
RoutingDecision, then update the test call sites that expect non-centralized
behavior to pass the appropriate CoordinationTopology (e.g., DECENTRALIZED, SAS,
or CONTEXT_DEPENDENT) so routing metadata matches the dispatcher under test;
adjust the return type still as RoutingResult with decisions tuple and keep
parent_task_id default as before.

Comment on lines +267 to +310
def test_optional_fields_default_none(self) -> None:
"""Optional fields default to None."""
result = CoordinationResult(
parent_task_id="task-1",
topology=CoordinationTopology.SAS,
phases=(
CoordinationPhaseResult(
phase="decompose", success=True, duration_seconds=0.0
),
),
total_duration_seconds=0.0,
)
assert result.decomposition_result is None
assert result.routing_result is None
assert result.status_rollup is None
assert result.workspace_merge is None
assert result.waves == ()
assert result.total_cost_usd == 0.0

@pytest.mark.unit
def test_with_status_rollup(self) -> None:
"""Result can carry a status rollup."""
rollup = SubtaskStatusRollup(
parent_task_id="task-1",
total=2,
completed=2,
failed=0,
in_progress=0,
blocked=0,
cancelled=0,
)
result = CoordinationResult(
parent_task_id="task-1",
topology=CoordinationTopology.CENTRALIZED,
phases=(
CoordinationPhaseResult(
phase="rollup", success=True, duration_seconds=0.01
),
),
status_rollup=rollup,
total_duration_seconds=5.0,
)
assert result.status_rollup is not None
assert result.status_rollup.derived_parent_status == TaskStatus.COMPLETED
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add a mismatched-parent regression for CoordinationResult.

In src/ai_company/engine/coordination/models.py, CoordinationResult currently has no cross-field validator tying decomposition_result.plan.parent_task_id, routing_result.parent_task_id, or status_rollup.parent_task_id back to parent_task_id. That means another task’s routing or rollup can be attached to the wrong aggregate result without being rejected. Please cover that case here and validate it in the model. As per coding guidelines, "Validate at system boundaries (user input, external APIs, config files)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_models.py` around lines 267 - 310, Add a
cross-field validator to the CoordinationResult Pydantic model to ensure any
nested parent_task_id fields belong to the same aggregate: check
decomposition_result.plan.parent_task_id, routing_result.parent_task_id, and
status_rollup.parent_task_id (if those nested objects are present) and raise a
validation error if any of them do not equal CoordinationResult.parent_task_id;
implement this as a root validator on CoordinationResult in
src/ai_company/engine/coordination/models.py and add a unit test that supplies a
mismatched parent_task_id for one of those nested fields to assert the model
rejects it.

Connect existing engine subsystems (DecompositionService, TaskRoutingService,
ParallelExecutor, WorkspaceIsolationService, TaskEngine) into an end-to-end
multi-agent coordination pipeline with topology-driven dispatch.

New engine/coordination/ package:
- CoordinationConfig, CoordinationContext, CoordinationResult models
- TopologyDispatcher protocol with 4 implementations:
  SAS (sequential), Centralized (parallel + shared workspace),
  Decentralized (all-parallel + mandatory workspace),
  ContextDependent (conditional per-wave isolation)
- MultiAgentCoordinator service: decompose → route → resolve topology →
  dispatch (workspace setup → execute waves → merge) → rollup → update parent
- DAG-based wave grouping via build_execution_waves()
- CoordinationError/CoordinationPhaseError with partial phase tracking
- 11 structured observability event constants
- 68 tests across 6 test files (config, models, errors, group_builder,
  dispatchers, service)
…fast

Pre-reviewed by 8 agents, 32 findings addressed:

- Add model validators for CoordinationPhaseResult (success/error
  consistency), CoordinationWave (non-empty subtask_ids), and
  CoordinationResult (min 1 phase)
- Fix ContextDependentDispatcher fail_fast bug (waves continued after
  failure regardless of config)
- Add mandatory workspace validation to DecentralizedDispatcher
- Add missing WARNING/ERROR logging on 7+ error paths across
  dispatchers and service
- Fix misleading CLEANUP_COMPLETED event for failures → CLEANUP_FAILED
- Separate WAVE_BUILT event from WAVE_STARTED for planning vs execution
- Tighten types: partial_phases uses CoordinationPhaseResult, rollup
  uses SubtaskStatusRollup
- Add coordination pipeline section to docs/design/engine.md
- Update CLAUDE.md package structure and logging event docs
- Add comprehensive test coverage: fail_fast, AUTO fallback, update
  parent failures, rollup errors, workspace setup failures, cost
  aggregation, model validators, event constants
…pilot

- Add input validators (non-empty waves, subtask ID presence, topology resolution)
- Add structured logging across dispatchers, service, and group_builder
- Implement fail-fast wave execution and merge gating (skip merge on failure)
- Guard against missing created_task in group_builder with CoordinationError
- Extract ~340 lines of shared test helpers into conftest.py
- Add 7 new test cases (fail-fast, merge gating, workspace failure, models)
- Fix TC001 lint compliance for type-only imports
- Update engine design doc for merge gating behavior
Copilot AI review requested due to automatic review settings March 13, 2026 06:14
@Aureliolo Aureliolo force-pushed the feat/runtime-wiring branch from 0602c90 to b5f359c Compare March 13, 2026 06:14
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 06:16 — with GitHub Actions Inactive
- Add explicit guard for workspace_service narrowing after needs_isolation check
- Add explicit TopologyDispatcher type annotation in select_dispatcher
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

♻️ Duplicate comments (4)
src/ai_company/engine/coordination/models.py (1)

146-190: ⚠️ Potential issue | 🟠 Major

Reject nested results from a different parent task.

CoordinationResult only validates topology today, so a decomposition_result, routing_result, or status_rollup from another parent task is accepted and then reported under this aggregate. That can corrupt rollups and parent updates for the wrong task. The new acceptance test in tests/unit/engine/test_coordination_models.py should flip once this validator exists.

Suggested fix
     `@model_validator`(mode="after")
     def _validate_topology_resolved(self) -> Self:
         """Ensure topology is resolved (not AUTO) in final result."""
         if self.topology == CoordinationTopology.AUTO:
             msg = "CoordinationResult topology must be resolved, not AUTO"
             raise ValueError(msg)
         return self
+
+    `@model_validator`(mode="after")
+    def _validate_parent_task_ids(self) -> Self:
+        """Ensure nested results belong to the same parent task."""
+        if (
+            self.decomposition_result is not None
+            and self.decomposition_result.plan.parent_task_id != self.parent_task_id
+        ):
+            msg = "decomposition_result parent_task_id must match parent_task_id"
+            raise ValueError(msg)
+        if (
+            self.routing_result is not None
+            and self.routing_result.parent_task_id != self.parent_task_id
+        ):
+            msg = "routing_result parent_task_id must match parent_task_id"
+            raise ValueError(msg)
+        if (
+            self.status_rollup is not None
+            and self.status_rollup.parent_task_id != self.parent_task_id
+        ):
+            msg = "status_rollup parent_task_id must match parent_task_id"
+            raise ValueError(msg)
+        return self

As per coding guidelines, "Validate: at system boundaries (user input, external APIs, config files)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/models.py` around lines 146 - 190, Add
validation to CoordinationResult to reject nested results that reference a
different parent task: in the existing model_validator (or a new after-mode
validator) for CoordinationResult (method _validate_topology_resolved), check
any present decomposition_result.parent_task_id, routing_result.parent_task_id,
and status_rollup.parent_task_id (and workspace_merge.parent_task_id if
applicable) and raise ValueError if any of them do not equal
self.parent_task_id; keep the existing topology AUTO check and return self when
all validations pass.
tests/unit/engine/test_coordination_dispatchers.py (1)

129-134: ⚠️ Potential issue | 🟡 Minor

Pass the dispatcher topology into make_routing().

These SAS/decentralized/context-dependent cases still rely on make_routing()'s default CENTRALIZED metadata. That weakens coverage and can hide mismatches between the selected dispatcher and the routing result. Apply the same fix to the other non-centralized call sites in this file.

Representative fix
         routing = make_routing(
             [
                 ("sub-a", "alice"),
                 ("sub-b", "alice"),
-            ]
+            ],
+            topology=CoordinationTopology.SAS,
         )

Also applies to: 394-399, 489-494

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_dispatchers.py` around lines 129 - 134,
The tests call make_routing(...) without passing the dispatcher topology, so
they default to CENTRALIZED and miss coverage for non-centralized cases; update
each call (e.g., the routing assignment in test_coordination_dispatchers.py) to
pass the appropriate topology argument (use the dispatcher topology constant
instead of default CENTRALIZED) so make_routing(subscriptions,
topology=YOUR_TOPOLOGY) matches the dispatcher under test; apply the same change
to the other non-centralized call sites in this file (the other make_routing
invocations that should not use CENTRALIZED).
tests/unit/engine/test_coordination_service.py (1)

250-296: ⚠️ Potential issue | 🟠 Major

Don't execute a dependent wave after its prerequisite fails.

sub_b depends on sub-a, but this test still expects wave 1 to run after wave 0 failed. That turns dependency edges into ordering-only hints and can produce invalid downstream work. If the goal is to cover non-fail_fast continuation, make the subtasks independent; otherwise assert the dependent wave is skipped/blocked.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_service.py` around lines 250 - 296, The
test currently defines sub_b with a dependency on sub_a (make_subtask("sub-b",
dependencies=("sub-a",))) but then expects wave 1 to run after wave 0 fails;
either make the tasks independent or change assertions. Fix by making sub_b
independent (remove the dependencies argument) and update the decomposition to
use an independent structure (e.g., TaskStructure.PARALLEL in the
make_decomposition call) so the test validates non-fail_fast continuation; keep
the rest of the setup (routing, exec_results,
CoordinationConfig(fail_fast=False), and assertions) unchanged.
src/ai_company/engine/coordination/group_builder.py (1)

96-102: ⚠️ Potential issue | 🟠 Major

Log the routing/decomposition mismatch before raising.

This still raises CoordinationError without a WARNING, so stale routing output is invisible in coordination telemetry. Log wave_index and subtask_id before raising.

Suggested fix
 from ai_company.observability.events.coordination import (
+    COORDINATION_PHASE_FAILED,
     COORDINATION_WAVE_BUILT,
 )
@@
             if task is None:
                 msg = (
                     f"Subtask {subtask_id!r} has a routing decision "
                     "but no corresponding created task in decomposition"
                 )
+                logger.warning(
+                    COORDINATION_PHASE_FAILED,
+                    phase="build_execution_waves",
+                    wave_index=wave_idx,
+                    subtask_id=subtask_id,
+                    error=msg,
+                )
                 raise CoordinationError(msg)

As per coding guidelines, "All error paths must log at WARNING or ERROR with context before raising" and "Validate: at system boundaries (user input, external APIs, config files)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/group_builder.py` around lines 96 - 102,
The code in group_builder.py raises CoordinationError when a routing decision
references a missing task but does not log context; before raising in the block
that checks task_lookup.get(subtask_id) == None, add a warning log (e.g., using
the module logger or the existing coordination logger) that includes wave_index
and subtask_id and any other relevant context (like the routing entry or
decomposition id) so stale routing output is visible in telemetry, then raise
CoordinationError as before; update the logging call adjacent to the check in
the function/method that builds groups (the block that constructs msg and raises
CoordinationError) to emit logger.warning(...) with wave_index and subtask_id.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/design/engine.md`:
- Around line 762-764: The pipeline diagram currently reads "decompose → route →
resolve topology → dispatch → rollup → update parent" but the implementation and
numbered list treat validation as its own phase; update the diagram to insert
the validate phase between "resolve topology" and "dispatch" (i.e., "decompose →
route → resolve topology → validate → dispatch → rollup → update parent") so it
matches the implementation and clarifies where unroutable-only runs fail.

In `@src/ai_company/engine/coordination/dispatchers.py`:
- Around line 396-410: Validate that every routed subtask ID in routing_result
exists in decomposition_result.created_tasks before calling _setup_workspaces or
performing any workspace side effects; implement a cheap preflight helper (e.g.,
validate_routed_task_ids(routing_result, decomposition_result)) and call it at
the start of the dispatcher flow where workspace_service and
config.enable_workspace_isolation are checked, returning a failed DispatchResult
if validation fails. Ensure the helper references
decomposition_result.created_tasks and the routed IDs from routing_result so
build_execution_waves and _setup_workspaces are only invoked when IDs are
consistent; apply the same change to the other dispatcher branch that also calls
_setup_workspaces/build_execution_waves.

In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 84-94: The loop in parallel_groups/building waves treats a missing
routing decision (routing_lookup.get(subtask_id) is None) as a simple continue,
which lets routed descendants run later and breaks dependency semantics; update
the handling so that when decision is None you (1) locate all downstream
descendants of subtask_id using the same dependency/graph structure used by
parallel_groups (the subtask dependency map / graph used to compute waves) and
mark those descendants as blocked/unroutable (e.g., set their routing_lookup
entry or a blocked set) so they cannot be scheduled in later waves, or (2) if
your policy is to fail fast, raise a coordination error when a skipped subtask
has any dependents; also update the COORDINATION_WAVE_BUILT logging to reflect
that the subtask blocked its descendants (include wave_idx, subtask_id,
skipped=True, blocked_descendants count/list) so the behavior is visible during
debugging.

In `@src/ai_company/engine/coordination/service.py`:
- Around line 139-148: Wrap the dispatcher selection and dispatch call into a
new helper _phase_dispatch(...) that measures duration, calls
select_dispatcher(topology) and await dispatcher.dispatch(...), and on any
exception appends a CoordinationPhaseResult (or appropriate phase result)
marking the dispatch phase as failed with partial_phases included, then
re-raises a CoordinationPhaseError containing the partial phases; replace the
inline block around select_dispatcher and dispatcher.dispatch in both places
(the current dispatch block and the similar block at lines 176-184) with calls
to _phase_dispatch so callers always receive a dispatch-phase result even on
errors and the function stays under 50 lines. Ensure the helper accepts the same
inputs used now (decomp_result, routing_result,
parallel_executor/self._parallel_executor,
workspace_service/self._workspace_service, context.config) and returns the
dispatch result or raises CoordinationPhaseError after appending the
failed-phase result to phases.
- Line 151: The rollup currently only iterates dispatch_result.waves via
_phase_rollup(), causing unrouted or skipped subtasks to disappear and making
rollup_status() undercount work; update the call sites (around where rollup =
self._phase_rollup(context, dispatch_result, phases) and the similar block at
lines ~361-389) to pass the full expected subtask ID set (from
build_execution_waves() or dispatch_result.expected_subtask_ids) into
_phase_rollup(), then modify _phase_rollup() to accept that ID set and, before
computing rollup_status(), insert any missing IDs into the statuses map with an
appropriate terminal state (BLOCKED or FAILED) so the rollup includes those
subtasks in totals; ensure you reference CoordinationWave entries as before but
compute rollup against the union of emitted wave results plus the filled-in
missing IDs so parents aren't marked complete prematurely.

In `@tests/unit/engine/test_coordination_service.py`:
- Around line 514-526: The test requires that MemoryError not be normalized into
CoordinationPhaseError; update MultiAgentCoordinator.coordinate (the phase
wrapper that currently catches exceptions and raises CoordinationPhaseError) to
special-case catastrophic exceptions by re-raising MemoryError (and similar
process-level exceptions if desired) before wrapping other exceptions.
Concretely, in the exception handling block (the try/except that currently
converts all errors into CoordinationPhaseError), add an early check like: if
isinstance(exc, MemoryError): raise, then proceed to wrap non-catastrophic
exceptions into CoordinationPhaseError so MemoryError propagates unchanged.

---

Duplicate comments:
In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 96-102: The code in group_builder.py raises CoordinationError when
a routing decision references a missing task but does not log context; before
raising in the block that checks task_lookup.get(subtask_id) == None, add a
warning log (e.g., using the module logger or the existing coordination logger)
that includes wave_index and subtask_id and any other relevant context (like the
routing entry or decomposition id) so stale routing output is visible in
telemetry, then raise CoordinationError as before; update the logging call
adjacent to the check in the function/method that builds groups (the block that
constructs msg and raises CoordinationError) to emit logger.warning(...) with
wave_index and subtask_id.

In `@src/ai_company/engine/coordination/models.py`:
- Around line 146-190: Add validation to CoordinationResult to reject nested
results that reference a different parent task: in the existing model_validator
(or a new after-mode validator) for CoordinationResult (method
_validate_topology_resolved), check any present
decomposition_result.parent_task_id, routing_result.parent_task_id, and
status_rollup.parent_task_id (and workspace_merge.parent_task_id if applicable)
and raise ValueError if any of them do not equal self.parent_task_id; keep the
existing topology AUTO check and return self when all validations pass.

In `@tests/unit/engine/test_coordination_dispatchers.py`:
- Around line 129-134: The tests call make_routing(...) without passing the
dispatcher topology, so they default to CENTRALIZED and miss coverage for
non-centralized cases; update each call (e.g., the routing assignment in
test_coordination_dispatchers.py) to pass the appropriate topology argument (use
the dispatcher topology constant instead of default CENTRALIZED) so
make_routing(subscriptions, topology=YOUR_TOPOLOGY) matches the dispatcher under
test; apply the same change to the other non-centralized call sites in this file
(the other make_routing invocations that should not use CENTRALIZED).

In `@tests/unit/engine/test_coordination_service.py`:
- Around line 250-296: The test currently defines sub_b with a dependency on
sub_a (make_subtask("sub-b", dependencies=("sub-a",))) but then expects wave 1
to run after wave 0 fails; either make the tasks independent or change
assertions. Fix by making sub_b independent (remove the dependencies argument)
and update the decomposition to use an independent structure (e.g.,
TaskStructure.PARALLEL in the make_decomposition call) so the test validates
non-fail_fast continuation; keep the rest of the setup (routing, exec_results,
CoordinationConfig(fail_fast=False), and assertions) unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1e0871ab-a201-43f1-8bea-9e9da98ebed6

📥 Commits

Reviewing files that changed from the base of the PR and between 2cc14d4 and 0602c90.

📒 Files selected for processing (11)
  • docs/design/engine.md
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/models.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/errors.py
  • tests/unit/engine/conftest.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_models.py
  • tests/unit/engine/test_coordination_service.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Greptile Review
  • GitHub Check: Analyze (python)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649 native lazy annotations
Use except A, B: syntax (no parentheses) — ruff enforces PEP 758 except syntax on Python 3.14
Type hints: all public functions, mypy strict mode
Docstrings: Google style, required on public classes/functions (enforced by ruff D rules)
Create new objects, never mutate existing ones. For non-Pydantic internal collections (registries, BaseTool), use copy.deepcopy() at construction + MappingProxyType wrapping for read-only enforcement.
For dict/list fields in frozen Pydantic models, rely on frozen=True for field reassignment prevention and copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence).
Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Models: Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict). Use @computed_field for derived values instead of storing + validating redundant fields. Use NotBlankStr for all identifier/name fields — including optional and tuple variants — instead of manual whitespace validators.
Async concurrency: prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare create_task. Existing code is being migrated incrementally.
Line length: 88 characters (ruff)
Functions: < 50 lines, files < 800 lines
Errors: handle explicitly, never silently swallow
Validate: at system boundaries (user input, external APIs, config files)

Files:

  • src/ai_company/engine/coordination/service.py
  • tests/unit/engine/test_coordination_service.py
  • src/ai_company/engine/coordination/group_builder.py
  • tests/unit/engine/conftest.py
  • src/ai_company/engine/coordination/dispatchers.py
  • tests/unit/engine/test_coordination_group_builder.py
  • src/ai_company/engine/errors.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_models.py
  • src/ai_company/engine/coordination/models.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Variable name: always logger (not _logger, not log)
Event names: always use constants from the domain-specific module under ai_company.observability.events (e.g. PROVIDER_CALL_START from events.provider). Import directly: from ai_company.observability.events.<domain> import EVENT_CONSTANT
Structured kwargs in logging: always logger.info(EVENT, key=value) — never logger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO
DEBUG for object creation, internal flow, entry/exit of key functions
Vendor-agnostic everywhere: NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/coordination/models.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Tests: markers @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, @pytest.mark.slow
Coverage: 80% minimum (enforced in CI)
Async: asyncio_mode = "auto" — no manual @pytest.mark.asyncio needed
Timeout: 30 seconds per test
Parallelism: pytest-xdist via -n auto — ALWAYS include -n auto when running pytest, never run tests sequentially
Prefer @pytest.mark.parametrize for testing similar cases
Vendor-agnostic everywhere: Tests must use test-provider, test-small-001, etc. (Vendor names may only appear in operations design page, .claude/ skill/agent files, or third-party import paths)

Files:

  • tests/unit/engine/test_coordination_service.py
  • tests/unit/engine/conftest.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_models.py
🧠 Learnings (7)
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/providers/**/*.py : Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. Non-retryable errors raise immediately without retry. `RetryExhaustedError` signals that all retries failed — the engine layer catches this to trigger fallback chains.

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/errors.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Errors: handle explicitly, never silently swallow

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Async concurrency: prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`. Existing code is being migrated incrementally.

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: When review agents find valid issues (including pre-existing issues in surrounding code, suggestions, and findings adjacent to the PR's changes), fix them all. No deferring, no 'out of scope' skipping.

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.

Applied to files:

  • src/ai_company/engine/coordination/models.py
🧬 Code graph analysis (4)
src/ai_company/engine/coordination/service.py (6)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (358-369)
  • TaskStatus (198-224)
src/ai_company/engine/coordination/dispatchers.py (7)
  • DispatchResult (49-76)
  • select_dispatcher (791-826)
  • dispatch (83-104)
  • dispatch (349-372)
  • dispatch (382-442)
  • dispatch (453-523)
  • dispatch (534-578)
src/ai_company/engine/coordination/models.py (3)
  • CoordinationPhaseResult (64-96)
  • CoordinationResult (128-198)
  • is_success (196-198)
src/ai_company/engine/decomposition/models.py (2)
  • DecompositionResult (125-176)
  • SubtaskStatusRollup (179-259)
src/ai_company/engine/decomposition/service.py (2)
  • DecompositionService (34-180)
  • rollup_status (166-180)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/coordination/group_builder.py (6)
src/ai_company/engine/decomposition/dag.py (2)
  • DependencyGraph (26-252)
  • parallel_groups (185-228)
src/ai_company/engine/parallel_models.py (2)
  • AgentAssignment (23-89)
  • task_id (87-89)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/decomposition/models.py (1)
  • DecompositionResult (125-176)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/workspace/models.py (1)
  • Workspace (36-65)
src/ai_company/engine/errors.py (1)
src/ai_company/engine/coordination/models.py (1)
  • CoordinationPhaseResult (64-96)
tests/unit/engine/test_coordination_models.py (5)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (358-369)
  • TaskStatus (198-224)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/coordination/models.py (5)
  • CoordinationContext (30-61)
  • CoordinationPhaseResult (64-96)
  • CoordinationResult (128-198)
  • CoordinationWave (99-125)
  • is_success (196-198)
src/ai_company/engine/decomposition/models.py (2)
  • SubtaskStatusRollup (179-259)
  • derived_parent_status (232-259)
src/ai_company/engine/parallel_models.py (4)
  • AgentOutcome (143-193)
  • ParallelExecutionResult (196-248)
  • task_id (87-89)
  • agent_id (79-81)

Comment on lines +514 to +526
async def test_memory_error_propagated(self) -> None:
"""MemoryError from decomposition is not swallowed."""
coordinator = _make_coordinator(
decompose_error=MemoryError("out of memory"),
)

ctx = CoordinationContext(
task=make_assignment_task(id="parent-1"),
available_agents=(make_assignment_agent("alice"),),
)

with pytest.raises(CoordinationPhaseError):
await coordinator.coordinate(ctx)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not normalize MemoryError into CoordinationPhaseError.

This test locks in wrapping process-level resource exhaustion as a recoverable phase failure. MemoryError should propagate unchanged so the engine can fail fast instead of continuing in a compromised process. MultiAgentCoordinator.coordinate() should special-case catastrophic exceptions before its generic phase wrapper.

Expected assertion after special-casing catastrophic exceptions
-        with pytest.raises(CoordinationPhaseError):
+        with pytest.raises(MemoryError):
             await coordinator.coordinate(ctx)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_service.py` around lines 514 - 526, The
test requires that MemoryError not be normalized into CoordinationPhaseError;
update MultiAgentCoordinator.coordinate (the phase wrapper that currently
catches exceptions and raises CoordinationPhaseError) to special-case
catastrophic exceptions by re-raising MemoryError (and similar process-level
exceptions if desired) before wrapping other exceptions. Concretely, in the
exception handling block (the try/except that currently converts all errors into
CoordinationPhaseError), add an early check like: if isinstance(exc,
MemoryError): raise, then proceed to wrap non-catastrophic exceptions into
CoordinationPhaseError so MemoryError propagates unchanged.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new multi-agent “coordination” subsystem to the engine, wiring decomposition → routing → topology dispatch → rollup → optional parent update, with stronger validation and expanded observability event coverage to support Issue #205’s runtime coordination wiring.

Changes:

  • Introduces coordination models/config, dispatcher implementations, and the MultiAgentCoordinator orchestration service.
  • Adds coordination observability events and updates docs to describe the coordination pipeline and logging/event conventions.
  • Adds extensive unit test coverage for coordination behavior (validators, workspace preconditions, fail-fast behavior, rollup, and error paths).

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Adds coordination event constants coverage and module discovery entry.
tests/unit/engine/test_coordination_service.py End-to-end service tests for the new coordination pipeline (success + failure paths).
tests/unit/engine/test_coordination_models.py Validator and invariants tests for coordination domain models.
tests/unit/engine/test_coordination_group_builder.py Tests DAG→wave conversion and defensive behavior in wave building.
tests/unit/engine/test_coordination_errors.py Tests new coordination error hierarchy behavior.
tests/unit/engine/test_coordination_dispatchers.py Tests dispatcher behaviors across topologies, workspace lifecycle, and fail-fast.
tests/unit/engine/test_coordination_config.py Tests the new coordination config model defaults and validation.
tests/unit/engine/conftest.py Adds shared coordination test helpers (subtasks, decomposition, routing, exec results).
src/ai_company/observability/events/coordination.py Defines coordination event name constants.
src/ai_company/engine/errors.py Adds CoordinationError and CoordinationPhaseError.
src/ai_company/engine/coordination/service.py Implements MultiAgentCoordinator pipeline orchestration.
src/ai_company/engine/coordination/models.py Adds frozen Pydantic models for context, phase results, waves, and overall result.
src/ai_company/engine/coordination/group_builder.py Converts dependency DAG + routing decisions into ParallelExecutionGroups.
src/ai_company/engine/coordination/dispatchers.py Adds topology dispatchers + workspace lifecycle + shared wave execution helper.
src/ai_company/engine/coordination/config.py Adds CoordinationConfig and its validation constraints.
src/ai_company/engine/coordination/init.py Exposes coordination public API surface.
src/ai_company/engine/init.py Re-exports coordination types/errors at the engine package level.
docs/design/engine.md Documents the multi-agent coordination pipeline design and dispatcher semantics.
CLAUDE.md Updates package structure description and adds coordination to event-constant examples.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +131 to +136
logger.info(COORDINATION_PHASE_STARTED, phase=phase_name)
try:
requests = _build_workspace_requests(routing_result, config)
workspaces = await workspace_service.setup_group(requests=requests)
except Exception as exc:
elapsed = time.monotonic() - start
Comment on lines +612 to +631
base_branch=config.base_branch,
)
for a in group.assignments
)
ws_start = time.monotonic()
try:
wave_workspaces = await workspace_service.setup_group(
requests=wave_requests,
)
except Exception as exc:
ws_elapsed = time.monotonic() - ws_start
logger.warning(
COORDINATION_PHASE_FAILED,
phase=f"workspace_setup_wave_{wave_idx}",
error=str(exc),
)
all_phases.append(
CoordinationPhaseResult(
phase=f"workspace_setup_wave_{wave_idx}",
success=False,
Comment on lines +717 to +725
succeeded=exec_result.agents_succeeded,
failed=exec_result.agents_failed,
duration_seconds=elapsed,
)

except Exception as exc:
elapsed = time.monotonic() - start
wave_failed = True
logger.warning(
msg = (
f"Subtask {subtask_id!r} has a routing decision "
"but no corresponding created task in decomposition"
)
Comment on lines +372 to +389
try:
# Collect statuses from wave outcomes
statuses: list[TaskStatus] = []
for wave in dispatch_result.waves:
if wave.execution_result is None:
statuses.extend(TaskStatus.BLOCKED for _ in wave.subtask_ids)
continue

for outcome in wave.execution_result.outcomes:
if outcome.is_success:
statuses.append(TaskStatus.COMPLETED)
else:
statuses.append(TaskStatus.FAILED)

rollup = self._decomposition_service.rollup_status(
context.task.id,
tuple(statuses),
)
Comment on lines +174 to +180
logger.info(COORDINATION_PHASE_STARTED, phase=phase_name)
try:
merge_result = await workspace_service.merge_group(
workspaces=workspaces,
)
except Exception as exc:
elapsed = time.monotonic() - start
Comment on lines +207 to +213
try:
result = await self._decomposition_service.decompose_task(
context.task, context.decomposition_context
)
except Exception as exc:
elapsed = time.monotonic() - start
logger.warning(
Comment on lines +259 to +266
try:
result = self._routing_service.route(
decomp_result,
context.available_agents,
context.task,
)
except Exception as exc:
elapsed = time.monotonic() - start
Comment on lines +471 to +472
except Exception as exc:
elapsed = time.monotonic() - start
Comment on lines +176 to +184
except CoordinationPhaseError:
raise
except Exception as exc:
logger.exception(
COORDINATION_FAILED,
parent_task_id=task.id,
error=str(exc),
)
raise
Comment on lines +464 to +471
CoordinationPhaseResult(
phase=phase_name,
success=result.success,
duration_seconds=elapsed,
error=result.error,
)
)
except Exception as exc:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New validator + unconditional error=result.error creates a false-failure path

The CoordinationPhaseResult validator added in this very PR rejects success=True with a non-None error:

if self.success and self.error is not None:
    raise ValueError("successful phase must not have an error")

error=result.error is passed unconditionally here. If TaskMutationResult.error is ever non-None on a successful submission (e.g., a deprecation warning or advisory message from the engine), the CoordinationPhaseResult constructor raises ValueError. That exception is caught by the outer except Exception as exc handler, which logs it as COORDINATION_PHASE_FAILED and records a failed update_parent phase — even though the actual task transition succeeded. This silently masks a successful parent update and can make result.is_success return False for a fully-successful run.

Suggested change
CoordinationPhaseResult(
phase=phase_name,
success=result.success,
duration_seconds=elapsed,
error=result.error,
)
)
except Exception as exc:
phases.append(
CoordinationPhaseResult(
phase=phase_name,
success=result.success,
duration_seconds=elapsed,
error=result.error if not result.success else None,
)
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 464-471

Comment:
**New validator + unconditional `error=result.error` creates a false-failure path**

The `CoordinationPhaseResult` validator added in this very PR rejects `success=True` with a non-`None` `error`:
```python
if self.success and self.error is not None:
    raise ValueError("successful phase must not have an error")
```

`error=result.error` is passed unconditionally here. If `TaskMutationResult.error` is ever non-`None` on a successful submission (e.g., a deprecation warning or advisory message from the engine), the `CoordinationPhaseResult` constructor raises `ValueError`. That exception is caught by the outer `except Exception as exc` handler, which logs it as `COORDINATION_PHASE_FAILED` and records a failed `update_parent` phase — **even though the actual task transition succeeded**. This silently masks a successful parent update and can make `result.is_success` return `False` for a fully-successful run.

```suggestion
            phases.append(
                CoordinationPhaseResult(
                    phase=phase_name,
                    success=result.success,
                    duration_seconds=elapsed,
                    error=result.error if not result.success else None,
                )
            )
```

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (7)
tests/unit/engine/test_coordination_dispatchers.py (1)

709-742: ⚠️ Potential issue | 🟠 Major

Don't lock in downstream execution after a prerequisite wave failed.

sub_b explicitly depends on sub-a, but this test asserts that wave 1 still runs after wave 0 raised. That turns dependency edges into ordering-only hints and will mask the real fix if blocked descendants are enforced later. If the goal is to cover non-fail_fast continuation, make the second subtask independent; otherwise assert that the dependent wave is skipped/blocked.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_dispatchers.py` around lines 709 - 742,
The test test_exception_with_fail_fast_off_continues currently makes sub_b
depend on sub_a (via make_subtask("sub-b", dependencies=("sub-a",))) but then
asserts wave 1 still ran after wave 0 failed, which is inconsistent with real
dependency enforcement; either make the second subtask independent (remove
dependencies from make_subtask for "sub-b") to test non-fail_fast continuation,
or keep the dependency and change the assertions to expect the dependent wave to
be skipped/blocked (e.g., result.waves[1].execution_result is None and that
execute_group was not invoked for blocked wave); update the setup using
make_subtask/make_decomposition/make_routing accordingly and adjust assertions
for SasDispatcher dispatch with CoordinationConfig(fail_fast=False).
docs/design/engine.md (1)

802-804: ⚠️ Potential issue | 🟡 Minor

Add validate to the pipeline diagram.

The numbered list and implementation both have a dedicated validation step, but the diagram jumps straight from topology resolution to dispatch.

Suggested diff
-decompose → route → resolve topology → dispatch → rollup → update parent
+decompose → route → resolve topology → validate → dispatch → rollup → update parent
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/design/engine.md` around lines 802 - 804, The pipeline diagram string
"decompose → route → resolve topology → dispatch → rollup → update parent" is
missing the dedicated validation step; update that diagram to insert "validate"
after "resolve topology" so it reads "decompose → route → resolve topology →
validate → dispatch → rollup → update parent" to match the numbered list and
implementation (look for the pipeline text in the docs/design/engine.md
content).
tests/unit/engine/test_coordination_models.py (1)

375-406: ⚠️ Potential issue | 🟠 Major

Don't codify foreign parent IDs as valid.

This test blesses attaching a status_rollup from "other-task" to CoordinationResult(parent_task_id="task-1"). That lets another task's nested coordination data be associated with the wrong aggregate result. Please invert this into a ValidationError case once CoordinationResult validates nested parent_task_ids. As per coding guidelines, "Validate: at system boundaries (user input, external APIs, config files)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_coordination_models.py` around lines 375 - 406, The
test test_mismatched_parent_task_id_accepted currently asserts that a
CoordinationResult can accept a SubtaskStatusRollup with a different
parent_task_id; change it to assert that providing a rollup whose parent_task_id
doesn't match CoordinationResult.parent_task_id raises a ValidationError once
CoordinationResult enforces cross-validation. Update the test to construct the
same SubtaskStatusRollup and CoordinationResult inputs but wrap the
creation/validation of CoordinationResult in a pytest.raises(ValidationError)
(or the project's equivalent validation exception) and assert the exception is
raised; reference CoordinationResult and SubtaskStatusRollup to locate the code
under test.
src/ai_company/engine/coordination/service.py (3)

139-148: ⚠️ Potential issue | 🟠 Major

Wrap dispatch failures as a real coordination phase error.

select_dispatcher() and dispatcher.dispatch() can still raise here — for example, decentralized precondition failures currently bubble as raw exceptions. That means callers lose a dispatch phase entry and partial_phases on the pipeline's most failure-prone step. Please funnel this block through a dedicated dispatch-phase helper that appends a failed CoordinationPhaseResult before re-raising CoordinationPhaseError.

Also applies to: 176-184

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/service.py` around lines 139 - 148, The
dispatch block should be wrapped in a helper that runs select_dispatcher(...)
and await dispatcher.dispatch(...), catching any Exception, creating and
appending a failed CoordinationPhaseResult for the "dispatch" phase (including
partial_phases/state from the attempted dispatch when available) to the phases
list, and then re-raising a CoordinationPhaseError that carries that failed
phase result; implement this by extracting the current dispatch calls into a
helper (e.g., _run_dispatch_phase or similar) that returns a dispatch result on
success and on exception appends a failed CoordinationPhaseResult and raises
CoordinationPhaseError. Apply the same pattern to the other dispatch site (the
similar block around lines 176-184) so both select_dispatcher and
dispatcher.dispatch failures produce a proper dispatch-phase entry and raise
CoordinationPhaseError.

372-389: ⚠️ Potential issue | 🔴 Critical

Roll up against the full planned subtask set.

This only counts statuses from dispatch_result.waves. build_execution_waves() drops unrouted/empty waves, and fail-fast or setup failures can skip later waves entirely, so missing subtasks disappear from the rollup and the parent can be marked complete with unfinished work. Seed the rollup from the full decomposition plan and synthesize BLOCKED/FAILED for any subtask with no recorded outcome.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/service.py` around lines 372 - 389, The
current rollup uses only statuses collected from dispatch_result.waves, which
omits subtasks that were never routed; instead fetch the full decomposition plan
(via the decomposition service used to build_execution_waves) for
context.task.id, iterate the plan's ordered subtask ids, build a map of outcomes
from dispatch_result.waves (by subtask id), and for each planned subtask append
TaskStatus.COMPLETED if outcome.is_success, TaskStatus.FAILED if outcome exists
but not success, or TaskStatus.BLOCKED if there is no recorded outcome; pass
that complete tuple to
self._decomposition_service.rollup_status(context.task.id, tuple(statuses)) so
missing subtasks are treated as BLOCKED/FAILED rather than disappearing from the
rollup.

211-230: ⚠️ Potential issue | 🟠 Major

Don't normalize catastrophic exceptions into ordinary phase failures.

These generic except Exception handlers will wrap or swallow MemoryError, which lets the coordinator keep running after a process-fatal failure. At minimum, catastrophic exceptions should bypass the phase-failure path and propagate unchanged; that also means the new test_memory_error_propagated expectation needs to flip to MemoryError. Based on learnings, "Errors: handle explicitly, never silently swallow".

Also applies to: 265-284, 390-405, 471-485

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/service.py` around lines 211 - 230, The
generic except Exception blocks (the handler that logs
COORDINATION_PHASE_FAILED, creates CoordinationPhaseResult, appends to phases
and raises CoordinationPhaseError) must not normalize catastrophic
exceptions—explicitly detect and re-raise MemoryError (and optionally other
fatal exceptions like SystemExit/KeyboardInterrupt) before converting to a phase
failure; locate the try/except that uses phase_name, start, logger.warning(...,
phase=phase_name, error=str(exc)), creates CoordinationPhaseResult and raises
CoordinationPhaseError and add a guard such as if isinstance(exc, MemoryError):
raise (and likewise for SystemExit/KeyboardInterrupt) so those exceptions
propagate unchanged; apply the same change to the other identical handlers
referenced (the blocks around the other ranges) so test_memory_error_propagated
will see a raw MemoryError.
src/ai_company/engine/coordination/group_builder.py (1)

84-94: ⚠️ Potential issue | 🔴 Critical

Don't let an unroutable prerequisite disappear from the DAG.

When decision is None, this code just skips the subtask. DependencyGraph.parallel_groups() only gives you topological order, so any routed descendants can still show up in later waves and run without their prerequisite ever being assigned. Please either fail coordination here or mark downstream dependents blocked before building subsequent waves. As per coding guidelines, "Validate: at system boundaries (user input, external APIs, config files)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/group_builder.py` around lines 84 - 94,
The loop in group_builder.py that iterates over subtask_ids currently skips
subtasks when routing_lookup.get(subtask_id) returns None, which lets downstream
nodes run without their prerequisite; instead, when decision is None you must
fail coordination or explicitly mark downstream dependents blocked: modify the
loop (the branch checking "if decision is None") to either raise a clear
exception (e.g., CoordinationError or RuntimeError) with context (subtask_id,
wave_idx) so coordination aborts, or call the dependency-graph API to mark the
entire descendant subtree of subtask_id as blocked before continuing (use the
module/class that provides dependency traversal from
DependencyGraph.parallel_groups(), e.g.,
dependency_graph.get_descendants(subtask_id) /
dependency_graph.block_subtree(subtask_id)), and replace the current
logger.debug-only behavior (COORDINATION_WAVE_BUILT, wave_idx, subtask_id,
skipped=True) with the chosen failure/blocking action so unroutable
prerequisites cannot disappear from the DAG.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/engine/coordination/service.py`:
- Around line 303-329: _resolve_topology currently trusts
routing_result.decisions[0].topology; instead, iterate all entries in
routing_result.decisions (RoutingResult) and ensure every decision.topology
equals the resolved topology (CoordinationTopology); if any differ, fail fast by
logging COORDINATION_PHASE_FAILED with a clear message about mixed topologies
and raise a suitable exception (eg. ValueError or a domain-specific error)
rather than proceeding, otherwise continue and log
COORDINATION_TOPOLOGY_RESOLVED as before; reference _resolve_topology,
RoutingResult.decisions, CoordinationTopology, COORDINATION_PHASE_FAILED, and
COORDINATION_TOPOLOGY_RESOLVED when making the change.

---

Duplicate comments:
In `@docs/design/engine.md`:
- Around line 802-804: The pipeline diagram string "decompose → route → resolve
topology → dispatch → rollup → update parent" is missing the dedicated
validation step; update that diagram to insert "validate" after "resolve
topology" so it reads "decompose → route → resolve topology → validate →
dispatch → rollup → update parent" to match the numbered list and implementation
(look for the pipeline text in the docs/design/engine.md content).

In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 84-94: The loop in group_builder.py that iterates over subtask_ids
currently skips subtasks when routing_lookup.get(subtask_id) returns None, which
lets downstream nodes run without their prerequisite; instead, when decision is
None you must fail coordination or explicitly mark downstream dependents
blocked: modify the loop (the branch checking "if decision is None") to either
raise a clear exception (e.g., CoordinationError or RuntimeError) with context
(subtask_id, wave_idx) so coordination aborts, or call the dependency-graph API
to mark the entire descendant subtree of subtask_id as blocked before continuing
(use the module/class that provides dependency traversal from
DependencyGraph.parallel_groups(), e.g.,
dependency_graph.get_descendants(subtask_id) /
dependency_graph.block_subtree(subtask_id)), and replace the current
logger.debug-only behavior (COORDINATION_WAVE_BUILT, wave_idx, subtask_id,
skipped=True) with the chosen failure/blocking action so unroutable
prerequisites cannot disappear from the DAG.

In `@src/ai_company/engine/coordination/service.py`:
- Around line 139-148: The dispatch block should be wrapped in a helper that
runs select_dispatcher(...) and await dispatcher.dispatch(...), catching any
Exception, creating and appending a failed CoordinationPhaseResult for the
"dispatch" phase (including partial_phases/state from the attempted dispatch
when available) to the phases list, and then re-raising a CoordinationPhaseError
that carries that failed phase result; implement this by extracting the current
dispatch calls into a helper (e.g., _run_dispatch_phase or similar) that returns
a dispatch result on success and on exception appends a failed
CoordinationPhaseResult and raises CoordinationPhaseError. Apply the same
pattern to the other dispatch site (the similar block around lines 176-184) so
both select_dispatcher and dispatcher.dispatch failures produce a proper
dispatch-phase entry and raise CoordinationPhaseError.
- Around line 372-389: The current rollup uses only statuses collected from
dispatch_result.waves, which omits subtasks that were never routed; instead
fetch the full decomposition plan (via the decomposition service used to
build_execution_waves) for context.task.id, iterate the plan's ordered subtask
ids, build a map of outcomes from dispatch_result.waves (by subtask id), and for
each planned subtask append TaskStatus.COMPLETED if outcome.is_success,
TaskStatus.FAILED if outcome exists but not success, or TaskStatus.BLOCKED if
there is no recorded outcome; pass that complete tuple to
self._decomposition_service.rollup_status(context.task.id, tuple(statuses)) so
missing subtasks are treated as BLOCKED/FAILED rather than disappearing from the
rollup.
- Around line 211-230: The generic except Exception blocks (the handler that
logs COORDINATION_PHASE_FAILED, creates CoordinationPhaseResult, appends to
phases and raises CoordinationPhaseError) must not normalize catastrophic
exceptions—explicitly detect and re-raise MemoryError (and optionally other
fatal exceptions like SystemExit/KeyboardInterrupt) before converting to a phase
failure; locate the try/except that uses phase_name, start, logger.warning(...,
phase=phase_name, error=str(exc)), creates CoordinationPhaseResult and raises
CoordinationPhaseError and add a guard such as if isinstance(exc, MemoryError):
raise (and likewise for SystemExit/KeyboardInterrupt) so those exceptions
propagate unchanged; apply the same change to the other identical handlers
referenced (the blocks around the other ranges) so test_memory_error_propagated
will see a raw MemoryError.

In `@tests/unit/engine/test_coordination_dispatchers.py`:
- Around line 709-742: The test test_exception_with_fail_fast_off_continues
currently makes sub_b depend on sub_a (via make_subtask("sub-b",
dependencies=("sub-a",))) but then asserts wave 1 still ran after wave 0 failed,
which is inconsistent with real dependency enforcement; either make the second
subtask independent (remove dependencies from make_subtask for "sub-b") to test
non-fail_fast continuation, or keep the dependency and change the assertions to
expect the dependent wave to be skipped/blocked (e.g.,
result.waves[1].execution_result is None and that execute_group was not invoked
for blocked wave); update the setup using
make_subtask/make_decomposition/make_routing accordingly and adjust assertions
for SasDispatcher dispatch with CoordinationConfig(fail_fast=False).

In `@tests/unit/engine/test_coordination_models.py`:
- Around line 375-406: The test test_mismatched_parent_task_id_accepted
currently asserts that a CoordinationResult can accept a SubtaskStatusRollup
with a different parent_task_id; change it to assert that providing a rollup
whose parent_task_id doesn't match CoordinationResult.parent_task_id raises a
ValidationError once CoordinationResult enforces cross-validation. Update the
test to construct the same SubtaskStatusRollup and CoordinationResult inputs but
wrap the creation/validation of CoordinationResult in a
pytest.raises(ValidationError) (or the project's equivalent validation
exception) and assert the exception is raised; reference CoordinationResult and
SubtaskStatusRollup to locate the code under test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: c19e23b7-8803-4b47-a557-89672d08bbe6

📥 Commits

Reviewing files that changed from the base of the PR and between 0602c90 and bfc9e2b.

📒 Files selected for processing (19)
  • CLAUDE.md
  • docs/design/engine.md
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/coordination/__init__.py
  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/models.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/errors.py
  • src/ai_company/observability/events/coordination.py
  • tests/unit/engine/conftest.py
  • tests/unit/engine/test_coordination_config.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/test_coordination_errors.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_models.py
  • tests/unit/engine/test_coordination_service.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649 native lazy annotations
Use except A, B: syntax (no parentheses) — ruff enforces PEP 758 except syntax on Python 3.14
Type hints: all public functions, mypy strict mode
Docstrings: Google style, required on public classes/functions (enforced by ruff D rules)
Create new objects, never mutate existing ones. For non-Pydantic internal collections (registries, BaseTool), use copy.deepcopy() at construction + MappingProxyType wrapping for read-only enforcement.
For dict/list fields in frozen Pydantic models, rely on frozen=True for field reassignment prevention and copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence).
Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Models: Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict). Use @computed_field for derived values instead of storing + validating redundant fields. Use NotBlankStr for all identifier/name fields — including optional and tuple variants — instead of manual whitespace validators.
Async concurrency: prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare create_task. Existing code is being migrated incrementally.
Line length: 88 characters (ruff)
Functions: < 50 lines, files < 800 lines
Errors: handle explicitly, never silently swallow
Validate: at system boundaries (user input, external APIs, config files)

Files:

  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
  • tests/unit/engine/test_coordination_config.py
  • tests/unit/engine/test_coordination_service.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • tests/unit/engine/conftest.py
  • tests/unit/observability/test_events.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/observability/events/coordination.py
  • tests/unit/engine/test_coordination_errors.py
  • src/ai_company/engine/errors.py
  • tests/unit/engine/test_coordination_group_builder.py
  • src/ai_company/engine/coordination/__init__.py
  • tests/unit/engine/test_coordination_models.py
  • src/ai_company/engine/coordination/models.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Variable name: always logger (not _logger, not log)
Event names: always use constants from the domain-specific module under ai_company.observability.events (e.g. PROVIDER_CALL_START from events.provider). Import directly: from ai_company.observability.events.<domain> import EVENT_CONSTANT
Structured kwargs in logging: always logger.info(EVENT, key=value) — never logger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO
DEBUG for object creation, internal flow, entry/exit of key functions
Vendor-agnostic everywhere: NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/engine/coordination/config.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/observability/events/coordination.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/coordination/__init__.py
  • src/ai_company/engine/coordination/models.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Tests: markers @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, @pytest.mark.slow
Coverage: 80% minimum (enforced in CI)
Async: asyncio_mode = "auto" — no manual @pytest.mark.asyncio needed
Timeout: 30 seconds per test
Parallelism: pytest-xdist via -n auto — ALWAYS include -n auto when running pytest, never run tests sequentially
Prefer @pytest.mark.parametrize for testing similar cases
Vendor-agnostic everywhere: Tests must use test-provider, test-small-001, etc. (Vendor names may only appear in operations design page, .claude/ skill/agent files, or third-party import paths)

Files:

  • tests/unit/engine/test_coordination_config.py
  • tests/unit/engine/test_coordination_service.py
  • tests/unit/engine/test_coordination_dispatchers.py
  • tests/unit/engine/conftest.py
  • tests/unit/observability/test_events.py
  • tests/unit/engine/test_coordination_errors.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_models.py
🧠 Learnings (13)
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Event names: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`

Applied to files:

  • CLAUDE.md
  • tests/unit/observability/test_events.py
  • src/ai_company/observability/events/coordination.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Structured kwargs in logging: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)`

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Variable name: always `logger` (not `_logger`, not `log`)

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising

Applied to files:

  • CLAUDE.md
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO

Applied to files:

  • CLAUDE.md
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : DEBUG for object creation, internal flow, entry/exit of key functions

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/providers/**/*.py : Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. Non-retryable errors raise immediately without retry. `RetryExhaustedError` signals that all retries failed — the engine layer catches this to trigger fallback chains.

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/errors.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Errors: handle explicitly, never silently swallow

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Async concurrency: prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`. Existing code is being migrated incrementally.

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: When review agents find valid issues (including pre-existing issues in surrounding code, suggestions, and findings adjacent to the PR's changes), fix them all. No deferring, no 'out of scope' skipping.

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.

Applied to files:

  • src/ai_company/engine/coordination/models.py
🧬 Code graph analysis (6)
src/ai_company/engine/coordination/service.py (9)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (358-369)
src/ai_company/engine/coordination/dispatchers.py (7)
  • DispatchResult (49-76)
  • select_dispatcher (796-832)
  • dispatch (83-104)
  • dispatch (349-372)
  • dispatch (382-442)
  • dispatch (453-523)
  • dispatch (534-578)
src/ai_company/engine/coordination/models.py (3)
  • CoordinationPhaseResult (64-96)
  • CoordinationResult (128-198)
  • is_success (196-198)
src/ai_company/engine/decomposition/models.py (3)
  • DecompositionResult (125-176)
  • SubtaskStatusRollup (179-259)
  • derived_parent_status (232-259)
src/ai_company/engine/decomposition/service.py (3)
  • DecompositionService (34-180)
  • decompose_task (51-88)
  • rollup_status (166-180)
src/ai_company/engine/parallel.py (1)
  • ParallelExecutor (81-591)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/task_engine.py (2)
  • TaskEngine (88-776)
  • submit (247-287)
src/ai_company/engine/workspace/service.py (1)
  • WorkspaceIsolationService (43-215)
src/ai_company/engine/coordination/dispatchers.py (9)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (358-369)
src/ai_company/engine/coordination/group_builder.py (1)
  • build_execution_waves (43-145)
src/ai_company/engine/coordination/models.py (2)
  • CoordinationPhaseResult (64-96)
  • CoordinationWave (99-125)
src/ai_company/engine/workspace/models.py (3)
  • Workspace (36-65)
  • WorkspaceGroupResult (144-181)
  • WorkspaceRequest (12-33)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/decomposition/models.py (1)
  • DecompositionResult (125-176)
src/ai_company/engine/parallel_models.py (3)
  • ParallelExecutionGroup (92-140)
  • task_id (87-89)
  • agent_id (79-81)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/workspace/service.py (4)
  • WorkspaceIsolationService (43-215)
  • setup_group (72-117)
  • merge_group (142-168)
  • teardown_group (170-215)
tests/unit/engine/test_coordination_config.py (1)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/coordination/group_builder.py (7)
src/ai_company/engine/decomposition/dag.py (2)
  • DependencyGraph (26-252)
  • parallel_groups (185-228)
src/ai_company/engine/errors.py (1)
  • CoordinationError (128-129)
src/ai_company/engine/parallel_models.py (3)
  • AgentAssignment (23-89)
  • ParallelExecutionGroup (92-140)
  • task_id (87-89)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/decomposition/models.py (1)
  • DecompositionResult (125-176)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/workspace/models.py (1)
  • Workspace (36-65)
tests/unit/engine/test_coordination_errors.py (2)
src/ai_company/engine/errors.py (3)
  • CoordinationError (128-129)
  • CoordinationPhaseError (132-153)
  • EngineError (9-10)
src/ai_company/engine/coordination/models.py (1)
  • CoordinationPhaseResult (64-96)
src/ai_company/engine/errors.py (1)
src/ai_company/engine/coordination/models.py (1)
  • CoordinationPhaseResult (64-96)
🪛 LanguageTool
CLAUDE.md

[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...

(EG_NO_COMMA)

🔇 Additional comments (29)
src/ai_company/engine/errors.py (2)

3-6: LGTM!

The TYPE_CHECKING guard correctly avoids circular imports at runtime while preserving type information for static analysis.


126-153: LGTM!

The coordination error hierarchy is well-designed:

  • CoordinationError as base exception for multi-agent coordination failures
  • CoordinationPhaseError correctly uses keyword-only args with sensible defaults
  • Docstrings follow Google style with clear attribute documentation
src/ai_company/engine/coordination/config.py (1)

1-38: LGTM!

The CoordinationConfig model is well-structured:

  • Frozen Pydantic model for immutable configuration (per coding guidelines)
  • NotBlankStr correctly used for base_branch identifier field
  • Sensible defaults and clear field descriptions
  • extra="forbid" prevents configuration errors from typos
src/ai_company/observability/events/coordination.py (1)

1-17: LGTM!

Event constants are well-organized:

  • Consistent COORDINATION_ prefix
  • Values follow the established dot-delimited pattern
  • Comprehensive coverage of coordination lifecycle events
  • All typed as Final[str] for immutability
tests/unit/observability/test_events.py (2)

189-189: LGTM!

Adding "coordination" to the expected domain modules ensures the new event module is discovered by pkgutil iteration.


634-663: LGTM!

The test follows the established pattern for event existence validation:

  • Imports all 13 coordination event constants
  • Asserts each constant matches its expected string value
  • Consistent with other test_*_events_exist methods in the class
CLAUDE.md (2)

95-95: LGTM!

The package structure documentation accurately reflects the new coordination subsystem with its key components: TopologyDispatcher protocol, 4 dispatchers, wave execution, and workspace lifecycle integration.


130-130: LGTM!

The event naming documentation is correctly updated with COORDINATION_STARTED from events.coordination, following the established pattern for domain-specific event imports.

tests/unit/engine/test_coordination_errors.py (1)

1-58: LGTM!

Comprehensive test coverage for coordination error classes:

  • Verifies inheritance hierarchy (EngineError → CoordinationError → CoordinationPhaseError)
  • Tests attribute storage (phase, partial_phases)
  • Tests default values and string representation
  • Each test is properly marked with @pytest.mark.unit
tests/unit/engine/conftest.py (2)

19-46: LGTM!

New imports cleanly organized for coordination domain types: enums, decomposition models, parallel execution models, and routing models.


480-625: LGTM!

Excellent set of test helpers for the coordination domain:

  • make_subtask(): Clean builder for SubtaskDefinition
  • make_decomposition(): Correctly constructs plan, created tasks, and dependency edges
  • make_routing(): Properly maps subtask-agent pairs to RoutingDecision objects
  • build_run_result(): Creates minimal AgentRunResult for testing
  • make_exec_result(): Handles both success and failure outcomes

These helpers will significantly reduce boilerplate across coordination tests.

tests/unit/engine/test_coordination_group_builder.py (5)

21-35: LGTM!

Clean local helper function that avoids code duplication across tests. Using CoordinationTopology.CENTRALIZED as default is sensible for test simplicity.


38-141: LGTM!

Excellent test coverage for wave building scenarios:

  • Single/multiple independent subtasks → parallel execution
  • Sequential chain respects dependencies
  • Diamond DAG correctly partitions into waves [A,B] then [C]

Assertions verify both wave count and task assignments within each wave.


143-177: LGTM!

Config propagation tests verify that max_concurrency_per_wave and fail_fast settings correctly flow through to the generated execution groups.


179-261: LGTM!

Good coverage of edge cases:

  • Workspace worktree paths correctly mapped to resource claims
  • Unroutable subtasks gracefully skipped
  • All-unroutable scenario produces empty waves
  • No workspace defaults to empty resource claims

263-316: LGTM!

Smart use of model_construct to bypass validation and test the defensive guard in build_execution_waves. The test verifies that a routing decision referencing a non-existent created task correctly raises CoordinationError.

src/ai_company/engine/coordination/dispatchers.py (7)

49-105: LGTM!

The DispatchResult model and TopologyDispatcher protocol are well-structured. Frozen model ensures immutability, and the runtime-checkable protocol allows for proper type checking.


232-324: LGTM!

The _execute_waves function properly:

  • Logs wave start at INFO and completion at WARNING when failed
  • Records phase results with accurate success/error fields
  • Respects fail_fast configuration
  • Handles exceptions gracefully with proper phase recording

341-373: LGTM!

SasDispatcher correctly implements the single-agent-step pattern without workspace isolation, delegating to the shared wave execution logic.


396-411: Validate routed task IDs before creating workspaces.

The current flow creates workspaces at lines 398-400 before build_execution_waves (lines 406-411) validates that every routed subtask_id exists in decomposition_result.created_tasks. A stale routing result could create git branches/worktrees for nonexistent subtasks, only to tear them down when validation fails.

While the finally block ensures cleanup, consider adding a preflight validation step before workspace side effects to avoid wasteful operations.


463-473: LGTM!

The decentralized precondition guard now properly logs at WARNING before raising CoordinationError, ensuring telemetry captures workspace service availability issues.


745-766: LGTM!

The per-wave merge/teardown logic in the finally block correctly:

  • Only merges when wave_failed is False
  • Logs a warning when merge is skipped due to wave failure
  • Always tears down workspaces regardless of outcome

796-832: LGTM!

The select_dispatcher function correctly:

  • Maps each resolved topology to its dispatcher implementation
  • Rejects AUTO topology with a clear error message and WARNING log
  • Emits COORDINATION_TOPOLOGY_RESOLVED only after successful dispatcher creation
src/ai_company/engine/coordination/models.py (4)

30-62: LGTM!

CoordinationContext properly uses a frozen model with tuple fields for immutability and validates that at least one agent is available.


87-97: LGTM!

The _validate_success_error_consistency validator properly enforces that successful phases have no error and failed phases must have an error description, preventing inconsistent phase results.


99-126: LGTM!

CoordinationWave correctly models a wave with non-empty subtask validation and optional execution result for tracking pre/post execution states.


184-198: LGTM!

The validators and computed field are well-designed:

  • _validate_topology_resolved ensures AUTO is not stored in final results
  • is_success computed field derives status from phases, avoiding redundant storage per the @computed_field guideline
src/ai_company/engine/__init__.py (2)

42-57: LGTM!

The coordination module re-exports are properly organized, exposing the full public API surface for dispatchers, models, and utilities.


210-221: LGTM!

The __all__ list is correctly updated with new coordination types in alphabetical order, maintaining consistency with the existing export structure.

- Block descendants of unroutable subtasks in group_builder (critical)
- Include missing subtasks as BLOCKED in rollup (critical)
- Validate routed task IDs before workspace creation (major)
- Wrap dispatch failures as CoordinationPhaseError with partial phases (major)
- Re-raise MemoryError/RecursionError instead of wrapping (major)
- Add validate step to pipeline diagram in engine.md (minor)
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 06:32 — with GitHub Actions Inactive
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
src/ai_company/engine/coordination/service.py (1)

306-332: ⚠️ Potential issue | 🟠 Major

Validate that all routing decisions have consistent topologies.

_resolve_topology reads only decisions[0].topology, ignoring the rest. A malformed RoutingResult with mixed topologies will dispatch through the wrong dispatcher. Validate that all decisions agree, or fail fast if they differ. As per coding guidelines: "Validate: at system boundaries (user input, external APIs, config files)".

🛡️ Proposed fix
     def _resolve_topology(
         self,
         routing_result: RoutingResult,
     ) -> CoordinationTopology:
         """Resolve the coordination topology from routing decisions."""
         if routing_result.decisions:
             topology = routing_result.decisions[0].topology
+            # Validate all decisions have consistent topology
+            for decision in routing_result.decisions[1:]:
+                if decision.topology != topology:
+                    msg = (
+                        f"Mixed topologies in routing decisions: "
+                        f"{topology.value} vs {decision.topology.value}"
+                    )
+                    logger.warning(
+                        COORDINATION_PHASE_FAILED,
+                        phase="resolve_topology",
+                        error=msg,
+                    )
+                    raise CoordinationPhaseError(
+                        msg,
+                        phase="resolve_topology",
+                        partial_phases=(),
+                    )
         else:
             topology = CoordinationTopology.SAS
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/service.py` around lines 306 - 332, In
_resolve_topology, after receiving routing_result (RoutingResult) validate that
all entries in routing_result.decisions have the same topology as
decisions[0].topology; if any decision differs, log a COORDINATION_PHASE_FAILED
with phase="resolve_topology" and a clear error message, then fail fast (raise a
ValueError or a domain-specific exception) instead of continuing, otherwise
proceed to use that consistent topology (handling AUTO same as before) and emit
COORDINATION_TOPOLOGY_RESOLVED; reference the _resolve_topology method,
RoutingResult.decisions, and CoordinationTopology for locating the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/engine/coordination/dispatchers.py`:
- Around line 677-792: The _execute_wave function is too long; extract the final
merge/teardown block into a new async helper named _finalize_wave to reduce
complexity. Move the logic currently under finally (the conditional that checks
wave_workspaces and workspace_service, calls _merge_workspaces, appends
merge_phase/merge_result, logs skipped merges, and calls _teardown_workspaces)
into async def _finalize_wave(workspace_service, wave_workspaces, wave_idx,
wave_failed, all_phases, merge_results) and have _execute_wave simply await
_finalize_wave(...) in the finally clause; preserve the same logging messages,
phase naming (merge_wave_{wave_idx}), and error handling semantics, and ensure
type hints and imports mirror existing signatures so behavior and side-effects
(appending to all_phases/all_waves/merge_results) remain unchanged.
- Around line 136-141: In _validate_routing_against_decomposition, log a warning
with context before raising CoordinationError: when decision.subtask_id not in
created_ids, emit a warning (use self.logger.warning(...) if this is a method on
a class with a logger, otherwise use the module-level logger.warning(...))
including the same message and the decision.subtask_id/created_ids for
telemetry, then raise CoordinationError(msg).

In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 114-120: Add a warning log immediately before raising
CoordinationError in the group_builder.py guard that checks task =
task_lookup.get(subtask_id): emit a logger.warning (or self.logger.warning
depending on surrounding code) with the same contextual message (including
subtask_id and that there is a routing decision but no created task) so the
failure is recorded in telemetry, then raise CoordinationError(msg) as before;
ensure you reference the same msg string used for the exception and use the
module/class logger already in scope.

---

Duplicate comments:
In `@src/ai_company/engine/coordination/service.py`:
- Around line 306-332: In _resolve_topology, after receiving routing_result
(RoutingResult) validate that all entries in routing_result.decisions have the
same topology as decisions[0].topology; if any decision differs, log a
COORDINATION_PHASE_FAILED with phase="resolve_topology" and a clear error
message, then fail fast (raise a ValueError or a domain-specific exception)
instead of continuing, otherwise proceed to use that consistent topology
(handling AUTO same as before) and emit COORDINATION_TOPOLOGY_RESOLVED;
reference the _resolve_topology method, RoutingResult.decisions, and
CoordinationTopology for locating the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 60ed2780-d52c-4075-9e21-8f72161a02bf

📥 Commits

Reviewing files that changed from the base of the PR and between bfc9e2b and 4a5b6cf.

📒 Files selected for processing (6)
  • docs/design/engine.md
  • src/ai_company/engine/coordination/dispatchers.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • tests/unit/engine/test_coordination_group_builder.py
  • tests/unit/engine/test_coordination_service.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649 native lazy annotations
Use except A, B: syntax (no parentheses) — ruff enforces PEP 758 except syntax on Python 3.14
Type hints: all public functions, mypy strict mode
Docstrings: Google style, required on public classes/functions (enforced by ruff D rules)
Create new objects, never mutate existing ones. For non-Pydantic internal collections (registries, BaseTool), use copy.deepcopy() at construction + MappingProxyType wrapping for read-only enforcement.
For dict/list fields in frozen Pydantic models, rely on frozen=True for field reassignment prevention and copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence).
Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Models: Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict). Use @computed_field for derived values instead of storing + validating redundant fields. Use NotBlankStr for all identifier/name fields — including optional and tuple variants — instead of manual whitespace validators.
Async concurrency: prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare create_task. Existing code is being migrated incrementally.
Line length: 88 characters (ruff)
Functions: < 50 lines, files < 800 lines
Errors: handle explicitly, never silently swallow
Validate: at system boundaries (user input, external APIs, config files)

Files:

  • tests/unit/engine/test_coordination_service.py
  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
  • tests/unit/engine/test_coordination_group_builder.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Tests: markers @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, @pytest.mark.slow
Coverage: 80% minimum (enforced in CI)
Async: asyncio_mode = "auto" — no manual @pytest.mark.asyncio needed
Timeout: 30 seconds per test
Parallelism: pytest-xdist via -n auto — ALWAYS include -n auto when running pytest, never run tests sequentially
Prefer @pytest.mark.parametrize for testing similar cases
Vendor-agnostic everywhere: Tests must use test-provider, test-small-001, etc. (Vendor names may only appear in operations design page, .claude/ skill/agent files, or third-party import paths)

Files:

  • tests/unit/engine/test_coordination_service.py
  • tests/unit/engine/test_coordination_group_builder.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Variable name: always logger (not _logger, not log)
Event names: always use constants from the domain-specific module under ai_company.observability.events (e.g. PROVIDER_CALL_START from events.provider). Import directly: from ai_company.observability.events.<domain> import EVENT_CONSTANT
Structured kwargs in logging: always logger.info(EVENT, key=value) — never logger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO
DEBUG for object creation, internal flow, entry/exit of key functions
Vendor-agnostic everywhere: NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
🧠 Learnings (6)
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising

Applied to files:

  • src/ai_company/engine/coordination/group_builder.py
  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/providers/**/*.py : Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. Non-retryable errors raise immediately without retry. `RetryExhaustedError` signals that all retries failed — the engine layer catches this to trigger fallback chains.

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Errors: handle explicitly, never silently swallow

Applied to files:

  • src/ai_company/engine/coordination/service.py
  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Async concurrency: prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`. Existing code is being migrated incrementally.

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: When review agents find valid issues (including pre-existing issues in surrounding code, suggestions, and findings adjacent to the PR's changes), fix them all. No deferring, no 'out of scope' skipping.

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO

Applied to files:

  • src/ai_company/engine/coordination/dispatchers.py
🧬 Code graph analysis (2)
tests/unit/engine/test_coordination_service.py (8)
src/ai_company/engine/decomposition/models.py (3)
  • DecompositionResult (125-176)
  • derived_parent_status (232-259)
  • DecompositionPlan (66-122)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (358-369)
  • TaskStatus (198-224)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/coordination/models.py (2)
  • CoordinationContext (30-61)
  • is_success (196-198)
src/ai_company/engine/coordination/service.py (2)
  • MultiAgentCoordinator (50-546)
  • coordinate (92-194)
src/ai_company/engine/parallel_models.py (3)
  • AgentOutcome (143-193)
  • agent_id (79-81)
  • task_id (87-89)
src/ai_company/engine/workspace/models.py (2)
  • MergeResult (94-141)
  • Workspace (36-65)
src/ai_company/engine/decomposition/rollup.py (2)
  • StatusRollup (21-84)
  • compute (25-84)
src/ai_company/engine/coordination/dispatchers.py (10)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (358-369)
src/ai_company/engine/coordination/group_builder.py (1)
  • build_execution_waves (43-163)
src/ai_company/engine/coordination/models.py (2)
  • CoordinationPhaseResult (64-96)
  • CoordinationWave (99-125)
src/ai_company/engine/errors.py (1)
  • CoordinationError (128-129)
src/ai_company/engine/workspace/models.py (3)
  • Workspace (36-65)
  • WorkspaceGroupResult (144-181)
  • WorkspaceRequest (12-33)
src/ai_company/engine/coordination/config.py (1)
  • CoordinationConfig (8-38)
src/ai_company/engine/decomposition/models.py (1)
  • DecompositionResult (125-176)
src/ai_company/engine/parallel.py (2)
  • ParallelExecutor (81-591)
  • execute_group (110-215)
src/ai_company/engine/routing/models.py (1)
  • RoutingResult (91-157)
src/ai_company/engine/workspace/service.py (4)
  • WorkspaceIsolationService (43-215)
  • setup_group (72-117)
  • merge_group (142-168)
  • teardown_group (170-215)
🔇 Additional comments (8)
src/ai_company/engine/coordination/group_builder.py (1)

43-163: Function exceeds 50-line limit.

build_execution_waves is ~93 lines. Extract the per-wave assignment loop (lines 86-134) into a helper to stay under the limit. As per coding guidelines: "Functions: < 50 lines".

src/ai_company/engine/coordination/service.py (1)

210-211: LGTM on PEP 758 except syntax.

The except MemoryError, RecursionError: syntax correctly uses Python 3.14's PEP 758 (no parentheses), and catastrophic errors are re-raised immediately before generic handling.

Also applies to: 266-267

tests/unit/engine/test_coordination_group_builder.py (1)

1-370: LGTM!

Comprehensive test coverage for build_execution_waves: DAG structures, config propagation, workspace mapping, unroutable/blocked handling, and error paths. Tests are well-organized with descriptive names and proper @pytest.mark.unit markers.

docs/design/engine.md (1)

796-840: LGTM!

Clear and accurate documentation of the multi-agent coordination pipeline. The diagram includes all phases (decompose → route → resolve topology → validate → dispatch → rollup → update parent), and the dispatcher table correctly describes workspace isolation behavior per topology.

tests/unit/engine/test_coordination_service.py (3)

513-526: LGTM on MemoryError propagation test.

The test correctly verifies that MemoryError propagates unchanged rather than being wrapped in CoordinationPhaseError. This aligns with the implementation's special-casing of catastrophic exceptions.


1-876: Comprehensive test suite for MultiAgentCoordinator.

Excellent coverage of coordination pipeline phases, error handling, workspace lifecycle, cost aggregation, rollup semantics, and fail-fast behavior. Tests properly use @pytest.mark.unit markers and avoid vendor names.


249-296: This test behavior is correct and reflects the intended design.

With fail_fast=False, dependent waves execute even if earlier waves fail. This is intentional: dependencies organize subtasks into sequential waves at build time (via DAG structure), but execution-time failures don't block downstream waves. The fail_fast flag controls only whether the coordination loop stops after a wave failure—it doesn't enforce dependency constraints at runtime. Wave 0 fails but wave 1 executes because the dispatcher loop continues (no break), allowing all pre-built waves to run regardless of prior results. This is the correct behavior for fail_fast=False.

src/ai_company/engine/coordination/dispatchers.py (1)

1-858: 🧹 Nitpick | 🔵 Trivial

File exceeds 800-line limit.

At 858 lines, this file is above the 800-line threshold. Consider splitting topology dispatchers into separate modules (e.g., dispatchers/sas.py, dispatchers/centralized.py) or extracting shared helpers into a dispatchers_helpers.py module. As per coding guidelines: "files < 800 lines".

⛔ Skipped due to learnings
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Functions: < 50 lines, files < 800 lines
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Line length: 88 characters (ruff)

Comment on lines +677 to +792
async def _execute_wave( # noqa: PLR0913
self,
wave_idx: int,
group: ParallelExecutionGroup,
parallel_executor: ParallelExecutor,
all_waves: list[CoordinationWave],
all_phases: list[CoordinationPhaseResult],
wave_workspaces: tuple[Workspace, ...],
workspace_service: WorkspaceIsolationService | None,
merge_results: list[WorkspaceGroupResult],
) -> bool:
"""Execute a single wave and handle per-wave merge/teardown.

Returns:
True if the wave failed, False if it succeeded.
"""
start = time.monotonic()
subtask_ids = tuple(a.task.id for a in group.assignments)
wave_failed = False

logger.info(
COORDINATION_WAVE_STARTED,
wave_index=wave_idx,
subtask_count=len(subtask_ids),
)

try:
exec_result = await parallel_executor.execute_group(group)
elapsed = time.monotonic() - start

all_waves.append(
CoordinationWave(
wave_index=wave_idx,
subtask_ids=subtask_ids,
execution_result=exec_result,
)
)

success = exec_result.all_succeeded
wave_failed = not success
error_msg = (
None
if success
else f"Wave {wave_idx}: {exec_result.agents_failed} agent(s) failed"
)
all_phases.append(
CoordinationPhaseResult(
phase=f"execute_wave_{wave_idx}",
success=success,
duration_seconds=elapsed,
error=error_msg,
)
)

if success:
logger.info(
COORDINATION_WAVE_COMPLETED,
wave_index=wave_idx,
succeeded=exec_result.agents_succeeded,
failed=exec_result.agents_failed,
duration_seconds=elapsed,
)
else:
logger.warning(
COORDINATION_WAVE_COMPLETED,
wave_index=wave_idx,
succeeded=exec_result.agents_succeeded,
failed=exec_result.agents_failed,
duration_seconds=elapsed,
)

except Exception as exc:
elapsed = time.monotonic() - start
wave_failed = True
logger.warning(
COORDINATION_PHASE_FAILED,
phase=f"execute_wave_{wave_idx}",
wave_index=wave_idx,
error=str(exc),
)
all_waves.append(
CoordinationWave(
wave_index=wave_idx,
subtask_ids=subtask_ids,
)
)
all_phases.append(
CoordinationPhaseResult(
phase=f"execute_wave_{wave_idx}",
success=False,
duration_seconds=elapsed,
error=str(exc),
)
)
finally:
if wave_workspaces and workspace_service is not None:
# Only merge if the wave succeeded
if not wave_failed:
merge_phase_name = f"merge_wave_{wave_idx}"
merge_result, merge_phase = await _merge_workspaces(
workspace_service,
wave_workspaces,
phase_name=merge_phase_name,
)
all_phases.append(merge_phase)
if merge_result is not None:
merge_results.append(merge_result)
else:
logger.warning(
COORDINATION_PHASE_FAILED,
phase=f"merge_wave_{wave_idx}",
error="Skipped merge: wave failed",
)
await _teardown_workspaces(workspace_service, wave_workspaces)

return wave_failed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Split _execute_wave to reduce complexity.

At ~116 lines, this method handles execution, result construction, logging, merge, and teardown. Consider extracting the merge/teardown block (lines 771-790) into a separate _finalize_wave helper to improve readability and stay closer to the 50-line guideline. As per coding guidelines: "Functions: < 50 lines".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/coordination/dispatchers.py` around lines 677 - 792,
The _execute_wave function is too long; extract the final merge/teardown block
into a new async helper named _finalize_wave to reduce complexity. Move the
logic currently under finally (the conditional that checks wave_workspaces and
workspace_service, calls _merge_workspaces, appends merge_phase/merge_result,
logs skipped merges, and calls _teardown_workspaces) into async def
_finalize_wave(workspace_service, wave_workspaces, wave_idx, wave_failed,
all_phases, merge_results) and have _execute_wave simply await
_finalize_wave(...) in the finally clause; preserve the same logging messages,
phase naming (merge_wave_{wave_idx}), and error handling semantics, and ensure
type hints and imports mirror existing signatures so behavior and side-effects
(appending to all_phases/all_waves/merge_results) remain unchanged.

Comment on lines +210 to +211
except MemoryError, RecursionError:
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python 2 except syntax — SyntaxError in Python 3

except MemoryError, RecursionError: is Python 2 syntax. In Python 3 (including the >=3.14 requirement for this project) the , separator in an except clause means "catch MemoryError and bind it to the name RecursionError" — which triggers a SyntaxError. The correct Python 3 syntax for catching multiple exception types is to use a parenthesised tuple.

The same issue appears again on line 266 in _phase_route.

Suggested change
except MemoryError, RecursionError:
raise
except (MemoryError, RecursionError):
raise
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 210-211

Comment:
**Python 2 `except` syntax — SyntaxError in Python 3**

`except MemoryError, RecursionError:` is Python 2 syntax. In Python 3 (including the `>=3.14` requirement for this project) the `,` separator in an except clause means *"catch `MemoryError` and bind it to the name `RecursionError`"* — which triggers a `SyntaxError`. The correct Python 3 syntax for catching multiple exception types is to use a parenthesised tuple.

The same issue appears again on line 266 in `_phase_route`.

```suggestion
        except (MemoryError, RecursionError):
            raise
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +266 to +267
except MemoryError, RecursionError:
raise
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate Python 2 except syntax — SyntaxError in Python 3

Same issue as line 210: except MemoryError, RecursionError: is not valid Python 3 syntax and will raise a SyntaxError at import time on Python 3.14.

Suggested change
except MemoryError, RecursionError:
raise
except (MemoryError, RecursionError):
raise
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 266-267

Comment:
**Duplicate Python 2 `except` syntax — SyntaxError in Python 3**

Same issue as line 210: `except MemoryError, RecursionError:` is not valid Python 3 syntax and will raise a `SyntaxError` at import time on Python 3.14.

```suggestion
        except (MemoryError, RecursionError):
            raise
```

How can I resolve this? If you propose a fix, please make it concise.

- Validate topology consistency across routing decisions
- Log WARNING before raising in validation guard and group_builder
Copilot AI review requested due to automatic review settings March 13, 2026 06:51
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 06:52 — with GitHub Actions Inactive
@Aureliolo Aureliolo merged commit 2f10d49 into main Mar 13, 2026
15 of 16 checks passed
@Aureliolo Aureliolo deleted the feat/runtime-wiring branch March 13, 2026 06:53
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 06:53 — with GitHub Actions Inactive
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new multi-agent coordination subsystem to the engine, wiring decomposition + routing + topology-based dispatch (with optional workspace isolation) into a single orchestration pipeline with structured observability and stronger Pydantic validation.

Changes:

  • Introduces MultiAgentCoordinator plus coordination models/config and DAG→wave planning (build_execution_waves).
  • Adds topology dispatchers (SAS / centralized / decentralized / context-dependent) integrating workspace setup/merge/teardown and fail-fast behavior.
  • Adds coordination observability event constants + extensive unit test coverage and documentation updates.

Reviewed changes

Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Ensures coordination event constants are present and domain module is discoverable.
tests/unit/engine/test_coordination_service.py End-to-end coordinator pipeline tests (happy path, failures, rollups, parent update, costs, fail-fast).
tests/unit/engine/test_coordination_models.py Validation tests for coordination models (cross-field invariants, non-empty collections).
tests/unit/engine/test_coordination_group_builder.py Tests for DAG→execution wave conversion and unroutable/blocked handling.
tests/unit/engine/test_coordination_errors.py Tests for new coordination error types.
tests/unit/engine/test_coordination_dispatchers.py Dispatcher behavior tests (workspace lifecycle, fail-fast, merge gating, setup failures).
tests/unit/engine/test_coordination_config.py Tests for coordination config defaults and validation.
tests/unit/engine/conftest.py Adds shared coordination test helpers (subtasks, decomposition, routing, exec results).
src/ai_company/observability/events/coordination.py Adds coordination event name constants.
src/ai_company/engine/errors.py Adds CoordinationError / CoordinationPhaseError.
src/ai_company/engine/coordination/service.py Implements the coordination pipeline and phase tracking/logging.
src/ai_company/engine/coordination/models.py Defines frozen Pydantic models for coordination context/results/waves/phases.
src/ai_company/engine/coordination/group_builder.py Implements wave planning from decomposition + routing, with blocked/unroutable handling.
src/ai_company/engine/coordination/dispatchers.py Implements topology dispatchers and shared workspace/wave execution helpers.
src/ai_company/engine/coordination/config.py Adds CoordinationConfig model.
src/ai_company/engine/coordination/init.py Re-exports coordination public API.
src/ai_company/engine/init.py Re-exports coordination types from the engine top-level package.
docs/design/engine.md Documents the coordination pipeline and dispatcher behaviors.
CLAUDE.md Updates package-structure and logging-event guidance to include coordination.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +175 to +183
except CoordinationPhaseError:
raise
except Exception as exc:
logger.exception(
COORDINATION_FAILED,
parent_task_id=task.id,
error=str(exc),
)
raise
Comment on lines +206 to +213
try:
result = await self._decomposition_service.decompose_task(
context.task, context.decomposition_context
)
except MemoryError, RecursionError:
raise
except Exception as exc:
elapsed = time.monotonic() - start
Comment on lines +159 to +176
logger.info(COORDINATION_PHASE_STARTED, phase=phase_name)
try:
requests = _build_workspace_requests(routing_result, config)
workspaces = await workspace_service.setup_group(requests=requests)
except Exception as exc:
elapsed = time.monotonic() - start
phase = CoordinationPhaseResult(
phase=phase_name,
success=False,
duration_seconds=elapsed,
error=str(exc),
)
logger.warning(
COORDINATION_PHASE_FAILED,
phase=phase_name,
error=str(exc),
)
return (), phase
Comment on lines +327 to +335
except Exception as exc:
elapsed = time.monotonic() - start
logger.warning(
COORDINATION_PHASE_FAILED,
phase=phase_name,
wave_index=wave_idx,
error=str(exc),
)
wave = CoordinationWave(
Comment on lines +649 to +668
try:
wave_workspaces = await workspace_service.setup_group(
requests=wave_requests,
)
except Exception as exc:
ws_elapsed = time.monotonic() - ws_start
logger.warning(
COORDINATION_PHASE_FAILED,
phase=f"workspace_setup_wave_{wave_idx}",
error=str(exc),
)
all_phases.append(
CoordinationPhaseResult(
phase=f"workspace_setup_wave_{wave_idx}",
success=False,
duration_seconds=ws_elapsed,
error=str(exc),
)
)
return (), None
Comment on lines +754 to +776
except Exception as exc:
elapsed = time.monotonic() - start
wave_failed = True
logger.warning(
COORDINATION_PHASE_FAILED,
phase=f"execute_wave_{wave_idx}",
wave_index=wave_idx,
error=str(exc),
)
all_waves.append(
CoordinationWave(
wave_index=wave_idx,
subtask_ids=subtask_ids,
)
)
all_phases.append(
CoordinationPhaseResult(
phase=f"execute_wave_{wave_idx}",
success=False,
duration_seconds=elapsed,
error=str(exc),
)
)
Comment on lines +398 to +404
return await dispatcher.dispatch(
decomposition_result=decomp_result,
routing_result=routing_result,
parallel_executor=self._parallel_executor,
workspace_service=self._workspace_service,
config=context.config,
)
Aureliolo added a commit that referenced this pull request Mar 13, 2026
🤖 I have created a release *beep* *boop*
---


##
[0.1.3](v0.1.2...v0.1.3)
(2026-03-13)


### Features

* add Mem0 memory backend adapter
([#345](#345))
([2788db8](2788db8)),
closes [#206](#206)
* centralized single-writer TaskEngine with full CRUD API
([#328](#328))
([9c1a3e1](9c1a3e1))
* incremental AgentEngine → TaskEngine status sync
([#331](#331))
([7a68d34](7a68d34)),
closes [#323](#323)
* web dashboard pages — views, components, tests, and review fixes
([#354](#354))
([b165ec4](b165ec4))
* web dashboard with Vue 3 + PrimeVue + Tailwind CSS
([#347](#347))
([06416b1](06416b1))


### Bug Fixes

* harden coordination pipeline with validators, logging, and fail-fast
([#333](#333))
([2f10d49](2f10d49)),
closes [#205](#205)
* repo-wide security hardening from ZAP, Scorecard, and CodeQL audit
([#357](#357))
([27eb288](27eb288))


### CI/CD

* add pip-audit, hadolint, OSSF Scorecard, ZAP DAST, and pre-push hooks
([#350](#350))
([2802d20](2802d20))
* add workflow_dispatch trigger to PR Preview for Dependabot PRs
([#326](#326))
([4c7b6d9](4c7b6d9))
* bump astral-sh/setup-uv from 7.4.0 to 7.5.0 in the minor-and-patch
group ([#335](#335))
([98dd8ca](98dd8ca))


### Maintenance

* bump the minor-and-patch group across 1 directory with 3 updates
([#352](#352))
([031b1c9](031b1c9))
* **deps:** bump devalue from 5.6.3 to 5.6.4 in /site in the
npm_and_yarn group across 1 directory
([#324](#324))
([9a9c600](9a9c600))
* migrate docs build from MkDocs to Zensical
([#330](#330))
([fa8bf1d](fa8bf1d)),
closes [#329](#329)

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: wire runtime multi-agent coordination

2 participants