fix: harden coordination pipeline with validators, logging, and fail-fast#333
fix: harden coordination pipeline with validators, logging, and fail-fast#333
Conversation
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a multi‑agent coordination subsystem: new coordination package (models, config, dispatchers, group builder), a MultiAgentCoordinator service implementing a multi‑phase pipeline, workspace lifecycle helpers, coordination errors and observability events, package exports, docs, and extensive unit tests. Changes
sequenceDiagram
participant Client
participant Coordinator as MultiAgentCoordinator
participant Decomp as DecompositionService
participant Router as TaskRoutingService
participant Dispatcher as TopologyDispatcher
participant Executor as ParallelExecutor
participant Workspace as WorkspaceIsolationService
participant Engine as TaskEngine
Client->>Coordinator: coordinate(context)
Coordinator->>Decomp: decompose(task)
Decomp-->>Coordinator: DecompositionResult
Coordinator->>Router: route(decomposition)
Router-->>Coordinator: RoutingResult
Coordinator->>Coordinator: resolve_topology(routing_result)
Coordinator->>Dispatcher: dispatch(decomposition, routing, executor, config, workspace_service?)
Dispatcher->>Workspace: setup_workspaces(...)
Workspace-->>Dispatcher: workspaces
Dispatcher->>Executor: execute(waves)
Executor-->>Dispatcher: ParallelExecutionResults
Dispatcher->>Workspace: merge_workspaces(workspaces)
Workspace-->>Dispatcher: WorkspaceGroupResult
Dispatcher-->>Coordinator: DispatchResult
Coordinator->>Coordinator: rollup(execution_results)
Coordinator->>Engine: update_parent_task(rollup) (optional)
Engine-->>Coordinator: update_response
Coordinator-->>Client: CoordinationResult
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
📝 Coding Plan
Comment |
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly hardens the multi-agent coordination pipeline by introducing robust data validation, improving error handling with a functional fail-fast mechanism, and ensuring critical workspace isolation requirements are met. It also enhances system observability through comprehensive logging and refines internal type consistency and code structure, leading to a more stable and predictable execution environment for complex multi-agent tasks. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #333 +/- ##
==========================================
+ Coverage 93.76% 93.82% +0.05%
==========================================
Files 434 441 +7
Lines 19850 20243 +393
Branches 1915 1950 +35
==========================================
+ Hits 18613 18993 +380
- Misses 956 966 +10
- Partials 281 284 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive multi-agent coordination pipeline, complete with new services, dispatchers, and data models. The changes significantly enhance the system's capabilities by adding robust validation, improved logging, and fixing a critical fail-fast bug. The addition of extensive tests and documentation is commendable. My review identified a high-severity issue where AgentAssignment objects are created without all necessary fields, potentially leading to incorrect agent behavior. I also found a couple of medium-severity opportunities for improvement: one to enhance logging consistency and another to refactor object creation for better maintainability. Overall, this is a strong contribution that hardens the coordination engine.
| assignments.append( | ||
| AgentAssignment( | ||
| identity=candidate.agent_identity, | ||
| task=task, | ||
| resource_claims=resource_claims, | ||
| ) | ||
| ) |
There was a problem hiding this comment.
The AgentAssignment is being created with only a subset of its fields (identity, task, resource_claims). Other important fields like max_turns, timeout_seconds, memory_messages, and completion_config are omitted, causing them to fall back to default values. This is likely incorrect as it could lead to unexpected agent behavior by losing execution context (e.g., custom timeouts or turn limits). These parameters should be propagated from the parent task or another configuration source to ensure agents execute with the correct settings.
| new_assignments = tuple( | ||
| AgentAssignment( | ||
| identity=a.identity, | ||
| task=a.task, | ||
| resource_claims=(ws_lookup[a.task.id],) | ||
| if a.task.id in ws_lookup | ||
| else a.resource_claims, | ||
| max_turns=a.max_turns, | ||
| timeout_seconds=a.timeout_seconds, | ||
| memory_messages=a.memory_messages, | ||
| completion_config=a.completion_config, | ||
| ) | ||
| for a in group.assignments | ||
| ) | ||
| return ParallelExecutionGroup( | ||
| group_id=group.group_id, | ||
| assignments=new_assignments, | ||
| max_concurrency=group.max_concurrency, | ||
| fail_fast=group.fail_fast, | ||
| ) |
There was a problem hiding this comment.
Instead of manually reconstructing AgentAssignment and ParallelExecutionGroup objects, consider using Pydantic's model_copy(update=...) method. This approach is more concise and robust against future changes to the models. If new fields are added to AgentAssignment or ParallelExecutionGroup, model_copy will handle them automatically, whereas the current implementation would require manual updates.
new_assignments = tuple(
a.model_copy(update={"resource_claims": (ws_lookup[a.task.id],)}) if a.task.id in ws_lookup else a
for a in group.assignments
)
return group.model_copy(update={"assignments": new_assignments})| logger.info( | ||
| COORDINATION_WAVE_COMPLETED, | ||
| wave_index=wave_idx, | ||
| succeeded=exec_result.agents_succeeded, | ||
| failed=exec_result.agents_failed, | ||
| ) |
There was a problem hiding this comment.
The logger.info call for COORDINATION_WAVE_COMPLETED is missing the duration_seconds field. This is inconsistent with the logging in the _execute_waves helper function and reduces observability. The elapsed time is available in this scope and should be included in the log for better performance monitoring.
logger.info(
COORDINATION_WAVE_COMPLETED,
wave_index=wave_idx,
succeeded=exec_result.agents_succeeded,
failed=exec_result.agents_failed,
duration_seconds=elapsed,
)
Greptile SummaryThis PR introduces the complete multi-agent coordination pipeline ( Key findings:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant C as MultiAgentCoordinator
participant D as DecompositionService
participant R as TaskRoutingService
participant SD as select_dispatcher
participant TD as TopologyDispatcher
participant PE as ParallelExecutor
participant WS as WorkspaceIsolationService
C->>D: decompose_task(task, context)
D-->>C: DecompositionResult
C->>R: route(decomp_result, agents, task)
R-->>C: RoutingResult
C->>C: _resolve_topology(routing_result)
C->>C: _validate_routing(routing_result)
C->>SD: select_dispatcher(topology)
SD-->>C: TopologyDispatcher instance
C->>TD: dispatch(decomp, routing, executor, ws_service, config)
opt workspace isolation enabled
TD->>WS: setup_group(requests)
WS-->>TD: tuple[Workspace]
end
loop per DAG wave
TD->>PE: execute_group(ParallelExecutionGroup)
PE-->>TD: ParallelExecutionResult
opt per-wave merge (ContextDependent)
TD->>WS: merge_group(wave_workspaces)
WS-->>TD: WorkspaceGroupResult
end
end
opt post-all-waves merge (Centralized/Decentralized)
TD->>WS: merge_group(all_workspaces)
WS-->>TD: WorkspaceGroupResult
end
opt cleanup
TD->>WS: teardown_group(workspaces)
end
TD-->>C: DispatchResult
C->>C: _phase_rollup(dispatch_result, decomp_result)
C->>C: _phase_update_parent(rollup)
C-->>C: CoordinationResult
|
There was a problem hiding this comment.
Pull request overview
Introduces a multi-agent coordination pipeline (models, dispatchers, orchestration service) with stricter validation, improved observability event coverage, and extensive unit tests to harden fail-fast and workspace-isolation behavior.
Changes:
- Added coordination domain package (
engine/coordination) including config, models/validators, DAG-to-wave builder, topology dispatchers, andMultiAgentCoordinatororchestrator. - Added coordination observability event constants and corresponding event tests.
- Added comprehensive unit tests for coordinator, dispatchers, group builder, models, config, and new coordination errors; updated docs to describe the pipeline.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Adds coordination events existence assertions. |
| tests/unit/engine/test_coordination_service.py | New tests for end-to-end coordination service behavior and error paths. |
| tests/unit/engine/test_coordination_models.py | New tests for model validators and computed fields. |
| tests/unit/engine/test_coordination_group_builder.py | New tests for DAG→wave group building and workspace resource claim mapping. |
| tests/unit/engine/test_coordination_errors.py | New tests for coordination error hierarchy. |
| tests/unit/engine/test_coordination_dispatchers.py | New tests for dispatcher selection and per-topology dispatch behavior (including fail-fast). |
| tests/unit/engine/test_coordination_config.py | New tests for coordination config validation and immutability. |
| src/ai_company/observability/events/coordination.py | Adds coordination event name constants. |
| src/ai_company/engine/errors.py | Adds CoordinationError and CoordinationPhaseError. |
| src/ai_company/engine/coordination/service.py | Implements MultiAgentCoordinator orchestration pipeline with phase tracking and logging. |
| src/ai_company/engine/coordination/models.py | Adds frozen Pydantic models + cross-field validation for coordination results. |
| src/ai_company/engine/coordination/group_builder.py | Adds wave construction from dependency DAG + routing decisions. |
| src/ai_company/engine/coordination/dispatchers.py | Adds topology dispatchers, workspace lifecycle helpers, and fail-fast handling. |
| src/ai_company/engine/coordination/config.py | Adds coordination config model (frozen, extra forbidden). |
| src/ai_company/engine/coordination/init.py | Exposes coordination public API. |
| src/ai_company/engine/init.py | Re-exports coordination types/services from engine package. |
| docs/design/engine.md | Documents the multi-agent coordination pipeline and dispatcher behaviors. |
| CLAUDE.md | Updates package structure description and logging/event guidance. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| """Decentralized dispatcher. | ||
|
|
||
| Single wave with all subtasks in parallel. Mandatory workspace | ||
| isolation — raises ``CoordinationError`` if workspace service | ||
| is unavailable or isolation is disabled. | ||
| """ |
| for wave_idx, group in enumerate(groups): | ||
| start = time.monotonic() | ||
| phase_name = f"execute_wave_{wave_idx}" | ||
| subtask_ids = tuple(NotBlankStr(a.task.id) for a in group.assignments) |
| True if the wave failed, False if it succeeded. | ||
| """ | ||
| start = time.monotonic() | ||
| subtask_ids = tuple(NotBlankStr(a.task.id) for a in group.assignments) |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| phase = CoordinationPhaseResult( | ||
| phase=phase_name, | ||
| success=False, |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| phase = CoordinationPhaseResult( | ||
| phase=phase_name, | ||
| success=False, |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=phase_name, | ||
| error=str(exc), | ||
| ) |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=phase_name, | ||
| error=str(exc), | ||
| ) |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=phase_name, | ||
| wave_index=wave_idx, | ||
| error=str(exc), | ||
| ) |
There was a problem hiding this comment.
Actionable comments posted: 13
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/design/engine.md`:
- Around line 762-764: The fenced code block containing the pipeline diagram
"decompose → route → resolve topology → dispatch → rollup → update parent" lacks
a language tag; update that fenced block to include the `text` language
specifier (i.e., change the opening triple backticks to "```text") so the
diagram is treated as plain text and satisfies linting.
In `@src/ai_company/engine/coordination/dispatchers.py`:
- Around line 267-291: The log call for wave completion currently always uses
logger.info with COORDINATION_WAVE_COMPLETED; change it to emit a warning when
the wave did not fully succeed by checking the local success variable (derived
from exec_result.all_succeeded) and calling logger.warning (or logger.error if
you prefer) with the same structured fields (wave_index=wave_idx,
succeeded=exec_result.agents_succeeded, failed=exec_result.agents_failed,
duration_seconds=elapsed, and include error or context) while keeping
logger.info for successful waves; apply the same change to the analogous block
around the symbols mentioned at lines 670-691 so failed waves are emitted at
warning level.
- Around line 426-431: Avoid merging workspaces that failed during execution:
update the logic around calls to _merge_workspaces so it only runs for
workspaces that succeeded (or when execute_group() returned no failures) instead
of passing the full workspaces list; filter the workspaces variable to a list of
successful_workspace objects (e.g., by checking a success flag or absence of
errors reported by execute_group()) before calling
_merge_workspaces(workspace_service, workspaces) and only append merge_phase to
all_phases when the merge is attempted; apply the same filtering/gating pattern
to the other call sites you noted (the blocks around lines where
_merge_workspaces is called, e.g., the occurrences near execute_group() results
and the later blocks at the other two locations).
- Around line 768-789: The COORDINATION_TOPOLOGY_RESOLVED event is emitted
prematurely; modify the dispatcher selection (the match on CoordinationTopology
in the dispatcher factory function that returns
SasDispatcher/CentralizedDispatcher/DecentralizedDispatcher/ContextDependentDispatcher)
so that COORDINATION_TOPOLOGY_RESOLVED is emitted only after a successful match
and not in the error branch (AUTO/unresolved). Move the logger call out of the
top of the function and call logger.info(COORDINATION_TOPOLOGY_RESOLVED,
topology=topology.value) after the matched dispatcher is created (or immediately
before returning the dispatcher) and do not emit it in the default (_) case
where you currently build the msg and
logger.warning(COORDINATION_PHASE_FAILED,...).
- Around line 462-467: Before raising CoordinationError in the decentralized
precondition guard, log the failure with context: emit a logger.warning or
logger.error that includes the values of workspace_service (e.g., whether it's
None) and config.enable_workspace_isolation so telemetry captures the cause,
then raise CoordinationError as before; use the existing module/class logger (or
create one if missing) and include the same descriptive message currently
assigned to msg.
In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 42-138: The build_execution_waves function is doing too many
things; extract the per-wave assignment materialization into a new helper (e.g.
_materialize_wave_assignments or _build_wave_assignments) that accepts
(wave_idx, subtask_ids, routing_lookup, task_lookup, workspace_lookup, config)
and returns the tuple/list of AgentAssignment plus any per-wave logging
metadata; keep DAG reconstruction and lookup creation
(DependencyGraph(plan.subtasks), parallel_groups, routing_lookup,
workspace_lookup, task_lookup) in build_execution_waves, call the new helper for
each parallel_groups entry, and move the logic that checks for missing
decisions, builds resource_claims, appends AgentAssignment, and produces
group_id/logging into that helper so build_execution_waves only assembles
ParallelExecutionGroup from the helper result and returns the groups. Ensure
tests and existing logger calls (COORDINATION_WAVE_BUILT) are preserved by
invoking logging from the new helper where appropriate.
- Around line 73-109: The code currently indexes task_lookup[subtask_id]
directly which raises a bare KeyError for routed subtasks missing from
decomposition_result.created_tasks; instead, in the loop in group_builder.py
before using task_lookup and creating AgentAssignment, check membership (if
subtask_id not in task_lookup) and log a WARNING or ERROR via logger including
wave_idx and subtask_id (use COORDINATION_WAVE_BUILT or a similar contextual
log) and then raise an explicit coordination failure (e.g., a CoordinationError
/ CoordinationException or a clear RuntimeError) so callers get a documented
failure path rather than a raw KeyError; update references to task_lookup,
decomposition_result.created_tasks, routing_lookup, wave_idx, and
AgentAssignment accordingly.
In `@src/ai_company/engine/coordination/service.py`:
- Around line 211-225: Before re-raising the exception in the decomposition
except block, log a WARNING with contextual data: call the module/class logger
(e.g., logger.warning or self.logger.warning) including phase_name, elapsed
(time.monotonic() - start), the exception exc, and current partial phases
(phases) so the failure is recorded before raising CoordinationPhaseError; keep
the existing creation of CoordinationPhaseResult and raising of
CoordinationPhaseError unchanged but add the warning log immediately before the
raise.
- Around line 260-274: Insert a WARNING-level log right before raising
CoordinationPhaseError in the except block that handles routing failures: log a
message containing phase_name and the exception details and include the current
partial_phases (the tuple(phases)) so the failure is recorded before raising
CoordinationPhaseError; place the log call immediately before the raise and use
the module/class logger's warning method (include exception info for traceback)
to mirror the behavior used in _phase_decompose.
In `@tests/unit/engine/test_coordination_dispatchers.py`:
- Around line 89-111: The helper _make_routing currently hard-codes
RoutingDecision.topology=CoordinationTopology.CENTRALIZED; change _make_routing
to accept a topology parameter (e.g., topology: CoordinationTopology =
CoordinationTopology.CENTRALIZED) and use that value when constructing each
RoutingDecision, then update the test call sites that expect non-centralized
behavior to pass the appropriate CoordinationTopology (e.g., DECENTRALIZED, SAS,
or CONTEXT_DEPENDENT) so routing metadata matches the dispatcher under test;
adjust the return type still as RoutingResult with decisions tuple and keep
parent_task_id default as before.
In `@tests/unit/engine/test_coordination_models.py`:
- Around line 267-310: Add a cross-field validator to the CoordinationResult
Pydantic model to ensure any nested parent_task_id fields belong to the same
aggregate: check decomposition_result.plan.parent_task_id,
routing_result.parent_task_id, and status_rollup.parent_task_id (if those nested
objects are present) and raise a validation error if any of them do not equal
CoordinationResult.parent_task_id; implement this as a root validator on
CoordinationResult in src/ai_company/engine/coordination/models.py and add a
unit test that supplies a mismatched parent_task_id for one of those nested
fields to assert the model rejects it.
In `@tests/unit/engine/test_coordination_service.py`:
- Around line 382-428: The test test_partial_execution_fail_fast_off should not
run a dependent subtask after its prerequisite fails: either make the downstream
subtask independent or assert it is skipped/blocked; update the decomposition or
assertions accordingly. Specifically, in the test body that constructs decomp
via _make_decomposition and the subtasks sub_a/sub_b, either remove the
dependency tuple ("sub-a",) from sub_b so sub_b is independent (then keep the
current expect-two-waves assertions), or keep the dependency and change the
expectations on coordinator.coordinate(ctx) to assert that the second wave was
not executed/was skipped/blocked (e.g., len(result.waves) reflects only executed
waves and status_rollup.failed/completed counts reflect the skipped dependent
wave). Locate the constructs _make_subtask("sub-b", dependencies=(...)),
_make_decomposition(...), the coordinator created via _make_coordinator(...),
and the CoordinationConfig(fail_fast=False) to apply the chosen fix.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a15572d6-b798-4487-afc0-d11dbdd4871a
📒 Files selected for processing (18)
CLAUDE.mddocs/design/engine.mdsrc/ai_company/engine/__init__.pysrc/ai_company/engine/coordination/__init__.pysrc/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/models.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/errors.pysrc/ai_company/observability/events/coordination.pytests/unit/engine/test_coordination_config.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_errors.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_models.pytests/unit/engine/test_coordination_service.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Agent
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
- GitHub Check: Analyze (python)
🧰 Additional context used
📓 Path-based instructions (5)
docs/design/*.md
📄 CodeRabbit inference engine (CLAUDE.md)
When approved deviations occur, update the relevant
docs/design/page to reflect the new reality
Files:
docs/design/engine.md
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slow
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/engine/test_coordination_config.pytests/unit/engine/test_coordination_models.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_service.pytests/unit/engine/test_coordination_errors.pytests/unit/observability/test_events.py
{src,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like
example-provider,example-large-001,test-provider,test-small-001
Files:
tests/unit/engine/test_coordination_config.pytests/unit/engine/test_coordination_models.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pytests/unit/engine/test_coordination_group_builder.pysrc/ai_company/engine/coordination/__init__.pysrc/ai_company/observability/events/coordination.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_service.pytests/unit/engine/test_coordination_errors.pysrc/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/models.pytests/unit/observability/test_events.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/__init__.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:(no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, usecopy.deepcopy()at construction andMappingProxyTypewrapping for read-only enforcement
Fordict/listfields in frozen Pydantic models, usecopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state usingmodel_copy(update=...)
Use Pydantic v2 withBaseModel,model_validator,computed_field,ConfigDict
Use@computed_fieldfor derived values instead of storing redundant fields (e.g.TokenUsage.total_tokens)
UseNotBlankStrfromcore.typesfor all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over barecreate_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Use onlyloggeras the variable name (not_logger, notlog)
Use domain-specific event name constants fromai_company.observability.eventsmodules (e.g...
Files:
src/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/__init__.pysrc/ai_company/observability/events/coordination.pysrc/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/models.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/__init__.py
src/ai_company/{providers,engine}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
RetryExhaustedErrorsignals all retries failed — the engine layer catches this to trigger fallback chains
Files:
src/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/__init__.pysrc/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/models.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/__init__.py
🧠 Learnings (9)
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Every module with business logic must have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use domain-specific event name constants from `ai_company.observability.events` modules (e.g. `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly and never use string literals for event names
Applied to files:
CLAUDE.mdsrc/ai_company/observability/events/coordination.pytests/unit/observability/test_events.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use structured logging with `logger.info(EVENT, key=value)` — never use `logger.info("msg %s", val)` formatting
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use only `logger` as the variable name (not `_logger`, not `log`)
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : All error paths must log at WARNING or ERROR with context before raising an exception
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : All state transitions must log at INFO level
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state using `model_copy(update=...)`
Applied to files:
src/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/models.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/ai_company/{providers,engine}/**/*.py : `RetryExhaustedError` signals all retries failed — the engine layer catches this to trigger fallback chains
Applied to files:
src/ai_company/engine/errors.py
🧬 Code graph analysis (5)
tests/unit/engine/test_coordination_config.py (2)
src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)tests/unit/engine/test_coordination_models.py (1)
test_frozen(69-76)
tests/unit/engine/test_coordination_models.py (6)
src/ai_company/core/enums.py (2)
CoordinationTopology(358-369)TaskStatus(198-224)tests/unit/engine/conftest.py (3)
engine(432-443)make_assignment_agent(371-393)make_assignment_task(396-407)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/coordination/models.py (5)
CoordinationContext(30-61)CoordinationPhaseResult(64-96)CoordinationResult(128-190)CoordinationWave(99-125)is_success(188-190)src/ai_company/engine/decomposition/models.py (2)
SubtaskStatusRollup(179-259)derived_parent_status(232-259)src/ai_company/engine/parallel_models.py (4)
AgentOutcome(143-193)ParallelExecutionResult(196-248)task_id(87-89)agent_id(79-81)
tests/unit/engine/test_coordination_group_builder.py (5)
src/ai_company/core/enums.py (2)
CoordinationTopology(358-369)TaskStructure(346-355)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/coordination/group_builder.py (1)
build_execution_waves(42-138)src/ai_company/engine/decomposition/models.py (3)
DecompositionPlan(66-122)DecompositionResult(125-176)SubtaskDefinition(22-63)src/ai_company/engine/routing/models.py (2)
RoutingCandidate(23-45)RoutingResult(91-157)
src/ai_company/engine/coordination/models.py (7)
src/ai_company/core/agent.py (1)
AgentIdentity(266-342)src/ai_company/core/enums.py (1)
CoordinationTopology(358-369)src/ai_company/core/task.py (1)
Task(45-261)src/ai_company/engine/decomposition/models.py (2)
DecompositionContext(262-287)DecompositionResult(125-176)src/ai_company/engine/parallel_models.py (1)
ParallelExecutionResult(196-248)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)src/ai_company/engine/workspace/models.py (1)
WorkspaceGroupResult(144-181)
src/ai_company/engine/errors.py (1)
src/ai_company/engine/coordination/models.py (1)
CoordinationPhaseResult(64-96)
🪛 LanguageTool
CLAUDE.md
[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...
(EG_NO_COMMA)
🪛 markdownlint-cli2 (0.21.0)
docs/design/engine.md
[warning] 762-762: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (15)
CLAUDE.md (2)
95-95: LGTM!The engine description accurately reflects the new coordination package with TopologyDispatcher protocol, dispatcher implementations, wave execution, and workspace lifecycle integration.
130-130: LGTM!The event naming guidance is updated consistently with other domain events, including the new
COORDINATION_STARTEDfromevents.coordination.src/ai_company/engine/coordination/config.py (1)
1-38: LGTM!The
CoordinationConfigmodel follows project conventions:
- Frozen Pydantic model for configuration immutability
- Uses
NotBlankStrfor thebase_branchidentifier field- Sensible defaults and appropriate field constraints (
ge=1for concurrency)- Google-style docstring with attribute documentation
src/ai_company/engine/coordination/service.py (2)
50-91: LGTM!The
MultiAgentCoordinatorclass follows the project's dependency injection pattern with clear separation of concerns. The__slots__declaration is appropriate for memory efficiency, and the docstring comprehensively documents the purpose and constructor parameters.
92-195: LGTM!The
coordinate()method implements a well-structured pipeline with:
- Proper phase timing using
time.monotonic()- Comprehensive event logging at start and completion
- Correct exception handling that re-raises
CoordinationPhaseErrorand logs unexpected exceptions- Clean aggregation of phase results and cost calculation
src/ai_company/observability/events/coordination.py (1)
1-17: LGTM!Event constants follow the established naming convention (
domain.subject.qualifier) and are properly typed withFinal[str]. The coverage is comprehensive for the coordination lifecycle: start/complete/fail, phases, waves, topology resolution, and cleanup.tests/unit/observability/test_events.py (2)
189-189: LGTM!Correctly adds
"coordination"to the expected domain modules set for discovery validation.
634-663: LGTM!The
test_coordination_events_existmethod follows the established pattern for event existence tests, verifying all 13 coordination event constants with their expected string values. The inline imports and assertions are consistent with other similar tests in this file.src/ai_company/engine/coordination/models.py (4)
30-61: LGTM!
CoordinationContextis well-designed with:
- Frozen configuration for immutability
- Sensible defaults for
decomposition_contextandconfig- Proper validation ensuring at least one agent is available
64-96: LGTM!
CoordinationPhaseResultincludes excellent defensive validation in_validate_success_error_consistencyensuring that:
- Successful phases cannot have errors
- Failed phases must have an error description
This prevents inconsistent state representation.
99-125: LGTM!
CoordinationWavecorrectly models an execution wave with proper constraints (wave_index >= 0, non-emptysubtask_ids) and optionalexecution_resultfor waves that haven't executed yet.
128-190: LGTM!
CoordinationResultprovides a comprehensive aggregation of the coordination run:
- Enforces at least one phase via
min_length=1- Uses
@computed_fieldforis_successas per guidelines- Properly types optional fields (
decomposition_result,routing_result,status_rollup,workspace_merge)src/ai_company/engine/errors.py (2)
3-6: LGTM!Correctly uses
TYPE_CHECKINGguard to importCoordinationPhaseResultfor type annotations only, avoiding potential circular import issues betweenerrors.pyandcoordination/models.py.
128-152: LGTM!The coordination error hierarchy is well-designed:
CoordinationErrorprovides a clear base for coordination failuresCoordinationPhaseErrorcarries diagnostic context (phase,partial_phases) enabling partial-result inspection- Keyword-only arguments with sensible defaults follow best practices
- Google-style docstring properly documents the attributes
src/ai_company/engine/__init__.py (1)
42-57: Public coordination exports stay consistent.The new coordination imports are mirrored in
__all__, so the package-level API remains explicit and avoids half-exported symbols.Also applies to: 73-75, 210-225, 237-258, 282-335
| def _make_routing( | ||
| subtask_agent_pairs: list[tuple[str, str]], | ||
| *, | ||
| parent_task_id: str = "parent-1", | ||
| ) -> RoutingResult: | ||
| decisions: list[RoutingDecision] = [] | ||
| for subtask_id, agent_name in subtask_agent_pairs: | ||
| agent = make_assignment_agent(agent_name) | ||
| decisions.append( | ||
| RoutingDecision( | ||
| subtask_id=subtask_id, | ||
| selected_candidate=RoutingCandidate( | ||
| agent_identity=agent, | ||
| score=0.9, | ||
| reason="Good match", | ||
| ), | ||
| topology=CoordinationTopology.CENTRALIZED, | ||
| ) | ||
| ) | ||
| return RoutingResult( | ||
| parent_task_id=parent_task_id, | ||
| decisions=tuple(decisions), | ||
| ) |
There was a problem hiding this comment.
Make the shared routing fixture topology-aware.
_make_routing() hard-codes RoutingDecision.topology=CoordinationTopology.CENTRALIZED, so the SAS, decentralized, and context-dependent tests all run with inconsistent routing metadata. That weakens those cases and can hide mismatches between the selected dispatcher and the routing result.
Suggested change
def _make_routing(
subtask_agent_pairs: list[tuple[str, str]],
*,
parent_task_id: str = "parent-1",
+ topology: CoordinationTopology = CoordinationTopology.CENTRALIZED,
) -> RoutingResult:
@@
selected_candidate=RoutingCandidate(
agent_identity=agent,
score=0.9,
reason="Good match",
),
- topology=CoordinationTopology.CENTRALIZED,
+ topology=topology,
)
)Then pass the dispatcher-specific topology at each non-centralized call site.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_coordination_dispatchers.py` around lines 89 - 111,
The helper _make_routing currently hard-codes
RoutingDecision.topology=CoordinationTopology.CENTRALIZED; change _make_routing
to accept a topology parameter (e.g., topology: CoordinationTopology =
CoordinationTopology.CENTRALIZED) and use that value when constructing each
RoutingDecision, then update the test call sites that expect non-centralized
behavior to pass the appropriate CoordinationTopology (e.g., DECENTRALIZED, SAS,
or CONTEXT_DEPENDENT) so routing metadata matches the dispatcher under test;
adjust the return type still as RoutingResult with decisions tuple and keep
parent_task_id default as before.
| def test_optional_fields_default_none(self) -> None: | ||
| """Optional fields default to None.""" | ||
| result = CoordinationResult( | ||
| parent_task_id="task-1", | ||
| topology=CoordinationTopology.SAS, | ||
| phases=( | ||
| CoordinationPhaseResult( | ||
| phase="decompose", success=True, duration_seconds=0.0 | ||
| ), | ||
| ), | ||
| total_duration_seconds=0.0, | ||
| ) | ||
| assert result.decomposition_result is None | ||
| assert result.routing_result is None | ||
| assert result.status_rollup is None | ||
| assert result.workspace_merge is None | ||
| assert result.waves == () | ||
| assert result.total_cost_usd == 0.0 | ||
|
|
||
| @pytest.mark.unit | ||
| def test_with_status_rollup(self) -> None: | ||
| """Result can carry a status rollup.""" | ||
| rollup = SubtaskStatusRollup( | ||
| parent_task_id="task-1", | ||
| total=2, | ||
| completed=2, | ||
| failed=0, | ||
| in_progress=0, | ||
| blocked=0, | ||
| cancelled=0, | ||
| ) | ||
| result = CoordinationResult( | ||
| parent_task_id="task-1", | ||
| topology=CoordinationTopology.CENTRALIZED, | ||
| phases=( | ||
| CoordinationPhaseResult( | ||
| phase="rollup", success=True, duration_seconds=0.01 | ||
| ), | ||
| ), | ||
| status_rollup=rollup, | ||
| total_duration_seconds=5.0, | ||
| ) | ||
| assert result.status_rollup is not None | ||
| assert result.status_rollup.derived_parent_status == TaskStatus.COMPLETED |
There was a problem hiding this comment.
Add a mismatched-parent regression for CoordinationResult.
In src/ai_company/engine/coordination/models.py, CoordinationResult currently has no cross-field validator tying decomposition_result.plan.parent_task_id, routing_result.parent_task_id, or status_rollup.parent_task_id back to parent_task_id. That means another task’s routing or rollup can be attached to the wrong aggregate result without being rejected. Please cover that case here and validate it in the model. As per coding guidelines, "Validate at system boundaries (user input, external APIs, config files)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_coordination_models.py` around lines 267 - 310, Add a
cross-field validator to the CoordinationResult Pydantic model to ensure any
nested parent_task_id fields belong to the same aggregate: check
decomposition_result.plan.parent_task_id, routing_result.parent_task_id, and
status_rollup.parent_task_id (if those nested objects are present) and raise a
validation error if any of them do not equal CoordinationResult.parent_task_id;
implement this as a root validator on CoordinationResult in
src/ai_company/engine/coordination/models.py and add a unit test that supplies a
mismatched parent_task_id for one of those nested fields to assert the model
rejects it.
Connect existing engine subsystems (DecompositionService, TaskRoutingService, ParallelExecutor, WorkspaceIsolationService, TaskEngine) into an end-to-end multi-agent coordination pipeline with topology-driven dispatch. New engine/coordination/ package: - CoordinationConfig, CoordinationContext, CoordinationResult models - TopologyDispatcher protocol with 4 implementations: SAS (sequential), Centralized (parallel + shared workspace), Decentralized (all-parallel + mandatory workspace), ContextDependent (conditional per-wave isolation) - MultiAgentCoordinator service: decompose → route → resolve topology → dispatch (workspace setup → execute waves → merge) → rollup → update parent - DAG-based wave grouping via build_execution_waves() - CoordinationError/CoordinationPhaseError with partial phase tracking - 11 structured observability event constants - 68 tests across 6 test files (config, models, errors, group_builder, dispatchers, service)
…fast Pre-reviewed by 8 agents, 32 findings addressed: - Add model validators for CoordinationPhaseResult (success/error consistency), CoordinationWave (non-empty subtask_ids), and CoordinationResult (min 1 phase) - Fix ContextDependentDispatcher fail_fast bug (waves continued after failure regardless of config) - Add mandatory workspace validation to DecentralizedDispatcher - Add missing WARNING/ERROR logging on 7+ error paths across dispatchers and service - Fix misleading CLEANUP_COMPLETED event for failures → CLEANUP_FAILED - Separate WAVE_BUILT event from WAVE_STARTED for planning vs execution - Tighten types: partial_phases uses CoordinationPhaseResult, rollup uses SubtaskStatusRollup - Add coordination pipeline section to docs/design/engine.md - Update CLAUDE.md package structure and logging event docs - Add comprehensive test coverage: fail_fast, AUTO fallback, update parent failures, rollup errors, workspace setup failures, cost aggregation, model validators, event constants
…pilot - Add input validators (non-empty waves, subtask ID presence, topology resolution) - Add structured logging across dispatchers, service, and group_builder - Implement fail-fast wave execution and merge gating (skip merge on failure) - Guard against missing created_task in group_builder with CoordinationError - Extract ~340 lines of shared test helpers into conftest.py - Add 7 new test cases (fail-fast, merge gating, workspace failure, models) - Fix TC001 lint compliance for type-only imports - Update engine design doc for merge gating behavior
0602c90 to
b5f359c
Compare
- Add explicit guard for workspace_service narrowing after needs_isolation check - Add explicit TopologyDispatcher type annotation in select_dispatcher
There was a problem hiding this comment.
Actionable comments posted: 6
♻️ Duplicate comments (4)
src/ai_company/engine/coordination/models.py (1)
146-190:⚠️ Potential issue | 🟠 MajorReject nested results from a different parent task.
CoordinationResultonly validatestopologytoday, so adecomposition_result,routing_result, orstatus_rollupfrom another parent task is accepted and then reported under this aggregate. That can corrupt rollups and parent updates for the wrong task. The new acceptance test intests/unit/engine/test_coordination_models.pyshould flip once this validator exists.Suggested fix
`@model_validator`(mode="after") def _validate_topology_resolved(self) -> Self: """Ensure topology is resolved (not AUTO) in final result.""" if self.topology == CoordinationTopology.AUTO: msg = "CoordinationResult topology must be resolved, not AUTO" raise ValueError(msg) return self + + `@model_validator`(mode="after") + def _validate_parent_task_ids(self) -> Self: + """Ensure nested results belong to the same parent task.""" + if ( + self.decomposition_result is not None + and self.decomposition_result.plan.parent_task_id != self.parent_task_id + ): + msg = "decomposition_result parent_task_id must match parent_task_id" + raise ValueError(msg) + if ( + self.routing_result is not None + and self.routing_result.parent_task_id != self.parent_task_id + ): + msg = "routing_result parent_task_id must match parent_task_id" + raise ValueError(msg) + if ( + self.status_rollup is not None + and self.status_rollup.parent_task_id != self.parent_task_id + ): + msg = "status_rollup parent_task_id must match parent_task_id" + raise ValueError(msg) + return selfAs per coding guidelines, "Validate: at system boundaries (user input, external APIs, config files)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/models.py` around lines 146 - 190, Add validation to CoordinationResult to reject nested results that reference a different parent task: in the existing model_validator (or a new after-mode validator) for CoordinationResult (method _validate_topology_resolved), check any present decomposition_result.parent_task_id, routing_result.parent_task_id, and status_rollup.parent_task_id (and workspace_merge.parent_task_id if applicable) and raise ValueError if any of them do not equal self.parent_task_id; keep the existing topology AUTO check and return self when all validations pass.tests/unit/engine/test_coordination_dispatchers.py (1)
129-134:⚠️ Potential issue | 🟡 MinorPass the dispatcher topology into
make_routing().These SAS/decentralized/context-dependent cases still rely on
make_routing()'s defaultCENTRALIZEDmetadata. That weakens coverage and can hide mismatches between the selected dispatcher and the routing result. Apply the same fix to the other non-centralized call sites in this file.Representative fix
routing = make_routing( [ ("sub-a", "alice"), ("sub-b", "alice"), - ] + ], + topology=CoordinationTopology.SAS, )Also applies to: 394-399, 489-494
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_coordination_dispatchers.py` around lines 129 - 134, The tests call make_routing(...) without passing the dispatcher topology, so they default to CENTRALIZED and miss coverage for non-centralized cases; update each call (e.g., the routing assignment in test_coordination_dispatchers.py) to pass the appropriate topology argument (use the dispatcher topology constant instead of default CENTRALIZED) so make_routing(subscriptions, topology=YOUR_TOPOLOGY) matches the dispatcher under test; apply the same change to the other non-centralized call sites in this file (the other make_routing invocations that should not use CENTRALIZED).tests/unit/engine/test_coordination_service.py (1)
250-296:⚠️ Potential issue | 🟠 MajorDon't execute a dependent wave after its prerequisite fails.
sub_bdepends onsub-a, but this test still expects wave 1 to run after wave 0 failed. That turns dependency edges into ordering-only hints and can produce invalid downstream work. If the goal is to cover non-fail_fastcontinuation, make the subtasks independent; otherwise assert the dependent wave is skipped/blocked.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_coordination_service.py` around lines 250 - 296, The test currently defines sub_b with a dependency on sub_a (make_subtask("sub-b", dependencies=("sub-a",))) but then expects wave 1 to run after wave 0 fails; either make the tasks independent or change assertions. Fix by making sub_b independent (remove the dependencies argument) and update the decomposition to use an independent structure (e.g., TaskStructure.PARALLEL in the make_decomposition call) so the test validates non-fail_fast continuation; keep the rest of the setup (routing, exec_results, CoordinationConfig(fail_fast=False), and assertions) unchanged.src/ai_company/engine/coordination/group_builder.py (1)
96-102:⚠️ Potential issue | 🟠 MajorLog the routing/decomposition mismatch before raising.
This still raises
CoordinationErrorwithout a WARNING, so stale routing output is invisible in coordination telemetry. Logwave_indexandsubtask_idbefore raising.Suggested fix
from ai_company.observability.events.coordination import ( + COORDINATION_PHASE_FAILED, COORDINATION_WAVE_BUILT, ) @@ if task is None: msg = ( f"Subtask {subtask_id!r} has a routing decision " "but no corresponding created task in decomposition" ) + logger.warning( + COORDINATION_PHASE_FAILED, + phase="build_execution_waves", + wave_index=wave_idx, + subtask_id=subtask_id, + error=msg, + ) raise CoordinationError(msg)As per coding guidelines, "All error paths must log at WARNING or ERROR with context before raising" and "Validate: at system boundaries (user input, external APIs, config files)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/group_builder.py` around lines 96 - 102, The code in group_builder.py raises CoordinationError when a routing decision references a missing task but does not log context; before raising in the block that checks task_lookup.get(subtask_id) == None, add a warning log (e.g., using the module logger or the existing coordination logger) that includes wave_index and subtask_id and any other relevant context (like the routing entry or decomposition id) so stale routing output is visible in telemetry, then raise CoordinationError as before; update the logging call adjacent to the check in the function/method that builds groups (the block that constructs msg and raises CoordinationError) to emit logger.warning(...) with wave_index and subtask_id.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/design/engine.md`:
- Around line 762-764: The pipeline diagram currently reads "decompose → route →
resolve topology → dispatch → rollup → update parent" but the implementation and
numbered list treat validation as its own phase; update the diagram to insert
the validate phase between "resolve topology" and "dispatch" (i.e., "decompose →
route → resolve topology → validate → dispatch → rollup → update parent") so it
matches the implementation and clarifies where unroutable-only runs fail.
In `@src/ai_company/engine/coordination/dispatchers.py`:
- Around line 396-410: Validate that every routed subtask ID in routing_result
exists in decomposition_result.created_tasks before calling _setup_workspaces or
performing any workspace side effects; implement a cheap preflight helper (e.g.,
validate_routed_task_ids(routing_result, decomposition_result)) and call it at
the start of the dispatcher flow where workspace_service and
config.enable_workspace_isolation are checked, returning a failed DispatchResult
if validation fails. Ensure the helper references
decomposition_result.created_tasks and the routed IDs from routing_result so
build_execution_waves and _setup_workspaces are only invoked when IDs are
consistent; apply the same change to the other dispatcher branch that also calls
_setup_workspaces/build_execution_waves.
In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 84-94: The loop in parallel_groups/building waves treats a missing
routing decision (routing_lookup.get(subtask_id) is None) as a simple continue,
which lets routed descendants run later and breaks dependency semantics; update
the handling so that when decision is None you (1) locate all downstream
descendants of subtask_id using the same dependency/graph structure used by
parallel_groups (the subtask dependency map / graph used to compute waves) and
mark those descendants as blocked/unroutable (e.g., set their routing_lookup
entry or a blocked set) so they cannot be scheduled in later waves, or (2) if
your policy is to fail fast, raise a coordination error when a skipped subtask
has any dependents; also update the COORDINATION_WAVE_BUILT logging to reflect
that the subtask blocked its descendants (include wave_idx, subtask_id,
skipped=True, blocked_descendants count/list) so the behavior is visible during
debugging.
In `@src/ai_company/engine/coordination/service.py`:
- Around line 139-148: Wrap the dispatcher selection and dispatch call into a
new helper _phase_dispatch(...) that measures duration, calls
select_dispatcher(topology) and await dispatcher.dispatch(...), and on any
exception appends a CoordinationPhaseResult (or appropriate phase result)
marking the dispatch phase as failed with partial_phases included, then
re-raises a CoordinationPhaseError containing the partial phases; replace the
inline block around select_dispatcher and dispatcher.dispatch in both places
(the current dispatch block and the similar block at lines 176-184) with calls
to _phase_dispatch so callers always receive a dispatch-phase result even on
errors and the function stays under 50 lines. Ensure the helper accepts the same
inputs used now (decomp_result, routing_result,
parallel_executor/self._parallel_executor,
workspace_service/self._workspace_service, context.config) and returns the
dispatch result or raises CoordinationPhaseError after appending the
failed-phase result to phases.
- Line 151: The rollup currently only iterates dispatch_result.waves via
_phase_rollup(), causing unrouted or skipped subtasks to disappear and making
rollup_status() undercount work; update the call sites (around where rollup =
self._phase_rollup(context, dispatch_result, phases) and the similar block at
lines ~361-389) to pass the full expected subtask ID set (from
build_execution_waves() or dispatch_result.expected_subtask_ids) into
_phase_rollup(), then modify _phase_rollup() to accept that ID set and, before
computing rollup_status(), insert any missing IDs into the statuses map with an
appropriate terminal state (BLOCKED or FAILED) so the rollup includes those
subtasks in totals; ensure you reference CoordinationWave entries as before but
compute rollup against the union of emitted wave results plus the filled-in
missing IDs so parents aren't marked complete prematurely.
In `@tests/unit/engine/test_coordination_service.py`:
- Around line 514-526: The test requires that MemoryError not be normalized into
CoordinationPhaseError; update MultiAgentCoordinator.coordinate (the phase
wrapper that currently catches exceptions and raises CoordinationPhaseError) to
special-case catastrophic exceptions by re-raising MemoryError (and similar
process-level exceptions if desired) before wrapping other exceptions.
Concretely, in the exception handling block (the try/except that currently
converts all errors into CoordinationPhaseError), add an early check like: if
isinstance(exc, MemoryError): raise, then proceed to wrap non-catastrophic
exceptions into CoordinationPhaseError so MemoryError propagates unchanged.
---
Duplicate comments:
In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 96-102: The code in group_builder.py raises CoordinationError when
a routing decision references a missing task but does not log context; before
raising in the block that checks task_lookup.get(subtask_id) == None, add a
warning log (e.g., using the module logger or the existing coordination logger)
that includes wave_index and subtask_id and any other relevant context (like the
routing entry or decomposition id) so stale routing output is visible in
telemetry, then raise CoordinationError as before; update the logging call
adjacent to the check in the function/method that builds groups (the block that
constructs msg and raises CoordinationError) to emit logger.warning(...) with
wave_index and subtask_id.
In `@src/ai_company/engine/coordination/models.py`:
- Around line 146-190: Add validation to CoordinationResult to reject nested
results that reference a different parent task: in the existing model_validator
(or a new after-mode validator) for CoordinationResult (method
_validate_topology_resolved), check any present
decomposition_result.parent_task_id, routing_result.parent_task_id, and
status_rollup.parent_task_id (and workspace_merge.parent_task_id if applicable)
and raise ValueError if any of them do not equal self.parent_task_id; keep the
existing topology AUTO check and return self when all validations pass.
In `@tests/unit/engine/test_coordination_dispatchers.py`:
- Around line 129-134: The tests call make_routing(...) without passing the
dispatcher topology, so they default to CENTRALIZED and miss coverage for
non-centralized cases; update each call (e.g., the routing assignment in
test_coordination_dispatchers.py) to pass the appropriate topology argument (use
the dispatcher topology constant instead of default CENTRALIZED) so
make_routing(subscriptions, topology=YOUR_TOPOLOGY) matches the dispatcher under
test; apply the same change to the other non-centralized call sites in this file
(the other make_routing invocations that should not use CENTRALIZED).
In `@tests/unit/engine/test_coordination_service.py`:
- Around line 250-296: The test currently defines sub_b with a dependency on
sub_a (make_subtask("sub-b", dependencies=("sub-a",))) but then expects wave 1
to run after wave 0 fails; either make the tasks independent or change
assertions. Fix by making sub_b independent (remove the dependencies argument)
and update the decomposition to use an independent structure (e.g.,
TaskStructure.PARALLEL in the make_decomposition call) so the test validates
non-fail_fast continuation; keep the rest of the setup (routing, exec_results,
CoordinationConfig(fail_fast=False), and assertions) unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1e0871ab-a201-43f1-8bea-9e9da98ebed6
📒 Files selected for processing (11)
docs/design/engine.mdsrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/models.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/errors.pytests/unit/engine/conftest.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_models.pytests/unit/engine/test_coordination_service.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Greptile Review
- GitHub Check: Analyze (python)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649 native lazy annotations
Useexcept A, B:syntax (no parentheses) — ruff enforces PEP 758 except syntax on Python 3.14
Type hints: all public functions, mypy strict mode
Docstrings: Google style, required on public classes/functions (enforced by ruff D rules)
Create new objects, never mutate existing ones. For non-Pydantic internal collections (registries,BaseTool), usecopy.deepcopy()at construction +MappingProxyTypewrapping for read-only enforcement.
Fordict/listfields in frozen Pydantic models, rely onfrozen=Truefor field reassignment prevention andcopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence).
Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (usingmodel_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Models: Pydantic v2 (BaseModel,model_validator,computed_field,ConfigDict). Use@computed_fieldfor derived values instead of storing + validating redundant fields. UseNotBlankStrfor all identifier/name fields — including optional and tuple variants — instead of manual whitespace validators.
Async concurrency: preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task. Existing code is being migrated incrementally.
Line length: 88 characters (ruff)
Functions: < 50 lines, files < 800 lines
Errors: handle explicitly, never silently swallow
Validate: at system boundaries (user input, external APIs, config files)
Files:
src/ai_company/engine/coordination/service.pytests/unit/engine/test_coordination_service.pysrc/ai_company/engine/coordination/group_builder.pytests/unit/engine/conftest.pysrc/ai_company/engine/coordination/dispatchers.pytests/unit/engine/test_coordination_group_builder.pysrc/ai_company/engine/errors.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_models.pysrc/ai_company/engine/coordination/models.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Variable name: alwayslogger(not_logger, notlog)
Event names: always use constants from the domain-specific module underai_company.observability.events(e.g.PROVIDER_CALL_STARTfromevents.provider). Import directly:from ai_company.observability.events.<domain> import EVENT_CONSTANT
Structured kwargs in logging: alwayslogger.info(EVENT, key=value)— neverlogger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO
DEBUG for object creation, internal flow, entry/exit of key functions
Vendor-agnostic everywhere: NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names:example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc.
Files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/coordination/models.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Tests: markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow
Coverage: 80% minimum (enforced in CI)
Async:asyncio_mode = "auto"— no manual@pytest.mark.asyncioneeded
Timeout: 30 seconds per test
Parallelism:pytest-xdistvia-n auto— ALWAYS include-n autowhen running pytest, never run tests sequentially
Prefer@pytest.mark.parametrizefor testing similar cases
Vendor-agnostic everywhere: Tests must usetest-provider,test-small-001, etc. (Vendor names may only appear in operations design page, .claude/ skill/agent files, or third-party import paths)
Files:
tests/unit/engine/test_coordination_service.pytests/unit/engine/conftest.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_models.py
🧠 Learnings (7)
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/providers/**/*.py : Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. Non-retryable errors raise immediately without retry. `RetryExhaustedError` signals that all retries failed — the engine layer catches this to trigger fallback chains.
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/errors.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Errors: handle explicitly, never silently swallow
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Async concurrency: prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`. Existing code is being migrated incrementally.
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: When review agents find valid issues (including pre-existing issues in surrounding code, suggestions, and findings adjacent to the PR's changes), fix them all. No deferring, no 'out of scope' skipping.
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Applied to files:
src/ai_company/engine/coordination/models.py
🧬 Code graph analysis (4)
src/ai_company/engine/coordination/service.py (6)
src/ai_company/core/enums.py (2)
CoordinationTopology(358-369)TaskStatus(198-224)src/ai_company/engine/coordination/dispatchers.py (7)
DispatchResult(49-76)select_dispatcher(791-826)dispatch(83-104)dispatch(349-372)dispatch(382-442)dispatch(453-523)dispatch(534-578)src/ai_company/engine/coordination/models.py (3)
CoordinationPhaseResult(64-96)CoordinationResult(128-198)is_success(196-198)src/ai_company/engine/decomposition/models.py (2)
DecompositionResult(125-176)SubtaskStatusRollup(179-259)src/ai_company/engine/decomposition/service.py (2)
DecompositionService(34-180)rollup_status(166-180)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)
src/ai_company/engine/coordination/group_builder.py (6)
src/ai_company/engine/decomposition/dag.py (2)
DependencyGraph(26-252)parallel_groups(185-228)src/ai_company/engine/parallel_models.py (2)
AgentAssignment(23-89)task_id(87-89)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/decomposition/models.py (1)
DecompositionResult(125-176)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)src/ai_company/engine/workspace/models.py (1)
Workspace(36-65)
src/ai_company/engine/errors.py (1)
src/ai_company/engine/coordination/models.py (1)
CoordinationPhaseResult(64-96)
tests/unit/engine/test_coordination_models.py (5)
src/ai_company/core/enums.py (2)
CoordinationTopology(358-369)TaskStatus(198-224)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/coordination/models.py (5)
CoordinationContext(30-61)CoordinationPhaseResult(64-96)CoordinationResult(128-198)CoordinationWave(99-125)is_success(196-198)src/ai_company/engine/decomposition/models.py (2)
SubtaskStatusRollup(179-259)derived_parent_status(232-259)src/ai_company/engine/parallel_models.py (4)
AgentOutcome(143-193)ParallelExecutionResult(196-248)task_id(87-89)agent_id(79-81)
| async def test_memory_error_propagated(self) -> None: | ||
| """MemoryError from decomposition is not swallowed.""" | ||
| coordinator = _make_coordinator( | ||
| decompose_error=MemoryError("out of memory"), | ||
| ) | ||
|
|
||
| ctx = CoordinationContext( | ||
| task=make_assignment_task(id="parent-1"), | ||
| available_agents=(make_assignment_agent("alice"),), | ||
| ) | ||
|
|
||
| with pytest.raises(CoordinationPhaseError): | ||
| await coordinator.coordinate(ctx) |
There was a problem hiding this comment.
Do not normalize MemoryError into CoordinationPhaseError.
This test locks in wrapping process-level resource exhaustion as a recoverable phase failure. MemoryError should propagate unchanged so the engine can fail fast instead of continuing in a compromised process. MultiAgentCoordinator.coordinate() should special-case catastrophic exceptions before its generic phase wrapper.
Expected assertion after special-casing catastrophic exceptions
- with pytest.raises(CoordinationPhaseError):
+ with pytest.raises(MemoryError):
await coordinator.coordinate(ctx)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_coordination_service.py` around lines 514 - 526, The
test requires that MemoryError not be normalized into CoordinationPhaseError;
update MultiAgentCoordinator.coordinate (the phase wrapper that currently
catches exceptions and raises CoordinationPhaseError) to special-case
catastrophic exceptions by re-raising MemoryError (and similar process-level
exceptions if desired) before wrapping other exceptions. Concretely, in the
exception handling block (the try/except that currently converts all errors into
CoordinationPhaseError), add an early check like: if isinstance(exc,
MemoryError): raise, then proceed to wrap non-catastrophic exceptions into
CoordinationPhaseError so MemoryError propagates unchanged.
There was a problem hiding this comment.
Pull request overview
Adds a new multi-agent “coordination” subsystem to the engine, wiring decomposition → routing → topology dispatch → rollup → optional parent update, with stronger validation and expanded observability event coverage to support Issue #205’s runtime coordination wiring.
Changes:
- Introduces coordination models/config, dispatcher implementations, and the
MultiAgentCoordinatororchestration service. - Adds coordination observability events and updates docs to describe the coordination pipeline and logging/event conventions.
- Adds extensive unit test coverage for coordination behavior (validators, workspace preconditions, fail-fast behavior, rollup, and error paths).
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Adds coordination event constants coverage and module discovery entry. |
| tests/unit/engine/test_coordination_service.py | End-to-end service tests for the new coordination pipeline (success + failure paths). |
| tests/unit/engine/test_coordination_models.py | Validator and invariants tests for coordination domain models. |
| tests/unit/engine/test_coordination_group_builder.py | Tests DAG→wave conversion and defensive behavior in wave building. |
| tests/unit/engine/test_coordination_errors.py | Tests new coordination error hierarchy behavior. |
| tests/unit/engine/test_coordination_dispatchers.py | Tests dispatcher behaviors across topologies, workspace lifecycle, and fail-fast. |
| tests/unit/engine/test_coordination_config.py | Tests the new coordination config model defaults and validation. |
| tests/unit/engine/conftest.py | Adds shared coordination test helpers (subtasks, decomposition, routing, exec results). |
| src/ai_company/observability/events/coordination.py | Defines coordination event name constants. |
| src/ai_company/engine/errors.py | Adds CoordinationError and CoordinationPhaseError. |
| src/ai_company/engine/coordination/service.py | Implements MultiAgentCoordinator pipeline orchestration. |
| src/ai_company/engine/coordination/models.py | Adds frozen Pydantic models for context, phase results, waves, and overall result. |
| src/ai_company/engine/coordination/group_builder.py | Converts dependency DAG + routing decisions into ParallelExecutionGroups. |
| src/ai_company/engine/coordination/dispatchers.py | Adds topology dispatchers + workspace lifecycle + shared wave execution helper. |
| src/ai_company/engine/coordination/config.py | Adds CoordinationConfig and its validation constraints. |
| src/ai_company/engine/coordination/init.py | Exposes coordination public API surface. |
| src/ai_company/engine/init.py | Re-exports coordination types/errors at the engine package level. |
| docs/design/engine.md | Documents the multi-agent coordination pipeline design and dispatcher semantics. |
| CLAUDE.md | Updates package structure description and adds coordination to event-constant examples. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| logger.info(COORDINATION_PHASE_STARTED, phase=phase_name) | ||
| try: | ||
| requests = _build_workspace_requests(routing_result, config) | ||
| workspaces = await workspace_service.setup_group(requests=requests) | ||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start |
| base_branch=config.base_branch, | ||
| ) | ||
| for a in group.assignments | ||
| ) | ||
| ws_start = time.monotonic() | ||
| try: | ||
| wave_workspaces = await workspace_service.setup_group( | ||
| requests=wave_requests, | ||
| ) | ||
| except Exception as exc: | ||
| ws_elapsed = time.monotonic() - ws_start | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=f"workspace_setup_wave_{wave_idx}", | ||
| error=str(exc), | ||
| ) | ||
| all_phases.append( | ||
| CoordinationPhaseResult( | ||
| phase=f"workspace_setup_wave_{wave_idx}", | ||
| success=False, |
| succeeded=exec_result.agents_succeeded, | ||
| failed=exec_result.agents_failed, | ||
| duration_seconds=elapsed, | ||
| ) | ||
|
|
||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| wave_failed = True | ||
| logger.warning( |
| msg = ( | ||
| f"Subtask {subtask_id!r} has a routing decision " | ||
| "but no corresponding created task in decomposition" | ||
| ) |
| try: | ||
| # Collect statuses from wave outcomes | ||
| statuses: list[TaskStatus] = [] | ||
| for wave in dispatch_result.waves: | ||
| if wave.execution_result is None: | ||
| statuses.extend(TaskStatus.BLOCKED for _ in wave.subtask_ids) | ||
| continue | ||
|
|
||
| for outcome in wave.execution_result.outcomes: | ||
| if outcome.is_success: | ||
| statuses.append(TaskStatus.COMPLETED) | ||
| else: | ||
| statuses.append(TaskStatus.FAILED) | ||
|
|
||
| rollup = self._decomposition_service.rollup_status( | ||
| context.task.id, | ||
| tuple(statuses), | ||
| ) |
| logger.info(COORDINATION_PHASE_STARTED, phase=phase_name) | ||
| try: | ||
| merge_result = await workspace_service.merge_group( | ||
| workspaces=workspaces, | ||
| ) | ||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start |
| try: | ||
| result = await self._decomposition_service.decompose_task( | ||
| context.task, context.decomposition_context | ||
| ) | ||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| logger.warning( |
| try: | ||
| result = self._routing_service.route( | ||
| decomp_result, | ||
| context.available_agents, | ||
| context.task, | ||
| ) | ||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start |
| except CoordinationPhaseError: | ||
| raise | ||
| except Exception as exc: | ||
| logger.exception( | ||
| COORDINATION_FAILED, | ||
| parent_task_id=task.id, | ||
| error=str(exc), | ||
| ) | ||
| raise |
| CoordinationPhaseResult( | ||
| phase=phase_name, | ||
| success=result.success, | ||
| duration_seconds=elapsed, | ||
| error=result.error, | ||
| ) | ||
| ) | ||
| except Exception as exc: |
There was a problem hiding this comment.
New validator + unconditional error=result.error creates a false-failure path
The CoordinationPhaseResult validator added in this very PR rejects success=True with a non-None error:
if self.success and self.error is not None:
raise ValueError("successful phase must not have an error")error=result.error is passed unconditionally here. If TaskMutationResult.error is ever non-None on a successful submission (e.g., a deprecation warning or advisory message from the engine), the CoordinationPhaseResult constructor raises ValueError. That exception is caught by the outer except Exception as exc handler, which logs it as COORDINATION_PHASE_FAILED and records a failed update_parent phase — even though the actual task transition succeeded. This silently masks a successful parent update and can make result.is_success return False for a fully-successful run.
| CoordinationPhaseResult( | |
| phase=phase_name, | |
| success=result.success, | |
| duration_seconds=elapsed, | |
| error=result.error, | |
| ) | |
| ) | |
| except Exception as exc: | |
| phases.append( | |
| CoordinationPhaseResult( | |
| phase=phase_name, | |
| success=result.success, | |
| duration_seconds=elapsed, | |
| error=result.error if not result.success else None, | |
| ) | |
| ) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 464-471
Comment:
**New validator + unconditional `error=result.error` creates a false-failure path**
The `CoordinationPhaseResult` validator added in this very PR rejects `success=True` with a non-`None` `error`:
```python
if self.success and self.error is not None:
raise ValueError("successful phase must not have an error")
```
`error=result.error` is passed unconditionally here. If `TaskMutationResult.error` is ever non-`None` on a successful submission (e.g., a deprecation warning or advisory message from the engine), the `CoordinationPhaseResult` constructor raises `ValueError`. That exception is caught by the outer `except Exception as exc` handler, which logs it as `COORDINATION_PHASE_FAILED` and records a failed `update_parent` phase — **even though the actual task transition succeeded**. This silently masks a successful parent update and can make `result.is_success` return `False` for a fully-successful run.
```suggestion
phases.append(
CoordinationPhaseResult(
phase=phase_name,
success=result.success,
duration_seconds=elapsed,
error=result.error if not result.success else None,
)
)
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (7)
tests/unit/engine/test_coordination_dispatchers.py (1)
709-742:⚠️ Potential issue | 🟠 MajorDon't lock in downstream execution after a prerequisite wave failed.
sub_bexplicitly depends onsub-a, but this test asserts that wave 1 still runs after wave 0 raised. That turns dependency edges into ordering-only hints and will mask the real fix if blocked descendants are enforced later. If the goal is to cover non-fail_fastcontinuation, make the second subtask independent; otherwise assert that the dependent wave is skipped/blocked.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_coordination_dispatchers.py` around lines 709 - 742, The test test_exception_with_fail_fast_off_continues currently makes sub_b depend on sub_a (via make_subtask("sub-b", dependencies=("sub-a",))) but then asserts wave 1 still ran after wave 0 failed, which is inconsistent with real dependency enforcement; either make the second subtask independent (remove dependencies from make_subtask for "sub-b") to test non-fail_fast continuation, or keep the dependency and change the assertions to expect the dependent wave to be skipped/blocked (e.g., result.waves[1].execution_result is None and that execute_group was not invoked for blocked wave); update the setup using make_subtask/make_decomposition/make_routing accordingly and adjust assertions for SasDispatcher dispatch with CoordinationConfig(fail_fast=False).docs/design/engine.md (1)
802-804:⚠️ Potential issue | 🟡 MinorAdd
validateto the pipeline diagram.The numbered list and implementation both have a dedicated validation step, but the diagram jumps straight from topology resolution to dispatch.
Suggested diff
-decompose → route → resolve topology → dispatch → rollup → update parent +decompose → route → resolve topology → validate → dispatch → rollup → update parent🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docs/design/engine.md` around lines 802 - 804, The pipeline diagram string "decompose → route → resolve topology → dispatch → rollup → update parent" is missing the dedicated validation step; update that diagram to insert "validate" after "resolve topology" so it reads "decompose → route → resolve topology → validate → dispatch → rollup → update parent" to match the numbered list and implementation (look for the pipeline text in the docs/design/engine.md content).tests/unit/engine/test_coordination_models.py (1)
375-406:⚠️ Potential issue | 🟠 MajorDon't codify foreign parent IDs as valid.
This test blesses attaching a
status_rollupfrom"other-task"toCoordinationResult(parent_task_id="task-1"). That lets another task's nested coordination data be associated with the wrong aggregate result. Please invert this into aValidationErrorcase onceCoordinationResultvalidates nestedparent_task_ids. As per coding guidelines, "Validate: at system boundaries (user input, external APIs, config files)".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_coordination_models.py` around lines 375 - 406, The test test_mismatched_parent_task_id_accepted currently asserts that a CoordinationResult can accept a SubtaskStatusRollup with a different parent_task_id; change it to assert that providing a rollup whose parent_task_id doesn't match CoordinationResult.parent_task_id raises a ValidationError once CoordinationResult enforces cross-validation. Update the test to construct the same SubtaskStatusRollup and CoordinationResult inputs but wrap the creation/validation of CoordinationResult in a pytest.raises(ValidationError) (or the project's equivalent validation exception) and assert the exception is raised; reference CoordinationResult and SubtaskStatusRollup to locate the code under test.src/ai_company/engine/coordination/service.py (3)
139-148:⚠️ Potential issue | 🟠 MajorWrap dispatch failures as a real coordination phase error.
select_dispatcher()anddispatcher.dispatch()can still raise here — for example, decentralized precondition failures currently bubble as raw exceptions. That means callers lose adispatchphase entry andpartial_phaseson the pipeline's most failure-prone step. Please funnel this block through a dedicated dispatch-phase helper that appends a failedCoordinationPhaseResultbefore re-raisingCoordinationPhaseError.Also applies to: 176-184
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/service.py` around lines 139 - 148, The dispatch block should be wrapped in a helper that runs select_dispatcher(...) and await dispatcher.dispatch(...), catching any Exception, creating and appending a failed CoordinationPhaseResult for the "dispatch" phase (including partial_phases/state from the attempted dispatch when available) to the phases list, and then re-raising a CoordinationPhaseError that carries that failed phase result; implement this by extracting the current dispatch calls into a helper (e.g., _run_dispatch_phase or similar) that returns a dispatch result on success and on exception appends a failed CoordinationPhaseResult and raises CoordinationPhaseError. Apply the same pattern to the other dispatch site (the similar block around lines 176-184) so both select_dispatcher and dispatcher.dispatch failures produce a proper dispatch-phase entry and raise CoordinationPhaseError.
372-389:⚠️ Potential issue | 🔴 CriticalRoll up against the full planned subtask set.
This only counts statuses from
dispatch_result.waves.build_execution_waves()drops unrouted/empty waves, and fail-fast or setup failures can skip later waves entirely, so missing subtasks disappear from the rollup and the parent can be marked complete with unfinished work. Seed the rollup from the full decomposition plan and synthesizeBLOCKED/FAILEDfor any subtask with no recorded outcome.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/service.py` around lines 372 - 389, The current rollup uses only statuses collected from dispatch_result.waves, which omits subtasks that were never routed; instead fetch the full decomposition plan (via the decomposition service used to build_execution_waves) for context.task.id, iterate the plan's ordered subtask ids, build a map of outcomes from dispatch_result.waves (by subtask id), and for each planned subtask append TaskStatus.COMPLETED if outcome.is_success, TaskStatus.FAILED if outcome exists but not success, or TaskStatus.BLOCKED if there is no recorded outcome; pass that complete tuple to self._decomposition_service.rollup_status(context.task.id, tuple(statuses)) so missing subtasks are treated as BLOCKED/FAILED rather than disappearing from the rollup.
211-230:⚠️ Potential issue | 🟠 MajorDon't normalize catastrophic exceptions into ordinary phase failures.
These generic
except Exceptionhandlers will wrap or swallowMemoryError, which lets the coordinator keep running after a process-fatal failure. At minimum, catastrophic exceptions should bypass the phase-failure path and propagate unchanged; that also means the newtest_memory_error_propagatedexpectation needs to flip toMemoryError. Based on learnings, "Errors: handle explicitly, never silently swallow".Also applies to: 265-284, 390-405, 471-485
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/service.py` around lines 211 - 230, The generic except Exception blocks (the handler that logs COORDINATION_PHASE_FAILED, creates CoordinationPhaseResult, appends to phases and raises CoordinationPhaseError) must not normalize catastrophic exceptions—explicitly detect and re-raise MemoryError (and optionally other fatal exceptions like SystemExit/KeyboardInterrupt) before converting to a phase failure; locate the try/except that uses phase_name, start, logger.warning(..., phase=phase_name, error=str(exc)), creates CoordinationPhaseResult and raises CoordinationPhaseError and add a guard such as if isinstance(exc, MemoryError): raise (and likewise for SystemExit/KeyboardInterrupt) so those exceptions propagate unchanged; apply the same change to the other identical handlers referenced (the blocks around the other ranges) so test_memory_error_propagated will see a raw MemoryError.src/ai_company/engine/coordination/group_builder.py (1)
84-94:⚠️ Potential issue | 🔴 CriticalDon't let an unroutable prerequisite disappear from the DAG.
When
decision is None, this code just skips the subtask.DependencyGraph.parallel_groups()only gives you topological order, so any routed descendants can still show up in later waves and run without their prerequisite ever being assigned. Please either fail coordination here or mark downstream dependents blocked before building subsequent waves. As per coding guidelines, "Validate: at system boundaries (user input, external APIs, config files)".🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/group_builder.py` around lines 84 - 94, The loop in group_builder.py that iterates over subtask_ids currently skips subtasks when routing_lookup.get(subtask_id) returns None, which lets downstream nodes run without their prerequisite; instead, when decision is None you must fail coordination or explicitly mark downstream dependents blocked: modify the loop (the branch checking "if decision is None") to either raise a clear exception (e.g., CoordinationError or RuntimeError) with context (subtask_id, wave_idx) so coordination aborts, or call the dependency-graph API to mark the entire descendant subtree of subtask_id as blocked before continuing (use the module/class that provides dependency traversal from DependencyGraph.parallel_groups(), e.g., dependency_graph.get_descendants(subtask_id) / dependency_graph.block_subtree(subtask_id)), and replace the current logger.debug-only behavior (COORDINATION_WAVE_BUILT, wave_idx, subtask_id, skipped=True) with the chosen failure/blocking action so unroutable prerequisites cannot disappear from the DAG.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/engine/coordination/service.py`:
- Around line 303-329: _resolve_topology currently trusts
routing_result.decisions[0].topology; instead, iterate all entries in
routing_result.decisions (RoutingResult) and ensure every decision.topology
equals the resolved topology (CoordinationTopology); if any differ, fail fast by
logging COORDINATION_PHASE_FAILED with a clear message about mixed topologies
and raise a suitable exception (eg. ValueError or a domain-specific error)
rather than proceeding, otherwise continue and log
COORDINATION_TOPOLOGY_RESOLVED as before; reference _resolve_topology,
RoutingResult.decisions, CoordinationTopology, COORDINATION_PHASE_FAILED, and
COORDINATION_TOPOLOGY_RESOLVED when making the change.
---
Duplicate comments:
In `@docs/design/engine.md`:
- Around line 802-804: The pipeline diagram string "decompose → route → resolve
topology → dispatch → rollup → update parent" is missing the dedicated
validation step; update that diagram to insert "validate" after "resolve
topology" so it reads "decompose → route → resolve topology → validate →
dispatch → rollup → update parent" to match the numbered list and implementation
(look for the pipeline text in the docs/design/engine.md content).
In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 84-94: The loop in group_builder.py that iterates over subtask_ids
currently skips subtasks when routing_lookup.get(subtask_id) returns None, which
lets downstream nodes run without their prerequisite; instead, when decision is
None you must fail coordination or explicitly mark downstream dependents
blocked: modify the loop (the branch checking "if decision is None") to either
raise a clear exception (e.g., CoordinationError or RuntimeError) with context
(subtask_id, wave_idx) so coordination aborts, or call the dependency-graph API
to mark the entire descendant subtree of subtask_id as blocked before continuing
(use the module/class that provides dependency traversal from
DependencyGraph.parallel_groups(), e.g.,
dependency_graph.get_descendants(subtask_id) /
dependency_graph.block_subtree(subtask_id)), and replace the current
logger.debug-only behavior (COORDINATION_WAVE_BUILT, wave_idx, subtask_id,
skipped=True) with the chosen failure/blocking action so unroutable
prerequisites cannot disappear from the DAG.
In `@src/ai_company/engine/coordination/service.py`:
- Around line 139-148: The dispatch block should be wrapped in a helper that
runs select_dispatcher(...) and await dispatcher.dispatch(...), catching any
Exception, creating and appending a failed CoordinationPhaseResult for the
"dispatch" phase (including partial_phases/state from the attempted dispatch
when available) to the phases list, and then re-raising a CoordinationPhaseError
that carries that failed phase result; implement this by extracting the current
dispatch calls into a helper (e.g., _run_dispatch_phase or similar) that returns
a dispatch result on success and on exception appends a failed
CoordinationPhaseResult and raises CoordinationPhaseError. Apply the same
pattern to the other dispatch site (the similar block around lines 176-184) so
both select_dispatcher and dispatcher.dispatch failures produce a proper
dispatch-phase entry and raise CoordinationPhaseError.
- Around line 372-389: The current rollup uses only statuses collected from
dispatch_result.waves, which omits subtasks that were never routed; instead
fetch the full decomposition plan (via the decomposition service used to
build_execution_waves) for context.task.id, iterate the plan's ordered subtask
ids, build a map of outcomes from dispatch_result.waves (by subtask id), and for
each planned subtask append TaskStatus.COMPLETED if outcome.is_success,
TaskStatus.FAILED if outcome exists but not success, or TaskStatus.BLOCKED if
there is no recorded outcome; pass that complete tuple to
self._decomposition_service.rollup_status(context.task.id, tuple(statuses)) so
missing subtasks are treated as BLOCKED/FAILED rather than disappearing from the
rollup.
- Around line 211-230: The generic except Exception blocks (the handler that
logs COORDINATION_PHASE_FAILED, creates CoordinationPhaseResult, appends to
phases and raises CoordinationPhaseError) must not normalize catastrophic
exceptions—explicitly detect and re-raise MemoryError (and optionally other
fatal exceptions like SystemExit/KeyboardInterrupt) before converting to a phase
failure; locate the try/except that uses phase_name, start, logger.warning(...,
phase=phase_name, error=str(exc)), creates CoordinationPhaseResult and raises
CoordinationPhaseError and add a guard such as if isinstance(exc, MemoryError):
raise (and likewise for SystemExit/KeyboardInterrupt) so those exceptions
propagate unchanged; apply the same change to the other identical handlers
referenced (the blocks around the other ranges) so test_memory_error_propagated
will see a raw MemoryError.
In `@tests/unit/engine/test_coordination_dispatchers.py`:
- Around line 709-742: The test test_exception_with_fail_fast_off_continues
currently makes sub_b depend on sub_a (via make_subtask("sub-b",
dependencies=("sub-a",))) but then asserts wave 1 still ran after wave 0 failed,
which is inconsistent with real dependency enforcement; either make the second
subtask independent (remove dependencies from make_subtask for "sub-b") to test
non-fail_fast continuation, or keep the dependency and change the assertions to
expect the dependent wave to be skipped/blocked (e.g.,
result.waves[1].execution_result is None and that execute_group was not invoked
for blocked wave); update the setup using
make_subtask/make_decomposition/make_routing accordingly and adjust assertions
for SasDispatcher dispatch with CoordinationConfig(fail_fast=False).
In `@tests/unit/engine/test_coordination_models.py`:
- Around line 375-406: The test test_mismatched_parent_task_id_accepted
currently asserts that a CoordinationResult can accept a SubtaskStatusRollup
with a different parent_task_id; change it to assert that providing a rollup
whose parent_task_id doesn't match CoordinationResult.parent_task_id raises a
ValidationError once CoordinationResult enforces cross-validation. Update the
test to construct the same SubtaskStatusRollup and CoordinationResult inputs but
wrap the creation/validation of CoordinationResult in a
pytest.raises(ValidationError) (or the project's equivalent validation
exception) and assert the exception is raised; reference CoordinationResult and
SubtaskStatusRollup to locate the code under test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: c19e23b7-8803-4b47-a557-89672d08bbe6
📒 Files selected for processing (19)
CLAUDE.mddocs/design/engine.mdsrc/ai_company/engine/__init__.pysrc/ai_company/engine/coordination/__init__.pysrc/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/models.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/errors.pysrc/ai_company/observability/events/coordination.pytests/unit/engine/conftest.pytests/unit/engine/test_coordination_config.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/test_coordination_errors.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_models.pytests/unit/engine/test_coordination_service.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649 native lazy annotations
Useexcept A, B:syntax (no parentheses) — ruff enforces PEP 758 except syntax on Python 3.14
Type hints: all public functions, mypy strict mode
Docstrings: Google style, required on public classes/functions (enforced by ruff D rules)
Create new objects, never mutate existing ones. For non-Pydantic internal collections (registries,BaseTool), usecopy.deepcopy()at construction +MappingProxyTypewrapping for read-only enforcement.
Fordict/listfields in frozen Pydantic models, rely onfrozen=Truefor field reassignment prevention andcopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence).
Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (usingmodel_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Models: Pydantic v2 (BaseModel,model_validator,computed_field,ConfigDict). Use@computed_fieldfor derived values instead of storing + validating redundant fields. UseNotBlankStrfor all identifier/name fields — including optional and tuple variants — instead of manual whitespace validators.
Async concurrency: preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task. Existing code is being migrated incrementally.
Line length: 88 characters (ruff)
Functions: < 50 lines, files < 800 lines
Errors: handle explicitly, never silently swallow
Validate: at system boundaries (user input, external APIs, config files)
Files:
src/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.pytests/unit/engine/test_coordination_config.pytests/unit/engine/test_coordination_service.pytests/unit/engine/test_coordination_dispatchers.pysrc/ai_company/engine/coordination/group_builder.pytests/unit/engine/conftest.pytests/unit/observability/test_events.pysrc/ai_company/engine/__init__.pysrc/ai_company/observability/events/coordination.pytests/unit/engine/test_coordination_errors.pysrc/ai_company/engine/errors.pytests/unit/engine/test_coordination_group_builder.pysrc/ai_company/engine/coordination/__init__.pytests/unit/engine/test_coordination_models.pysrc/ai_company/engine/coordination/models.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Variable name: alwayslogger(not_logger, notlog)
Event names: always use constants from the domain-specific module underai_company.observability.events(e.g.PROVIDER_CALL_STARTfromevents.provider). Import directly:from ai_company.observability.events.<domain> import EVENT_CONSTANT
Structured kwargs in logging: alwayslogger.info(EVENT, key=value)— neverlogger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO
DEBUG for object creation, internal flow, entry/exit of key functions
Vendor-agnostic everywhere: NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names:example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc.
Files:
src/ai_company/engine/coordination/config.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/__init__.pysrc/ai_company/observability/events/coordination.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/coordination/__init__.pysrc/ai_company/engine/coordination/models.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Tests: markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow
Coverage: 80% minimum (enforced in CI)
Async:asyncio_mode = "auto"— no manual@pytest.mark.asyncioneeded
Timeout: 30 seconds per test
Parallelism:pytest-xdistvia-n auto— ALWAYS include-n autowhen running pytest, never run tests sequentially
Prefer@pytest.mark.parametrizefor testing similar cases
Vendor-agnostic everywhere: Tests must usetest-provider,test-small-001, etc. (Vendor names may only appear in operations design page, .claude/ skill/agent files, or third-party import paths)
Files:
tests/unit/engine/test_coordination_config.pytests/unit/engine/test_coordination_service.pytests/unit/engine/test_coordination_dispatchers.pytests/unit/engine/conftest.pytests/unit/observability/test_events.pytests/unit/engine/test_coordination_errors.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_models.py
🧠 Learnings (13)
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Event names: always use constants from the domain-specific module under `ai_company.observability.events` (e.g. `PROVIDER_CALL_START` from `events.provider`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`
Applied to files:
CLAUDE.mdtests/unit/observability/test_events.pysrc/ai_company/observability/events/coordination.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Structured kwargs in logging: always `logger.info(EVENT, key=value)` — never `logger.info("msg %s", val)`
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : Variable name: always `logger` (not `_logger`, not `log`)
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising
Applied to files:
CLAUDE.mdsrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO
Applied to files:
CLAUDE.mdsrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : DEBUG for object creation, internal flow, entry/exit of key functions
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/providers/**/*.py : Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. Non-retryable errors raise immediately without retry. `RetryExhaustedError` signals that all retries failed — the engine layer catches this to trigger fallback chains.
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/errors.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Errors: handle explicitly, never silently swallow
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Async concurrency: prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`. Existing code is being migrated incrementally.
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: When review agents find valid issues (including pre-existing issues in surrounding code, suggestions, and findings adjacent to the PR's changes), fix them all. No deferring, no 'out of scope' skipping.
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Applied to files:
src/ai_company/engine/coordination/models.py
🧬 Code graph analysis (6)
src/ai_company/engine/coordination/service.py (9)
src/ai_company/core/enums.py (1)
CoordinationTopology(358-369)src/ai_company/engine/coordination/dispatchers.py (7)
DispatchResult(49-76)select_dispatcher(796-832)dispatch(83-104)dispatch(349-372)dispatch(382-442)dispatch(453-523)dispatch(534-578)src/ai_company/engine/coordination/models.py (3)
CoordinationPhaseResult(64-96)CoordinationResult(128-198)is_success(196-198)src/ai_company/engine/decomposition/models.py (3)
DecompositionResult(125-176)SubtaskStatusRollup(179-259)derived_parent_status(232-259)src/ai_company/engine/decomposition/service.py (3)
DecompositionService(34-180)decompose_task(51-88)rollup_status(166-180)src/ai_company/engine/parallel.py (1)
ParallelExecutor(81-591)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)src/ai_company/engine/task_engine.py (2)
TaskEngine(88-776)submit(247-287)src/ai_company/engine/workspace/service.py (1)
WorkspaceIsolationService(43-215)
src/ai_company/engine/coordination/dispatchers.py (9)
src/ai_company/core/enums.py (1)
CoordinationTopology(358-369)src/ai_company/engine/coordination/group_builder.py (1)
build_execution_waves(43-145)src/ai_company/engine/coordination/models.py (2)
CoordinationPhaseResult(64-96)CoordinationWave(99-125)src/ai_company/engine/workspace/models.py (3)
Workspace(36-65)WorkspaceGroupResult(144-181)WorkspaceRequest(12-33)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/decomposition/models.py (1)
DecompositionResult(125-176)src/ai_company/engine/parallel_models.py (3)
ParallelExecutionGroup(92-140)task_id(87-89)agent_id(79-81)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)src/ai_company/engine/workspace/service.py (4)
WorkspaceIsolationService(43-215)setup_group(72-117)merge_group(142-168)teardown_group(170-215)
tests/unit/engine/test_coordination_config.py (1)
src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)
src/ai_company/engine/coordination/group_builder.py (7)
src/ai_company/engine/decomposition/dag.py (2)
DependencyGraph(26-252)parallel_groups(185-228)src/ai_company/engine/errors.py (1)
CoordinationError(128-129)src/ai_company/engine/parallel_models.py (3)
AgentAssignment(23-89)ParallelExecutionGroup(92-140)task_id(87-89)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/decomposition/models.py (1)
DecompositionResult(125-176)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)src/ai_company/engine/workspace/models.py (1)
Workspace(36-65)
tests/unit/engine/test_coordination_errors.py (2)
src/ai_company/engine/errors.py (3)
CoordinationError(128-129)CoordinationPhaseError(132-153)EngineError(9-10)src/ai_company/engine/coordination/models.py (1)
CoordinationPhaseResult(64-96)
src/ai_company/engine/errors.py (1)
src/ai_company/engine/coordination/models.py (1)
CoordinationPhaseResult(64-96)
🪛 LanguageTool
CLAUDE.md
[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...
(EG_NO_COMMA)
🔇 Additional comments (29)
src/ai_company/engine/errors.py (2)
3-6: LGTM!The
TYPE_CHECKINGguard correctly avoids circular imports at runtime while preserving type information for static analysis.
126-153: LGTM!The coordination error hierarchy is well-designed:
CoordinationErroras base exception for multi-agent coordination failuresCoordinationPhaseErrorcorrectly uses keyword-only args with sensible defaults- Docstrings follow Google style with clear attribute documentation
src/ai_company/engine/coordination/config.py (1)
1-38: LGTM!The
CoordinationConfigmodel is well-structured:
- Frozen Pydantic model for immutable configuration (per coding guidelines)
NotBlankStrcorrectly used forbase_branchidentifier field- Sensible defaults and clear field descriptions
extra="forbid"prevents configuration errors from typossrc/ai_company/observability/events/coordination.py (1)
1-17: LGTM!Event constants are well-organized:
- Consistent
COORDINATION_prefix- Values follow the established dot-delimited pattern
- Comprehensive coverage of coordination lifecycle events
- All typed as
Final[str]for immutabilitytests/unit/observability/test_events.py (2)
189-189: LGTM!Adding
"coordination"to the expected domain modules ensures the new event module is discovered bypkgutiliteration.
634-663: LGTM!The test follows the established pattern for event existence validation:
- Imports all 13 coordination event constants
- Asserts each constant matches its expected string value
- Consistent with other
test_*_events_existmethods in the classCLAUDE.md (2)
95-95: LGTM!The package structure documentation accurately reflects the new coordination subsystem with its key components: TopologyDispatcher protocol, 4 dispatchers, wave execution, and workspace lifecycle integration.
130-130: LGTM!The event naming documentation is correctly updated with
COORDINATION_STARTEDfromevents.coordination, following the established pattern for domain-specific event imports.tests/unit/engine/test_coordination_errors.py (1)
1-58: LGTM!Comprehensive test coverage for coordination error classes:
- Verifies inheritance hierarchy (EngineError → CoordinationError → CoordinationPhaseError)
- Tests attribute storage (
phase,partial_phases)- Tests default values and string representation
- Each test is properly marked with
@pytest.mark.unittests/unit/engine/conftest.py (2)
19-46: LGTM!New imports cleanly organized for coordination domain types: enums, decomposition models, parallel execution models, and routing models.
480-625: LGTM!Excellent set of test helpers for the coordination domain:
make_subtask(): Clean builder forSubtaskDefinitionmake_decomposition(): Correctly constructs plan, created tasks, and dependency edgesmake_routing(): Properly maps subtask-agent pairs toRoutingDecisionobjectsbuild_run_result(): Creates minimalAgentRunResultfor testingmake_exec_result(): Handles both success and failure outcomesThese helpers will significantly reduce boilerplate across coordination tests.
tests/unit/engine/test_coordination_group_builder.py (5)
21-35: LGTM!Clean local helper function that avoids code duplication across tests. Using
CoordinationTopology.CENTRALIZEDas default is sensible for test simplicity.
38-141: LGTM!Excellent test coverage for wave building scenarios:
- Single/multiple independent subtasks → parallel execution
- Sequential chain respects dependencies
- Diamond DAG correctly partitions into waves
[A,B]then[C]Assertions verify both wave count and task assignments within each wave.
143-177: LGTM!Config propagation tests verify that
max_concurrency_per_waveandfail_fastsettings correctly flow through to the generated execution groups.
179-261: LGTM!Good coverage of edge cases:
- Workspace worktree paths correctly mapped to resource claims
- Unroutable subtasks gracefully skipped
- All-unroutable scenario produces empty waves
- No workspace defaults to empty resource claims
263-316: LGTM!Smart use of
model_constructto bypass validation and test the defensive guard inbuild_execution_waves. The test verifies that a routing decision referencing a non-existent created task correctly raisesCoordinationError.src/ai_company/engine/coordination/dispatchers.py (7)
49-105: LGTM!The
DispatchResultmodel andTopologyDispatcherprotocol are well-structured. Frozen model ensures immutability, and the runtime-checkable protocol allows for proper type checking.
232-324: LGTM!The
_execute_wavesfunction properly:
- Logs wave start at INFO and completion at WARNING when failed
- Records phase results with accurate success/error fields
- Respects
fail_fastconfiguration- Handles exceptions gracefully with proper phase recording
341-373: LGTM!SasDispatcher correctly implements the single-agent-step pattern without workspace isolation, delegating to the shared wave execution logic.
396-411: Validate routed task IDs before creating workspaces.The current flow creates workspaces at lines 398-400 before
build_execution_waves(lines 406-411) validates that every routedsubtask_idexists indecomposition_result.created_tasks. A stale routing result could create git branches/worktrees for nonexistent subtasks, only to tear them down when validation fails.While the
finallyblock ensures cleanup, consider adding a preflight validation step before workspace side effects to avoid wasteful operations.
463-473: LGTM!The decentralized precondition guard now properly logs at WARNING before raising
CoordinationError, ensuring telemetry captures workspace service availability issues.
745-766: LGTM!The per-wave merge/teardown logic in the
finallyblock correctly:
- Only merges when
wave_failedis False- Logs a warning when merge is skipped due to wave failure
- Always tears down workspaces regardless of outcome
796-832: LGTM!The
select_dispatcherfunction correctly:
- Maps each resolved topology to its dispatcher implementation
- Rejects AUTO topology with a clear error message and WARNING log
- Emits
COORDINATION_TOPOLOGY_RESOLVEDonly after successful dispatcher creationsrc/ai_company/engine/coordination/models.py (4)
30-62: LGTM!
CoordinationContextproperly uses a frozen model with tuple fields for immutability and validates that at least one agent is available.
87-97: LGTM!The
_validate_success_error_consistencyvalidator properly enforces that successful phases have no error and failed phases must have an error description, preventing inconsistent phase results.
99-126: LGTM!
CoordinationWavecorrectly models a wave with non-empty subtask validation and optional execution result for tracking pre/post execution states.
184-198: LGTM!The validators and computed field are well-designed:
_validate_topology_resolvedensures AUTO is not stored in final resultsis_successcomputed field derives status from phases, avoiding redundant storage per the@computed_fieldguidelinesrc/ai_company/engine/__init__.py (2)
42-57: LGTM!The coordination module re-exports are properly organized, exposing the full public API surface for dispatchers, models, and utilities.
210-221: LGTM!The
__all__list is correctly updated with new coordination types in alphabetical order, maintaining consistency with the existing export structure.
- Block descendants of unroutable subtasks in group_builder (critical) - Include missing subtasks as BLOCKED in rollup (critical) - Validate routed task IDs before workspace creation (major) - Wrap dispatch failures as CoordinationPhaseError with partial phases (major) - Re-raise MemoryError/RecursionError instead of wrapping (major) - Add validate step to pipeline diagram in engine.md (minor)
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/ai_company/engine/coordination/service.py (1)
306-332:⚠️ Potential issue | 🟠 MajorValidate that all routing decisions have consistent topologies.
_resolve_topologyreads onlydecisions[0].topology, ignoring the rest. A malformedRoutingResultwith mixed topologies will dispatch through the wrong dispatcher. Validate that all decisions agree, or fail fast if they differ. As per coding guidelines: "Validate: at system boundaries (user input, external APIs, config files)".🛡️ Proposed fix
def _resolve_topology( self, routing_result: RoutingResult, ) -> CoordinationTopology: """Resolve the coordination topology from routing decisions.""" if routing_result.decisions: topology = routing_result.decisions[0].topology + # Validate all decisions have consistent topology + for decision in routing_result.decisions[1:]: + if decision.topology != topology: + msg = ( + f"Mixed topologies in routing decisions: " + f"{topology.value} vs {decision.topology.value}" + ) + logger.warning( + COORDINATION_PHASE_FAILED, + phase="resolve_topology", + error=msg, + ) + raise CoordinationPhaseError( + msg, + phase="resolve_topology", + partial_phases=(), + ) else: topology = CoordinationTopology.SAS🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/coordination/service.py` around lines 306 - 332, In _resolve_topology, after receiving routing_result (RoutingResult) validate that all entries in routing_result.decisions have the same topology as decisions[0].topology; if any decision differs, log a COORDINATION_PHASE_FAILED with phase="resolve_topology" and a clear error message, then fail fast (raise a ValueError or a domain-specific exception) instead of continuing, otherwise proceed to use that consistent topology (handling AUTO same as before) and emit COORDINATION_TOPOLOGY_RESOLVED; reference the _resolve_topology method, RoutingResult.decisions, and CoordinationTopology for locating the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/engine/coordination/dispatchers.py`:
- Around line 677-792: The _execute_wave function is too long; extract the final
merge/teardown block into a new async helper named _finalize_wave to reduce
complexity. Move the logic currently under finally (the conditional that checks
wave_workspaces and workspace_service, calls _merge_workspaces, appends
merge_phase/merge_result, logs skipped merges, and calls _teardown_workspaces)
into async def _finalize_wave(workspace_service, wave_workspaces, wave_idx,
wave_failed, all_phases, merge_results) and have _execute_wave simply await
_finalize_wave(...) in the finally clause; preserve the same logging messages,
phase naming (merge_wave_{wave_idx}), and error handling semantics, and ensure
type hints and imports mirror existing signatures so behavior and side-effects
(appending to all_phases/all_waves/merge_results) remain unchanged.
- Around line 136-141: In _validate_routing_against_decomposition, log a warning
with context before raising CoordinationError: when decision.subtask_id not in
created_ids, emit a warning (use self.logger.warning(...) if this is a method on
a class with a logger, otherwise use the module-level logger.warning(...))
including the same message and the decision.subtask_id/created_ids for
telemetry, then raise CoordinationError(msg).
In `@src/ai_company/engine/coordination/group_builder.py`:
- Around line 114-120: Add a warning log immediately before raising
CoordinationError in the group_builder.py guard that checks task =
task_lookup.get(subtask_id): emit a logger.warning (or self.logger.warning
depending on surrounding code) with the same contextual message (including
subtask_id and that there is a routing decision but no created task) so the
failure is recorded in telemetry, then raise CoordinationError(msg) as before;
ensure you reference the same msg string used for the exception and use the
module/class logger already in scope.
---
Duplicate comments:
In `@src/ai_company/engine/coordination/service.py`:
- Around line 306-332: In _resolve_topology, after receiving routing_result
(RoutingResult) validate that all entries in routing_result.decisions have the
same topology as decisions[0].topology; if any decision differs, log a
COORDINATION_PHASE_FAILED with phase="resolve_topology" and a clear error
message, then fail fast (raise a ValueError or a domain-specific exception)
instead of continuing, otherwise proceed to use that consistent topology
(handling AUTO same as before) and emit COORDINATION_TOPOLOGY_RESOLVED;
reference the _resolve_topology method, RoutingResult.decisions, and
CoordinationTopology for locating the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 60ed2780-d52c-4075-9e21-8f72161a02bf
📒 Files selected for processing (6)
docs/design/engine.mdsrc/ai_company/engine/coordination/dispatchers.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pytests/unit/engine/test_coordination_group_builder.pytests/unit/engine/test_coordination_service.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (3)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649 native lazy annotations
Useexcept A, B:syntax (no parentheses) — ruff enforces PEP 758 except syntax on Python 3.14
Type hints: all public functions, mypy strict mode
Docstrings: Google style, required on public classes/functions (enforced by ruff D rules)
Create new objects, never mutate existing ones. For non-Pydantic internal collections (registries,BaseTool), usecopy.deepcopy()at construction +MappingProxyTypewrapping for read-only enforcement.
Fordict/listfields in frozen Pydantic models, rely onfrozen=Truefor field reassignment prevention andcopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence).
Config vs runtime state: use frozen Pydantic models for config/identity; separate mutable-via-copy models (usingmodel_copy(update=...)) for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Models: Pydantic v2 (BaseModel,model_validator,computed_field,ConfigDict). Use@computed_fieldfor derived values instead of storing + validating redundant fields. UseNotBlankStrfor all identifier/name fields — including optional and tuple variants — instead of manual whitespace validators.
Async concurrency: preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task. Existing code is being migrated incrementally.
Line length: 88 characters (ruff)
Functions: < 50 lines, files < 800 lines
Errors: handle explicitly, never silently swallow
Validate: at system boundaries (user input, external APIs, config files)
Files:
tests/unit/engine/test_coordination_service.pysrc/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.pytests/unit/engine/test_coordination_group_builder.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Tests: markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow
Coverage: 80% minimum (enforced in CI)
Async:asyncio_mode = "auto"— no manual@pytest.mark.asyncioneeded
Timeout: 30 seconds per test
Parallelism:pytest-xdistvia-n auto— ALWAYS include-n autowhen running pytest, never run tests sequentially
Prefer@pytest.mark.parametrizefor testing similar cases
Vendor-agnostic everywhere: Tests must usetest-provider,test-small-001, etc. (Vendor names may only appear in operations design page, .claude/ skill/agent files, or third-party import paths)
Files:
tests/unit/engine/test_coordination_service.pytests/unit/engine/test_coordination_group_builder.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Variable name: alwayslogger(not_logger, notlog)
Event names: always use constants from the domain-specific module underai_company.observability.events(e.g.PROVIDER_CALL_STARTfromevents.provider). Import directly:from ai_company.observability.events.<domain> import EVENT_CONSTANT
Structured kwargs in logging: alwayslogger.info(EVENT, key=value)— neverlogger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO
DEBUG for object creation, internal flow, entry/exit of key functions
Vendor-agnostic everywhere: NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names:example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc.
Files:
src/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
🧠 Learnings (6)
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising
Applied to files:
src/ai_company/engine/coordination/group_builder.pysrc/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/providers/**/*.py : Retryable errors (`is_retryable=True`): `RateLimitError`, `ProviderTimeoutError`, `ProviderConnectionError`, `ProviderInternalError`. Non-retryable errors raise immediately without retry. `RetryExhaustedError` signals that all retries failed — the engine layer catches this to trigger fallback chains.
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Errors: handle explicitly, never silently swallow
Applied to files:
src/ai_company/engine/coordination/service.pysrc/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to **/*.py : Async concurrency: prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`. Existing code is being migrated incrementally.
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: When review agents find valid issues (including pre-existing issues in surrounding code, suggestions, and findings adjacent to the PR's changes), fix them all. No deferring, no 'out of scope' skipping.
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
📚 Learning: 2026-03-13T06:12:19.113Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-13T06:12:19.113Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO
Applied to files:
src/ai_company/engine/coordination/dispatchers.py
🧬 Code graph analysis (2)
tests/unit/engine/test_coordination_service.py (8)
src/ai_company/engine/decomposition/models.py (3)
DecompositionResult(125-176)derived_parent_status(232-259)DecompositionPlan(66-122)src/ai_company/core/enums.py (2)
CoordinationTopology(358-369)TaskStatus(198-224)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/coordination/models.py (2)
CoordinationContext(30-61)is_success(196-198)src/ai_company/engine/coordination/service.py (2)
MultiAgentCoordinator(50-546)coordinate(92-194)src/ai_company/engine/parallel_models.py (3)
AgentOutcome(143-193)agent_id(79-81)task_id(87-89)src/ai_company/engine/workspace/models.py (2)
MergeResult(94-141)Workspace(36-65)src/ai_company/engine/decomposition/rollup.py (2)
StatusRollup(21-84)compute(25-84)
src/ai_company/engine/coordination/dispatchers.py (10)
src/ai_company/core/enums.py (1)
CoordinationTopology(358-369)src/ai_company/engine/coordination/group_builder.py (1)
build_execution_waves(43-163)src/ai_company/engine/coordination/models.py (2)
CoordinationPhaseResult(64-96)CoordinationWave(99-125)src/ai_company/engine/errors.py (1)
CoordinationError(128-129)src/ai_company/engine/workspace/models.py (3)
Workspace(36-65)WorkspaceGroupResult(144-181)WorkspaceRequest(12-33)src/ai_company/engine/coordination/config.py (1)
CoordinationConfig(8-38)src/ai_company/engine/decomposition/models.py (1)
DecompositionResult(125-176)src/ai_company/engine/parallel.py (2)
ParallelExecutor(81-591)execute_group(110-215)src/ai_company/engine/routing/models.py (1)
RoutingResult(91-157)src/ai_company/engine/workspace/service.py (4)
WorkspaceIsolationService(43-215)setup_group(72-117)merge_group(142-168)teardown_group(170-215)
🔇 Additional comments (8)
src/ai_company/engine/coordination/group_builder.py (1)
43-163: Function exceeds 50-line limit.
build_execution_wavesis ~93 lines. Extract the per-wave assignment loop (lines 86-134) into a helper to stay under the limit. As per coding guidelines: "Functions: < 50 lines".src/ai_company/engine/coordination/service.py (1)
210-211: LGTM on PEP 758 except syntax.The
except MemoryError, RecursionError:syntax correctly uses Python 3.14's PEP 758 (no parentheses), and catastrophic errors are re-raised immediately before generic handling.Also applies to: 266-267
tests/unit/engine/test_coordination_group_builder.py (1)
1-370: LGTM!Comprehensive test coverage for
build_execution_waves: DAG structures, config propagation, workspace mapping, unroutable/blocked handling, and error paths. Tests are well-organized with descriptive names and proper@pytest.mark.unitmarkers.docs/design/engine.md (1)
796-840: LGTM!Clear and accurate documentation of the multi-agent coordination pipeline. The diagram includes all phases (decompose → route → resolve topology → validate → dispatch → rollup → update parent), and the dispatcher table correctly describes workspace isolation behavior per topology.
tests/unit/engine/test_coordination_service.py (3)
513-526: LGTM on MemoryError propagation test.The test correctly verifies that
MemoryErrorpropagates unchanged rather than being wrapped inCoordinationPhaseError. This aligns with the implementation's special-casing of catastrophic exceptions.
1-876: Comprehensive test suite for MultiAgentCoordinator.Excellent coverage of coordination pipeline phases, error handling, workspace lifecycle, cost aggregation, rollup semantics, and fail-fast behavior. Tests properly use
@pytest.mark.unitmarkers and avoid vendor names.
249-296: This test behavior is correct and reflects the intended design.With
fail_fast=False, dependent waves execute even if earlier waves fail. This is intentional: dependencies organize subtasks into sequential waves at build time (via DAG structure), but execution-time failures don't block downstream waves. Thefail_fastflag controls only whether the coordination loop stops after a wave failure—it doesn't enforce dependency constraints at runtime. Wave 0 fails but wave 1 executes because the dispatcher loop continues (no break), allowing all pre-built waves to run regardless of prior results. This is the correct behavior forfail_fast=False.src/ai_company/engine/coordination/dispatchers.py (1)
1-858: 🧹 Nitpick | 🔵 TrivialFile exceeds 800-line limit.
At 858 lines, this file is above the 800-line threshold. Consider splitting topology dispatchers into separate modules (e.g.,
dispatchers/sas.py,dispatchers/centralized.py) or extracting shared helpers into adispatchers_helpers.pymodule. As per coding guidelines: "files < 800 lines".⛔ Skipped due to learnings
Learnt from: CR Repo: Aureliolo/synthorg PR: 0 File: CLAUDE.md:0-0 Timestamp: 2026-03-13T06:12:19.113Z Learning: Applies to **/*.py : Functions: < 50 lines, files < 800 linesLearnt from: CR Repo: Aureliolo/synthorg PR: 0 File: CLAUDE.md:0-0 Timestamp: 2026-03-13T06:12:19.113Z Learning: Applies to **/*.py : Line length: 88 characters (ruff)
| async def _execute_wave( # noqa: PLR0913 | ||
| self, | ||
| wave_idx: int, | ||
| group: ParallelExecutionGroup, | ||
| parallel_executor: ParallelExecutor, | ||
| all_waves: list[CoordinationWave], | ||
| all_phases: list[CoordinationPhaseResult], | ||
| wave_workspaces: tuple[Workspace, ...], | ||
| workspace_service: WorkspaceIsolationService | None, | ||
| merge_results: list[WorkspaceGroupResult], | ||
| ) -> bool: | ||
| """Execute a single wave and handle per-wave merge/teardown. | ||
|
|
||
| Returns: | ||
| True if the wave failed, False if it succeeded. | ||
| """ | ||
| start = time.monotonic() | ||
| subtask_ids = tuple(a.task.id for a in group.assignments) | ||
| wave_failed = False | ||
|
|
||
| logger.info( | ||
| COORDINATION_WAVE_STARTED, | ||
| wave_index=wave_idx, | ||
| subtask_count=len(subtask_ids), | ||
| ) | ||
|
|
||
| try: | ||
| exec_result = await parallel_executor.execute_group(group) | ||
| elapsed = time.monotonic() - start | ||
|
|
||
| all_waves.append( | ||
| CoordinationWave( | ||
| wave_index=wave_idx, | ||
| subtask_ids=subtask_ids, | ||
| execution_result=exec_result, | ||
| ) | ||
| ) | ||
|
|
||
| success = exec_result.all_succeeded | ||
| wave_failed = not success | ||
| error_msg = ( | ||
| None | ||
| if success | ||
| else f"Wave {wave_idx}: {exec_result.agents_failed} agent(s) failed" | ||
| ) | ||
| all_phases.append( | ||
| CoordinationPhaseResult( | ||
| phase=f"execute_wave_{wave_idx}", | ||
| success=success, | ||
| duration_seconds=elapsed, | ||
| error=error_msg, | ||
| ) | ||
| ) | ||
|
|
||
| if success: | ||
| logger.info( | ||
| COORDINATION_WAVE_COMPLETED, | ||
| wave_index=wave_idx, | ||
| succeeded=exec_result.agents_succeeded, | ||
| failed=exec_result.agents_failed, | ||
| duration_seconds=elapsed, | ||
| ) | ||
| else: | ||
| logger.warning( | ||
| COORDINATION_WAVE_COMPLETED, | ||
| wave_index=wave_idx, | ||
| succeeded=exec_result.agents_succeeded, | ||
| failed=exec_result.agents_failed, | ||
| duration_seconds=elapsed, | ||
| ) | ||
|
|
||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| wave_failed = True | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=f"execute_wave_{wave_idx}", | ||
| wave_index=wave_idx, | ||
| error=str(exc), | ||
| ) | ||
| all_waves.append( | ||
| CoordinationWave( | ||
| wave_index=wave_idx, | ||
| subtask_ids=subtask_ids, | ||
| ) | ||
| ) | ||
| all_phases.append( | ||
| CoordinationPhaseResult( | ||
| phase=f"execute_wave_{wave_idx}", | ||
| success=False, | ||
| duration_seconds=elapsed, | ||
| error=str(exc), | ||
| ) | ||
| ) | ||
| finally: | ||
| if wave_workspaces and workspace_service is not None: | ||
| # Only merge if the wave succeeded | ||
| if not wave_failed: | ||
| merge_phase_name = f"merge_wave_{wave_idx}" | ||
| merge_result, merge_phase = await _merge_workspaces( | ||
| workspace_service, | ||
| wave_workspaces, | ||
| phase_name=merge_phase_name, | ||
| ) | ||
| all_phases.append(merge_phase) | ||
| if merge_result is not None: | ||
| merge_results.append(merge_result) | ||
| else: | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=f"merge_wave_{wave_idx}", | ||
| error="Skipped merge: wave failed", | ||
| ) | ||
| await _teardown_workspaces(workspace_service, wave_workspaces) | ||
|
|
||
| return wave_failed |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Split _execute_wave to reduce complexity.
At ~116 lines, this method handles execution, result construction, logging, merge, and teardown. Consider extracting the merge/teardown block (lines 771-790) into a separate _finalize_wave helper to improve readability and stay closer to the 50-line guideline. As per coding guidelines: "Functions: < 50 lines".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/coordination/dispatchers.py` around lines 677 - 792,
The _execute_wave function is too long; extract the final merge/teardown block
into a new async helper named _finalize_wave to reduce complexity. Move the
logic currently under finally (the conditional that checks wave_workspaces and
workspace_service, calls _merge_workspaces, appends merge_phase/merge_result,
logs skipped merges, and calls _teardown_workspaces) into async def
_finalize_wave(workspace_service, wave_workspaces, wave_idx, wave_failed,
all_phases, merge_results) and have _execute_wave simply await
_finalize_wave(...) in the finally clause; preserve the same logging messages,
phase naming (merge_wave_{wave_idx}), and error handling semantics, and ensure
type hints and imports mirror existing signatures so behavior and side-effects
(appending to all_phases/all_waves/merge_results) remain unchanged.
| except MemoryError, RecursionError: | ||
| raise |
There was a problem hiding this comment.
Python 2 except syntax — SyntaxError in Python 3
except MemoryError, RecursionError: is Python 2 syntax. In Python 3 (including the >=3.14 requirement for this project) the , separator in an except clause means "catch MemoryError and bind it to the name RecursionError" — which triggers a SyntaxError. The correct Python 3 syntax for catching multiple exception types is to use a parenthesised tuple.
The same issue appears again on line 266 in _phase_route.
| except MemoryError, RecursionError: | |
| raise | |
| except (MemoryError, RecursionError): | |
| raise |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 210-211
Comment:
**Python 2 `except` syntax — SyntaxError in Python 3**
`except MemoryError, RecursionError:` is Python 2 syntax. In Python 3 (including the `>=3.14` requirement for this project) the `,` separator in an except clause means *"catch `MemoryError` and bind it to the name `RecursionError`"* — which triggers a `SyntaxError`. The correct Python 3 syntax for catching multiple exception types is to use a parenthesised tuple.
The same issue appears again on line 266 in `_phase_route`.
```suggestion
except (MemoryError, RecursionError):
raise
```
How can I resolve this? If you propose a fix, please make it concise.| except MemoryError, RecursionError: | ||
| raise |
There was a problem hiding this comment.
Duplicate Python 2 except syntax — SyntaxError in Python 3
Same issue as line 210: except MemoryError, RecursionError: is not valid Python 3 syntax and will raise a SyntaxError at import time on Python 3.14.
| except MemoryError, RecursionError: | |
| raise | |
| except (MemoryError, RecursionError): | |
| raise |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/coordination/service.py
Line: 266-267
Comment:
**Duplicate Python 2 `except` syntax — SyntaxError in Python 3**
Same issue as line 210: `except MemoryError, RecursionError:` is not valid Python 3 syntax and will raise a `SyntaxError` at import time on Python 3.14.
```suggestion
except (MemoryError, RecursionError):
raise
```
How can I resolve this? If you propose a fix, please make it concise.- Validate topology consistency across routing decisions - Log WARNING before raising in validation guard and group_builder
There was a problem hiding this comment.
Pull request overview
Adds a new multi-agent coordination subsystem to the engine, wiring decomposition + routing + topology-based dispatch (with optional workspace isolation) into a single orchestration pipeline with structured observability and stronger Pydantic validation.
Changes:
- Introduces
MultiAgentCoordinatorplus coordination models/config and DAG→wave planning (build_execution_waves). - Adds topology dispatchers (SAS / centralized / decentralized / context-dependent) integrating workspace setup/merge/teardown and fail-fast behavior.
- Adds coordination observability event constants + extensive unit test coverage and documentation updates.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Ensures coordination event constants are present and domain module is discoverable. |
| tests/unit/engine/test_coordination_service.py | End-to-end coordinator pipeline tests (happy path, failures, rollups, parent update, costs, fail-fast). |
| tests/unit/engine/test_coordination_models.py | Validation tests for coordination models (cross-field invariants, non-empty collections). |
| tests/unit/engine/test_coordination_group_builder.py | Tests for DAG→execution wave conversion and unroutable/blocked handling. |
| tests/unit/engine/test_coordination_errors.py | Tests for new coordination error types. |
| tests/unit/engine/test_coordination_dispatchers.py | Dispatcher behavior tests (workspace lifecycle, fail-fast, merge gating, setup failures). |
| tests/unit/engine/test_coordination_config.py | Tests for coordination config defaults and validation. |
| tests/unit/engine/conftest.py | Adds shared coordination test helpers (subtasks, decomposition, routing, exec results). |
| src/ai_company/observability/events/coordination.py | Adds coordination event name constants. |
| src/ai_company/engine/errors.py | Adds CoordinationError / CoordinationPhaseError. |
| src/ai_company/engine/coordination/service.py | Implements the coordination pipeline and phase tracking/logging. |
| src/ai_company/engine/coordination/models.py | Defines frozen Pydantic models for coordination context/results/waves/phases. |
| src/ai_company/engine/coordination/group_builder.py | Implements wave planning from decomposition + routing, with blocked/unroutable handling. |
| src/ai_company/engine/coordination/dispatchers.py | Implements topology dispatchers and shared workspace/wave execution helpers. |
| src/ai_company/engine/coordination/config.py | Adds CoordinationConfig model. |
| src/ai_company/engine/coordination/init.py | Re-exports coordination public API. |
| src/ai_company/engine/init.py | Re-exports coordination types from the engine top-level package. |
| docs/design/engine.md | Documents the coordination pipeline and dispatcher behaviors. |
| CLAUDE.md | Updates package-structure and logging-event guidance to include coordination. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| except CoordinationPhaseError: | ||
| raise | ||
| except Exception as exc: | ||
| logger.exception( | ||
| COORDINATION_FAILED, | ||
| parent_task_id=task.id, | ||
| error=str(exc), | ||
| ) | ||
| raise |
| try: | ||
| result = await self._decomposition_service.decompose_task( | ||
| context.task, context.decomposition_context | ||
| ) | ||
| except MemoryError, RecursionError: | ||
| raise | ||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start |
| logger.info(COORDINATION_PHASE_STARTED, phase=phase_name) | ||
| try: | ||
| requests = _build_workspace_requests(routing_result, config) | ||
| workspaces = await workspace_service.setup_group(requests=requests) | ||
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| phase = CoordinationPhaseResult( | ||
| phase=phase_name, | ||
| success=False, | ||
| duration_seconds=elapsed, | ||
| error=str(exc), | ||
| ) | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=phase_name, | ||
| error=str(exc), | ||
| ) | ||
| return (), phase |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=phase_name, | ||
| wave_index=wave_idx, | ||
| error=str(exc), | ||
| ) | ||
| wave = CoordinationWave( |
| try: | ||
| wave_workspaces = await workspace_service.setup_group( | ||
| requests=wave_requests, | ||
| ) | ||
| except Exception as exc: | ||
| ws_elapsed = time.monotonic() - ws_start | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=f"workspace_setup_wave_{wave_idx}", | ||
| error=str(exc), | ||
| ) | ||
| all_phases.append( | ||
| CoordinationPhaseResult( | ||
| phase=f"workspace_setup_wave_{wave_idx}", | ||
| success=False, | ||
| duration_seconds=ws_elapsed, | ||
| error=str(exc), | ||
| ) | ||
| ) | ||
| return (), None |
| except Exception as exc: | ||
| elapsed = time.monotonic() - start | ||
| wave_failed = True | ||
| logger.warning( | ||
| COORDINATION_PHASE_FAILED, | ||
| phase=f"execute_wave_{wave_idx}", | ||
| wave_index=wave_idx, | ||
| error=str(exc), | ||
| ) | ||
| all_waves.append( | ||
| CoordinationWave( | ||
| wave_index=wave_idx, | ||
| subtask_ids=subtask_ids, | ||
| ) | ||
| ) | ||
| all_phases.append( | ||
| CoordinationPhaseResult( | ||
| phase=f"execute_wave_{wave_idx}", | ||
| success=False, | ||
| duration_seconds=elapsed, | ||
| error=str(exc), | ||
| ) | ||
| ) |
| return await dispatcher.dispatch( | ||
| decomposition_result=decomp_result, | ||
| routing_result=routing_result, | ||
| parallel_executor=self._parallel_executor, | ||
| workspace_service=self._workspace_service, | ||
| config=context.config, | ||
| ) |
🤖 I have created a release *beep* *boop* --- ## [0.1.3](v0.1.2...v0.1.3) (2026-03-13) ### Features * add Mem0 memory backend adapter ([#345](#345)) ([2788db8](2788db8)), closes [#206](#206) * centralized single-writer TaskEngine with full CRUD API ([#328](#328)) ([9c1a3e1](9c1a3e1)) * incremental AgentEngine → TaskEngine status sync ([#331](#331)) ([7a68d34](7a68d34)), closes [#323](#323) * web dashboard pages — views, components, tests, and review fixes ([#354](#354)) ([b165ec4](b165ec4)) * web dashboard with Vue 3 + PrimeVue + Tailwind CSS ([#347](#347)) ([06416b1](06416b1)) ### Bug Fixes * harden coordination pipeline with validators, logging, and fail-fast ([#333](#333)) ([2f10d49](2f10d49)), closes [#205](#205) * repo-wide security hardening from ZAP, Scorecard, and CodeQL audit ([#357](#357)) ([27eb288](27eb288)) ### CI/CD * add pip-audit, hadolint, OSSF Scorecard, ZAP DAST, and pre-push hooks ([#350](#350)) ([2802d20](2802d20)) * add workflow_dispatch trigger to PR Preview for Dependabot PRs ([#326](#326)) ([4c7b6d9](4c7b6d9)) * bump astral-sh/setup-uv from 7.4.0 to 7.5.0 in the minor-and-patch group ([#335](#335)) ([98dd8ca](98dd8ca)) ### Maintenance * bump the minor-and-patch group across 1 directory with 3 updates ([#352](#352)) ([031b1c9](031b1c9)) * **deps:** bump devalue from 5.6.3 to 5.6.4 in /site in the npm_and_yarn group across 1 directory ([#324](#324)) ([9a9c600](9a9c600)) * migrate docs build from MkDocs to Zensical ([#330](#330)) ([fa8bf1d](fa8bf1d)), closes [#329](#329) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Summary
CoordinationPhaseResult(success/error consistency),CoordinationWave(non-empty subtask_ids), andCoordinationResult(min 1 phase)ContextDependentDispatchernow correctly stops wave execution whenfail_fast=Trueand a wave fails — previously waves continued regardlessDecentralizedDispatcherraisesCoordinationErrorif workspace service is unavailable or isolation is disabledWARNINGlogging on 7+ error paths across dispatchers and service (setup failures, wave errors, rollup errors, parent update failures)COORDINATION_CLEANUP_FAILED(was misleadingly usingCLEANUP_COMPLETEDfor failures) andCOORDINATION_WAVE_BUILT(separate planning from execution semantics)partial_phasesusesCoordinationPhaseResultinstead ofobject, rollup parameter usesSubtaskStatusRollupinstead ofobjectDecentralizedDispatcher, extract inline ternaries, consolidate_phase_update_parentbranchesdocs/design/engine.md, updateCLAUDE.mdpackage structure and logging eventsTest plan
test_success_with_error_rejectedandtest_failed_without_error_rejectedforCoordinationPhaseResulttest_empty_subtask_ids_rejectedforCoordinationWavetest_empty_phases_rejectedforCoordinationResulttest_no_workspace_service_raisesandtest_isolation_disabled_raisesforDecentralizedDispatchertest_fail_fast_stops_on_wave_failure,test_setup_failure_skips_wave,test_fail_fast_stops_on_setup_failureforContextDependentDispatchertest_workspace_setup_failure_returns_earlyforCentralizedDispatchertest_auto_topology_resolves_to_centralizedfor AUTO fallbacktest_update_parent_submit_failsandtest_update_parent_exception_capturedfor parent update error pathstest_rollup_error_capturedfor rollup failure pathtest_total_cost_aggregatedfor cost computationtest_coordination_events_existfor all 13 coordination event constantstest_phase_error_carries_partial_phasesto use properCoordinationPhaseResultobjectsReview coverage
Pre-reviewed by 8 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 32 findings addressed.
🤖 Generated with Claude Code
Closes #205