feat: implement task decomposition and routing engine (#14)#165
feat: implement task decomposition and routing engine (#14)#165
Conversation
Add task decomposition engine that breaks complex tasks into subtasks with dependency tracking (DAG), structure classification, and status rollup. Add task routing engine that scores agents against subtask requirements and selects coordination topologies. New packages: - engine/decomposition/: models, DAG, classifier, rollup, manual strategy, service, and protocol - engine/routing/: models, scorer, topology selector, and service Also adds TaskStructure and CoordinationTopology enums, new error types (DecompositionError, TaskRoutingError), and event constants.
Pre-reviewed by 9 agents, 40 findings addressed: - Fix classifier bias: separate language vs structural scoring, MIXED only on language signals - Replace recursive DFS with iterative + heapq for O(n log n) topo sort - Use Complexity enum instead of raw strings in SubtaskDefinition - Freeze DAG adjacency with MappingProxyType + tuple values - Remove redundant validation (cycle check in plan, depth check in context) - Add cross-field validators to DecompositionResult and RoutingDecision - Add AUTO topology rejection validator to AutoTopologyConfig - Rename parallel_tool_threshold to parallel_artifact_threshold - Add failure logging (logger.exception) to decomposition/routing services - Fix wrong error type for missing DAG references (DecompositionError, not CycleError) - Update DESIGN_SPEC §15.3 project structure with decomposition/routing modules - Expand test coverage with parametrized and boundary tests
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds decomposition and routing subsystems to the engine: new domain models, protocols, services, errors, and observability events for DAG-based subtask decomposition, structure classification, topology selection, agent scoring, and routing; extends core Task schema with task_structure and coordination_topology. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant DecompositionService
participant TaskStructureClassifier
participant DecompositionStrategy
participant DependencyGraph
participant StatusRollup
Client->>DecompositionService: decompose_task(task, context)
DecompositionService->>TaskStructureClassifier: classify(task)
TaskStructureClassifier-->>DecompositionService: TaskStructure
DecompositionService->>DecompositionStrategy: decompose(task, context)
DecompositionStrategy-->>DecompositionService: DecompositionPlan
DecompositionService->>DependencyGraph: validate(subtasks)
DependencyGraph-->>DecompositionService: validated / raises
DecompositionService->>DecompositionService: create Task objects & edges
DecompositionService->>StatusRollup: rollup_status(parent_id, subtask_statuses)
StatusRollup-->>DecompositionService: SubtaskStatusRollup
DecompositionService-->>Client: DecompositionResult(plan, tasks, edges)
sequenceDiagram
participant Client
participant TaskRoutingService
participant TopologySelector
participant AgentTaskScorer
participant DecompositionResult
Client->>TaskRoutingService: route(decomposition_result, available_agents, parent_task)
TaskRoutingService->>TopologySelector: select(parent_task, plan)
TopologySelector-->>TaskRoutingService: CoordinationTopology
loop per subtask
TaskRoutingService->>AgentTaskScorer: score(agent, subtask)
AgentTaskScorer-->>TaskRoutingService: RoutingCandidate(score,...)
TaskRoutingService->>TaskRoutingService: filter & select best + alternatives
TaskRoutingService-->>Client: per-subtask routing event
end
TaskRoutingService-->>Client: RoutingResult(decisions, unroutable)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the AI Company's core capabilities by introducing sophisticated task decomposition and routing mechanisms. It enables the system to break down complex tasks into manageable subtasks, intelligently assign them to suitable AI agents based on their skills and roles, and dynamically determine the most effective coordination topology for their execution. This foundational work is crucial for scaling the system's ability to handle intricate projects and improve overall operational efficiency and autonomy. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive and well-designed task decomposition and routing engine, a major feature for the project. The implementation is impressive, with a clear separation of concerns into decomposition and routing services, robust data models with strong validation, and a full suite of unit tests ensuring correctness. The use of design patterns like protocols for strategies, immutable data structures, and dedicated service classes for orchestration is excellent. The code adheres closely to the design specification. My review found only one minor issue: a misleading docstring in the status rollup component, for which I've provided a suggestion to improve clarity.
| """Compute a status rollup from subtask statuses. | ||
|
|
||
| Rules (applied in order, first match wins): | ||
| - All COMPLETED -> COMPLETED | ||
| - All CANCELLED -> CANCELLED | ||
| - Any FAILED -> FAILED | ||
| - Any IN_PROGRESS -> IN_PROGRESS | ||
| - Any BLOCKED (none IN_PROGRESS) -> BLOCKED | ||
| - Otherwise -> IN_PROGRESS (work still pending) | ||
|
|
||
| Args: | ||
| parent_task_id: The parent task identifier. | ||
| subtask_statuses: Statuses of all subtasks. | ||
|
|
||
| Returns: | ||
| Aggregated status rollup. | ||
| """ |
There was a problem hiding this comment.
The docstring for compute describes the derivation logic for the parent task's status. However, this logic is actually implemented in the SubtaskStatusRollup.derived_parent_status computed field. This method's primary responsibility is to count the statuses and instantiate the SubtaskStatusRollup object.
To improve clarity and prevent future confusion, the docstring should be updated to describe what this method does and refer to SubtaskStatusRollup for the status derivation logic.
| """Compute a status rollup from subtask statuses. | |
| Rules (applied in order, first match wins): | |
| - All COMPLETED -> COMPLETED | |
| - All CANCELLED -> CANCELLED | |
| - Any FAILED -> FAILED | |
| - Any IN_PROGRESS -> IN_PROGRESS | |
| - Any BLOCKED (none IN_PROGRESS) -> BLOCKED | |
| - Otherwise -> IN_PROGRESS (work still pending) | |
| Args: | |
| parent_task_id: The parent task identifier. | |
| subtask_statuses: Statuses of all subtasks. | |
| Returns: | |
| Aggregated status rollup. | |
| """ | |
| """Compute a status rollup from a collection of subtask statuses. | |
| This method aggregates the statuses of all subtasks for a given parent | |
| task into a ``SubtaskStatusRollup`` object. It counts the occurrences | |
| of key statuses (e.g., COMPLETED, FAILED). | |
| The returned ``SubtaskStatusRollup`` object contains a | |
| ``derived_parent_status`` computed field that determines the overall | |
| parent task status based on the aggregated counts. | |
| Args: | |
| parent_task_id: The parent task identifier. | |
| subtask_statuses: Statuses of all subtasks. | |
| Returns: | |
| An aggregated status rollup object. | |
| """ |
There was a problem hiding this comment.
Pull request overview
Implements the core orchestration pieces for decomposing a parent task into a validated subtask DAG and routing those subtasks to agents with an auto-selected coordination topology, plus observability events and comprehensive unit tests.
Changes:
- Added a task decomposition subsystem (models, manual strategy, structure classifier, DAG validation/topo utilities, status rollup, orchestration service).
- Added a task routing subsystem (scoring, routing decisions/results models, topology selector, routing service) with structured logging events.
- Extended core domain enums/task model to include
TaskStructureandCoordinationTopology, and updated docs + event discovery tests.
Reviewed changes
Copilot reviewed 34 out of 34 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Updates event-module discovery expectations for new domains. |
| tests/unit/engine/test_routing_topology.py | Unit tests for topology auto-selection and overrides. |
| tests/unit/engine/test_routing_service.py | Unit tests for routing behavior (best agent, unroutable, topology applied, filtering). |
| tests/unit/engine/test_routing_scorer.py | Unit tests for scoring weights/status handling and boundaries. |
| tests/unit/engine/test_routing_models.py | Unit tests for routing Pydantic models and validators. |
| tests/unit/engine/test_decomposition_service.py | Unit tests for service orchestration (task creation, edges, structure classification, cycles, rollup). |
| tests/unit/engine/test_decomposition_rollup.py | Unit tests for parent-status rollup rules and counters. |
| tests/unit/engine/test_decomposition_models.py | Unit tests for decomposition models’ invariants/validators. |
| tests/unit/engine/test_decomposition_manual.py | Unit tests for manual decomposition strategy constraints/validation. |
| tests/unit/engine/test_decomposition_dag.py | Unit tests for DAG validation, topo sort, and parallel grouping. |
| tests/unit/engine/test_decomposition_classifier.py | Unit tests for sequential/parallel/mixed heuristics and patterns. |
| src/ai_company/observability/events/task_routing.py | Adds structured event constants for routing lifecycle. |
| src/ai_company/observability/events/decomposition.py | Adds structured event constants for decomposition lifecycle. |
| src/ai_company/engine/routing/topology_selector.py | Implements topology selection logic (explicit override vs AUTO heuristic). |
| src/ai_company/engine/routing/service.py | Implements routing service (score agents, choose candidate(s), apply topology, logging). |
| src/ai_company/engine/routing/scorer.py | Implements agent↔subtask scoring heuristic and scoring logs. |
| src/ai_company/engine/routing/models.py | Adds frozen Pydantic routing models + validators and auto-topology config. |
| src/ai_company/engine/routing/init.py | Exposes routing subsystem public API. |
| src/ai_company/engine/errors.py | Adds decomposition/routing-specific exception types. |
| src/ai_company/engine/decomposition/service.py | Orchestrates classify → strategy → DAG validate → task creation → result. |
| src/ai_company/engine/decomposition/rollup.py | Implements subtask-status aggregation into derived parent status + logging. |
| src/ai_company/engine/decomposition/protocol.py | Defines decomposition strategy protocol. |
| src/ai_company/engine/decomposition/models.py | Adds frozen decomposition models + validators and derived rollup status field. |
| src/ai_company/engine/decomposition/manual.py | Adds manual decomposition strategy enforcing context limits. |
| src/ai_company/engine/decomposition/dag.py | Implements DAG validation, cycle detection, topo sort, parallel grouping, dependency lookups. |
| src/ai_company/engine/decomposition/classifier.py | Implements structure classification heuristics and emits classification events. |
| src/ai_company/engine/decomposition/init.py | Exposes decomposition subsystem public API. |
| src/ai_company/engine/init.py | Re-exports new engine APIs and error types at package level. |
| src/ai_company/core/task.py | Adds task_structure and coordination_topology fields to Task model. |
| src/ai_company/core/enums.py | Introduces TaskStructure and CoordinationTopology enums. |
| src/ai_company/core/init.py | Exports new enums from ai_company.core. |
| README.md | Updates product feature list to include decomposition & routing. |
| DESIGN_SPEC.md | Updates architecture tree/docs to reflect new subsystems and events. |
| CLAUDE.md | Updates repository layout description to mention decomposition/routing in engine. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| all_matched: list[str] = [] | ||
| reasons: list[str] = [] | ||
|
|
||
| # Primary skill overlap (weight: 0.4) | ||
| required = set(subtask.required_skills) | ||
| primary = set(agent.skills.primary) | ||
| primary_matched = required & primary | ||
| if required: | ||
| primary_ratio = len(primary_matched) / max(len(required), 1) | ||
| primary_contrib = primary_ratio * 0.4 | ||
| total_score += primary_contrib | ||
| all_matched.extend(sorted(primary_matched)) | ||
| if primary_matched: | ||
| reasons.append(f"primary skills: {sorted(primary_matched)}") | ||
| else: | ||
| reasons.append("no skills required, skill matching skipped") | ||
|
|
||
| # Secondary skill overlap (weight: 0.2) | ||
| secondary = set(agent.skills.secondary) | ||
| secondary_matched = required & secondary | ||
| if required: | ||
| secondary_ratio = len(secondary_matched) / max(len(required), 1) | ||
| secondary_contrib = secondary_ratio * 0.2 | ||
| total_score += secondary_contrib | ||
| all_matched.extend(sorted(secondary_matched)) | ||
| if secondary_matched: | ||
| reasons.append(f"secondary skills: {sorted(secondary_matched)}") |
There was a problem hiding this comment.
matched_skills can include duplicates because primary and secondary skills aren’t validated to be disjoint (and you append both primary_matched and secondary_matched). That can make routing explanations noisy/confusing and complicate any downstream consumers that treat matched_skills as a set. Consider de-duplicating while preserving deterministic ordering (e.g., accumulate in an ordered set / dict, or build a set and sort once).
| if not available_agents: | ||
| logger.warning( | ||
| TASK_ROUTING_NO_AGENTS, | ||
| parent_task_id=plan.parent_task_id, | ||
| subtask_count=len(plan.subtasks), | ||
| ) | ||
|
|
||
| try: | ||
| return self._do_route(decomposition_result, available_agents, parent_task) | ||
| except Exception: |
There was a problem hiding this comment.
When available_agents is empty you emit TASK_ROUTING_NO_AGENTS, but still proceed into _do_route(), which will then log TASK_ROUTING_SUBTASK_UNROUTABLE once per subtask. For decompositions with many subtasks this creates avoidable log noise and extra work. Consider short-circuiting: return a RoutingResult with unroutable set to all plan.subtasks and skip per-subtask routing/logging when there are no agents.
| from typing import TYPE_CHECKING | ||
|
|
||
| import pytest | ||
|
|
||
| from ai_company.core.enums import CoordinationTopology | ||
|
|
||
| if TYPE_CHECKING: | ||
| from ai_company.core.agent import AgentIdentity |
There was a problem hiding this comment.
AgentIdentity is only imported under TYPE_CHECKING, but it’s referenced in runtime type annotations (e.g. sample_agent_with_personality: AgentIdentity). In this repo there are multiple places noting that test-time tooling/pytest can evaluate annotations, so this pattern can lead to runtime NameError when annotations are resolved. Import AgentIdentity at runtime here as well (and suppress the type-checking lint with # noqa: TC003/TC001 as used elsewhere in tests).
| from typing import TYPE_CHECKING | |
| import pytest | |
| from ai_company.core.enums import CoordinationTopology | |
| if TYPE_CHECKING: | |
| from ai_company.core.agent import AgentIdentity | |
| from typing import TYPE_CHECKING # noqa: TC001 | |
| import pytest | |
| from ai_company.core.enums import CoordinationTopology | |
| from ai_company.core.agent import AgentIdentity # noqa: TC003 |
Greptile SummaryThis PR introduces a complete task decomposition and routing engine for the Two new issues were identified:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant DecompositionService
participant TaskStructureClassifier
participant DecompositionStrategy
participant DependencyGraph
participant TaskRoutingService
participant AgentTaskScorer
participant TopologySelector
Caller->>DecompositionService: decompose_task(task, context)
DecompositionService->>TaskStructureClassifier: classify(task)
TaskStructureClassifier-->>DecompositionService: TaskStructure
DecompositionService->>DecompositionStrategy: decompose(task, context)
DecompositionStrategy-->>DecompositionService: DecompositionPlan
Note over DecompositionService: Override plan.task_structure if classifier differs
DecompositionService->>DependencyGraph: __init__(plan.subtasks)
DecompositionService->>DependencyGraph: validate()
Note over DependencyGraph: _check_missing_references() + iterative DFS cycles
DependencyGraph-->>DecompositionService: (valid or raises)
DecompositionService->>DecompositionService: create Task objects from subtask defs
DecompositionService-->>Caller: DecompositionResult
Caller->>TaskRoutingService: route(decomposition_result, agents, parent_task)
TaskRoutingService->>TopologySelector: select(parent_task, plan)
TopologySelector-->>TaskRoutingService: CoordinationTopology
loop for each SubtaskDefinition
TaskRoutingService->>AgentTaskScorer: score(agent, subtask_def) × N
AgentTaskScorer-->>TaskRoutingService: RoutingCandidate[]
Note over TaskRoutingService: Filter by min_score, sort desc, pick best
end
TaskRoutingService-->>Caller: RoutingResult
Last reviewed commit: 5e539d2 |
There was a problem hiding this comment.
Actionable comments posted: 11
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/core/task.py`:
- Around line 143-150: The Task class docstring is missing documentation for the
two new fields; update the Attributes section of the Task class docstring to
include brief descriptions for task_structure (TaskStructure | None —
classification of subtask relationships; None means not classified) and
coordination_topology (CoordinationTopology — coordination topology for
multi-agent execution, default AUTO). Edit the Task class docstring near the
existing attribute list so the entries match the style/format of the other
attributes and reference the exact field names task_structure and
coordination_topology.
In `@src/ai_company/engine/decomposition/models.py`:
- Around line 1-20: Import get_logger and create logger = get_logger(__name__),
then update each Pydantic validator (the methods decorated with `@model_validator`
and any computed_field validators in the file) to catch validation exceptions,
log a warning/error with contextual details (model/class name, incoming field
values or kwargs, and the exception message) and re-raise the original
exception; apply this pattern to all validator methods referenced in the file
(the validators around lines ~57-63, ~98-122, ~145-165, and ~198-211) so
failures are logged before being raised.
- Around line 145-165: The validator must ensure created_tasks exactly match the
plan's subtasks and that dependency_edges are neither missing nor extraneous:
replace the current count-only check with a set equality check comparing {t.id
for t in self.created_tasks} to {s.id for s in self.plan.subtasks} and raise
ValueError if they differ; then strengthen edge validation by (a) if self.plan
exposes an expected dependency set (e.g., self.plan.dependency_edges) compare
the set(self.dependency_edges) to that expected set and raise on mismatch, and
(b) otherwise enforce no self-loops, no duplicate edges, and that every edge
endpoint is in the subtask id set (use symbols _validate_plan_task_consistency,
created_tasks, plan.subtasks, dependency_edges to locate code).
In `@src/ai_company/engine/decomposition/service.py`:
- Around line 104-113: The code is wrongly overwriting the plan's declared
task_structure with the parent-only classifier result; instead, do not replace
plan.task_structure returned from self._strategy.decompose(task, context).
Update the logic in the method containing calls to
self._classifier.classify(task) and self._strategy.decompose(...) so that you
only use the classifier result if the returned plan has no structure (e.g.,
plan.task_structure is None or falsy); otherwise leave plan.task_structure and
plan.subtasks intact (or if you must detect mismatches, emit a warning/error
referencing plan.task_structure and plan.subtasks rather than silently calling
plan.model_copy(...)). Ensure you remove the unconditional model_copy update
that sets task_structure from the classifier.
- Around line 121-147: The created child Task instances currently omit subtask
dependencies, so copy each SubtaskDefinition's dependencies into the Task when
constructing it in the decomposition service (the Task construction block that
creates child_task from subtask_def); set Task.dependencies =
subtask_def.dependencies (converting to the appropriate type if Task expects a
tuple/list) so the per-Task dependency info matches the DAG edges built later
(the edges list comprehension over plan.subtasks).
In `@src/ai_company/engine/routing/models.py`:
- Around line 65-75: The validator _validate_selected_not_in_alternatives on the
model uses agent_identity.name to detect duplicates; change it to compare the
unique identifier agent_identity.id (UUID) instead: compute selected_id =
self.selected_candidate.agent_identity.id and compare against
alt.agent_identity.id for each alt in self.alternatives, and raise the same
ValueError when ids match; keep the error message but optionally include the
selected_id for clarity.
- Around line 99-111: The validator _validate_unique_subtask_ids currently
silences duplicates by building sets; instead explicitly detect and reject
duplicate subtask_id values inside both decisions and unroutable before checking
cross-collection overlap: scan self.decisions and self.unroutable to collect
counts (or use a seen set) to find any subtask_ids occurring more than once and
raise a ValueError listing those duplicates (referencing symbols decisions,
unroutable, and the validator _validate_unique_subtask_ids), then continue to
compute decision_ids and unroutable_set and raise the existing overlap error if
any IDs appear in both.
- Around line 7-13: Import get_logger and the observability event constants,
create logger = get_logger(__name__), then add structured logging (WARNING or
ERROR) with the appropriate event constant and contextual fields immediately
before each raise in the module's validator functions (the pydantic
`@model_validator` blocks and any validation logic around
CoordinationTopology/AgentIdentity/NotBlankStr). Specifically, update the three
raise paths referenced (around lines 65-75, 99-111, 145-157) to call
logger.warning or logger.error with a descriptive message, include the event
from ai_company.observability.events and relevant context (payload, field names,
invalid values), and only then re-raise the validation exception so failures are
logged consistently.
In `@src/ai_company/engine/routing/topology_selector.py`:
- Around line 57-79: The selector currently ignores a concrete topology set on
the decomposition plan when task.coordination_topology is AUTO; update the
selection logic so that after checking the task-level explicit override
(task.coordination_topology) you check plan.coordination_topology and if it's
not CoordinationTopology.AUTO, log using TASK_ROUTING_TOPOLOGY_SELECTED with
source="plan" and return plan.coordination_topology (instead of continuing to
heuristic auto-selection); keep the existing heuristics
(_config.sequential_override, _config.parallel_default, _config.mixed_default
and parallel_artifact_threshold) for true AUTO cases.
In `@tests/unit/engine/test_decomposition_models.py`:
- Around line 16-17: Remove the unused TYPE_CHECKING guard that imports Task:
delete the "if TYPE_CHECKING: from ai_company.core.task import Task" block (the
test uses the runtime import "from ai_company.core.task import Task" at line
~180), so eliminate the misleading conditional import and any reference to
TYPE_CHECKING in this file to keep imports clear and avoid dead code.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e20b1f17-e5d6-4abd-8192-62a862f8bd4a
📒 Files selected for processing (34)
CLAUDE.mdDESIGN_SPEC.mdREADME.mdsrc/ai_company/core/__init__.pysrc/ai_company/core/enums.pysrc/ai_company/core/task.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/decomposition/__init__.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/engine/decomposition/protocol.pysrc/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/routing/__init__.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/routing/topology_selector.pysrc/ai_company/observability/events/decomposition.pysrc/ai_company/observability/events/task_routing.pytests/unit/engine/test_decomposition_classifier.pytests/unit/engine/test_decomposition_dag.pytests/unit/engine/test_decomposition_manual.pytests/unit/engine/test_decomposition_models.pytests/unit/engine/test_decomposition_rollup.pytests/unit/engine/test_decomposition_service.pytests/unit/engine/test_routing_models.pytests/unit/engine/test_routing_scorer.pytests/unit/engine/test_routing_service.pytests/unit/engine/test_routing_topology.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Agent
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Do not usefrom __future__ import annotationsin Python 3.14+ code — Python 3.14 has native PEP 649 lazy annotations
Useexcept A, B:(no parentheses) for exception syntax in Python 3.14 — PEP 758 except syntax enforced by ruff
All public functions and classes must have type hints; mypy strict mode is enforced
All public classes and functions must have Google-style docstrings; ruff D rules enforce this
Create new objects instead of mutating existing ones; use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement of non-Pydantic internal collections (registries, BaseTool)
For non-Pydantic internal collections and dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence)
Use frozen Pydantic models for config/identity; use separate mutable-via-copy models (model_copy(update=...)) for runtime state that evolves; never mix static config fields with mutable runtime fields
Use Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict); use@computed_fieldfor derived values instead of storing redundant fields
Use NotBlankStr (from core.types) for all identifier/name fields — including optional (NotBlankStr | None) and tuple (tuple[NotBlankStr, ...]) variants — instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls); prefer structured concurrency over bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly; never silently swallow errors
Validate at system boundaries (user input, external APIs, config files)
Line length must be 88 characters (enforced by ruff)
Files:
src/ai_company/observability/events/decomposition.pytests/unit/engine/test_routing_topology.pytests/unit/engine/test_decomposition_rollup.pysrc/ai_company/core/__init__.pysrc/ai_company/engine/decomposition/__init__.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/observability/events/task_routing.pytests/unit/engine/test_decomposition_dag.pysrc/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/routing/topology_selector.pytests/unit/engine/test_routing_models.pysrc/ai_company/engine/decomposition/manual.pytests/unit/observability/test_events.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/decomposition/dag.pytests/unit/engine/test_routing_service.pysrc/ai_company/engine/routing/service.pytests/unit/engine/test_routing_scorer.pysrc/ai_company/engine/decomposition/protocol.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/service.pytests/unit/engine/test_decomposition_service.pysrc/ai_company/engine/routing/__init__.pysrc/ai_company/core/enums.pytests/unit/engine/test_decomposition_manual.pytests/unit/engine/test_decomposition_models.pysrc/ai_company/engine/routing/models.pytests/unit/engine/test_decomposition_classifier.pysrc/ai_company/engine/__init__.pysrc/ai_company/core/task.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic must importfrom ai_company.observability import get_loggerand createlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Always use variable namelogger(not_logger, notlog) for the logger instance
Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly
Always use structured logging with kwargs:logger.info(EVENT, key=value)— never use formatted strings likelogger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
DEBUG logging is for object creation, internal flow, and entry/exit of key functions
Files:
src/ai_company/observability/events/decomposition.pysrc/ai_company/core/__init__.pysrc/ai_company/engine/decomposition/__init__.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/observability/events/task_routing.pysrc/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/routing/topology_selector.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/decomposition/protocol.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/routing/__init__.pysrc/ai_company/core/enums.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/__init__.pysrc/ai_company/core/task.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples; use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases
Files:
src/ai_company/observability/events/decomposition.pysrc/ai_company/core/__init__.pysrc/ai_company/engine/decomposition/__init__.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/observability/events/task_routing.pysrc/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/routing/topology_selector.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/decomposition/protocol.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/routing/__init__.pysrc/ai_company/core/enums.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/__init__.pysrc/ai_company/core/task.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, or@pytest.mark.slowas appropriate
Prefer@pytest.mark.parametrizefor testing similar cases
Use vendor-agnostic test names: test-provider, test-small-001, etc. instead of real vendor names
Files:
tests/unit/engine/test_routing_topology.pytests/unit/engine/test_decomposition_rollup.pytests/unit/engine/test_decomposition_dag.pytests/unit/engine/test_routing_models.pytests/unit/observability/test_events.pytests/unit/engine/test_routing_service.pytests/unit/engine/test_routing_scorer.pytests/unit/engine/test_decomposition_service.pytests/unit/engine/test_decomposition_manual.pytests/unit/engine/test_decomposition_models.pytests/unit/engine/test_decomposition_classifier.py
src/ai_company/engine/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
RetryExhaustedError signals that all retries failed — the engine layer catches this to trigger fallback chains
Files:
src/ai_company/engine/decomposition/__init__.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/routing/topology_selector.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/decomposition/protocol.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/routing/__init__.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/__init__.py
🧠 Learnings (2)
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly
Applied to files:
src/ai_company/observability/events/decomposition.pysrc/ai_company/observability/events/task_routing.pytests/unit/observability/test_events.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/engine/**/*.py : RetryExhaustedError signals that all retries failed — the engine layer catches this to trigger fallback chains
Applied to files:
src/ai_company/engine/errors.py
🧬 Code graph analysis (23)
tests/unit/engine/test_routing_topology.py (2)
src/ai_company/engine/routing/models.py (1)
AutoTopologyConfig(114-157)src/ai_company/engine/routing/topology_selector.py (3)
TopologySelector(24-98)select(43-98)config(39-41)
tests/unit/engine/test_decomposition_rollup.py (3)
src/ai_company/core/enums.py (1)
TaskStatus(122-148)src/ai_company/engine/decomposition/rollup.py (2)
StatusRollup(21-79)compute(25-79)src/ai_company/engine/decomposition/models.py (1)
derived_parent_status(217-237)
src/ai_company/core/__init__.py (1)
src/ai_company/core/enums.py (2)
CoordinationTopology(282-293)TaskStructure(270-279)
src/ai_company/engine/decomposition/models.py (2)
src/ai_company/core/enums.py (4)
Complexity(171-177)CoordinationTopology(282-293)TaskStatus(122-148)TaskStructure(270-279)src/ai_company/core/task.py (1)
Task(45-257)
tests/unit/engine/test_decomposition_dag.py (1)
src/ai_company/engine/decomposition/dag.py (6)
DependencyGraph(25-236)validate(62-78)topological_sort(132-171)parallel_groups(173-212)get_dependents(214-224)get_dependencies(226-236)
src/ai_company/engine/decomposition/rollup.py (3)
src/ai_company/core/enums.py (1)
TaskStatus(122-148)src/ai_company/engine/decomposition/models.py (2)
SubtaskStatusRollup(168-237)derived_parent_status(217-237)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
src/ai_company/engine/routing/topology_selector.py (4)
src/ai_company/core/enums.py (2)
CoordinationTopology(282-293)TaskStructure(270-279)src/ai_company/engine/routing/models.py (1)
AutoTopologyConfig(114-157)src/ai_company/core/task.py (1)
Task(45-257)src/ai_company/engine/decomposition/models.py (1)
DecompositionPlan(66-122)
tests/unit/engine/test_routing_models.py (5)
src/ai_company/core/enums.py (1)
CoordinationTopology(282-293)src/ai_company/engine/routing/models.py (4)
AutoTopologyConfig(114-157)RoutingCandidate(16-38)RoutingDecision(41-75)RoutingResult(78-111)tests/unit/engine/conftest.py (1)
sample_agent_with_personality(52-79)tests/unit/engine/test_decomposition_models.py (1)
test_frozen(77-81)src/ai_company/engine/routing/topology_selector.py (1)
config(39-41)
src/ai_company/engine/decomposition/manual.py (4)
src/ai_company/engine/errors.py (2)
DecompositionDepthError(58-59)DecompositionError(50-51)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/engine/decomposition/models.py (2)
DecompositionContext(240-265)DecompositionPlan(66-122)src/ai_company/engine/decomposition/protocol.py (2)
decompose(22-36)get_strategy_name(38-40)
src/ai_company/engine/decomposition/classifier.py (2)
src/ai_company/core/enums.py (1)
TaskStructure(270-279)src/ai_company/core/task.py (1)
Task(45-257)
src/ai_company/engine/decomposition/dag.py (3)
src/ai_company/engine/errors.py (2)
DecompositionCycleError(54-55)DecompositionError(50-51)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/engine/decomposition/models.py (1)
SubtaskDefinition(22-63)
tests/unit/engine/test_routing_service.py (7)
src/ai_company/core/agent.py (3)
AgentIdentity(246-304)ModelConfig(145-174)SkillSet(125-142)src/ai_company/core/enums.py (5)
AgentStatus(24-29)Complexity(171-177)Priority(162-168)SeniorityLevel(6-21)TaskType(151-159)src/ai_company/core/task.py (1)
Task(45-257)src/ai_company/engine/decomposition/models.py (3)
DecompositionPlan(66-122)DecompositionResult(125-165)SubtaskDefinition(22-63)src/ai_company/engine/routing/scorer.py (2)
AgentTaskScorer(35-149)min_score(57-59)src/ai_company/engine/routing/service.py (2)
TaskRoutingService(32-166)route(50-88)src/ai_company/engine/routing/topology_selector.py (1)
TopologySelector(24-98)
src/ai_company/engine/routing/service.py (6)
src/ai_company/engine/routing/models.py (2)
RoutingDecision(41-75)RoutingResult(78-111)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/core/task.py (1)
Task(45-257)src/ai_company/engine/decomposition/models.py (1)
DecompositionResult(125-165)src/ai_company/engine/routing/scorer.py (3)
AgentTaskScorer(35-149)score(61-149)min_score(57-59)src/ai_company/engine/routing/topology_selector.py (2)
TopologySelector(24-98)select(43-98)
tests/unit/engine/test_routing_scorer.py (4)
src/ai_company/core/agent.py (3)
AgentIdentity(246-304)ModelConfig(145-174)SkillSet(125-142)src/ai_company/core/enums.py (3)
AgentStatus(24-29)Complexity(171-177)SeniorityLevel(6-21)src/ai_company/engine/decomposition/models.py (1)
SubtaskDefinition(22-63)src/ai_company/engine/routing/scorer.py (3)
AgentTaskScorer(35-149)score(61-149)min_score(57-59)
src/ai_company/engine/decomposition/protocol.py (3)
src/ai_company/core/task.py (1)
Task(45-257)src/ai_company/engine/decomposition/models.py (2)
DecompositionContext(240-265)DecompositionPlan(66-122)src/ai_company/engine/decomposition/manual.py (2)
decompose(37-81)get_strategy_name(83-85)
src/ai_company/engine/routing/scorer.py (5)
src/ai_company/core/enums.py (3)
AgentStatus(24-29)Complexity(171-177)SeniorityLevel(6-21)src/ai_company/engine/routing/models.py (1)
RoutingCandidate(16-38)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/core/agent.py (1)
AgentIdentity(246-304)src/ai_company/engine/decomposition/models.py (1)
SubtaskDefinition(22-63)
src/ai_company/engine/routing/__init__.py (4)
src/ai_company/engine/routing/models.py (4)
AutoTopologyConfig(114-157)RoutingCandidate(16-38)RoutingDecision(41-75)RoutingResult(78-111)src/ai_company/engine/routing/scorer.py (1)
AgentTaskScorer(35-149)src/ai_company/engine/routing/service.py (1)
TaskRoutingService(32-166)src/ai_company/engine/routing/topology_selector.py (1)
TopologySelector(24-98)
tests/unit/engine/test_decomposition_manual.py (2)
src/ai_company/engine/decomposition/protocol.py (3)
DecompositionStrategy(14-40)decompose(22-36)get_strategy_name(38-40)src/ai_company/engine/errors.py (2)
DecompositionDepthError(58-59)DecompositionError(50-51)
tests/unit/engine/test_decomposition_models.py (1)
src/ai_company/engine/decomposition/models.py (6)
DecompositionContext(240-265)DecompositionPlan(66-122)DecompositionResult(125-165)SubtaskDefinition(22-63)SubtaskStatusRollup(168-237)derived_parent_status(217-237)
src/ai_company/engine/routing/models.py (2)
src/ai_company/core/agent.py (1)
AgentIdentity(246-304)src/ai_company/core/enums.py (1)
CoordinationTopology(282-293)
tests/unit/engine/test_decomposition_classifier.py (2)
src/ai_company/core/enums.py (1)
TaskStructure(270-279)src/ai_company/engine/decomposition/classifier.py (2)
TaskStructureClassifier(46-192)classify(59-104)
src/ai_company/engine/__init__.py (4)
src/ai_company/engine/decomposition/models.py (4)
DecompositionContext(240-265)DecompositionPlan(66-122)DecompositionResult(125-165)SubtaskDefinition(22-63)src/ai_company/engine/decomposition/dag.py (1)
DependencyGraph(25-236)src/ai_company/engine/errors.py (2)
EngineError(4-5)TaskRoutingError(62-63)src/ai_company/engine/routing/models.py (4)
AutoTopologyConfig(114-157)RoutingCandidate(16-38)RoutingDecision(41-75)RoutingResult(78-111)
src/ai_company/core/task.py (1)
src/ai_company/core/enums.py (3)
CoordinationTopology(282-293)TaskStructure(270-279)TaskType(151-159)
🪛 LanguageTool
README.md
[typographical] ~18-~18: To join two clauses or introduce examples, consider using an em dash.
Context: ...ntion - Task Decomposition & Routing - DAG-based subtask decomposition, structu...
(DASH_RULE)
🔇 Additional comments (37)
src/ai_company/observability/events/decomposition.py (1)
1-14: LGTM!Event constants are well-structured, follow established naming conventions (domain.entity.action), and align with other observability event modules. The
Final[str]typing ensures immutability at the type-checker level.src/ai_company/engine/errors.py (1)
48-63: LGTM!The exception hierarchy is well-designed with proper inheritance:
DecompositionErroras the base for decomposition failures with specific subclasses for cycle detection and depth violations, andTaskRoutingErroras a separate concern for routing failures. All classes have appropriate docstrings.src/ai_company/observability/events/task_routing.py (1)
1-12: LGTM!Event constants comprehensively cover the task routing lifecycle (scoring, routing decisions, topology selection, completion/failure states) and follow established naming conventions.
src/ai_company/engine/routing/scorer.py (1)
1-149: LGTM!The scoring implementation is well-designed with clear weight distribution (0.4 primary skills, 0.2 secondary, 0.2 role, 0.2 seniority-complexity), proper handling of inactive agents, and comprehensive reason generation. The seniority-to-complexity mapping appropriately pairs agent levels with suitable task complexities.
src/ai_company/engine/decomposition/classifier.py (1)
1-192: LGTM!The classifier implementation cleanly separates language-based signals (regex patterns) from structural signals (artifact count, dependencies). The decision logic correctly prioritizes MIXED detection (both language categories present), then compares total scores with a SEQUENTIAL fallback per the research guidance in §6.9. The heuristics are well-documented and align with DESIGN_SPEC.md.
DESIGN_SPEC.md (1)
2388-2402: LGTM!The project structure documentation accurately reflects the new decomposition and routing subsystems, including all modules (classifier, dag, manual, models, protocol, rollup, service for decomposition; models, scorer, service, topology_selector for routing) and their corresponding observability event constants.
Also applies to: 2454-2455, 2465-2465
README.md (1)
18-18: LGTM!The new feature bullet accurately summarizes the decomposition and routing capabilities and maintains consistency with the existing list formatting. The static analysis suggestion about em dashes is noted but would create inconsistency with other bullets — the current hyphen-minus style matches the rest of the document.
CLAUDE.md (1)
52-52: LGTM!The engine directory description is appropriately updated to include the new decomposition and routing subsystems, maintaining consistency with the actual package structure additions.
tests/unit/observability/test_events.py (1)
118-130: LGTM!The expected domain modules set is correctly updated to include
decompositionandtask_routing, aligning with the new observability event modules introduced in this PR. The alphabetical ordering is maintained.src/ai_company/core/__init__.py (1)
34-47: LGTM!The new
CoordinationTopologyandTaskStructureenums are correctly imported and added to__all__in alphabetical order, following the existing module conventions.Also applies to: 89-120
tests/unit/engine/test_decomposition_dag.py (2)
1-21: LGTM!Clean helper function and well-organized imports. The
_sub()helper provides a minimal factory for creatingSubtaskDefinitioninstances, reducing boilerplate across tests.
23-213: LGTM!Comprehensive test coverage for
DependencyGraphincluding:
- Validation (acyclic, cycles, missing references)
- Topological sort (linear, diamond, cycle detection)
- Parallel groups (independent, sequential, diamond patterns)
- Boundary cases (single node, unknown IDs)
The tests effectively validate the iterative DFS cycle detection and heapq-based topological sort mentioned in the PR objectives.
tests/unit/engine/test_decomposition_rollup.py (1)
1-93: LGTM!Thorough test coverage for
StatusRollup.compute()covering the documented priority rules:
- All COMPLETED → COMPLETED
- All CANCELLED → CANCELLED
- Any FAILED → FAILED
- Any IN_PROGRESS → IN_PROGRESS
- BLOCKED without IN_PROGRESS → BLOCKED
- Empty → CREATED
- Mixed pending work → IN_PROGRESS
The
test_rollup_countstest validates all aggregate count fields are correctly populated.tests/unit/engine/test_routing_models.py (2)
9-10: TYPE_CHECKING import is appropriate here.The
AgentIdentityis only used as a type annotation for fixture parameters and never instantiated directly in this module, making theTYPE_CHECKINGguard correct for avoiding circular imports.
19-169: LGTM!Comprehensive validation tests for routing models:
RoutingCandidate: score bounds (0.0-1.0), immutabilityRoutingDecision: selected candidate not in alternatives validatorRoutingResult: no overlap between decisions and unroutableAutoTopologyConfig: defaults and AUTO rejection to prevent infinite resolutionAll cross-field validators from the model definitions are properly exercised.
tests/unit/engine/test_decomposition_classifier.py (3)
14-34: LGTM!Clean helper function with sensible defaults. The keyword-only arguments after
descriptionenforce explicit parameter passing for optional fields, improving readability.
117-170: Good use of parametrization.The parametrized tests for individual sequential and parallel patterns provide excellent coverage of the classifier's language pattern recognition, with descriptive test IDs for clear failure reporting.
36-115: LGTM!Tests comprehensively cover the classifier logic:
- Explicit
task_structurebypass- Sequential/parallel language signal detection
- Mixed signals triggering MIXED classification
- Default SEQUENTIAL fallback with dependencies
- Acceptance criteria contributing to scoring
This aligns well with the classifier implementation that requires both sequential and parallel language signals for MIXED classification.
tests/unit/engine/test_routing_service.py (2)
27-47: LGTM!The
_make_agenthelper withnoqa: PLR0913is acceptable for test fixtures where explicit parameters improve test readability. The vendor-agnostic naming (test-provider,test-model-001) follows coding guidelines.
113-294: LGTM!Thorough test coverage for
TaskRoutingService:
- Routing to best-scoring agent based on skills/role matching
- Unroutable subtasks when no viable candidates
- Alternatives populated with secondary candidates
- Topology applied to all decisions
- Empty agent pool handling
- Inactive agent filtering (TERMINATED status → score 0)
- Parent task ID propagation in results
The tests effectively exercise the scorer weights (skill 0.4/0.2, role 0.2, seniority 0.2) and topology integration.
tests/unit/engine/test_routing_topology.py (3)
22-56: LGTM!Clean helper functions for creating test fixtures. The
_make_taskhelper with configurableartifact_countenables thorough threshold testing, and_make_planallows structure variations.
131-149: Good boundary testing.Tests correctly verify the threshold behavior:
- At threshold (4 artifacts) → CENTRALIZED
- Above threshold (5 artifacts) → DECENTRALIZED
This validates the
artifact_count > parallel_artifact_thresholdcondition inTopologySelector.select().
59-156: LGTM!Comprehensive test coverage for
TopologySelector:
- Explicit topology override takes precedence
- Structure-based auto-selection (SEQUENTIAL→SAS, PARALLEL→CENTRALIZED/DECENTRALIZED, MIXED→CONTEXT_DEPENDENT)
- Custom
AutoTopologyConfigoverrides defaults- Artifact count threshold boundary behavior
- Config property accessor
The tests align with DESIGN_SPEC Section 6.9 topology selection rules.
src/ai_company/engine/routing/__init__.py (1)
1-25: LGTM!Clean public API surface for the routing subsystem. The
__all__is alphabetically sorted and matches the imports.src/ai_company/core/task.py (1)
10-17: LGTM!The import block correctly includes the new
CoordinationTopologyandTaskStructureenums needed for the new Task fields.src/ai_company/engine/decomposition/protocol.py (1)
1-40: LGTM!Clean protocol definition with proper
TYPE_CHECKINGguards,@runtime_checkabledecorator, and method signatures that align with the concreteManualDecompositionStrategyimplementation.tests/unit/engine/test_routing_scorer.py (2)
184-211: LGTM!Good use of
@pytest.mark.parametrizeto cover multiple seniority-complexity alignment combinations. The test IDs are descriptive and aid debugging.
23-33: > Likely an incorrect or invalid review comment.tests/unit/engine/test_decomposition_models.py (2)
250-364: LGTM!Comprehensive test coverage for
SubtaskStatusRollupcovering all status derivation paths including edge cases (zero total, counts exceeding total, pending work defaults).
196-211: Thesample_task_with_criteriafixture is correctly defined intests/unit/engine/conftest.pyand is properly accessible to the test.src/ai_company/engine/routing/service.py (3)
1-48: LGTM!Clean service implementation with proper observability setup. The
__slots__usage andTYPE_CHECKINGguard for heavy imports follow best practices.
50-88: LGTM!The
route()method properly handles the empty-agents case with a warning log while allowing the flow to continue (all subtasks become unroutable). The try/except withlogger.exception()ensures failures are logged with full traceback before re-raising.
90-166: LGTM!The
_do_route()implementation correctly:
- Computes topology once per plan (shared across all decisions)
- Scores and filters candidates by
min_scorethreshold- Sorts viable candidates descending by score
- Logs at appropriate levels (DEBUG for per-subtask, INFO for completion)
tests/unit/engine/test_decomposition_service.py (3)
1-66: LGTM!Clean test helpers that create minimal Task and DecompositionPlan instances with sensible defaults. The plan helper creates a sequential dependency chain suitable for testing edge construction.
67-214: LGTM!Comprehensive async test coverage for
DecompositionService.decompose_task():
- Task creation with correct parent linkage
- Dependency edge construction
- Delegation chain inheritance
- Structure classification and override behavior
- Cycle detection raising
DecompositionCycleError
215-266: LGTM!Good coverage for subtask complexity propagation and rollup delegation. The
test_rollup_status_delegatescorrectly verifies synchronous rollup behavior.src/ai_company/engine/decomposition/__init__.py (1)
1-33: LGTM!Clean public API surface for the decomposition subsystem. The
__all__is alphabetically sorted and all imports are correctly mapped to exports.
| task_structure: TaskStructure | None = Field( | ||
| default=None, | ||
| description="Classification of subtask relationships (None = not classified)", | ||
| ) | ||
| coordination_topology: CoordinationTopology = Field( | ||
| default=CoordinationTopology.AUTO, | ||
| description="Coordination topology for multi-agent execution", | ||
| ) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Update class docstring to document new fields.
The Task class docstring (lines 52-73) lists all attributes but doesn't include the newly added task_structure and coordination_topology fields. Add them for consistency.
📝 Proposed docstring addition
Add these entries to the Attributes section in the class docstring:
delegation_chain: Ordered agent IDs of delegators (root first).
+ task_structure: Classification of subtask relationships
+ (``None`` if not classified).
+ coordination_topology: Coordination topology for multi-agent
+ execution (default ``AUTO``).
"""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/core/task.py` around lines 143 - 150, The Task class docstring
is missing documentation for the two new fields; update the Attributes section
of the Task class docstring to include brief descriptions for task_structure
(TaskStructure | None — classification of subtask relationships; None means not
classified) and coordination_topology (CoordinationTopology — coordination
topology for multi-agent execution, default AUTO). Edit the Task class docstring
near the existing attribute list so the entries match the style/format of the
other attributes and reference the exact field names task_structure and
coordination_topology.
| """Decomposition domain models. | ||
|
|
||
| Frozen Pydantic models for subtask definitions, decomposition plans, | ||
| results, status rollups, and decomposition context. | ||
| """ | ||
|
|
||
| from collections import Counter | ||
| from typing import Self | ||
|
|
||
| from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator | ||
|
|
||
| from ai_company.core.enums import ( | ||
| Complexity, | ||
| CoordinationTopology, | ||
| TaskStatus, | ||
| TaskStructure, | ||
| ) | ||
| from ai_company.core.task import Task # noqa: TC001 | ||
| from ai_company.core.types import NotBlankStr # noqa: TC001 | ||
|
|
There was a problem hiding this comment.
Log validator failures before raising from this module.
These validators currently raise ValueError silently. In a business-logic module, that leaves decomposition validation failures without the required warning/error log context.
As per coding guidelines, src/ai_company/**/*.py: Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__); all error paths must log at WARNING or ERROR with context before raising.
Also applies to: 57-63, 98-122, 145-165, 198-211
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/decomposition/models.py` around lines 1 - 20, Import
get_logger and create logger = get_logger(__name__), then update each Pydantic
validator (the methods decorated with `@model_validator` and any computed_field
validators in the file) to catch validation exceptions, log a warning/error with
contextual details (model/class name, incoming field values or kwargs, and the
exception message) and re-raise the original exception; apply this pattern to
all validator methods referenced in the file (the validators around lines
~57-63, ~98-122, ~145-165, and ~198-211) so failures are logged before being
raised.
| @model_validator(mode="after") | ||
| def _validate_plan_task_consistency(self) -> Self: | ||
| """Ensure created_tasks align with plan subtasks.""" | ||
| if len(self.created_tasks) != len(self.plan.subtasks): | ||
| msg = ( | ||
| f"created_tasks count ({len(self.created_tasks)}) " | ||
| f"does not match plan subtask count ({len(self.plan.subtasks)})" | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
||
| task_ids = {t.id for t in self.created_tasks} | ||
| edge_ids = {eid for edge in self.dependency_edges for eid in edge} | ||
| unknown_edge_ids = edge_ids - task_ids | ||
| if unknown_edge_ids: | ||
| msg = ( | ||
| f"dependency_edges reference unknown task IDs: " | ||
| f"{sorted(unknown_edge_ids)}" | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
||
| return self |
There was a problem hiding this comment.
Strengthen DecompositionResult consistency checks.
This validator only compares counts and verifies that edge endpoints exist. It still accepts a result where the created task IDs differ from plan.subtasks, and it also accepts missing or extra dependency_edges.
Suggested fix
`@model_validator`(mode="after")
def _validate_plan_task_consistency(self) -> Self:
"""Ensure created_tasks align with plan subtasks."""
if len(self.created_tasks) != len(self.plan.subtasks):
msg = (
f"created_tasks count ({len(self.created_tasks)}) "
f"does not match plan subtask count ({len(self.plan.subtasks)})"
)
raise ValueError(msg)
- task_ids = {t.id for t in self.created_tasks}
+ expected_task_ids = {subtask.id for subtask in self.plan.subtasks}
+ task_ids = {t.id for t in self.created_tasks}
+ if task_ids != expected_task_ids:
+ msg = (
+ f"created_tasks IDs {sorted(task_ids)} do not match "
+ f"plan subtask IDs {sorted(expected_task_ids)}"
+ )
+ raise ValueError(msg)
+
+ expected_edges = {
+ (dep_id, subtask.id)
+ for subtask in self.plan.subtasks
+ for dep_id in subtask.dependencies
+ }
edge_ids = {eid for edge in self.dependency_edges for eid in edge}
unknown_edge_ids = edge_ids - task_ids
if unknown_edge_ids:
msg = (
f"dependency_edges reference unknown task IDs: "
f"{sorted(unknown_edge_ids)}"
)
raise ValueError(msg)
+
+ if set(self.dependency_edges) != expected_edges:
+ msg = "dependency_edges do not match plan subtask dependencies"
+ raise ValueError(msg)
return self🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/decomposition/models.py` around lines 145 - 165, The
validator must ensure created_tasks exactly match the plan's subtasks and that
dependency_edges are neither missing nor extraneous: replace the current
count-only check with a set equality check comparing {t.id for t in
self.created_tasks} to {s.id for s in self.plan.subtasks} and raise ValueError
if they differ; then strengthen edge validation by (a) if self.plan exposes an
expected dependency set (e.g., self.plan.dependency_edges) compare the
set(self.dependency_edges) to that expected set and raise on mismatch, and (b)
otherwise enforce no self-loops, no duplicate edges, and that every edge
endpoint is in the subtask id set (use symbols _validate_plan_task_consistency,
created_tasks, plan.subtasks, dependency_edges to locate code).
| # 1. Classify structure | ||
| structure = self._classifier.classify(task) | ||
|
|
||
| # 2. Decompose via strategy | ||
| plan = await self._strategy.decompose(task, context) | ||
|
|
||
| # Override structure if classifier found something different | ||
| if plan.task_structure != structure: | ||
| plan = plan.model_copy(update={"task_structure": structure}) | ||
|
|
There was a problem hiding this comment.
Don’t replace the plan’s declared structure with a parent-task heuristic.
classify(task) only sees the parent task inputs, while plan.task_structure is attached to the actual subtask graph. Silently rewriting the plan here can make task_structure disagree with plan.subtasks and with any topology already chosen for that plan.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/decomposition/service.py` around lines 104 - 113, The
code is wrongly overwriting the plan's declared task_structure with the
parent-only classifier result; instead, do not replace plan.task_structure
returned from self._strategy.decompose(task, context). Update the logic in the
method containing calls to self._classifier.classify(task) and
self._strategy.decompose(...) so that you only use the classifier result if the
returned plan has no structure (e.g., plan.task_structure is None or falsy);
otherwise leave plan.task_structure and plan.subtasks intact (or if you must
detect mismatches, emit a warning/error referencing plan.task_structure and
plan.subtasks rather than silently calling plan.model_copy(...)). Ensure you
remove the unconditional model_copy update that sets task_structure from the
classifier.
| from typing import Self | ||
|
|
||
| from pydantic import BaseModel, ConfigDict, Field, model_validator | ||
|
|
||
| from ai_company.core.agent import AgentIdentity # noqa: TC001 | ||
| from ai_company.core.enums import CoordinationTopology | ||
| from ai_company.core.types import NotBlankStr # noqa: TC001 |
There was a problem hiding this comment.
Initialize logger and log these validator failures.
This business-logic module raises three validation errors but never creates logger, so invalid routing payloads are only visible if the caller preserves the exception. Please add structured WARNING/ERROR logging on each raise path.
As per coding guidelines, "Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__)", "Always use constants from domain-specific modules under ai_company.observability.events for event names", and "All error paths must log at WARNING or ERROR with context before raising".
Also applies to: 65-75, 99-111, 145-157
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/routing/models.py` around lines 7 - 13, Import
get_logger and the observability event constants, create logger =
get_logger(__name__), then add structured logging (WARNING or ERROR) with the
appropriate event constant and contextual fields immediately before each raise
in the module's validator functions (the pydantic `@model_validator` blocks and
any validation logic around CoordinationTopology/AgentIdentity/NotBlankStr).
Specifically, update the three raise paths referenced (around lines 65-75,
99-111, 145-157) to call logger.warning or logger.error with a descriptive
message, include the event from ai_company.observability.events and relevant
context (payload, field names, invalid values), and only then re-raise the
validation exception so failures are logged consistently.
| @model_validator(mode="after") | ||
| def _validate_unique_subtask_ids(self) -> Self: | ||
| """Ensure no subtask appears in both decisions and unroutable.""" | ||
| decision_ids = {d.subtask_id for d in self.decisions} | ||
| unroutable_set = set(self.unroutable) | ||
| overlap = decision_ids & unroutable_set | ||
| if overlap: | ||
| msg = ( | ||
| f"Subtask IDs appear in both decisions and " | ||
| f"unroutable: {sorted(overlap)}" | ||
| ) | ||
| raise ValueError(msg) | ||
| return self |
There was a problem hiding this comment.
Reject duplicate subtask IDs inside each collection.
Line 102 converts decisions to a set immediately, so duplicate subtask_id values are silently discarded instead of being rejected. Repeated IDs in unroutable also pass today, which can leave one RoutingResult representing the same subtask more than once.
🔧 Proposed fix
`@model_validator`(mode="after")
def _validate_unique_subtask_ids(self) -> Self:
- """Ensure no subtask appears in both decisions and unroutable."""
- decision_ids = {d.subtask_id for d in self.decisions}
- unroutable_set = set(self.unroutable)
- overlap = decision_ids & unroutable_set
+ """Ensure each subtask appears at most once in the result."""
+ decision_ids = tuple(d.subtask_id for d in self.decisions)
+ if len(decision_ids) != len(set(decision_ids)):
+ raise ValueError("decisions contains duplicate subtask IDs")
+
+ if len(self.unroutable) != len(set(self.unroutable)):
+ raise ValueError("unroutable contains duplicate subtask IDs")
+
+ overlap = set(decision_ids) & set(self.unroutable)
if overlap:
msg = (
f"Subtask IDs appear in both decisions and "
f"unroutable: {sorted(overlap)}"
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @model_validator(mode="after") | |
| def _validate_unique_subtask_ids(self) -> Self: | |
| """Ensure no subtask appears in both decisions and unroutable.""" | |
| decision_ids = {d.subtask_id for d in self.decisions} | |
| unroutable_set = set(self.unroutable) | |
| overlap = decision_ids & unroutable_set | |
| if overlap: | |
| msg = ( | |
| f"Subtask IDs appear in both decisions and " | |
| f"unroutable: {sorted(overlap)}" | |
| ) | |
| raise ValueError(msg) | |
| return self | |
| `@model_validator`(mode="after") | |
| def _validate_unique_subtask_ids(self) -> Self: | |
| """Ensure each subtask appears at most once in the result.""" | |
| decision_ids = tuple(d.subtask_id for d in self.decisions) | |
| if len(decision_ids) != len(set(decision_ids)): | |
| raise ValueError("decisions contains duplicate subtask IDs") | |
| if len(self.unroutable) != len(set(self.unroutable)): | |
| raise ValueError("unroutable contains duplicate subtask IDs") | |
| overlap = set(decision_ids) & set(self.unroutable) | |
| if overlap: | |
| msg = ( | |
| f"Subtask IDs appear in both decisions and " | |
| f"unroutable: {sorted(overlap)}" | |
| ) | |
| raise ValueError(msg) | |
| return self |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/routing/models.py` around lines 99 - 111, The validator
_validate_unique_subtask_ids currently silences duplicates by building sets;
instead explicitly detect and reject duplicate subtask_id values inside both
decisions and unroutable before checking cross-collection overlap: scan
self.decisions and self.unroutable to collect counts (or use a seen set) to find
any subtask_ids occurring more than once and raise a ValueError listing those
duplicates (referencing symbols decisions, unroutable, and the validator
_validate_unique_subtask_ids), then continue to compute decision_ids and
unroutable_set and raise the existing overlap error if any IDs appear in both.
| # Explicit override takes precedence | ||
| if task.coordination_topology != CoordinationTopology.AUTO: | ||
| logger.debug( | ||
| TASK_ROUTING_TOPOLOGY_SELECTED, | ||
| task_id=task.id, | ||
| topology=task.coordination_topology.value, | ||
| source="explicit", | ||
| ) | ||
| return task.coordination_topology | ||
|
|
||
| # Auto-select based on structure | ||
| structure = plan.task_structure | ||
| artifact_count = len(task.artifacts_expected) | ||
|
|
||
| if structure == TaskStructure.SEQUENTIAL: | ||
| topology = self._config.sequential_override | ||
| elif structure == TaskStructure.PARALLEL: | ||
| if artifact_count > self._config.parallel_artifact_threshold: | ||
| topology = CoordinationTopology.DECENTRALIZED | ||
| else: | ||
| topology = self._config.parallel_default | ||
| elif structure == TaskStructure.MIXED: | ||
| topology = self._config.mixed_default |
There was a problem hiding this comment.
Respect an explicit topology already set on the decomposition plan.
If task.coordination_topology is auto but plan.coordination_topology is concrete, this code still falls through to heuristic auto-selection and discards the plan’s explicit choice.
Suggested fix
if task.coordination_topology != CoordinationTopology.AUTO:
logger.debug(
TASK_ROUTING_TOPOLOGY_SELECTED,
task_id=task.id,
topology=task.coordination_topology.value,
source="explicit",
)
return task.coordination_topology
+
+ if plan.coordination_topology != CoordinationTopology.AUTO:
+ logger.debug(
+ TASK_ROUTING_TOPOLOGY_SELECTED,
+ task_id=task.id,
+ topology=plan.coordination_topology.value,
+ source="plan",
+ )
+ return plan.coordination_topology
# Auto-select based on structure
structure = plan.task_structure🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/routing/topology_selector.py` around lines 57 - 79, The
selector currently ignores a concrete topology set on the decomposition plan
when task.coordination_topology is AUTO; update the selection logic so that
after checking the task-level explicit override (task.coordination_topology) you
check plan.coordination_topology and if it's not CoordinationTopology.AUTO, log
using TASK_ROUTING_TOPOLOGY_SELECTED with source="plan" and return
plan.coordination_topology (instead of continuing to heuristic auto-selection);
keep the existing heuristics (_config.sequential_override,
_config.parallel_default, _config.mixed_default and parallel_artifact_threshold)
for true AUTO cases.
| if TYPE_CHECKING: | ||
| from ai_company.core.task import Task |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Remove unused TYPE_CHECKING guard.
The TYPE_CHECKING block imports Task, but it's not used for type hints in this file. The runtime import at line 180 (from ai_company.core.task import Task) handles the actual usage. This guard is misleading.
♻️ Proposed fix
-from typing import TYPE_CHECKING
-
import pytest
from ai_company.core.enums import (
...
)
-if TYPE_CHECKING:
- from ai_company.core.task import Task
from ai_company.engine.decomposition.models import (🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_decomposition_models.py` around lines 16 - 17, Remove
the unused TYPE_CHECKING guard that imports Task: delete the "if TYPE_CHECKING:
from ai_company.core.task import Task" block (the test uses the runtime import
"from ai_company.core.task import Task" at line ~180), so eliminate the
misleading conditional import and any reference to TYPE_CHECKING in this file to
keep imports clear and avoid dead code.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (5)
src/ai_company/engine/decomposition/dag.py (1)
140-179:⚠️ Potential issue | 🟠 MajorCheck missing references before cycle-based algorithms.
topological_sort()andparallel_groups()are public entry points, but both skip_check_missing_references(). A graph like("a" depends on "missing")currently incrementsin_degree["a"], produces no ready nodes, and raisesDecompositionCycleErroreven though the actual problem is an unknown dependency.Suggested fix
def topological_sort(self) -> tuple[str, ...]: """Return subtask IDs in topological execution order. @@ Raises: DecompositionCycleError: If a cycle prevents sorting. """ + self._check_missing_references() in_degree: dict[str, int] = dict.fromkeys(self._subtask_ids, 0) @@ def parallel_groups(self) -> tuple[tuple[str, ...], ...]: """Compute groups of subtasks that can execute in parallel. @@ Raises: DecompositionCycleError: If a cycle prevents grouping. """ + self._check_missing_references() in_degree: dict[str, int] = dict.fromkeys(self._subtask_ids, 0)Also applies to: 181-220
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/decomposition/dag.py` around lines 140 - 179, topological_sort() (and likewise parallel_groups()) currently proceed without validating that all dependency IDs exist, causing missing references to be misreported as cycles; call the existing _check_missing_references() at the start of topological_sort and at the start of parallel_groups to detect and raise the correct missing-reference error before running Kahn's algorithm or group computation, ensuring these public entry points validate dependencies up-front (refer to methods topological_sort, parallel_groups and helper _check_missing_references).src/ai_company/engine/decomposition/service.py (2)
121-133:⚠️ Potential issue | 🟠 MajorCopy subtask dependencies onto each created
Task.The DAG survives in
dependency_edges, but every childTaskis still created with theTask.dependenciesdefault of(). Any downstream flow that persists or schedules onlycreated_tasksloses the dependency information.Suggested fix
child_task = Task( id=subtask_def.id, title=subtask_def.title, description=subtask_def.description, + dependencies=subtask_def.dependencies, type=task.type, priority=task.priority, project=task.project, created_by=task.created_by,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/decomposition/service.py` around lines 121 - 133, The created child Task currently ignores the subtask's dependencies (leaving Task.dependencies as the default), so update the Task construction for child_task in create/subtask handling to copy the dependency data from the subtask definition (e.g., use subtask_def.dependencies or the equivalent dependency field used in the diff) into the Task.dependencies field so downstream consumers of created_tasks retain the DAG; ensure you convert the dependency collection to the same type Task.dependencies expects (tuple/list) and preserve IDs referenced in dependency_edges.
110-113:⚠️ Potential issue | 🟠 MajorDon't replace the plan's declared structure with a parent-task heuristic.
classify(task)only sees the parent task inputs.plan.task_structureis attached to the actual subtask graph returned by the strategy, so overwriting it here can makeplan.task_structure,plan.subtasks, and any downstream topology derived from the plan disagree.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/decomposition/service.py` around lines 110 - 113, The code overwrites the plan's declared subtask topology with a parent-only heuristic (classify(task)) by assigning plan = plan.model_copy(update={"task_structure": structure}); instead, avoid replacing plan.task_structure with the classifier result — either remove this assignment entirely or only apply it when plan.task_structure is empty/None (e.g., check if plan.task_structure is falsy before calling model_copy). Reference the symbols plan, task_structure, structure and classify(task) when making the change so plan.subtasks and downstream topology remain consistent with the strategy-produced graph.src/ai_company/engine/routing/models.py (2)
100-111:⚠️ Potential issue | 🟠 MajorReject duplicate subtask IDs inside
decisionsandunroutable.Building sets here silently drops repeated
subtask_idvalues, soRoutingResultcan still represent the same subtask multiple times as long as it does not overlap across the two collections.Suggested fix
`@model_validator`(mode="after") def _validate_unique_subtask_ids(self) -> Self: - """Ensure no subtask appears in both decisions and unroutable.""" - decision_ids = {d.subtask_id for d in self.decisions} - unroutable_set = set(self.unroutable) - overlap = decision_ids & unroutable_set + """Ensure each subtask appears at most once in the result.""" + decision_ids = tuple(d.subtask_id for d in self.decisions) + if len(decision_ids) != len(set(decision_ids)): + msg = "decisions contains duplicate subtask IDs" + raise ValueError(msg) + + if len(self.unroutable) != len(set(self.unroutable)): + msg = "unroutable contains duplicate subtask IDs" + raise ValueError(msg) + + overlap = set(decision_ids) & set(self.unroutable) if overlap: msg = ( f"Subtask IDs appear in both decisions and " f"unroutable: {sorted(overlap)}" )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/routing/models.py` around lines 100 - 111, The current _validate_unique_subtask_ids validator only checks for overlap between decisions and unroutable but lets duplicate subtask_id entries within decisions or within unroutable pass silently; update _validate_unique_subtask_ids to also detect duplicate subtask_id values inside self.decisions and inside self.unroutable (e.g. by counting subtask_id occurrences or comparing list vs set lengths), and raise a ValueError listing which subtask_ids are duplicated and whether they are duplicated in decisions or unroutable; keep the existing overlap check for cross-collection duplicates as well.
7-13:⚠️ Potential issue | 🟠 MajorInitialize
loggerand log these validator failures.This business-logic module raises validation errors from three model validators without a
logger, so invalid routing payloads are only visible if the caller preserves the exception. Log each failure at WARNING/ERROR with structured context before raising.As per coding guidelines, "Every module with business logic must import
from ai_company.observability import get_loggerand createlogger = get_logger(__name__)" and "All error paths must log at WARNING or ERROR with context before raising".Also applies to: 65-75, 100-111, 146-157
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/routing/models.py` around lines 7 - 13, This module lacks a logger and fails to record validation failures; import get_logger with "from ai_company.observability import get_logger" and create "logger = get_logger(__name__)" at top, then in each model validator (the three `@model_validator` methods referenced around lines 65-75, 100-111, and 146-157) catch the validation failure paths and log before re-raising: call logger.warning or logger.error with structured context (include the model/class name, the incoming payload/values and any relevant field names, and the exception or reason) inside those validator functions (refer to each model_validator-decorated function by its name) so the failure is recorded before raising the original validation error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/engine/routing/scorer.py`:
- Around line 57-61: The constructor __init__ in scorer.py validates min_score
and raises ValueError but doesn't log; before raising, emit a warning or error
log with context (include the invalid min_score value and the accepted range)
using the module logger (e.g., logger.warning or logger.error) so callers see
why construction failed; then raise the ValueError as before. Ensure you update
the __init__ method that checks min_score to log the message immediately prior
to raising.
---
Duplicate comments:
In `@src/ai_company/engine/decomposition/dag.py`:
- Around line 140-179: topological_sort() (and likewise parallel_groups())
currently proceed without validating that all dependency IDs exist, causing
missing references to be misreported as cycles; call the existing
_check_missing_references() at the start of topological_sort and at the start of
parallel_groups to detect and raise the correct missing-reference error before
running Kahn's algorithm or group computation, ensuring these public entry
points validate dependencies up-front (refer to methods topological_sort,
parallel_groups and helper _check_missing_references).
In `@src/ai_company/engine/decomposition/service.py`:
- Around line 121-133: The created child Task currently ignores the subtask's
dependencies (leaving Task.dependencies as the default), so update the Task
construction for child_task in create/subtask handling to copy the dependency
data from the subtask definition (e.g., use subtask_def.dependencies or the
equivalent dependency field used in the diff) into the Task.dependencies field
so downstream consumers of created_tasks retain the DAG; ensure you convert the
dependency collection to the same type Task.dependencies expects (tuple/list)
and preserve IDs referenced in dependency_edges.
- Around line 110-113: The code overwrites the plan's declared subtask topology
with a parent-only heuristic (classify(task)) by assigning plan =
plan.model_copy(update={"task_structure": structure}); instead, avoid replacing
plan.task_structure with the classifier result — either remove this assignment
entirely or only apply it when plan.task_structure is empty/None (e.g., check if
plan.task_structure is falsy before calling model_copy). Reference the symbols
plan, task_structure, structure and classify(task) when making the change so
plan.subtasks and downstream topology remain consistent with the
strategy-produced graph.
In `@src/ai_company/engine/routing/models.py`:
- Around line 100-111: The current _validate_unique_subtask_ids validator only
checks for overlap between decisions and unroutable but lets duplicate
subtask_id entries within decisions or within unroutable pass silently; update
_validate_unique_subtask_ids to also detect duplicate subtask_id values inside
self.decisions and inside self.unroutable (e.g. by counting subtask_id
occurrences or comparing list vs set lengths), and raise a ValueError listing
which subtask_ids are duplicated and whether they are duplicated in decisions or
unroutable; keep the existing overlap check for cross-collection duplicates as
well.
- Around line 7-13: This module lacks a logger and fails to record validation
failures; import get_logger with "from ai_company.observability import
get_logger" and create "logger = get_logger(__name__)" at top, then in each
model validator (the three `@model_validator` methods referenced around lines
65-75, 100-111, and 146-157) catch the validation failure paths and log before
re-raising: call logger.warning or logger.error with structured context (include
the model/class name, the incoming payload/values and any relevant field names,
and the exception or reason) inside those validator functions (refer to each
model_validator-decorated function by its name) so the failure is recorded
before raising the original validation error.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1d16dcd3-813d-4c10-a8e9-e964a0a34532
📒 Files selected for processing (19)
DESIGN_SPEC.mdsrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/routing/service.pysrc/ai_company/observability/events/decomposition.pysrc/ai_company/observability/events/task_routing.pytests/unit/engine/test_decomposition_classifier.pytests/unit/engine/test_decomposition_dag.pytests/unit/engine/test_decomposition_models.pytests/unit/engine/test_decomposition_service.pytests/unit/engine/test_routing_models.pytests/unit/engine/test_routing_scorer.pytests/unit/engine/test_routing_service.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Do not usefrom __future__ import annotationsin Python 3.14+ code — Python 3.14 has native PEP 649 lazy annotations
Useexcept A, B:(no parentheses) for exception syntax in Python 3.14 — PEP 758 except syntax enforced by ruff
All public functions and classes must have type hints; mypy strict mode is enforced
All public classes and functions must have Google-style docstrings; ruff D rules enforce this
Create new objects instead of mutating existing ones; use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement of non-Pydantic internal collections (registries, BaseTool)
For non-Pydantic internal collections and dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence)
Use frozen Pydantic models for config/identity; use separate mutable-via-copy models (model_copy(update=...)) for runtime state that evolves; never mix static config fields with mutable runtime fields
Use Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict); use@computed_fieldfor derived values instead of storing redundant fields
Use NotBlankStr (from core.types) for all identifier/name fields — including optional (NotBlankStr | None) and tuple (tuple[NotBlankStr, ...]) variants — instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls); prefer structured concurrency over bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly; never silently swallow errors
Validate at system boundaries (user input, external APIs, config files)
Line length must be 88 characters (enforced by ruff)
Files:
tests/unit/engine/test_decomposition_models.pytests/unit/engine/test_routing_models.pytests/unit/engine/test_decomposition_classifier.pysrc/ai_company/engine/decomposition/rollup.pytests/unit/engine/test_decomposition_dag.pysrc/ai_company/engine/decomposition/models.pytests/unit/engine/test_decomposition_service.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/routing/service.pytests/unit/engine/test_routing_service.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/decomposition/service.pytests/unit/engine/test_routing_scorer.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/observability/events/task_routing.pysrc/ai_company/observability/events/decomposition.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, or@pytest.mark.slowas appropriate
Prefer@pytest.mark.parametrizefor testing similar cases
Use vendor-agnostic test names: test-provider, test-small-001, etc. instead of real vendor names
Files:
tests/unit/engine/test_decomposition_models.pytests/unit/engine/test_routing_models.pytests/unit/engine/test_decomposition_classifier.pytests/unit/engine/test_decomposition_dag.pytests/unit/engine/test_decomposition_service.pytests/unit/engine/test_routing_service.pytests/unit/engine/test_routing_scorer.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic must importfrom ai_company.observability import get_loggerand createlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Always use variable namelogger(not_logger, notlog) for the logger instance
Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly
Always use structured logging with kwargs:logger.info(EVENT, key=value)— never use formatted strings likelogger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
DEBUG logging is for object creation, internal flow, and entry/exit of key functions
Files:
src/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/observability/events/task_routing.pysrc/ai_company/observability/events/decomposition.py
src/ai_company/engine/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
RetryExhaustedError signals that all retries failed — the engine layer catches this to trigger fallback chains
Files:
src/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/decomposition/manual.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples; use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases
Files:
src/ai_company/engine/decomposition/rollup.pysrc/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.pysrc/ai_company/engine/routing/service.pysrc/ai_company/engine/decomposition/classifier.pysrc/ai_company/engine/routing/scorer.pysrc/ai_company/engine/decomposition/dag.pysrc/ai_company/engine/decomposition/service.pysrc/ai_company/engine/decomposition/manual.pysrc/ai_company/observability/events/task_routing.pysrc/ai_company/observability/events/decomposition.py
🧠 Learnings (11)
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic must import `from ai_company.observability import get_logger` and create `logger = get_logger(__name__)`
Applied to files:
src/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising
Applied to files:
src/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use variable name `logger` (not `_logger`, not `log`) for the logger instance
Applied to files:
src/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : DEBUG logging is for object creation, internal flow, and entry/exit of key functions
Applied to files:
src/ai_company/engine/decomposition/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO level
Applied to files:
src/ai_company/engine/decomposition/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code
Applied to files:
src/ai_company/engine/decomposition/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to **/*.py : Validate at system boundaries (user input, external APIs, config files)
Applied to files:
src/ai_company/engine/decomposition/models.pysrc/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to **/*.py : Use frozen Pydantic models for config/identity; use separate mutable-via-copy models (model_copy(update=...)) for runtime state that evolves; never mix static config fields with mutable runtime fields
Applied to files:
src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use structured logging with kwargs: `logger.info(EVENT, key=value)` — never use formatted strings like `logger.info("msg %s", val)`
Applied to files:
src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly
Applied to files:
src/ai_company/observability/events/task_routing.pysrc/ai_company/observability/events/decomposition.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to **/*.py : Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls); prefer structured concurrency over bare create_task
Applied to files:
DESIGN_SPEC.md
🧬 Code graph analysis (12)
tests/unit/engine/test_decomposition_models.py (1)
src/ai_company/engine/decomposition/models.py (6)
DecompositionContext(244-269)DecompositionPlan(66-122)DecompositionResult(125-165)SubtaskDefinition(22-63)SubtaskStatusRollup(168-241)derived_parent_status(217-241)
tests/unit/engine/test_decomposition_classifier.py (4)
src/ai_company/core/artifact.py (1)
ExpectedArtifact(13-28)src/ai_company/core/enums.py (4)
ArtifactType(180-185)Priority(162-168)TaskStructure(270-279)TaskType(151-159)src/ai_company/core/task.py (2)
AcceptanceCriterion(26-42)Task(45-257)src/ai_company/tools/base.py (1)
description(115-117)
src/ai_company/engine/decomposition/rollup.py (3)
src/ai_company/core/enums.py (1)
TaskStatus(122-148)src/ai_company/engine/decomposition/models.py (2)
SubtaskStatusRollup(168-241)derived_parent_status(217-241)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
tests/unit/engine/test_decomposition_dag.py (3)
src/ai_company/engine/decomposition/dag.py (6)
DependencyGraph(26-244)validate(70-86)topological_sort(140-179)parallel_groups(181-220)get_dependents(222-232)get_dependencies(234-244)src/ai_company/engine/decomposition/models.py (1)
SubtaskDefinition(22-63)src/ai_company/engine/errors.py (2)
DecompositionCycleError(54-55)DecompositionError(50-51)
src/ai_company/engine/decomposition/models.py (2)
src/ai_company/core/enums.py (4)
Complexity(171-177)CoordinationTopology(282-293)TaskStatus(122-148)TaskStructure(270-279)src/ai_company/core/task.py (1)
Task(45-257)
tests/unit/engine/test_decomposition_service.py (3)
src/ai_company/core/enums.py (4)
Priority(162-168)TaskStatus(122-148)TaskStructure(270-279)Complexity(171-177)src/ai_company/engine/decomposition/service.py (3)
DecompositionService(34-179)decompose_task(51-88)rollup_status(165-179)src/ai_company/engine/errors.py (1)
DecompositionCycleError(54-55)
src/ai_company/engine/routing/models.py (3)
src/ai_company/core/agent.py (1)
AgentIdentity(246-304)src/ai_company/core/enums.py (1)
CoordinationTopology(282-293)src/ai_company/engine/routing/scorer.py (1)
score(68-156)
src/ai_company/engine/routing/service.py (5)
src/ai_company/engine/routing/models.py (2)
RoutingDecision(41-76)RoutingResult(79-112)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/core/task.py (1)
Task(45-257)src/ai_company/engine/routing/scorer.py (3)
AgentTaskScorer(35-156)score(68-156)min_score(64-66)src/ai_company/engine/routing/topology_selector.py (2)
TopologySelector(24-98)select(43-98)
src/ai_company/engine/decomposition/classifier.py (3)
src/ai_company/core/enums.py (1)
TaskStructure(270-279)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/core/task.py (1)
Task(45-257)
src/ai_company/engine/routing/scorer.py (5)
src/ai_company/core/enums.py (3)
AgentStatus(24-29)Complexity(171-177)SeniorityLevel(6-21)src/ai_company/engine/routing/models.py (1)
RoutingCandidate(16-38)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/core/agent.py (1)
AgentIdentity(246-304)src/ai_company/engine/decomposition/models.py (1)
SubtaskDefinition(22-63)
src/ai_company/engine/decomposition/dag.py (3)
src/ai_company/engine/errors.py (2)
DecompositionCycleError(54-55)DecompositionError(50-51)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/engine/decomposition/models.py (1)
SubtaskDefinition(22-63)
src/ai_company/engine/decomposition/manual.py (3)
src/ai_company/engine/errors.py (2)
DecompositionDepthError(58-59)DecompositionError(50-51)src/ai_company/core/task.py (1)
Task(45-257)src/ai_company/engine/decomposition/models.py (2)
DecompositionContext(244-269)DecompositionPlan(66-122)
🔇 Additional comments (25)
tests/unit/engine/test_decomposition_models.py (2)
3-17: Remove unusedTYPE_CHECKINGguard.The
TYPE_CHECKINGblock importsTask(line 17), but it's never used for type hints in this file. The actualTaskusage is a runtime import inside the helper function at line 180. This guard is dead code.♻️ Proposed fix
-from typing import TYPE_CHECKING - import pytest from ai_company.core.enums import ( Complexity, CoordinationTopology, Priority, TaskStatus, TaskStructure, TaskType, ) -if TYPE_CHECKING: - from ai_company.core.task import Task from ai_company.engine.decomposition.models import (
196-212: LGTM!The test correctly validates
DecompositionResultconstruction and uses the helper function_make_result_taskappropriately. The fixturesample_task_with_criteriais expected to be provided by conftest.src/ai_company/engine/decomposition/rollup.py (1)
22-76: LGTM!The
StatusRollup.computemethod is well-structured with:
- Proper warning log for empty subtask statuses (lines 45-51)
- Correct debug log after computation (lines 69-74)
- Clean status counting logic delegating derived status to the model's computed field
src/ai_company/engine/routing/scorer.py (1)
94-117: LGTM!The skill overlap scoring logic correctly handles:
- Primary skill matching with 0.4 weight
- Secondary skill matching (excluding already-matched primary skills) with 0.2 weight
- Proper handling when no skills are required (skipping skill-based scoring)
src/ai_company/observability/events/task_routing.py (1)
1-13: LGTM!Event constants are well-structured with consistent naming conventions. The
Final[str]typing ensures immutability and type safety. Based on learnings: "Always use constants from domain-specific modules under ai_company.observability.events for event names" — this module properly provides those constants for task routing.src/ai_company/engine/decomposition/models.py (3)
1-20: Add logger and log validator failures before raising.This module contains multiple validators that raise
ValueErrorwithout logging. Per coding guidelines, all error paths must log at WARNING or ERROR with context before raising.🔧 Proposed fix (partial - add logger)
from ai_company.core.enums import ( Complexity, CoordinationTopology, TaskStatus, TaskStructure, ) from ai_company.core.task import Task # noqa: TC001 from ai_company.core.types import NotBlankStr # noqa: TC001 +from ai_company.observability import get_logger + +logger = get_logger(__name__)Then update each validator to log before raising (e.g., in
_validate_no_self_dependency,_validate_subtasks,_validate_plan_task_consistency,_validate_counts).As per coding guidelines: "Every module with business logic must import
from ai_company.observability import get_loggerand createlogger = get_logger(__name__)" and "All error paths must log at WARNING or ERROR with context before raising."
145-165: StrengthenDecompositionResultconsistency checks.The validator only compares task counts and checks edge endpoints. It doesn't verify that
created_tasksIDs matchplan.subtasksIDs, allowing mismatched task sets to pass validation.🔧 Proposed fix
`@model_validator`(mode="after") def _validate_plan_task_consistency(self) -> Self: """Ensure created_tasks align with plan subtasks.""" if len(self.created_tasks) != len(self.plan.subtasks): msg = ( f"created_tasks count ({len(self.created_tasks)}) " f"does not match plan subtask count ({len(self.plan.subtasks)})" ) raise ValueError(msg) + expected_ids = {s.id for s in self.plan.subtasks} task_ids = {t.id for t in self.created_tasks} + if task_ids != expected_ids: + msg = ( + f"created_tasks IDs {sorted(task_ids)} do not match " + f"plan subtask IDs {sorted(expected_ids)}" + ) + raise ValueError(msg) + edge_ids = {eid for edge in self.dependency_edges for eid in edge} unknown_edge_ids = edge_ids - task_ids
213-242: LGTM!The
derived_parent_statuscomputed field correctly implements the status derivation logic with proper priority ordering:
- Empty total → CREATED
- All completed → COMPLETED
- All cancelled → CANCELLED
- Any failed → FAILED
- Any in progress → IN_PROGRESS
- Any blocked → BLOCKED
- Mixed terminal (completed + cancelled = total) → COMPLETED
- Default fallback → IN_PROGRESS
src/ai_company/engine/decomposition/manual.py (1)
26-92: LGTM!The
ManualDecompositionStrategyimplementation is well-structured:
- Proper validation with warning logs before raising errors (lines 63, 71, 79)
- Clear error messages with context
- Debug log on successful completion
- Clean separation of validation concerns (task ID mismatch, depth limit, subtask count)
src/ai_company/engine/decomposition/classifier.py (2)
46-104: LGTM!The
TaskStructureClassifier.classifymethod implements a sensible heuristic approach:
- Short-circuits when
task_structureis explicitly set (lines 68-75)- MIXED classification triggered only by conflicting language signals (line 86-87)
- Defaults to SEQUENTIAL as the safest fallback (line 94)
- Comprehensive debug logging with all scoring details (lines 96-103)
106-150: LGTM!The language scoring methods properly scan all relevant text sources (title, description, acceptance criteria) and use pre-compiled regex patterns for efficiency.
src/ai_company/observability/events/decomposition.py (1)
1-15: LGTM!Event constants are comprehensive and cover the full decomposition lifecycle (started, completed, failed) plus specific events (subtask creation, validation errors, graph operations, rollup computation). The
Final[str]typing ensures immutability. Based on learnings: "Always use constants from domain-specific modules under ai_company.observability.events for event names."src/ai_company/engine/routing/service.py (4)
1-31: LGTM!Module structure follows conventions correctly: proper docstring, TYPE_CHECKING imports for avoiding circular dependencies, and logger instantiated via
get_logger(__name__)as required. Event constants are imported directly from the domain-specific module.
33-49: LGTM!Clean class design with dependency injection for
AgentTaskScorerandTopologySelector. The__slots__definition is good for memory efficiency. Google-style docstring present.
51-100: LGTM!The
route()method follows proper patterns:
- Complete Google-style docstring with Args/Returns.
- Early exit for empty agents case with appropriate WARNING log.
- Exception handling logs with
logger.exception()(includes traceback) before re-raising.- All logging uses structured kwargs as required by guidelines.
102-178: LGTM!The
_do_route()implementation is well-structured:
- Topology is correctly computed once per parent task and applied uniformly to all routing decisions (per §6.9 design).
- Mutable lists used during iteration are properly converted to tuples for the final
RoutingResult.- Logging follows the structured kwargs pattern at appropriate levels (WARNING for unroutable, DEBUG for each routed subtask, INFO for completion).
- Clean separation between scoring, filtering, and decision creation.
tests/unit/engine/test_routing_service.py (6)
1-25: LGTM!Imports are well-organized and test file follows the vendor-agnostic naming convention (
test-provider,test-model-001). All necessary domain types are imported for testing.
27-110: LGTM!Helper functions are well-designed test factories with sensible defaults. The
_make_decomposition_resultcreates a realistic scenario with two subtasks wheresub-2depends onsub-1, enabling meaningful routing tests.
116-154: LGTM!Comprehensive happy-path test validating skill-based routing. Correctly verifies that
sub-1(requiring Python/SQL) routes to Backend Dev andsub-2(requiring JavaScript/React) routes to Frontend Dev.
156-178: LGTM!Good negative test case validating the
min_scorefiltering. An agent with unrelated skills ("cooking") and mismatched role ("chef") correctly fails to meet the routing threshold for developer subtasks.
180-224: LGTM!Good test validating that multiple viable candidates result in the best being selected with others captured as alternatives. The test correctly creates a scenario where both agents can route the subtask.
226-317: LGTM!Good coverage of edge cases and error handling:
test_topology_applied: Validates topology propagation to all decisions.test_empty_agents: Validates the early-exit path when no agents are available.test_inactive_agents_filtered: Ensures terminated agents are excluded (score=0).test_parent_task_id_in_result: Ensures result carries the correct parent task ID.test_exception_propagates: UsesMagicMockto simulate scorer failure and validates exception re-raising behavior.DESIGN_SPEC.md (3)
783-784: LGTM!Task definition schema correctly updated with
task_structureandcoordination_topologyfields, matching the implemented domain models.
1162-1207: LGTM!Documentation correctly updated to reflect the implementation:
- Current state accurately lists implemented components (
TaskStructureClassifier,DependencyGraph,ManualDecompositionStrategy, etc.).- Terminology consistently changed from
tool_counttoartifact_countthroughout §6.9.- LLM-based decomposition correctly noted as deferred (M4+).
2368-2466: LGTM!Project structure documentation accurately reflects the new packages:
engine/decomposition/with classifier, DAG, manual strategy, models, protocol, rollup, and service modules.engine/routing/with models, scorer, service, and topology_selector modules.observability/events/updated withdecomposition.pyandtask_routing.pyevent constants.
| if context.current_depth >= context.max_depth: | ||
| msg = ( | ||
| f"Decomposition depth {context.current_depth} " | ||
| f"exceeds max depth {context.max_depth}" | ||
| ) | ||
| logger.warning(DECOMPOSITION_VALIDATION_ERROR, error=msg) | ||
| raise DecompositionDepthError(msg) |
There was a problem hiding this comment.
The guard condition uses >= (meets or exceeds), but the error message only says "exceeds". When current_depth == max_depth exactly, the message is factually incorrect.
| if context.current_depth >= context.max_depth: | |
| msg = ( | |
| f"Decomposition depth {context.current_depth} " | |
| f"exceeds max depth {context.max_depth}" | |
| ) | |
| logger.warning(DECOMPOSITION_VALIDATION_ERROR, error=msg) | |
| raise DecompositionDepthError(msg) | |
| if context.current_depth >= context.max_depth: | |
| msg = ( | |
| f"Decomposition depth {context.current_depth} " | |
| f"meets or exceeds max depth {context.max_depth}" | |
| ) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/decomposition/manual.py
Line: 66-72
Comment:
The guard condition uses `>=` (meets or exceeds), but the error message only says "exceeds". When `current_depth == max_depth` exactly, the message is factually incorrect.
```suggestion
if context.current_depth >= context.max_depth:
msg = (
f"Decomposition depth {context.current_depth} "
f"meets or exceeds max depth {context.max_depth}"
)
```
How can I resolve this? If you propose a fix, please make it concise.…ef checks) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 34 out of 34 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not 0.0 <= min_score <= 1.0: | ||
| msg = f"min_score must be between 0.0 and 1.0, got {min_score}" | ||
| logger.warning( | ||
| "agent_task_scorer.invalid_min_score", |
There was a problem hiding this comment.
The logger warning for an invalid min_score uses a string literal event name ("agent_task_scorer.invalid_min_score") that is not defined in ai_company.observability.events.*. Elsewhere the codebase centralizes event names in those modules, and tests/unit/observability/test_events.py enforces discovery/duplication rules only for that package. Consider adding a TASK_ROUTING_* constant (or a routing/scorer-specific event constant) under observability/events/task_routing.py and using it here so event names remain centralized and discoverable.
| "agent_task_scorer.invalid_min_score", | |
| TASK_ROUTING_AGENT_SCORED, |
| plan = decomposition_result.plan | ||
|
|
||
| logger.info( | ||
| TASK_ROUTING_STARTED, | ||
| parent_task_id=plan.parent_task_id, | ||
| subtask_count=len(plan.subtasks), | ||
| agent_count=len(available_agents), | ||
| ) |
There was a problem hiding this comment.
TaskRoutingService.route() uses decomposition_result.plan.parent_task_id for logging/result fields but uses the passed parent_task object for topology selection. There’s currently no validation that parent_task.id matches plan.parent_task_id, so a caller mistake could silently produce routing decisions using the wrong task’s topology override and misleading logs/results. Consider validating this early (and raising TaskRoutingError/ValueError) to keep inputs consistent.
| @model_validator(mode="after") | ||
| def _validate_plan_task_consistency(self) -> Self: | ||
| """Ensure created_tasks align with plan subtasks.""" | ||
| if len(self.created_tasks) != len(self.plan.subtasks): | ||
| msg = ( | ||
| f"created_tasks count ({len(self.created_tasks)}) " | ||
| f"does not match plan subtask count ({len(self.plan.subtasks)})" | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
||
| task_ids = {t.id for t in self.created_tasks} | ||
| edge_ids = {eid for edge in self.dependency_edges for eid in edge} | ||
| unknown_edge_ids = edge_ids - task_ids | ||
| if unknown_edge_ids: | ||
| msg = ( | ||
| f"dependency_edges reference unknown task IDs: " | ||
| f"{sorted(unknown_edge_ids)}" | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
There was a problem hiding this comment.
DecompositionResult._validate_plan_task_consistency() checks that the number of created_tasks matches the number of plan.subtasks, but it doesn’t verify that the task IDs actually correspond to the plan’s subtask IDs (only that dependency_edges refer to some created task IDs). This can allow constructing an inconsistent DecompositionResult where the plan and created tasks disagree but validation passes. Consider validating that {t.id for t in created_tasks} equals {s.id for s in plan.subtasks} (and optionally that edges reference only plan subtask IDs).
| @model_validator(mode="after") | ||
| def _validate_plan_task_consistency(self) -> Self: | ||
| """Ensure created_tasks align with plan subtasks.""" | ||
| if len(self.created_tasks) != len(self.plan.subtasks): | ||
| msg = ( | ||
| f"created_tasks count ({len(self.created_tasks)}) " | ||
| f"does not match plan subtask count ({len(self.plan.subtasks)})" | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
||
| task_ids = {t.id for t in self.created_tasks} | ||
| edge_ids = {eid for edge in self.dependency_edges for eid in edge} | ||
| unknown_edge_ids = edge_ids - task_ids | ||
| if unknown_edge_ids: | ||
| msg = ( | ||
| f"dependency_edges reference unknown task IDs: " | ||
| f"{sorted(unknown_edge_ids)}" | ||
| ) | ||
| raise ValueError(msg) | ||
|
|
||
| return self |
There was a problem hiding this comment.
DecompositionResult validator doesn't cross-check created-task IDs against plan subtask IDs
_validate_plan_task_consistency verifies that len(created_tasks) == len(plan.subtasks) and that all edge IDs are in the created-task set — but it never checks that the actual IDs of the created tasks match the IDs declared in plan.subtasks. This means the following is silently accepted by Pydantic:
plan = DecompositionPlan(
parent_task_id="task-1",
subtasks=(
SubtaskDefinition(id="sub-1", title="A", description="A"),
SubtaskDefinition(id="sub-2", title="B", description="B"),
),
)
DecompositionResult(
plan=plan,
created_tasks=(task_with_id_sub_99, task_with_id_sub_98), # completely different IDs
dependency_edges=(), # no edges → edge check also passes
)The count check passes (2 == 2) and the edge check passes (no edges). The result looks valid but there is no semantic alignment between the plan's subtask definitions and the created tasks, which would silently break any consumer that correlates plan.subtasks[i].id with created_tasks[i].id (e.g. routing, status rollup, DAG edge lookups).
The DecompositionService always creates tasks with id=subtask_def.id, so the production path is correct. However, the model-level invariant should be enforced explicitly. Adding a set-equality check is sufficient:
task_ids = {t.id for t in self.created_tasks}
subtask_ids = {s.id for s in self.plan.subtasks}
if task_ids != subtask_ids:
msg = (
f"created_tasks IDs {sorted(task_ids)} do not match "
f"plan subtask IDs {sorted(subtask_ids)}"
)
raise ValueError(msg)There is no existing test that exercises this scenario (a same-count but mismatched-ID construction), confirming the gap is currently undetected.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/decomposition/models.py
Line: 145-165
Comment:
**`DecompositionResult` validator doesn't cross-check created-task IDs against plan subtask IDs**
`_validate_plan_task_consistency` verifies that `len(created_tasks) == len(plan.subtasks)` and that all edge IDs are in the created-task set — but it never checks that the actual IDs of the created tasks match the IDs declared in `plan.subtasks`. This means the following is silently accepted by Pydantic:
```python
plan = DecompositionPlan(
parent_task_id="task-1",
subtasks=(
SubtaskDefinition(id="sub-1", title="A", description="A"),
SubtaskDefinition(id="sub-2", title="B", description="B"),
),
)
DecompositionResult(
plan=plan,
created_tasks=(task_with_id_sub_99, task_with_id_sub_98), # completely different IDs
dependency_edges=(), # no edges → edge check also passes
)
```
The count check passes (2 == 2) and the edge check passes (no edges). The result looks valid but there is no semantic alignment between the plan's subtask definitions and the created tasks, which would silently break any consumer that correlates `plan.subtasks[i].id` with `created_tasks[i].id` (e.g. routing, status rollup, DAG edge lookups).
The `DecompositionService` always creates tasks with `id=subtask_def.id`, so the production path is correct. However, the model-level invariant should be enforced explicitly. Adding a set-equality check is sufficient:
```python
task_ids = {t.id for t in self.created_tasks}
subtask_ids = {s.id for s in self.plan.subtasks}
if task_ids != subtask_ids:
msg = (
f"created_tasks IDs {sorted(task_ids)} do not match "
f"plan subtask IDs {sorted(subtask_ids)}"
)
raise ValueError(msg)
```
There is no existing test that exercises this scenario (a same-count but mismatched-ID construction), confirming the gap is currently undetected.
How can I resolve this? If you propose a fix, please make it concise.| total = len(subtask_statuses) | ||
|
|
||
| if total == 0: | ||
| logger.warning( | ||
| DECOMPOSITION_ROLLUP_COMPUTED, | ||
| parent_task_id=parent_task_id, | ||
| total=0, | ||
| reason="rollup computed with no subtask statuses", | ||
| ) | ||
|
|
||
| completed = subtask_statuses.count(TaskStatus.COMPLETED) | ||
| failed = subtask_statuses.count(TaskStatus.FAILED) | ||
| in_progress = subtask_statuses.count(TaskStatus.IN_PROGRESS) | ||
| blocked = subtask_statuses.count(TaskStatus.BLOCKED) | ||
| cancelled = subtask_statuses.count(TaskStatus.CANCELLED) | ||
|
|
||
| rollup = SubtaskStatusRollup( | ||
| parent_task_id=parent_task_id, | ||
| total=total, | ||
| completed=completed, | ||
| failed=failed, | ||
| in_progress=in_progress, | ||
| blocked=blocked, | ||
| cancelled=cancelled, | ||
| ) | ||
|
|
||
| logger.debug( | ||
| DECOMPOSITION_ROLLUP_COMPUTED, | ||
| parent_task_id=parent_task_id, | ||
| total=total, | ||
| derived_status=rollup.derived_parent_status.value, | ||
| ) | ||
|
|
||
| return rollup |
There was a problem hiding this comment.
Double-logging DECOMPOSITION_ROLLUP_COMPUTED event when total == 0
When subtask_statuses is empty, the method fires the same event constant (DECOMPOSITION_ROLLUP_COMPUTED) twice in the same call — once as logger.warning (line 47) and then again unconditionally as logger.debug (line 70). Any log aggregator or metric counter keyed on this event string will record two entries for a single compute() invocation, and the two entries carry different structured fields (reason vs derived_status).
The CLAUDE.md convention is one logical event per operation. The cleanest fix is to introduce a dedicated event constant for the anomalous case, e.g. DECOMPOSITION_ROLLUP_EMPTY in events/decomposition.py, and only emit DECOMPOSITION_ROLLUP_COMPUTED once (at DEBUG) unconditionally:
if total == 0:
logger.warning(
DECOMPOSITION_ROLLUP_EMPTY, # ← new constant
parent_task_id=parent_task_id,
reason="rollup computed with no subtask statuses",
)This keeps one canonical success event (DECOMPOSITION_ROLLUP_COMPUTED) per operation while the anomaly has its own observable event name.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/decomposition/rollup.py
Line: 43-76
Comment:
**Double-logging `DECOMPOSITION_ROLLUP_COMPUTED` event when `total == 0`**
When `subtask_statuses` is empty, the method fires the same event constant (`DECOMPOSITION_ROLLUP_COMPUTED`) **twice** in the same call — once as `logger.warning` (line 47) and then again unconditionally as `logger.debug` (line 70). Any log aggregator or metric counter keyed on this event string will record two entries for a single `compute()` invocation, and the two entries carry different structured fields (`reason` vs `derived_status`).
The CLAUDE.md convention is one logical event per operation. The cleanest fix is to introduce a dedicated event constant for the anomalous case, e.g. `DECOMPOSITION_ROLLUP_EMPTY` in `events/decomposition.py`, and only emit `DECOMPOSITION_ROLLUP_COMPUTED` once (at DEBUG) unconditionally:
```python
if total == 0:
logger.warning(
DECOMPOSITION_ROLLUP_EMPTY, # ← new constant
parent_task_id=parent_task_id,
reason="rollup computed with no subtask statuses",
)
```
This keeps one canonical success event (`DECOMPOSITION_ROLLUP_COMPUTED`) per operation while the anomaly has its own observable event name.
How can I resolve this? If you propose a fix, please make it concise.
Summary
DecompositionServiceorchestrates strategy → classifier → DAG validation → task creation. IncludesManualDecompositionStrategy,TaskStructureClassifier(sequential/parallel/mixed heuristics),DependencyGraph(iterative cycle detection, heapq topo sort, parallel groups), andStatusRollupfor subtask status aggregation.TaskRoutingServicescores agents against subtasks (skill overlap 0.4/0.2, role 0.2, seniority-complexity 0.2), selects best candidates, and appliesTopologySelectorfor auto coordination topology (SAS/CENTRALIZED/DECENTRALIZED/CONTEXT_DEPENDENT).SubtaskDefinition(withComplexityenum),DecompositionPlan,DecompositionResult(cross-field validators),SubtaskStatusRollup(computed_field),DecompositionContext,RoutingCandidate,RoutingDecision,RoutingResult,AutoTopologyConfig(AUTO rejection validator).MappingProxyTypeimmutability,Complexityenum type safety, redundant validation removal, failure logging, artifact terminology rename, boundary/parametrized tests.Closes #14
Test plan
uv run pytest tests/ -n auto)ruff check+ruff formatcleanmypy src/ tests/clean (strict mode)Review coverage
Pre-reviewed by 9 specialized agents:
40 findings identified, all implemented.
🤖 Generated with Claude Code