Skip to content

feat: implement task decomposition and routing engine (#14)#165

Merged
Aureliolo merged 4 commits intomainfrom
feat/task-decomposition
Mar 8, 2026
Merged

feat: implement task decomposition and routing engine (#14)#165
Aureliolo merged 4 commits intomainfrom
feat/task-decomposition

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

Summary

  • Task decomposition engine: DecompositionService orchestrates strategy → classifier → DAG validation → task creation. Includes ManualDecompositionStrategy, TaskStructureClassifier (sequential/parallel/mixed heuristics), DependencyGraph (iterative cycle detection, heapq topo sort, parallel groups), and StatusRollup for subtask status aggregation.
  • Task routing engine: TaskRoutingService scores agents against subtasks (skill overlap 0.4/0.2, role 0.2, seniority-complexity 0.2), selects best candidates, and applies TopologySelector for auto coordination topology (SAS/CENTRALIZED/DECENTRALIZED/CONTEXT_DEPENDENT).
  • Domain models: SubtaskDefinition (with Complexity enum), DecompositionPlan, DecompositionResult (cross-field validators), SubtaskStatusRollup (computed_field), DecompositionContext, RoutingCandidate, RoutingDecision, RoutingResult, AutoTopologyConfig (AUTO rejection validator).
  • Pre-PR review fixes (9 agents, 40 findings): classifier bias fix, iterative DFS + heapq, MappingProxyType immutability, Complexity enum type safety, redundant validation removal, failure logging, artifact terminology rename, boundary/parametrized tests.

Closes #14

Test plan

  • All 3369 tests pass (uv run pytest tests/ -n auto)
  • 96.43% coverage (well above 80% threshold)
  • ruff check + ruff format clean
  • mypy src/ tests/ clean (strict mode)
  • Pre-commit hooks pass (trailing whitespace, gitleaks, commitizen)
  • Decomposition: DAG cycle detection, missing references, topo sort, parallel groups, depth limits, structure classification (8 sequential patterns, 5 parallel patterns)
  • Routing: skill/role/seniority scoring, inactive/ON_LEAVE agents, unroutable subtasks, topology auto-selection with boundary tests
  • Models: frozen immutability, cross-field validators, self-dependency rejection, count/edge consistency

Review coverage

Pre-reviewed by 9 specialized agents:

  • code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter
  • comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency

40 findings identified, all implemented.

🤖 Generated with Claude Code

Add task decomposition engine that breaks complex tasks into subtasks
with dependency tracking (DAG), structure classification, and status
rollup. Add task routing engine that scores agents against subtask
requirements and selects coordination topologies.

New packages:
- engine/decomposition/: models, DAG, classifier, rollup, manual
  strategy, service, and protocol
- engine/routing/: models, scorer, topology selector, and service

Also adds TaskStructure and CoordinationTopology enums, new error
types (DecompositionError, TaskRoutingError), and event constants.
Pre-reviewed by 9 agents, 40 findings addressed:
- Fix classifier bias: separate language vs structural scoring, MIXED only on language signals
- Replace recursive DFS with iterative + heapq for O(n log n) topo sort
- Use Complexity enum instead of raw strings in SubtaskDefinition
- Freeze DAG adjacency with MappingProxyType + tuple values
- Remove redundant validation (cycle check in plan, depth check in context)
- Add cross-field validators to DecompositionResult and RoutingDecision
- Add AUTO topology rejection validator to AutoTopologyConfig
- Rename parallel_tool_threshold to parallel_artifact_threshold
- Add failure logging (logger.exception) to decomposition/routing services
- Fix wrong error type for missing DAG references (DecompositionError, not CycleError)
- Update DESIGN_SPEC §15.3 project structure with decomposition/routing modules
- Expand test coverage with parametrized and boundary tests
Copilot AI review requested due to automatic review settings March 8, 2026 08:58
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 8, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 8, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: b99498b6-ac5c-41e4-8e81-7c8e2d4835e0

📥 Commits

Reviewing files that changed from the base of the PR and between 93f602e and 5e539d2.

📒 Files selected for processing (3)
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/routing/scorer.py
  • tests/unit/engine/test_decomposition_dag.py

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Adds task decomposition and routing: DAG-based subtask plans, structure classification (sequential/parallel/mixed), manual/pluggable strategies, agent scoring, routing decisions, and topology auto-selection.
    • Adds task metadata for structure and coordination topology plus public enums and defaults.
    • New observability events for decomposition and routing.
  • Tests

    • Extensive unit tests covering decomposition, DAGs, strategies, rollups, scorer, routing, and topology selector.
  • Documentation

    • Design spec and README updated to describe decomposition & routing.

Walkthrough

Adds decomposition and routing subsystems to the engine: new domain models, protocols, services, errors, and observability events for DAG-based subtask decomposition, structure classification, topology selection, agent scoring, and routing; extends core Task schema with task_structure and coordination_topology.

Changes

Cohort / File(s) Summary
Core enums & Task model
src/ai_company/core/enums.py, src/ai_company/core/__init__.py, src/ai_company/core/task.py
Add TaskStructure and CoordinationTopology enums, export them, and add task_structure and coordination_topology fields to Task.
Engine public exports
src/ai_company/engine/__init__.py, src/ai_company/engine/decomposition/__init__.py, src/ai_company/engine/routing/__init__.py
Expose decomposition and routing public symbols (models, services, protocols, errors) via package inits.
Decomposition domain models
src/ai_company/engine/decomposition/models.py
Add frozen Pydantic models: SubtaskDefinition, DecompositionPlan, DecompositionResult, SubtaskStatusRollup, DecompositionContext with validation and derived status logic.
Decomposition algorithms & helpers
src/ai_company/engine/decomposition/dag.py, src/ai_company/engine/decomposition/rollup.py
Add DependencyGraph (validation, cycle detection, topo sort, parallel groups) and StatusRollup.compute for status aggregation.
Decomposition strategies & service
src/ai_company/engine/decomposition/classifier.py, src/ai_company/engine/decomposition/manual.py, src/ai_company/engine/decomposition/protocol.py, src/ai_company/engine/decomposition/service.py
Implement TaskStructureClassifier (heuristics), ManualDecompositionStrategy, DecompositionStrategy protocol, and DecompositionService orchestrator (classification → decomposition → DAG validation → subtask creation → result).
Routing domain models
src/ai_company/engine/routing/models.py
Add frozen Pydantic models: RoutingCandidate, RoutingDecision, RoutingResult, AutoTopologyConfig with cross-field validators.
Routing scorer, selector & service
src/ai_company/engine/routing/scorer.py, src/ai_company/engine/routing/topology_selector.py, src/ai_company/engine/routing/service.py
Add AgentTaskScorer (weighted scoring by skills/role/seniority), TopologySelector (auto topology resolution using structure/artifact thresholds and config), and TaskRoutingService (score/filter/select candidates, build RoutingResult).
Engine errors
src/ai_company/engine/errors.py
Add DecompositionError, DecompositionCycleError, DecompositionDepthError, and TaskRoutingError.
Observability events
src/ai_company/observability/events/decomposition.py, src/ai_company/observability/events/task_routing.py
Add decomposition and task routing event constants for observability.
Docs & top-level
DESIGN_SPEC.md, README.md, CLAUDE.md
Document decomposition & routing design, public API surface, and topology selection; add README bullet.
Tests — decomposition
tests/unit/engine/test_decomposition_*.py
Comprehensive unit tests covering classifier heuristics, DAG algorithms, models, manual strategy, rollup logic, and DecompositionService orchestration.
Tests — routing & events
tests/unit/engine/test_routing_*.py, tests/unit/observability/test_events.py
Unit tests for routing models, scorer behavior, TaskRoutingService logic, TopologySelector mapping, and event discovery updates.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant DecompositionService
    participant TaskStructureClassifier
    participant DecompositionStrategy
    participant DependencyGraph
    participant StatusRollup

    Client->>DecompositionService: decompose_task(task, context)
    DecompositionService->>TaskStructureClassifier: classify(task)
    TaskStructureClassifier-->>DecompositionService: TaskStructure
    DecompositionService->>DecompositionStrategy: decompose(task, context)
    DecompositionStrategy-->>DecompositionService: DecompositionPlan
    DecompositionService->>DependencyGraph: validate(subtasks)
    DependencyGraph-->>DecompositionService: validated / raises
    DecompositionService->>DecompositionService: create Task objects & edges
    DecompositionService->>StatusRollup: rollup_status(parent_id, subtask_statuses)
    StatusRollup-->>DecompositionService: SubtaskStatusRollup
    DecompositionService-->>Client: DecompositionResult(plan, tasks, edges)
Loading
sequenceDiagram
    participant Client
    participant TaskRoutingService
    participant TopologySelector
    participant AgentTaskScorer
    participant DecompositionResult

    Client->>TaskRoutingService: route(decomposition_result, available_agents, parent_task)
    TaskRoutingService->>TopologySelector: select(parent_task, plan)
    TopologySelector-->>TaskRoutingService: CoordinationTopology
    loop per subtask
        TaskRoutingService->>AgentTaskScorer: score(agent, subtask)
        AgentTaskScorer-->>TaskRoutingService: RoutingCandidate(score,...)
        TaskRoutingService->>TaskRoutingService: filter & select best + alternatives
        TaskRoutingService-->>Client: per-subtask routing event
    end
    TaskRoutingService-->>Client: RoutingResult(decisions, unroutable)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: implement task decomposition and routing engine (#14)' clearly and specifically describes the main change—the implementation of two core engine components for task decomposition and routing.
Description check ✅ Passed The description thoroughly explains the summary, test plan, and review coverage, directly relating to the changeset's scope and objectives.
Linked Issues check ✅ Passed The PR implements all acceptance criteria for issue #14: task decomposition with manual mode [✓], status rollup [✓], task structure classification [✓], coordination topology selection [✓], auto topology selector [✓], and agent-based routing [✓].
Out of Scope Changes check ✅ Passed All changes are scoped to decomposition, routing, and related domain models; documentation updates (CLAUDE.md, DESIGN_SPEC.md, README.md) and event constants are in-scope enhancements; no unrelated modifications detected.
Docstring Coverage ✅ Passed Docstring coverage is 95.51% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/task-decomposition
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/task-decomposition

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the AI Company's core capabilities by introducing sophisticated task decomposition and routing mechanisms. It enables the system to break down complex tasks into manageable subtasks, intelligently assign them to suitable AI agents based on their skills and roles, and dynamically determine the most effective coordination topology for their execution. This foundational work is crucial for scaling the system's ability to handle intricate projects and improve overall operational efficiency and autonomy.

Highlights

  • Task Decomposition Engine: Implemented a robust task decomposition engine, DecompositionService, which orchestrates strategies, classifies task structures (sequential, parallel, mixed), validates Directed Acyclic Graphs (DAGs) for subtask dependencies, and handles subtask creation. Key components include ManualDecompositionStrategy, TaskStructureClassifier, DependencyGraph (with iterative cycle detection and topological sorting), and StatusRollup for aggregating subtask statuses.
  • Task Routing Engine: Introduced a new task routing engine, TaskRoutingService, responsible for intelligently assigning decomposed subtasks to agents. It scores agents based on skill overlap, role matching, and seniority-complexity alignment, then selects the best candidates. A TopologySelector automatically determines the optimal coordination topology (SAS, Centralized, Decentralized, Context-Dependent) for subtask execution.
  • New Domain Models: Added several new Pydantic domain models to support decomposition and routing, including SubtaskDefinition (with Complexity enum), DecompositionPlan, DecompositionResult, SubtaskStatusRollup (with a computed field for parent status), DecompositionContext, RoutingCandidate, RoutingDecision, RoutingResult, and AutoTopologyConfig (with a validator to prevent 'AUTO' defaults).
  • Enhanced Error Handling: Introduced specific error classes for decomposition and routing failures, such as DecompositionError, DecompositionCycleError, DecompositionDepthError, and TaskRoutingError, improving the clarity and granularity of error reporting.
  • Pre-PR Review and Quality Assurance: The changes underwent a pre-PR review by 9 specialized AI agents, leading to 40 identified findings which were all addressed. This includes fixes for classifier bias, improved DFS and heapq implementations, MappingProxyType for immutability, enum type safety, redundant validation removal, failure logging, terminology renaming, and comprehensive boundary/parametrized tests.
Changelog
  • CLAUDE.md
    • Updated the description for the engine/ directory to reflect the inclusion of task decomposition and routing functionalities.
  • DESIGN_SPEC.md
    • Added new directory structures for decomposition/ and routing/ within the engine/ module.
    • Introduced new constant files for decomposition.py and task_routing.py.
  • README.md
    • Added a new feature bullet point for 'Task Decomposition & Routing' highlighting DAG-based subtask decomposition, structure classification, and agent-task scoring.
  • src/ai_company/core/init.py
    • Imported new enums CoordinationTopology and TaskStructure.
    • Added CoordinationTopology and TaskStructure to the module's __all__ export list.
  • src/ai_company/core/enums.py
    • Added a new TaskStructure enum to classify subtask relationships (sequential, parallel, mixed).
    • Added a new CoordinationTopology enum to define multi-agent task execution coordination methods (SAS, centralized, decentralized, context-dependent, auto).
  • src/ai_company/core/task.py
    • Imported CoordinationTopology and TaskStructure enums.
    • Added task_structure and coordination_topology fields to the Task BaseModel.
  • src/ai_company/engine/init.py
    • Imported all public components from the new decomposition and routing sub-packages.
    • Imported new error types: DecompositionCycleError, DecompositionDepthError, DecompositionError, and TaskRoutingError.
    • Added all newly imported components and error types to the module's __all__ export list.
  • src/ai_company/engine/decomposition/init.py
    • Added new module for task decomposition engine.
    • Exported all public classes and models related to decomposition.
  • src/ai_company/engine/decomposition/classifier.py
    • Added TaskStructureClassifier to infer task structure (sequential, parallel, mixed) based on heuristic analysis of task properties.
  • src/ai_company/engine/decomposition/dag.py
    • Added DependencyGraph class for building, validating, and analyzing subtask dependency graphs, including cycle detection and topological sorting.
  • src/ai_company/engine/decomposition/manual.py
    • Added ManualDecompositionStrategy for handling pre-built decomposition plans and validating them against context limits.
  • src/ai_company/engine/decomposition/models.py
    • Added Pydantic models for SubtaskDefinition, DecompositionPlan, DecompositionResult, SubtaskStatusRollup, and DecompositionContext.
  • src/ai_company/engine/decomposition/protocol.py
    • Added DecompositionStrategy Protocol for defining the interface of task decomposition strategies.
  • src/ai_company/engine/decomposition/rollup.py
    • Added StatusRollup class to compute aggregated status from subtask statuses for a parent task.
  • src/ai_company/engine/decomposition/service.py
    • Added DecompositionService to orchestrate task decomposition, including strategy execution, structure classification, DAG validation, and subtask creation.
  • src/ai_company/engine/errors.py
    • Added DecompositionError as a base exception for decomposition failures.
    • Added DecompositionCycleError for dependency cycle detection.
    • Added DecompositionDepthError for exceeding maximum nesting depth.
    • Added TaskRoutingError for failures in task routing.
  • src/ai_company/engine/routing/init.py
    • Added new module for task routing engine.
    • Exported all public classes and models related to routing.
  • src/ai_company/engine/routing/models.py
    • Added Pydantic models for RoutingCandidate, RoutingDecision, RoutingResult, and AutoTopologyConfig.
  • src/ai_company/engine/routing/scorer.py
    • Added AgentTaskScorer to evaluate agent-subtask compatibility based on skills, role, and seniority-complexity alignment.
  • src/ai_company/engine/routing/service.py
    • Added TaskRoutingService to route decomposed subtasks to agents based on scoring and coordination topology selection.
  • src/ai_company/engine/routing/topology_selector.py
    • Added TopologySelector to select coordination topology for decomposed tasks based on task structure and configuration.
  • src/ai_company/observability/events/decomposition.py
    • Added new constants for decomposition-related observability events.
  • src/ai_company/observability/events/task_routing.py
    • Added new constants for task routing-related observability events.
  • tests/unit/engine/test_decomposition_classifier.py
    • Added unit tests for the TaskStructureClassifier.
  • tests/unit/engine/test_decomposition_dag.py
    • Added unit tests for the DependencyGraph utilities.
  • tests/unit/engine/test_decomposition_manual.py
    • Added unit tests for the ManualDecompositionStrategy.
  • tests/unit/engine/test_decomposition_models.py
    • Added unit tests for the decomposition domain models.
  • tests/unit/engine/test_decomposition_rollup.py
    • Added unit tests for the StatusRollup computation.
  • tests/unit/engine/test_decomposition_service.py
    • Added unit tests for the DecompositionService.
  • tests/unit/engine/test_routing_models.py
    • Added unit tests for the routing domain models.
  • tests/unit/engine/test_routing_scorer.py
    • Added unit tests for the AgentTaskScorer.
  • tests/unit/engine/test_routing_service.py
    • Added unit tests for the TaskRoutingService.
  • tests/unit/engine/test_routing_topology.py
    • Added unit tests for the TopologySelector.
  • tests/unit/observability/test_events.py
    • Updated test_all_domain_modules_discovered to include new 'decomposition' and 'task_routing' event modules.
Activity
  • The pull request includes a comprehensive summary of the new task decomposition and routing engines, detailing their components and functionalities.
  • The author performed a pre-PR review using 9 specialized AI agents, identifying and implementing 40 findings to refine the code.
  • All 3369 unit tests passed successfully, ensuring the stability and correctness of the new features.
  • Code coverage stands at 96.43%, well above the 80% threshold, indicating thorough testing.
  • ruff check and ruff format reported clean, ensuring code style and quality.
  • mypy in strict mode passed for all source and test files, confirming type safety.
  • Pre-commit hooks (trailing whitespace, gitleaks, commitizen) passed, maintaining repository standards.
  • Specific test coverage was highlighted for decomposition (DAG cycle detection, topo sort, parallel groups, classification patterns) and routing (skill/role/seniority scoring, agent states, topology auto-selection).
  • Model immutability, cross-field validators, and consistency checks were also verified through testing.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive and well-designed task decomposition and routing engine, a major feature for the project. The implementation is impressive, with a clear separation of concerns into decomposition and routing services, robust data models with strong validation, and a full suite of unit tests ensuring correctness. The use of design patterns like protocols for strategies, immutable data structures, and dedicated service classes for orchestration is excellent. The code adheres closely to the design specification. My review found only one minor issue: a misleading docstring in the status rollup component, for which I've provided a suggestion to improve clarity.

Comment on lines +29 to +45
"""Compute a status rollup from subtask statuses.

Rules (applied in order, first match wins):
- All COMPLETED -> COMPLETED
- All CANCELLED -> CANCELLED
- Any FAILED -> FAILED
- Any IN_PROGRESS -> IN_PROGRESS
- Any BLOCKED (none IN_PROGRESS) -> BLOCKED
- Otherwise -> IN_PROGRESS (work still pending)

Args:
parent_task_id: The parent task identifier.
subtask_statuses: Statuses of all subtasks.

Returns:
Aggregated status rollup.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The docstring for compute describes the derivation logic for the parent task's status. However, this logic is actually implemented in the SubtaskStatusRollup.derived_parent_status computed field. This method's primary responsibility is to count the statuses and instantiate the SubtaskStatusRollup object.

To improve clarity and prevent future confusion, the docstring should be updated to describe what this method does and refer to SubtaskStatusRollup for the status derivation logic.

Suggested change
"""Compute a status rollup from subtask statuses.
Rules (applied in order, first match wins):
- All COMPLETED -> COMPLETED
- All CANCELLED -> CANCELLED
- Any FAILED -> FAILED
- Any IN_PROGRESS -> IN_PROGRESS
- Any BLOCKED (none IN_PROGRESS) -> BLOCKED
- Otherwise -> IN_PROGRESS (work still pending)
Args:
parent_task_id: The parent task identifier.
subtask_statuses: Statuses of all subtasks.
Returns:
Aggregated status rollup.
"""
"""Compute a status rollup from a collection of subtask statuses.
This method aggregates the statuses of all subtasks for a given parent
task into a ``SubtaskStatusRollup`` object. It counts the occurrences
of key statuses (e.g., COMPLETED, FAILED).
The returned ``SubtaskStatusRollup`` object contains a
``derived_parent_status`` computed field that determines the overall
parent task status based on the aggregated counts.
Args:
parent_task_id: The parent task identifier.
subtask_statuses: Statuses of all subtasks.
Returns:
An aggregated status rollup object.
"""

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements the core orchestration pieces for decomposing a parent task into a validated subtask DAG and routing those subtasks to agents with an auto-selected coordination topology, plus observability events and comprehensive unit tests.

Changes:

  • Added a task decomposition subsystem (models, manual strategy, structure classifier, DAG validation/topo utilities, status rollup, orchestration service).
  • Added a task routing subsystem (scoring, routing decisions/results models, topology selector, routing service) with structured logging events.
  • Extended core domain enums/task model to include TaskStructure and CoordinationTopology, and updated docs + event discovery tests.

Reviewed changes

Copilot reviewed 34 out of 34 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Updates event-module discovery expectations for new domains.
tests/unit/engine/test_routing_topology.py Unit tests for topology auto-selection and overrides.
tests/unit/engine/test_routing_service.py Unit tests for routing behavior (best agent, unroutable, topology applied, filtering).
tests/unit/engine/test_routing_scorer.py Unit tests for scoring weights/status handling and boundaries.
tests/unit/engine/test_routing_models.py Unit tests for routing Pydantic models and validators.
tests/unit/engine/test_decomposition_service.py Unit tests for service orchestration (task creation, edges, structure classification, cycles, rollup).
tests/unit/engine/test_decomposition_rollup.py Unit tests for parent-status rollup rules and counters.
tests/unit/engine/test_decomposition_models.py Unit tests for decomposition models’ invariants/validators.
tests/unit/engine/test_decomposition_manual.py Unit tests for manual decomposition strategy constraints/validation.
tests/unit/engine/test_decomposition_dag.py Unit tests for DAG validation, topo sort, and parallel grouping.
tests/unit/engine/test_decomposition_classifier.py Unit tests for sequential/parallel/mixed heuristics and patterns.
src/ai_company/observability/events/task_routing.py Adds structured event constants for routing lifecycle.
src/ai_company/observability/events/decomposition.py Adds structured event constants for decomposition lifecycle.
src/ai_company/engine/routing/topology_selector.py Implements topology selection logic (explicit override vs AUTO heuristic).
src/ai_company/engine/routing/service.py Implements routing service (score agents, choose candidate(s), apply topology, logging).
src/ai_company/engine/routing/scorer.py Implements agent↔subtask scoring heuristic and scoring logs.
src/ai_company/engine/routing/models.py Adds frozen Pydantic routing models + validators and auto-topology config.
src/ai_company/engine/routing/init.py Exposes routing subsystem public API.
src/ai_company/engine/errors.py Adds decomposition/routing-specific exception types.
src/ai_company/engine/decomposition/service.py Orchestrates classify → strategy → DAG validate → task creation → result.
src/ai_company/engine/decomposition/rollup.py Implements subtask-status aggregation into derived parent status + logging.
src/ai_company/engine/decomposition/protocol.py Defines decomposition strategy protocol.
src/ai_company/engine/decomposition/models.py Adds frozen decomposition models + validators and derived rollup status field.
src/ai_company/engine/decomposition/manual.py Adds manual decomposition strategy enforcing context limits.
src/ai_company/engine/decomposition/dag.py Implements DAG validation, cycle detection, topo sort, parallel grouping, dependency lookups.
src/ai_company/engine/decomposition/classifier.py Implements structure classification heuristics and emits classification events.
src/ai_company/engine/decomposition/init.py Exposes decomposition subsystem public API.
src/ai_company/engine/init.py Re-exports new engine APIs and error types at package level.
src/ai_company/core/task.py Adds task_structure and coordination_topology fields to Task model.
src/ai_company/core/enums.py Introduces TaskStructure and CoordinationTopology enums.
src/ai_company/core/init.py Exports new enums from ai_company.core.
README.md Updates product feature list to include decomposition & routing.
DESIGN_SPEC.md Updates architecture tree/docs to reflect new subsystems and events.
CLAUDE.md Updates repository layout description to mention decomposition/routing in engine.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +84 to +110
all_matched: list[str] = []
reasons: list[str] = []

# Primary skill overlap (weight: 0.4)
required = set(subtask.required_skills)
primary = set(agent.skills.primary)
primary_matched = required & primary
if required:
primary_ratio = len(primary_matched) / max(len(required), 1)
primary_contrib = primary_ratio * 0.4
total_score += primary_contrib
all_matched.extend(sorted(primary_matched))
if primary_matched:
reasons.append(f"primary skills: {sorted(primary_matched)}")
else:
reasons.append("no skills required, skill matching skipped")

# Secondary skill overlap (weight: 0.2)
secondary = set(agent.skills.secondary)
secondary_matched = required & secondary
if required:
secondary_ratio = len(secondary_matched) / max(len(required), 1)
secondary_contrib = secondary_ratio * 0.2
total_score += secondary_contrib
all_matched.extend(sorted(secondary_matched))
if secondary_matched:
reasons.append(f"secondary skills: {sorted(secondary_matched)}")
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

matched_skills can include duplicates because primary and secondary skills aren’t validated to be disjoint (and you append both primary_matched and secondary_matched). That can make routing explanations noisy/confusing and complicate any downstream consumers that treat matched_skills as a set. Consider de-duplicating while preserving deterministic ordering (e.g., accumulate in an ordered set / dict, or build a set and sort once).

Copilot uses AI. Check for mistakes.
Comment on lines +74 to +83
if not available_agents:
logger.warning(
TASK_ROUTING_NO_AGENTS,
parent_task_id=plan.parent_task_id,
subtask_count=len(plan.subtasks),
)

try:
return self._do_route(decomposition_result, available_agents, parent_task)
except Exception:
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When available_agents is empty you emit TASK_ROUTING_NO_AGENTS, but still proceed into _do_route(), which will then log TASK_ROUTING_SUBTASK_UNROUTABLE once per subtask. For decompositions with many subtasks this creates avoidable log noise and extra work. Consider short-circuiting: return a RoutingResult with unroutable set to all plan.subtasks and skip per-subtask routing/logging when there are no agents.

Copilot uses AI. Check for mistakes.
Comment on lines +3 to +10
from typing import TYPE_CHECKING

import pytest

from ai_company.core.enums import CoordinationTopology

if TYPE_CHECKING:
from ai_company.core.agent import AgentIdentity
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AgentIdentity is only imported under TYPE_CHECKING, but it’s referenced in runtime type annotations (e.g. sample_agent_with_personality: AgentIdentity). In this repo there are multiple places noting that test-time tooling/pytest can evaluate annotations, so this pattern can lead to runtime NameError when annotations are resolved. Import AgentIdentity at runtime here as well (and suppress the type-checking lint with # noqa: TC003/TC001 as used elsewhere in tests).

Suggested change
from typing import TYPE_CHECKING
import pytest
from ai_company.core.enums import CoordinationTopology
if TYPE_CHECKING:
from ai_company.core.agent import AgentIdentity
from typing import TYPE_CHECKING # noqa: TC001
import pytest
from ai_company.core.enums import CoordinationTopology
from ai_company.core.agent import AgentIdentity # noqa: TC003

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Mar 8, 2026

Greptile Summary

This PR introduces a complete task decomposition and routing engine for the ai_company framework, comprising ~1,400 lines of production code and ~1,700 lines of tests across 34 files. The implementation covers DecompositionService (orchestrates strategy → classifier → DAG validation → task creation), DependencyGraph (iterative DFS cycle detection + Kahn's topo-sort with min-heap + parallel groups), TaskStructureClassifier (language and structural heuristics), ManualDecompositionStrategy, StatusRollup, TaskRoutingService, AgentTaskScorer (weighted skill/role/seniority scoring), and TopologySelector (auto-topology heuristics). All core components are frozen Pydantic models with cross-field validators, use MappingProxyType for internal collections, and follow the project's structured-logging conventions. The 40 findings from the pre-PR review (including the previously flagged skill double-counting and the COMPLETED+CANCELLED mixed-terminal rollup) are fully addressed.

Two new issues were identified:

  • DecompositionResult._validate_plan_task_consistency has an ID-alignment gap — it validates that len(created_tasks) == len(plan.subtasks) and that all edge IDs exist in the task set, but does not check that {t.id for t in created_tasks} == {s.id for s in plan.subtasks}. A caller could construct a semantically inconsistent result (correct count, wrong IDs) that passes model validation; routing and status-rollup consumers that correlate by ID would then silently operate on wrong data.
  • StatusRollup.compute double-logs DECOMPOSITION_ROLLUP_COMPUTED when total == 0 — both a logger.warning and the unconditional logger.debug at the end of the method fire the same event key, causing log aggregators to count two entries for one logical operation.

Confidence Score: 4/5

  • Safe to merge with the ID-alignment validator fix applied; the routing and rollup layers are otherwise correct.
  • The overall implementation is well-structured with comprehensive test coverage (96.43%) and all 40 pre-review findings addressed. The one logic gap — DecompositionResult not checking that created-task IDs match plan subtask IDs — won't trigger in the production service path but leaves the model open to silently invalid states if constructed externally (e.g. in future strategies or tests). The double-logging issue is a monitoring hygiene concern with no correctness impact. Both issues are localized and easy to fix.
  • src/ai_company/engine/decomposition/models.py (ID-alignment gap in DecompositionResult) and src/ai_company/engine/decomposition/rollup.py (double-logging on empty input).

Important Files Changed

Filename Overview
src/ai_company/engine/decomposition/dag.py Iterative DFS cycle detection using reverse-adjacency and Kahn's topo-sort/parallel-groups are all correct; MappingProxyType immutability enforced at construction.
src/ai_company/engine/decomposition/models.py Most validators are solid; DecompositionResult._validate_plan_task_consistency has a gap — it checks count and edge IDs but not that created_task IDs align with plan subtask IDs, allowing a semantically inconsistent result to pass validation.
src/ai_company/engine/decomposition/rollup.py Correct aggregation logic; fires DECOMPOSITION_ROLLUP_COMPUTED twice (WARNING + DEBUG) when total == 0, which is a logging hygiene issue.
src/ai_company/engine/decomposition/service.py Clean orchestration: classify → decompose → DAG validate → task creation → edges. Exception logging via logger.exception and re-raise is correct.
src/ai_company/engine/decomposition/classifier.py Heuristic classification is correct; MIXED only triggered by dual language signals; structural signals act as tiebreakers; safe sequential default when uncertain.
src/ai_company/engine/routing/scorer.py Primary/secondary skill double-counting fix (secondary_matched excludes primary_matched) is correctly implemented; inactive agent short-circuit is sound.
src/ai_company/engine/routing/service.py Topology selected once per plan and applied uniformly; empty-agent guard and exception-logging patterns are correct.
src/ai_company/engine/routing/topology_selector.py Explicit override takes correct precedence; AutoTopologyConfig validator prevents AUTO infinite-loop; defensive fallback correctly unreachable.
src/ai_company/engine/routing/models.py Frozen models with appropriate cross-field validators; AUTO topology rejection and selected-not-in-alternatives checks are both correct.
tests/unit/engine/test_decomposition_models.py Good parametric coverage of models; missing a test for the ID-mismatch case in DecompositionResult (same count, different IDs), which is the gap identified in the models review.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant DecompositionService
    participant TaskStructureClassifier
    participant DecompositionStrategy
    participant DependencyGraph
    participant TaskRoutingService
    participant AgentTaskScorer
    participant TopologySelector

    Caller->>DecompositionService: decompose_task(task, context)
    DecompositionService->>TaskStructureClassifier: classify(task)
    TaskStructureClassifier-->>DecompositionService: TaskStructure
    DecompositionService->>DecompositionStrategy: decompose(task, context)
    DecompositionStrategy-->>DecompositionService: DecompositionPlan
    Note over DecompositionService: Override plan.task_structure if classifier differs
    DecompositionService->>DependencyGraph: __init__(plan.subtasks)
    DecompositionService->>DependencyGraph: validate()
    Note over DependencyGraph: _check_missing_references() + iterative DFS cycles
    DependencyGraph-->>DecompositionService: (valid or raises)
    DecompositionService->>DecompositionService: create Task objects from subtask defs
    DecompositionService-->>Caller: DecompositionResult

    Caller->>TaskRoutingService: route(decomposition_result, agents, parent_task)
    TaskRoutingService->>TopologySelector: select(parent_task, plan)
    TopologySelector-->>TaskRoutingService: CoordinationTopology
    loop for each SubtaskDefinition
        TaskRoutingService->>AgentTaskScorer: score(agent, subtask_def) × N
        AgentTaskScorer-->>TaskRoutingService: RoutingCandidate[]
        Note over TaskRoutingService: Filter by min_score, sort desc, pick best
    end
    TaskRoutingService-->>Caller: RoutingResult
Loading

Last reviewed commit: 5e539d2

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/core/task.py`:
- Around line 143-150: The Task class docstring is missing documentation for the
two new fields; update the Attributes section of the Task class docstring to
include brief descriptions for task_structure (TaskStructure | None —
classification of subtask relationships; None means not classified) and
coordination_topology (CoordinationTopology — coordination topology for
multi-agent execution, default AUTO). Edit the Task class docstring near the
existing attribute list so the entries match the style/format of the other
attributes and reference the exact field names task_structure and
coordination_topology.

In `@src/ai_company/engine/decomposition/models.py`:
- Around line 1-20: Import get_logger and create logger = get_logger(__name__),
then update each Pydantic validator (the methods decorated with `@model_validator`
and any computed_field validators in the file) to catch validation exceptions,
log a warning/error with contextual details (model/class name, incoming field
values or kwargs, and the exception message) and re-raise the original
exception; apply this pattern to all validator methods referenced in the file
(the validators around lines ~57-63, ~98-122, ~145-165, and ~198-211) so
failures are logged before being raised.
- Around line 145-165: The validator must ensure created_tasks exactly match the
plan's subtasks and that dependency_edges are neither missing nor extraneous:
replace the current count-only check with a set equality check comparing {t.id
for t in self.created_tasks} to {s.id for s in self.plan.subtasks} and raise
ValueError if they differ; then strengthen edge validation by (a) if self.plan
exposes an expected dependency set (e.g., self.plan.dependency_edges) compare
the set(self.dependency_edges) to that expected set and raise on mismatch, and
(b) otherwise enforce no self-loops, no duplicate edges, and that every edge
endpoint is in the subtask id set (use symbols _validate_plan_task_consistency,
created_tasks, plan.subtasks, dependency_edges to locate code).

In `@src/ai_company/engine/decomposition/service.py`:
- Around line 104-113: The code is wrongly overwriting the plan's declared
task_structure with the parent-only classifier result; instead, do not replace
plan.task_structure returned from self._strategy.decompose(task, context).
Update the logic in the method containing calls to
self._classifier.classify(task) and self._strategy.decompose(...) so that you
only use the classifier result if the returned plan has no structure (e.g.,
plan.task_structure is None or falsy); otherwise leave plan.task_structure and
plan.subtasks intact (or if you must detect mismatches, emit a warning/error
referencing plan.task_structure and plan.subtasks rather than silently calling
plan.model_copy(...)). Ensure you remove the unconditional model_copy update
that sets task_structure from the classifier.
- Around line 121-147: The created child Task instances currently omit subtask
dependencies, so copy each SubtaskDefinition's dependencies into the Task when
constructing it in the decomposition service (the Task construction block that
creates child_task from subtask_def); set Task.dependencies =
subtask_def.dependencies (converting to the appropriate type if Task expects a
tuple/list) so the per-Task dependency info matches the DAG edges built later
(the edges list comprehension over plan.subtasks).

In `@src/ai_company/engine/routing/models.py`:
- Around line 65-75: The validator _validate_selected_not_in_alternatives on the
model uses agent_identity.name to detect duplicates; change it to compare the
unique identifier agent_identity.id (UUID) instead: compute selected_id =
self.selected_candidate.agent_identity.id and compare against
alt.agent_identity.id for each alt in self.alternatives, and raise the same
ValueError when ids match; keep the error message but optionally include the
selected_id for clarity.
- Around line 99-111: The validator _validate_unique_subtask_ids currently
silences duplicates by building sets; instead explicitly detect and reject
duplicate subtask_id values inside both decisions and unroutable before checking
cross-collection overlap: scan self.decisions and self.unroutable to collect
counts (or use a seen set) to find any subtask_ids occurring more than once and
raise a ValueError listing those duplicates (referencing symbols decisions,
unroutable, and the validator _validate_unique_subtask_ids), then continue to
compute decision_ids and unroutable_set and raise the existing overlap error if
any IDs appear in both.
- Around line 7-13: Import get_logger and the observability event constants,
create logger = get_logger(__name__), then add structured logging (WARNING or
ERROR) with the appropriate event constant and contextual fields immediately
before each raise in the module's validator functions (the pydantic
`@model_validator` blocks and any validation logic around
CoordinationTopology/AgentIdentity/NotBlankStr). Specifically, update the three
raise paths referenced (around lines 65-75, 99-111, 145-157) to call
logger.warning or logger.error with a descriptive message, include the event
from ai_company.observability.events and relevant context (payload, field names,
invalid values), and only then re-raise the validation exception so failures are
logged consistently.

In `@src/ai_company/engine/routing/topology_selector.py`:
- Around line 57-79: The selector currently ignores a concrete topology set on
the decomposition plan when task.coordination_topology is AUTO; update the
selection logic so that after checking the task-level explicit override
(task.coordination_topology) you check plan.coordination_topology and if it's
not CoordinationTopology.AUTO, log using TASK_ROUTING_TOPOLOGY_SELECTED with
source="plan" and return plan.coordination_topology (instead of continuing to
heuristic auto-selection); keep the existing heuristics
(_config.sequential_override, _config.parallel_default, _config.mixed_default
and parallel_artifact_threshold) for true AUTO cases.

In `@tests/unit/engine/test_decomposition_models.py`:
- Around line 16-17: Remove the unused TYPE_CHECKING guard that imports Task:
delete the "if TYPE_CHECKING: from ai_company.core.task import Task" block (the
test uses the runtime import "from ai_company.core.task import Task" at line
~180), so eliminate the misleading conditional import and any reference to
TYPE_CHECKING in this file to keep imports clear and avoid dead code.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: e20b1f17-e5d6-4abd-8192-62a862f8bd4a

📥 Commits

Reviewing files that changed from the base of the PR and between c02832a and dfc99f8.

📒 Files selected for processing (34)
  • CLAUDE.md
  • DESIGN_SPEC.md
  • README.md
  • src/ai_company/core/__init__.py
  • src/ai_company/core/enums.py
  • src/ai_company/core/task.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/decomposition/__init__.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/decomposition/protocol.py
  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/routing/__init__.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/routing/topology_selector.py
  • src/ai_company/observability/events/decomposition.py
  • src/ai_company/observability/events/task_routing.py
  • tests/unit/engine/test_decomposition_classifier.py
  • tests/unit/engine/test_decomposition_dag.py
  • tests/unit/engine/test_decomposition_manual.py
  • tests/unit/engine/test_decomposition_models.py
  • tests/unit/engine/test_decomposition_rollup.py
  • tests/unit/engine/test_decomposition_service.py
  • tests/unit/engine/test_routing_models.py
  • tests/unit/engine/test_routing_scorer.py
  • tests/unit/engine/test_routing_service.py
  • tests/unit/engine/test_routing_topology.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Do not use from __future__ import annotations in Python 3.14+ code — Python 3.14 has native PEP 649 lazy annotations
Use except A, B: (no parentheses) for exception syntax in Python 3.14 — PEP 758 except syntax enforced by ruff
All public functions and classes must have type hints; mypy strict mode is enforced
All public classes and functions must have Google-style docstrings; ruff D rules enforce this
Create new objects instead of mutating existing ones; use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement of non-Pydantic internal collections (registries, BaseTool)
For non-Pydantic internal collections and dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence)
Use frozen Pydantic models for config/identity; use separate mutable-via-copy models (model_copy(update=...)) for runtime state that evolves; never mix static config fields with mutable runtime fields
Use Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict); use @computed_field for derived values instead of storing redundant fields
Use NotBlankStr (from core.types) for all identifier/name fields — including optional (NotBlankStr | None) and tuple (tuple[NotBlankStr, ...]) variants — instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls); prefer structured concurrency over bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly; never silently swallow errors
Validate at system boundaries (user input, external APIs, config files)
Line length must be 88 characters (enforced by ruff)

Files:

  • src/ai_company/observability/events/decomposition.py
  • tests/unit/engine/test_routing_topology.py
  • tests/unit/engine/test_decomposition_rollup.py
  • src/ai_company/core/__init__.py
  • src/ai_company/engine/decomposition/__init__.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/observability/events/task_routing.py
  • tests/unit/engine/test_decomposition_dag.py
  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/routing/topology_selector.py
  • tests/unit/engine/test_routing_models.py
  • src/ai_company/engine/decomposition/manual.py
  • tests/unit/observability/test_events.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/decomposition/dag.py
  • tests/unit/engine/test_routing_service.py
  • src/ai_company/engine/routing/service.py
  • tests/unit/engine/test_routing_scorer.py
  • src/ai_company/engine/decomposition/protocol.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/service.py
  • tests/unit/engine/test_decomposition_service.py
  • src/ai_company/engine/routing/__init__.py
  • src/ai_company/core/enums.py
  • tests/unit/engine/test_decomposition_manual.py
  • tests/unit/engine/test_decomposition_models.py
  • src/ai_company/engine/routing/models.py
  • tests/unit/engine/test_decomposition_classifier.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/core/task.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Always use variable name logger (not _logger, not log) for the logger instance
Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly
Always use structured logging with kwargs: logger.info(EVENT, key=value) — never use formatted strings like logger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
DEBUG logging is for object creation, internal flow, and entry/exit of key functions

Files:

  • src/ai_company/observability/events/decomposition.py
  • src/ai_company/core/__init__.py
  • src/ai_company/engine/decomposition/__init__.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/observability/events/task_routing.py
  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/routing/topology_selector.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/decomposition/protocol.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/routing/__init__.py
  • src/ai_company/core/enums.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/core/task.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples; use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases

Files:

  • src/ai_company/observability/events/decomposition.py
  • src/ai_company/core/__init__.py
  • src/ai_company/engine/decomposition/__init__.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/observability/events/task_routing.py
  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/routing/topology_selector.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/decomposition/protocol.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/routing/__init__.py
  • src/ai_company/core/enums.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/core/task.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, or @pytest.mark.slow as appropriate
Prefer @pytest.mark.parametrize for testing similar cases
Use vendor-agnostic test names: test-provider, test-small-001, etc. instead of real vendor names

Files:

  • tests/unit/engine/test_routing_topology.py
  • tests/unit/engine/test_decomposition_rollup.py
  • tests/unit/engine/test_decomposition_dag.py
  • tests/unit/engine/test_routing_models.py
  • tests/unit/observability/test_events.py
  • tests/unit/engine/test_routing_service.py
  • tests/unit/engine/test_routing_scorer.py
  • tests/unit/engine/test_decomposition_service.py
  • tests/unit/engine/test_decomposition_manual.py
  • tests/unit/engine/test_decomposition_models.py
  • tests/unit/engine/test_decomposition_classifier.py
src/ai_company/engine/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

RetryExhaustedError signals that all retries failed — the engine layer catches this to trigger fallback chains

Files:

  • src/ai_company/engine/decomposition/__init__.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/routing/topology_selector.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/decomposition/protocol.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/routing/__init__.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/__init__.py
🧠 Learnings (2)
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly

Applied to files:

  • src/ai_company/observability/events/decomposition.py
  • src/ai_company/observability/events/task_routing.py
  • tests/unit/observability/test_events.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/engine/**/*.py : RetryExhaustedError signals that all retries failed — the engine layer catches this to trigger fallback chains

Applied to files:

  • src/ai_company/engine/errors.py
🧬 Code graph analysis (23)
tests/unit/engine/test_routing_topology.py (2)
src/ai_company/engine/routing/models.py (1)
  • AutoTopologyConfig (114-157)
src/ai_company/engine/routing/topology_selector.py (3)
  • TopologySelector (24-98)
  • select (43-98)
  • config (39-41)
tests/unit/engine/test_decomposition_rollup.py (3)
src/ai_company/core/enums.py (1)
  • TaskStatus (122-148)
src/ai_company/engine/decomposition/rollup.py (2)
  • StatusRollup (21-79)
  • compute (25-79)
src/ai_company/engine/decomposition/models.py (1)
  • derived_parent_status (217-237)
src/ai_company/core/__init__.py (1)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (282-293)
  • TaskStructure (270-279)
src/ai_company/engine/decomposition/models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (171-177)
  • CoordinationTopology (282-293)
  • TaskStatus (122-148)
  • TaskStructure (270-279)
src/ai_company/core/task.py (1)
  • Task (45-257)
tests/unit/engine/test_decomposition_dag.py (1)
src/ai_company/engine/decomposition/dag.py (6)
  • DependencyGraph (25-236)
  • validate (62-78)
  • topological_sort (132-171)
  • parallel_groups (173-212)
  • get_dependents (214-224)
  • get_dependencies (226-236)
src/ai_company/engine/decomposition/rollup.py (3)
src/ai_company/core/enums.py (1)
  • TaskStatus (122-148)
src/ai_company/engine/decomposition/models.py (2)
  • SubtaskStatusRollup (168-237)
  • derived_parent_status (217-237)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/engine/routing/topology_selector.py (4)
src/ai_company/core/enums.py (2)
  • CoordinationTopology (282-293)
  • TaskStructure (270-279)
src/ai_company/engine/routing/models.py (1)
  • AutoTopologyConfig (114-157)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/decomposition/models.py (1)
  • DecompositionPlan (66-122)
tests/unit/engine/test_routing_models.py (5)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (282-293)
src/ai_company/engine/routing/models.py (4)
  • AutoTopologyConfig (114-157)
  • RoutingCandidate (16-38)
  • RoutingDecision (41-75)
  • RoutingResult (78-111)
tests/unit/engine/conftest.py (1)
  • sample_agent_with_personality (52-79)
tests/unit/engine/test_decomposition_models.py (1)
  • test_frozen (77-81)
src/ai_company/engine/routing/topology_selector.py (1)
  • config (39-41)
src/ai_company/engine/decomposition/manual.py (4)
src/ai_company/engine/errors.py (2)
  • DecompositionDepthError (58-59)
  • DecompositionError (50-51)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/engine/decomposition/models.py (2)
  • DecompositionContext (240-265)
  • DecompositionPlan (66-122)
src/ai_company/engine/decomposition/protocol.py (2)
  • decompose (22-36)
  • get_strategy_name (38-40)
src/ai_company/engine/decomposition/classifier.py (2)
src/ai_company/core/enums.py (1)
  • TaskStructure (270-279)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/decomposition/dag.py (3)
src/ai_company/engine/errors.py (2)
  • DecompositionCycleError (54-55)
  • DecompositionError (50-51)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/engine/decomposition/models.py (1)
  • SubtaskDefinition (22-63)
tests/unit/engine/test_routing_service.py (7)
src/ai_company/core/agent.py (3)
  • AgentIdentity (246-304)
  • ModelConfig (145-174)
  • SkillSet (125-142)
src/ai_company/core/enums.py (5)
  • AgentStatus (24-29)
  • Complexity (171-177)
  • Priority (162-168)
  • SeniorityLevel (6-21)
  • TaskType (151-159)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/decomposition/models.py (3)
  • DecompositionPlan (66-122)
  • DecompositionResult (125-165)
  • SubtaskDefinition (22-63)
src/ai_company/engine/routing/scorer.py (2)
  • AgentTaskScorer (35-149)
  • min_score (57-59)
src/ai_company/engine/routing/service.py (2)
  • TaskRoutingService (32-166)
  • route (50-88)
src/ai_company/engine/routing/topology_selector.py (1)
  • TopologySelector (24-98)
src/ai_company/engine/routing/service.py (6)
src/ai_company/engine/routing/models.py (2)
  • RoutingDecision (41-75)
  • RoutingResult (78-111)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/decomposition/models.py (1)
  • DecompositionResult (125-165)
src/ai_company/engine/routing/scorer.py (3)
  • AgentTaskScorer (35-149)
  • score (61-149)
  • min_score (57-59)
src/ai_company/engine/routing/topology_selector.py (2)
  • TopologySelector (24-98)
  • select (43-98)
tests/unit/engine/test_routing_scorer.py (4)
src/ai_company/core/agent.py (3)
  • AgentIdentity (246-304)
  • ModelConfig (145-174)
  • SkillSet (125-142)
src/ai_company/core/enums.py (3)
  • AgentStatus (24-29)
  • Complexity (171-177)
  • SeniorityLevel (6-21)
src/ai_company/engine/decomposition/models.py (1)
  • SubtaskDefinition (22-63)
src/ai_company/engine/routing/scorer.py (3)
  • AgentTaskScorer (35-149)
  • score (61-149)
  • min_score (57-59)
src/ai_company/engine/decomposition/protocol.py (3)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/decomposition/models.py (2)
  • DecompositionContext (240-265)
  • DecompositionPlan (66-122)
src/ai_company/engine/decomposition/manual.py (2)
  • decompose (37-81)
  • get_strategy_name (83-85)
src/ai_company/engine/routing/scorer.py (5)
src/ai_company/core/enums.py (3)
  • AgentStatus (24-29)
  • Complexity (171-177)
  • SeniorityLevel (6-21)
src/ai_company/engine/routing/models.py (1)
  • RoutingCandidate (16-38)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/core/agent.py (1)
  • AgentIdentity (246-304)
src/ai_company/engine/decomposition/models.py (1)
  • SubtaskDefinition (22-63)
src/ai_company/engine/routing/__init__.py (4)
src/ai_company/engine/routing/models.py (4)
  • AutoTopologyConfig (114-157)
  • RoutingCandidate (16-38)
  • RoutingDecision (41-75)
  • RoutingResult (78-111)
src/ai_company/engine/routing/scorer.py (1)
  • AgentTaskScorer (35-149)
src/ai_company/engine/routing/service.py (1)
  • TaskRoutingService (32-166)
src/ai_company/engine/routing/topology_selector.py (1)
  • TopologySelector (24-98)
tests/unit/engine/test_decomposition_manual.py (2)
src/ai_company/engine/decomposition/protocol.py (3)
  • DecompositionStrategy (14-40)
  • decompose (22-36)
  • get_strategy_name (38-40)
src/ai_company/engine/errors.py (2)
  • DecompositionDepthError (58-59)
  • DecompositionError (50-51)
tests/unit/engine/test_decomposition_models.py (1)
src/ai_company/engine/decomposition/models.py (6)
  • DecompositionContext (240-265)
  • DecompositionPlan (66-122)
  • DecompositionResult (125-165)
  • SubtaskDefinition (22-63)
  • SubtaskStatusRollup (168-237)
  • derived_parent_status (217-237)
src/ai_company/engine/routing/models.py (2)
src/ai_company/core/agent.py (1)
  • AgentIdentity (246-304)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (282-293)
tests/unit/engine/test_decomposition_classifier.py (2)
src/ai_company/core/enums.py (1)
  • TaskStructure (270-279)
src/ai_company/engine/decomposition/classifier.py (2)
  • TaskStructureClassifier (46-192)
  • classify (59-104)
src/ai_company/engine/__init__.py (4)
src/ai_company/engine/decomposition/models.py (4)
  • DecompositionContext (240-265)
  • DecompositionPlan (66-122)
  • DecompositionResult (125-165)
  • SubtaskDefinition (22-63)
src/ai_company/engine/decomposition/dag.py (1)
  • DependencyGraph (25-236)
src/ai_company/engine/errors.py (2)
  • EngineError (4-5)
  • TaskRoutingError (62-63)
src/ai_company/engine/routing/models.py (4)
  • AutoTopologyConfig (114-157)
  • RoutingCandidate (16-38)
  • RoutingDecision (41-75)
  • RoutingResult (78-111)
src/ai_company/core/task.py (1)
src/ai_company/core/enums.py (3)
  • CoordinationTopology (282-293)
  • TaskStructure (270-279)
  • TaskType (151-159)
🪛 LanguageTool
README.md

[typographical] ~18-~18: To join two clauses or introduce examples, consider using an em dash.
Context: ...ntion - Task Decomposition & Routing - DAG-based subtask decomposition, structu...

(DASH_RULE)

🔇 Additional comments (37)
src/ai_company/observability/events/decomposition.py (1)

1-14: LGTM!

Event constants are well-structured, follow established naming conventions (domain.entity.action), and align with other observability event modules. The Final[str] typing ensures immutability at the type-checker level.

src/ai_company/engine/errors.py (1)

48-63: LGTM!

The exception hierarchy is well-designed with proper inheritance: DecompositionError as the base for decomposition failures with specific subclasses for cycle detection and depth violations, and TaskRoutingError as a separate concern for routing failures. All classes have appropriate docstrings.

src/ai_company/observability/events/task_routing.py (1)

1-12: LGTM!

Event constants comprehensively cover the task routing lifecycle (scoring, routing decisions, topology selection, completion/failure states) and follow established naming conventions.

src/ai_company/engine/routing/scorer.py (1)

1-149: LGTM!

The scoring implementation is well-designed with clear weight distribution (0.4 primary skills, 0.2 secondary, 0.2 role, 0.2 seniority-complexity), proper handling of inactive agents, and comprehensive reason generation. The seniority-to-complexity mapping appropriately pairs agent levels with suitable task complexities.

src/ai_company/engine/decomposition/classifier.py (1)

1-192: LGTM!

The classifier implementation cleanly separates language-based signals (regex patterns) from structural signals (artifact count, dependencies). The decision logic correctly prioritizes MIXED detection (both language categories present), then compares total scores with a SEQUENTIAL fallback per the research guidance in §6.9. The heuristics are well-documented and align with DESIGN_SPEC.md.

DESIGN_SPEC.md (1)

2388-2402: LGTM!

The project structure documentation accurately reflects the new decomposition and routing subsystems, including all modules (classifier, dag, manual, models, protocol, rollup, service for decomposition; models, scorer, service, topology_selector for routing) and their corresponding observability event constants.

Also applies to: 2454-2455, 2465-2465

README.md (1)

18-18: LGTM!

The new feature bullet accurately summarizes the decomposition and routing capabilities and maintains consistency with the existing list formatting. The static analysis suggestion about em dashes is noted but would create inconsistency with other bullets — the current hyphen-minus style matches the rest of the document.

CLAUDE.md (1)

52-52: LGTM!

The engine directory description is appropriately updated to include the new decomposition and routing subsystems, maintaining consistency with the actual package structure additions.

tests/unit/observability/test_events.py (1)

118-130: LGTM!

The expected domain modules set is correctly updated to include decomposition and task_routing, aligning with the new observability event modules introduced in this PR. The alphabetical ordering is maintained.

src/ai_company/core/__init__.py (1)

34-47: LGTM!

The new CoordinationTopology and TaskStructure enums are correctly imported and added to __all__ in alphabetical order, following the existing module conventions.

Also applies to: 89-120

tests/unit/engine/test_decomposition_dag.py (2)

1-21: LGTM!

Clean helper function and well-organized imports. The _sub() helper provides a minimal factory for creating SubtaskDefinition instances, reducing boilerplate across tests.


23-213: LGTM!

Comprehensive test coverage for DependencyGraph including:

  • Validation (acyclic, cycles, missing references)
  • Topological sort (linear, diamond, cycle detection)
  • Parallel groups (independent, sequential, diamond patterns)
  • Boundary cases (single node, unknown IDs)

The tests effectively validate the iterative DFS cycle detection and heapq-based topological sort mentioned in the PR objectives.

tests/unit/engine/test_decomposition_rollup.py (1)

1-93: LGTM!

Thorough test coverage for StatusRollup.compute() covering the documented priority rules:

  • All COMPLETED → COMPLETED
  • All CANCELLED → CANCELLED
  • Any FAILED → FAILED
  • Any IN_PROGRESS → IN_PROGRESS
  • BLOCKED without IN_PROGRESS → BLOCKED
  • Empty → CREATED
  • Mixed pending work → IN_PROGRESS

The test_rollup_counts test validates all aggregate count fields are correctly populated.

tests/unit/engine/test_routing_models.py (2)

9-10: TYPE_CHECKING import is appropriate here.

The AgentIdentity is only used as a type annotation for fixture parameters and never instantiated directly in this module, making the TYPE_CHECKING guard correct for avoiding circular imports.


19-169: LGTM!

Comprehensive validation tests for routing models:

  • RoutingCandidate: score bounds (0.0-1.0), immutability
  • RoutingDecision: selected candidate not in alternatives validator
  • RoutingResult: no overlap between decisions and unroutable
  • AutoTopologyConfig: defaults and AUTO rejection to prevent infinite resolution

All cross-field validators from the model definitions are properly exercised.

tests/unit/engine/test_decomposition_classifier.py (3)

14-34: LGTM!

Clean helper function with sensible defaults. The keyword-only arguments after description enforce explicit parameter passing for optional fields, improving readability.


117-170: Good use of parametrization.

The parametrized tests for individual sequential and parallel patterns provide excellent coverage of the classifier's language pattern recognition, with descriptive test IDs for clear failure reporting.


36-115: LGTM!

Tests comprehensively cover the classifier logic:

  • Explicit task_structure bypass
  • Sequential/parallel language signal detection
  • Mixed signals triggering MIXED classification
  • Default SEQUENTIAL fallback with dependencies
  • Acceptance criteria contributing to scoring

This aligns well with the classifier implementation that requires both sequential and parallel language signals for MIXED classification.

tests/unit/engine/test_routing_service.py (2)

27-47: LGTM!

The _make_agent helper with noqa: PLR0913 is acceptable for test fixtures where explicit parameters improve test readability. The vendor-agnostic naming (test-provider, test-model-001) follows coding guidelines.


113-294: LGTM!

Thorough test coverage for TaskRoutingService:

  • Routing to best-scoring agent based on skills/role matching
  • Unroutable subtasks when no viable candidates
  • Alternatives populated with secondary candidates
  • Topology applied to all decisions
  • Empty agent pool handling
  • Inactive agent filtering (TERMINATED status → score 0)
  • Parent task ID propagation in results

The tests effectively exercise the scorer weights (skill 0.4/0.2, role 0.2, seniority 0.2) and topology integration.

tests/unit/engine/test_routing_topology.py (3)

22-56: LGTM!

Clean helper functions for creating test fixtures. The _make_task helper with configurable artifact_count enables thorough threshold testing, and _make_plan allows structure variations.


131-149: Good boundary testing.

Tests correctly verify the threshold behavior:

  • At threshold (4 artifacts) → CENTRALIZED
  • Above threshold (5 artifacts) → DECENTRALIZED

This validates the artifact_count > parallel_artifact_threshold condition in TopologySelector.select().


59-156: LGTM!

Comprehensive test coverage for TopologySelector:

  • Explicit topology override takes precedence
  • Structure-based auto-selection (SEQUENTIAL→SAS, PARALLEL→CENTRALIZED/DECENTRALIZED, MIXED→CONTEXT_DEPENDENT)
  • Custom AutoTopologyConfig overrides defaults
  • Artifact count threshold boundary behavior
  • Config property accessor

The tests align with DESIGN_SPEC Section 6.9 topology selection rules.

src/ai_company/engine/routing/__init__.py (1)

1-25: LGTM!

Clean public API surface for the routing subsystem. The __all__ is alphabetically sorted and matches the imports.

src/ai_company/core/task.py (1)

10-17: LGTM!

The import block correctly includes the new CoordinationTopology and TaskStructure enums needed for the new Task fields.

src/ai_company/engine/decomposition/protocol.py (1)

1-40: LGTM!

Clean protocol definition with proper TYPE_CHECKING guards, @runtime_checkable decorator, and method signatures that align with the concrete ManualDecompositionStrategy implementation.

tests/unit/engine/test_routing_scorer.py (2)

184-211: LGTM!

Good use of @pytest.mark.parametrize to cover multiple seniority-complexity alignment combinations. The test IDs are descriptive and aid debugging.


23-33: > Likely an incorrect or invalid review comment.

tests/unit/engine/test_decomposition_models.py (2)

250-364: LGTM!

Comprehensive test coverage for SubtaskStatusRollup covering all status derivation paths including edge cases (zero total, counts exceeding total, pending work defaults).


196-211: The sample_task_with_criteria fixture is correctly defined in tests/unit/engine/conftest.py and is properly accessible to the test.

src/ai_company/engine/routing/service.py (3)

1-48: LGTM!

Clean service implementation with proper observability setup. The __slots__ usage and TYPE_CHECKING guard for heavy imports follow best practices.


50-88: LGTM!

The route() method properly handles the empty-agents case with a warning log while allowing the flow to continue (all subtasks become unroutable). The try/except with logger.exception() ensures failures are logged with full traceback before re-raising.


90-166: LGTM!

The _do_route() implementation correctly:

  • Computes topology once per plan (shared across all decisions)
  • Scores and filters candidates by min_score threshold
  • Sorts viable candidates descending by score
  • Logs at appropriate levels (DEBUG for per-subtask, INFO for completion)
tests/unit/engine/test_decomposition_service.py (3)

1-66: LGTM!

Clean test helpers that create minimal Task and DecompositionPlan instances with sensible defaults. The plan helper creates a sequential dependency chain suitable for testing edge construction.


67-214: LGTM!

Comprehensive async test coverage for DecompositionService.decompose_task():

  • Task creation with correct parent linkage
  • Dependency edge construction
  • Delegation chain inheritance
  • Structure classification and override behavior
  • Cycle detection raising DecompositionCycleError

215-266: LGTM!

Good coverage for subtask complexity propagation and rollup delegation. The test_rollup_status_delegates correctly verifies synchronous rollup behavior.

src/ai_company/engine/decomposition/__init__.py (1)

1-33: LGTM!

Clean public API surface for the decomposition subsystem. The __all__ is alphabetically sorted and all imports are correctly mapped to exports.

Comment on lines +143 to +150
task_structure: TaskStructure | None = Field(
default=None,
description="Classification of subtask relationships (None = not classified)",
)
coordination_topology: CoordinationTopology = Field(
default=CoordinationTopology.AUTO,
description="Coordination topology for multi-agent execution",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Update class docstring to document new fields.

The Task class docstring (lines 52-73) lists all attributes but doesn't include the newly added task_structure and coordination_topology fields. Add them for consistency.

📝 Proposed docstring addition

Add these entries to the Attributes section in the class docstring:

         delegation_chain: Ordered agent IDs of delegators (root first).
+        task_structure: Classification of subtask relationships
+            (``None`` if not classified).
+        coordination_topology: Coordination topology for multi-agent
+            execution (default ``AUTO``).
     """
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/core/task.py` around lines 143 - 150, The Task class docstring
is missing documentation for the two new fields; update the Attributes section
of the Task class docstring to include brief descriptions for task_structure
(TaskStructure | None — classification of subtask relationships; None means not
classified) and coordination_topology (CoordinationTopology — coordination
topology for multi-agent execution, default AUTO). Edit the Task class docstring
near the existing attribute list so the entries match the style/format of the
other attributes and reference the exact field names task_structure and
coordination_topology.

Comment on lines +1 to +20
"""Decomposition domain models.

Frozen Pydantic models for subtask definitions, decomposition plans,
results, status rollups, and decomposition context.
"""

from collections import Counter
from typing import Self

from pydantic import BaseModel, ConfigDict, Field, computed_field, model_validator

from ai_company.core.enums import (
Complexity,
CoordinationTopology,
TaskStatus,
TaskStructure,
)
from ai_company.core.task import Task # noqa: TC001
from ai_company.core.types import NotBlankStr # noqa: TC001

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Log validator failures before raising from this module.

These validators currently raise ValueError silently. In a business-logic module, that leaves decomposition validation failures without the required warning/error log context.

As per coding guidelines, src/ai_company/**/*.py: Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__); all error paths must log at WARNING or ERROR with context before raising.

Also applies to: 57-63, 98-122, 145-165, 198-211

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/decomposition/models.py` around lines 1 - 20, Import
get_logger and create logger = get_logger(__name__), then update each Pydantic
validator (the methods decorated with `@model_validator` and any computed_field
validators in the file) to catch validation exceptions, log a warning/error with
contextual details (model/class name, incoming field values or kwargs, and the
exception message) and re-raise the original exception; apply this pattern to
all validator methods referenced in the file (the validators around lines
~57-63, ~98-122, ~145-165, and ~198-211) so failures are logged before being
raised.

Comment on lines +145 to +165
@model_validator(mode="after")
def _validate_plan_task_consistency(self) -> Self:
"""Ensure created_tasks align with plan subtasks."""
if len(self.created_tasks) != len(self.plan.subtasks):
msg = (
f"created_tasks count ({len(self.created_tasks)}) "
f"does not match plan subtask count ({len(self.plan.subtasks)})"
)
raise ValueError(msg)

task_ids = {t.id for t in self.created_tasks}
edge_ids = {eid for edge in self.dependency_edges for eid in edge}
unknown_edge_ids = edge_ids - task_ids
if unknown_edge_ids:
msg = (
f"dependency_edges reference unknown task IDs: "
f"{sorted(unknown_edge_ids)}"
)
raise ValueError(msg)

return self
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Strengthen DecompositionResult consistency checks.

This validator only compares counts and verifies that edge endpoints exist. It still accepts a result where the created task IDs differ from plan.subtasks, and it also accepts missing or extra dependency_edges.

Suggested fix
     `@model_validator`(mode="after")
     def _validate_plan_task_consistency(self) -> Self:
         """Ensure created_tasks align with plan subtasks."""
         if len(self.created_tasks) != len(self.plan.subtasks):
             msg = (
                 f"created_tasks count ({len(self.created_tasks)}) "
                 f"does not match plan subtask count ({len(self.plan.subtasks)})"
             )
             raise ValueError(msg)
 
-        task_ids = {t.id for t in self.created_tasks}
+        expected_task_ids = {subtask.id for subtask in self.plan.subtasks}
+        task_ids = {t.id for t in self.created_tasks}
+        if task_ids != expected_task_ids:
+            msg = (
+                f"created_tasks IDs {sorted(task_ids)} do not match "
+                f"plan subtask IDs {sorted(expected_task_ids)}"
+            )
+            raise ValueError(msg)
+
+        expected_edges = {
+            (dep_id, subtask.id)
+            for subtask in self.plan.subtasks
+            for dep_id in subtask.dependencies
+        }
         edge_ids = {eid for edge in self.dependency_edges for eid in edge}
         unknown_edge_ids = edge_ids - task_ids
         if unknown_edge_ids:
             msg = (
                 f"dependency_edges reference unknown task IDs: "
                 f"{sorted(unknown_edge_ids)}"
             )
             raise ValueError(msg)
+
+        if set(self.dependency_edges) != expected_edges:
+            msg = "dependency_edges do not match plan subtask dependencies"
+            raise ValueError(msg)
 
         return self
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/decomposition/models.py` around lines 145 - 165, The
validator must ensure created_tasks exactly match the plan's subtasks and that
dependency_edges are neither missing nor extraneous: replace the current
count-only check with a set equality check comparing {t.id for t in
self.created_tasks} to {s.id for s in self.plan.subtasks} and raise ValueError
if they differ; then strengthen edge validation by (a) if self.plan exposes an
expected dependency set (e.g., self.plan.dependency_edges) compare the
set(self.dependency_edges) to that expected set and raise on mismatch, and (b)
otherwise enforce no self-loops, no duplicate edges, and that every edge
endpoint is in the subtask id set (use symbols _validate_plan_task_consistency,
created_tasks, plan.subtasks, dependency_edges to locate code).

Comment on lines +104 to +113
# 1. Classify structure
structure = self._classifier.classify(task)

# 2. Decompose via strategy
plan = await self._strategy.decompose(task, context)

# Override structure if classifier found something different
if plan.task_structure != structure:
plan = plan.model_copy(update={"task_structure": structure})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don’t replace the plan’s declared structure with a parent-task heuristic.

classify(task) only sees the parent task inputs, while plan.task_structure is attached to the actual subtask graph. Silently rewriting the plan here can make task_structure disagree with plan.subtasks and with any topology already chosen for that plan.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/decomposition/service.py` around lines 104 - 113, The
code is wrongly overwriting the plan's declared task_structure with the
parent-only classifier result; instead, do not replace plan.task_structure
returned from self._strategy.decompose(task, context). Update the logic in the
method containing calls to self._classifier.classify(task) and
self._strategy.decompose(...) so that you only use the classifier result if the
returned plan has no structure (e.g., plan.task_structure is None or falsy);
otherwise leave plan.task_structure and plan.subtasks intact (or if you must
detect mismatches, emit a warning/error referencing plan.task_structure and
plan.subtasks rather than silently calling plan.model_copy(...)). Ensure you
remove the unconditional model_copy update that sets task_structure from the
classifier.

Comment on lines +7 to +13
from typing import Self

from pydantic import BaseModel, ConfigDict, Field, model_validator

from ai_company.core.agent import AgentIdentity # noqa: TC001
from ai_company.core.enums import CoordinationTopology
from ai_company.core.types import NotBlankStr # noqa: TC001
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Initialize logger and log these validator failures.

This business-logic module raises three validation errors but never creates logger, so invalid routing payloads are only visible if the caller preserves the exception. Please add structured WARNING/ERROR logging on each raise path.

As per coding guidelines, "Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__)", "Always use constants from domain-specific modules under ai_company.observability.events for event names", and "All error paths must log at WARNING or ERROR with context before raising".

Also applies to: 65-75, 99-111, 145-157

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/routing/models.py` around lines 7 - 13, Import
get_logger and the observability event constants, create logger =
get_logger(__name__), then add structured logging (WARNING or ERROR) with the
appropriate event constant and contextual fields immediately before each raise
in the module's validator functions (the pydantic `@model_validator` blocks and
any validation logic around CoordinationTopology/AgentIdentity/NotBlankStr).
Specifically, update the three raise paths referenced (around lines 65-75,
99-111, 145-157) to call logger.warning or logger.error with a descriptive
message, include the event from ai_company.observability.events and relevant
context (payload, field names, invalid values), and only then re-raise the
validation exception so failures are logged consistently.

Comment on lines +99 to +111
@model_validator(mode="after")
def _validate_unique_subtask_ids(self) -> Self:
"""Ensure no subtask appears in both decisions and unroutable."""
decision_ids = {d.subtask_id for d in self.decisions}
unroutable_set = set(self.unroutable)
overlap = decision_ids & unroutable_set
if overlap:
msg = (
f"Subtask IDs appear in both decisions and "
f"unroutable: {sorted(overlap)}"
)
raise ValueError(msg)
return self
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject duplicate subtask IDs inside each collection.

Line 102 converts decisions to a set immediately, so duplicate subtask_id values are silently discarded instead of being rejected. Repeated IDs in unroutable also pass today, which can leave one RoutingResult representing the same subtask more than once.

🔧 Proposed fix
     `@model_validator`(mode="after")
     def _validate_unique_subtask_ids(self) -> Self:
-        """Ensure no subtask appears in both decisions and unroutable."""
-        decision_ids = {d.subtask_id for d in self.decisions}
-        unroutable_set = set(self.unroutable)
-        overlap = decision_ids & unroutable_set
+        """Ensure each subtask appears at most once in the result."""
+        decision_ids = tuple(d.subtask_id for d in self.decisions)
+        if len(decision_ids) != len(set(decision_ids)):
+            raise ValueError("decisions contains duplicate subtask IDs")
+
+        if len(self.unroutable) != len(set(self.unroutable)):
+            raise ValueError("unroutable contains duplicate subtask IDs")
+
+        overlap = set(decision_ids) & set(self.unroutable)
         if overlap:
             msg = (
                 f"Subtask IDs appear in both decisions and "
                 f"unroutable: {sorted(overlap)}"
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@model_validator(mode="after")
def _validate_unique_subtask_ids(self) -> Self:
"""Ensure no subtask appears in both decisions and unroutable."""
decision_ids = {d.subtask_id for d in self.decisions}
unroutable_set = set(self.unroutable)
overlap = decision_ids & unroutable_set
if overlap:
msg = (
f"Subtask IDs appear in both decisions and "
f"unroutable: {sorted(overlap)}"
)
raise ValueError(msg)
return self
`@model_validator`(mode="after")
def _validate_unique_subtask_ids(self) -> Self:
"""Ensure each subtask appears at most once in the result."""
decision_ids = tuple(d.subtask_id for d in self.decisions)
if len(decision_ids) != len(set(decision_ids)):
raise ValueError("decisions contains duplicate subtask IDs")
if len(self.unroutable) != len(set(self.unroutable)):
raise ValueError("unroutable contains duplicate subtask IDs")
overlap = set(decision_ids) & set(self.unroutable)
if overlap:
msg = (
f"Subtask IDs appear in both decisions and "
f"unroutable: {sorted(overlap)}"
)
raise ValueError(msg)
return self
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/routing/models.py` around lines 99 - 111, The validator
_validate_unique_subtask_ids currently silences duplicates by building sets;
instead explicitly detect and reject duplicate subtask_id values inside both
decisions and unroutable before checking cross-collection overlap: scan
self.decisions and self.unroutable to collect counts (or use a seen set) to find
any subtask_ids occurring more than once and raise a ValueError listing those
duplicates (referencing symbols decisions, unroutable, and the validator
_validate_unique_subtask_ids), then continue to compute decision_ids and
unroutable_set and raise the existing overlap error if any IDs appear in both.

Comment on lines +57 to +79
# Explicit override takes precedence
if task.coordination_topology != CoordinationTopology.AUTO:
logger.debug(
TASK_ROUTING_TOPOLOGY_SELECTED,
task_id=task.id,
topology=task.coordination_topology.value,
source="explicit",
)
return task.coordination_topology

# Auto-select based on structure
structure = plan.task_structure
artifact_count = len(task.artifacts_expected)

if structure == TaskStructure.SEQUENTIAL:
topology = self._config.sequential_override
elif structure == TaskStructure.PARALLEL:
if artifact_count > self._config.parallel_artifact_threshold:
topology = CoordinationTopology.DECENTRALIZED
else:
topology = self._config.parallel_default
elif structure == TaskStructure.MIXED:
topology = self._config.mixed_default
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Respect an explicit topology already set on the decomposition plan.

If task.coordination_topology is auto but plan.coordination_topology is concrete, this code still falls through to heuristic auto-selection and discards the plan’s explicit choice.

Suggested fix
         if task.coordination_topology != CoordinationTopology.AUTO:
             logger.debug(
                 TASK_ROUTING_TOPOLOGY_SELECTED,
                 task_id=task.id,
                 topology=task.coordination_topology.value,
                 source="explicit",
             )
             return task.coordination_topology
+
+        if plan.coordination_topology != CoordinationTopology.AUTO:
+            logger.debug(
+                TASK_ROUTING_TOPOLOGY_SELECTED,
+                task_id=task.id,
+                topology=plan.coordination_topology.value,
+                source="plan",
+            )
+            return plan.coordination_topology
 
         # Auto-select based on structure
         structure = plan.task_structure
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/routing/topology_selector.py` around lines 57 - 79, The
selector currently ignores a concrete topology set on the decomposition plan
when task.coordination_topology is AUTO; update the selection logic so that
after checking the task-level explicit override (task.coordination_topology) you
check plan.coordination_topology and if it's not CoordinationTopology.AUTO, log
using TASK_ROUTING_TOPOLOGY_SELECTED with source="plan" and return
plan.coordination_topology (instead of continuing to heuristic auto-selection);
keep the existing heuristics (_config.sequential_override,
_config.parallel_default, _config.mixed_default and parallel_artifact_threshold)
for true AUTO cases.

Comment on lines +16 to +17
if TYPE_CHECKING:
from ai_company.core.task import Task
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Remove unused TYPE_CHECKING guard.

The TYPE_CHECKING block imports Task, but it's not used for type hints in this file. The runtime import at line 180 (from ai_company.core.task import Task) handles the actual usage. This guard is misleading.

♻️ Proposed fix
-from typing import TYPE_CHECKING
-
 import pytest
 
 from ai_company.core.enums import (
...
 )
 
-if TYPE_CHECKING:
-    from ai_company.core.task import Task
 from ai_company.engine.decomposition.models import (
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_decomposition_models.py` around lines 16 - 17, Remove
the unused TYPE_CHECKING guard that imports Task: delete the "if TYPE_CHECKING:
from ai_company.core.task import Task" block (the test uses the runtime import
"from ai_company.core.task import Task" at line ~180), so eliminate the
misleading conditional import and any reference to TYPE_CHECKING in this file to
keep imports clear and avoid dead code.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (5)
src/ai_company/engine/decomposition/dag.py (1)

140-179: ⚠️ Potential issue | 🟠 Major

Check missing references before cycle-based algorithms.

topological_sort() and parallel_groups() are public entry points, but both skip _check_missing_references(). A graph like ("a" depends on "missing") currently increments in_degree["a"], produces no ready nodes, and raises DecompositionCycleError even though the actual problem is an unknown dependency.

Suggested fix
     def topological_sort(self) -> tuple[str, ...]:
         """Return subtask IDs in topological execution order.
@@
         Raises:
             DecompositionCycleError: If a cycle prevents sorting.
         """
+        self._check_missing_references()
         in_degree: dict[str, int] = dict.fromkeys(self._subtask_ids, 0)
@@
     def parallel_groups(self) -> tuple[tuple[str, ...], ...]:
         """Compute groups of subtasks that can execute in parallel.
@@
         Raises:
             DecompositionCycleError: If a cycle prevents grouping.
         """
+        self._check_missing_references()
         in_degree: dict[str, int] = dict.fromkeys(self._subtask_ids, 0)

Also applies to: 181-220

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/decomposition/dag.py` around lines 140 - 179,
topological_sort() (and likewise parallel_groups()) currently proceed without
validating that all dependency IDs exist, causing missing references to be
misreported as cycles; call the existing _check_missing_references() at the
start of topological_sort and at the start of parallel_groups to detect and
raise the correct missing-reference error before running Kahn's algorithm or
group computation, ensuring these public entry points validate dependencies
up-front (refer to methods topological_sort, parallel_groups and helper
_check_missing_references).
src/ai_company/engine/decomposition/service.py (2)

121-133: ⚠️ Potential issue | 🟠 Major

Copy subtask dependencies onto each created Task.

The DAG survives in dependency_edges, but every child Task is still created with the Task.dependencies default of (). Any downstream flow that persists or schedules only created_tasks loses the dependency information.

Suggested fix
             child_task = Task(
                 id=subtask_def.id,
                 title=subtask_def.title,
                 description=subtask_def.description,
+                dependencies=subtask_def.dependencies,
                 type=task.type,
                 priority=task.priority,
                 project=task.project,
                 created_by=task.created_by,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/decomposition/service.py` around lines 121 - 133, The
created child Task currently ignores the subtask's dependencies (leaving
Task.dependencies as the default), so update the Task construction for
child_task in create/subtask handling to copy the dependency data from the
subtask definition (e.g., use subtask_def.dependencies or the equivalent
dependency field used in the diff) into the Task.dependencies field so
downstream consumers of created_tasks retain the DAG; ensure you convert the
dependency collection to the same type Task.dependencies expects (tuple/list)
and preserve IDs referenced in dependency_edges.

110-113: ⚠️ Potential issue | 🟠 Major

Don't replace the plan's declared structure with a parent-task heuristic.

classify(task) only sees the parent task inputs. plan.task_structure is attached to the actual subtask graph returned by the strategy, so overwriting it here can make plan.task_structure, plan.subtasks, and any downstream topology derived from the plan disagree.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/decomposition/service.py` around lines 110 - 113, The
code overwrites the plan's declared subtask topology with a parent-only
heuristic (classify(task)) by assigning plan =
plan.model_copy(update={"task_structure": structure}); instead, avoid replacing
plan.task_structure with the classifier result — either remove this assignment
entirely or only apply it when plan.task_structure is empty/None (e.g., check if
plan.task_structure is falsy before calling model_copy). Reference the symbols
plan, task_structure, structure and classify(task) when making the change so
plan.subtasks and downstream topology remain consistent with the
strategy-produced graph.
src/ai_company/engine/routing/models.py (2)

100-111: ⚠️ Potential issue | 🟠 Major

Reject duplicate subtask IDs inside decisions and unroutable.

Building sets here silently drops repeated subtask_id values, so RoutingResult can still represent the same subtask multiple times as long as it does not overlap across the two collections.

Suggested fix
     `@model_validator`(mode="after")
     def _validate_unique_subtask_ids(self) -> Self:
-        """Ensure no subtask appears in both decisions and unroutable."""
-        decision_ids = {d.subtask_id for d in self.decisions}
-        unroutable_set = set(self.unroutable)
-        overlap = decision_ids & unroutable_set
+        """Ensure each subtask appears at most once in the result."""
+        decision_ids = tuple(d.subtask_id for d in self.decisions)
+        if len(decision_ids) != len(set(decision_ids)):
+            msg = "decisions contains duplicate subtask IDs"
+            raise ValueError(msg)
+
+        if len(self.unroutable) != len(set(self.unroutable)):
+            msg = "unroutable contains duplicate subtask IDs"
+            raise ValueError(msg)
+
+        overlap = set(decision_ids) & set(self.unroutable)
         if overlap:
             msg = (
                 f"Subtask IDs appear in both decisions and "
                 f"unroutable: {sorted(overlap)}"
             )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/routing/models.py` around lines 100 - 111, The current
_validate_unique_subtask_ids validator only checks for overlap between decisions
and unroutable but lets duplicate subtask_id entries within decisions or within
unroutable pass silently; update _validate_unique_subtask_ids to also detect
duplicate subtask_id values inside self.decisions and inside self.unroutable
(e.g. by counting subtask_id occurrences or comparing list vs set lengths), and
raise a ValueError listing which subtask_ids are duplicated and whether they are
duplicated in decisions or unroutable; keep the existing overlap check for
cross-collection duplicates as well.

7-13: ⚠️ Potential issue | 🟠 Major

Initialize logger and log these validator failures.

This business-logic module raises validation errors from three model validators without a logger, so invalid routing payloads are only visible if the caller preserves the exception. Log each failure at WARNING/ERROR with structured context before raising.

As per coding guidelines, "Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__)" and "All error paths must log at WARNING or ERROR with context before raising".

Also applies to: 65-75, 100-111, 146-157

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/routing/models.py` around lines 7 - 13, This module
lacks a logger and fails to record validation failures; import get_logger with
"from ai_company.observability import get_logger" and create "logger =
get_logger(__name__)" at top, then in each model validator (the three
`@model_validator` methods referenced around lines 65-75, 100-111, and 146-157)
catch the validation failure paths and log before re-raising: call
logger.warning or logger.error with structured context (include the model/class
name, the incoming payload/values and any relevant field names, and the
exception or reason) inside those validator functions (refer to each
model_validator-decorated function by its name) so the failure is recorded
before raising the original validation error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/engine/routing/scorer.py`:
- Around line 57-61: The constructor __init__ in scorer.py validates min_score
and raises ValueError but doesn't log; before raising, emit a warning or error
log with context (include the invalid min_score value and the accepted range)
using the module logger (e.g., logger.warning or logger.error) so callers see
why construction failed; then raise the ValueError as before. Ensure you update
the __init__ method that checks min_score to log the message immediately prior
to raising.

---

Duplicate comments:
In `@src/ai_company/engine/decomposition/dag.py`:
- Around line 140-179: topological_sort() (and likewise parallel_groups())
currently proceed without validating that all dependency IDs exist, causing
missing references to be misreported as cycles; call the existing
_check_missing_references() at the start of topological_sort and at the start of
parallel_groups to detect and raise the correct missing-reference error before
running Kahn's algorithm or group computation, ensuring these public entry
points validate dependencies up-front (refer to methods topological_sort,
parallel_groups and helper _check_missing_references).

In `@src/ai_company/engine/decomposition/service.py`:
- Around line 121-133: The created child Task currently ignores the subtask's
dependencies (leaving Task.dependencies as the default), so update the Task
construction for child_task in create/subtask handling to copy the dependency
data from the subtask definition (e.g., use subtask_def.dependencies or the
equivalent dependency field used in the diff) into the Task.dependencies field
so downstream consumers of created_tasks retain the DAG; ensure you convert the
dependency collection to the same type Task.dependencies expects (tuple/list)
and preserve IDs referenced in dependency_edges.
- Around line 110-113: The code overwrites the plan's declared subtask topology
with a parent-only heuristic (classify(task)) by assigning plan =
plan.model_copy(update={"task_structure": structure}); instead, avoid replacing
plan.task_structure with the classifier result — either remove this assignment
entirely or only apply it when plan.task_structure is empty/None (e.g., check if
plan.task_structure is falsy before calling model_copy). Reference the symbols
plan, task_structure, structure and classify(task) when making the change so
plan.subtasks and downstream topology remain consistent with the
strategy-produced graph.

In `@src/ai_company/engine/routing/models.py`:
- Around line 100-111: The current _validate_unique_subtask_ids validator only
checks for overlap between decisions and unroutable but lets duplicate
subtask_id entries within decisions or within unroutable pass silently; update
_validate_unique_subtask_ids to also detect duplicate subtask_id values inside
self.decisions and inside self.unroutable (e.g. by counting subtask_id
occurrences or comparing list vs set lengths), and raise a ValueError listing
which subtask_ids are duplicated and whether they are duplicated in decisions or
unroutable; keep the existing overlap check for cross-collection duplicates as
well.
- Around line 7-13: This module lacks a logger and fails to record validation
failures; import get_logger with "from ai_company.observability import
get_logger" and create "logger = get_logger(__name__)" at top, then in each
model validator (the three `@model_validator` methods referenced around lines
65-75, 100-111, and 146-157) catch the validation failure paths and log before
re-raising: call logger.warning or logger.error with structured context (include
the model/class name, the incoming payload/values and any relevant field names,
and the exception or reason) inside those validator functions (refer to each
model_validator-decorated function by its name) so the failure is recorded
before raising the original validation error.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 1d16dcd3-813d-4c10-a8e9-e964a0a34532

📥 Commits

Reviewing files that changed from the base of the PR and between dfc99f8 and 93f602e.

📒 Files selected for processing (19)
  • DESIGN_SPEC.md
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/observability/events/decomposition.py
  • src/ai_company/observability/events/task_routing.py
  • tests/unit/engine/test_decomposition_classifier.py
  • tests/unit/engine/test_decomposition_dag.py
  • tests/unit/engine/test_decomposition_models.py
  • tests/unit/engine/test_decomposition_service.py
  • tests/unit/engine/test_routing_models.py
  • tests/unit/engine/test_routing_scorer.py
  • tests/unit/engine/test_routing_service.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Do not use from __future__ import annotations in Python 3.14+ code — Python 3.14 has native PEP 649 lazy annotations
Use except A, B: (no parentheses) for exception syntax in Python 3.14 — PEP 758 except syntax enforced by ruff
All public functions and classes must have type hints; mypy strict mode is enforced
All public classes and functions must have Google-style docstrings; ruff D rules enforce this
Create new objects instead of mutating existing ones; use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement of non-Pydantic internal collections (registries, BaseTool)
For non-Pydantic internal collections and dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, serializing for persistence)
Use frozen Pydantic models for config/identity; use separate mutable-via-copy models (model_copy(update=...)) for runtime state that evolves; never mix static config fields with mutable runtime fields
Use Pydantic v2 (BaseModel, model_validator, computed_field, ConfigDict); use @computed_field for derived values instead of storing redundant fields
Use NotBlankStr (from core.types) for all identifier/name fields — including optional (NotBlankStr | None) and tuple (tuple[NotBlankStr, ...]) variants — instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls); prefer structured concurrency over bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly; never silently swallow errors
Validate at system boundaries (user input, external APIs, config files)
Line length must be 88 characters (enforced by ruff)

Files:

  • tests/unit/engine/test_decomposition_models.py
  • tests/unit/engine/test_routing_models.py
  • tests/unit/engine/test_decomposition_classifier.py
  • src/ai_company/engine/decomposition/rollup.py
  • tests/unit/engine/test_decomposition_dag.py
  • src/ai_company/engine/decomposition/models.py
  • tests/unit/engine/test_decomposition_service.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/routing/service.py
  • tests/unit/engine/test_routing_service.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/decomposition/service.py
  • tests/unit/engine/test_routing_scorer.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/observability/events/task_routing.py
  • src/ai_company/observability/events/decomposition.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, or @pytest.mark.slow as appropriate
Prefer @pytest.mark.parametrize for testing similar cases
Use vendor-agnostic test names: test-provider, test-small-001, etc. instead of real vendor names

Files:

  • tests/unit/engine/test_decomposition_models.py
  • tests/unit/engine/test_routing_models.py
  • tests/unit/engine/test_decomposition_classifier.py
  • tests/unit/engine/test_decomposition_dag.py
  • tests/unit/engine/test_decomposition_service.py
  • tests/unit/engine/test_routing_service.py
  • tests/unit/engine/test_routing_scorer.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Always use variable name logger (not _logger, not log) for the logger instance
Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly
Always use structured logging with kwargs: logger.info(EVENT, key=value) — never use formatted strings like logger.info("msg %s", val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
DEBUG logging is for object creation, internal flow, and entry/exit of key functions

Files:

  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/observability/events/task_routing.py
  • src/ai_company/observability/events/decomposition.py
src/ai_company/engine/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

RetryExhaustedError signals that all retries failed — the engine layer catches this to trigger fallback chains

Files:

  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/decomposition/manual.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples; use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases

Files:

  • src/ai_company/engine/decomposition/rollup.py
  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
  • src/ai_company/engine/routing/service.py
  • src/ai_company/engine/decomposition/classifier.py
  • src/ai_company/engine/routing/scorer.py
  • src/ai_company/engine/decomposition/dag.py
  • src/ai_company/engine/decomposition/service.py
  • src/ai_company/engine/decomposition/manual.py
  • src/ai_company/observability/events/task_routing.py
  • src/ai_company/observability/events/decomposition.py
🧠 Learnings (11)
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic must import `from ai_company.observability import get_logger` and create `logger = get_logger(__name__)`

Applied to files:

  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising

Applied to files:

  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use variable name `logger` (not `_logger`, not `log`) for the logger instance

Applied to files:

  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : DEBUG logging is for object creation, internal flow, and entry/exit of key functions

Applied to files:

  • src/ai_company/engine/decomposition/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO level

Applied to files:

  • src/ai_company/engine/decomposition/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code

Applied to files:

  • src/ai_company/engine/decomposition/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to **/*.py : Validate at system boundaries (user input, external APIs, config files)

Applied to files:

  • src/ai_company/engine/decomposition/models.py
  • src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to **/*.py : Use frozen Pydantic models for config/identity; use separate mutable-via-copy models (model_copy(update=...)) for runtime state that evolves; never mix static config fields with mutable runtime fields

Applied to files:

  • src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use structured logging with kwargs: `logger.info(EVENT, key=value)` — never use formatted strings like `logger.info("msg %s", val)`

Applied to files:

  • src/ai_company/engine/routing/models.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to src/ai_company/**/*.py : Always use constants from domain-specific modules under ai_company.observability.events for event names (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget); import directly

Applied to files:

  • src/ai_company/observability/events/task_routing.py
  • src/ai_company/observability/events/decomposition.py
📚 Learning: 2026-03-07T19:34:16.191Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T19:34:16.191Z
Learning: Applies to **/*.py : Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (e.g. multiple tool invocations, parallel agent calls); prefer structured concurrency over bare create_task

Applied to files:

  • DESIGN_SPEC.md
🧬 Code graph analysis (12)
tests/unit/engine/test_decomposition_models.py (1)
src/ai_company/engine/decomposition/models.py (6)
  • DecompositionContext (244-269)
  • DecompositionPlan (66-122)
  • DecompositionResult (125-165)
  • SubtaskDefinition (22-63)
  • SubtaskStatusRollup (168-241)
  • derived_parent_status (217-241)
tests/unit/engine/test_decomposition_classifier.py (4)
src/ai_company/core/artifact.py (1)
  • ExpectedArtifact (13-28)
src/ai_company/core/enums.py (4)
  • ArtifactType (180-185)
  • Priority (162-168)
  • TaskStructure (270-279)
  • TaskType (151-159)
src/ai_company/core/task.py (2)
  • AcceptanceCriterion (26-42)
  • Task (45-257)
src/ai_company/tools/base.py (1)
  • description (115-117)
src/ai_company/engine/decomposition/rollup.py (3)
src/ai_company/core/enums.py (1)
  • TaskStatus (122-148)
src/ai_company/engine/decomposition/models.py (2)
  • SubtaskStatusRollup (168-241)
  • derived_parent_status (217-241)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
tests/unit/engine/test_decomposition_dag.py (3)
src/ai_company/engine/decomposition/dag.py (6)
  • DependencyGraph (26-244)
  • validate (70-86)
  • topological_sort (140-179)
  • parallel_groups (181-220)
  • get_dependents (222-232)
  • get_dependencies (234-244)
src/ai_company/engine/decomposition/models.py (1)
  • SubtaskDefinition (22-63)
src/ai_company/engine/errors.py (2)
  • DecompositionCycleError (54-55)
  • DecompositionError (50-51)
src/ai_company/engine/decomposition/models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (171-177)
  • CoordinationTopology (282-293)
  • TaskStatus (122-148)
  • TaskStructure (270-279)
src/ai_company/core/task.py (1)
  • Task (45-257)
tests/unit/engine/test_decomposition_service.py (3)
src/ai_company/core/enums.py (4)
  • Priority (162-168)
  • TaskStatus (122-148)
  • TaskStructure (270-279)
  • Complexity (171-177)
src/ai_company/engine/decomposition/service.py (3)
  • DecompositionService (34-179)
  • decompose_task (51-88)
  • rollup_status (165-179)
src/ai_company/engine/errors.py (1)
  • DecompositionCycleError (54-55)
src/ai_company/engine/routing/models.py (3)
src/ai_company/core/agent.py (1)
  • AgentIdentity (246-304)
src/ai_company/core/enums.py (1)
  • CoordinationTopology (282-293)
src/ai_company/engine/routing/scorer.py (1)
  • score (68-156)
src/ai_company/engine/routing/service.py (5)
src/ai_company/engine/routing/models.py (2)
  • RoutingDecision (41-76)
  • RoutingResult (79-112)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/routing/scorer.py (3)
  • AgentTaskScorer (35-156)
  • score (68-156)
  • min_score (64-66)
src/ai_company/engine/routing/topology_selector.py (2)
  • TopologySelector (24-98)
  • select (43-98)
src/ai_company/engine/decomposition/classifier.py (3)
src/ai_company/core/enums.py (1)
  • TaskStructure (270-279)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/routing/scorer.py (5)
src/ai_company/core/enums.py (3)
  • AgentStatus (24-29)
  • Complexity (171-177)
  • SeniorityLevel (6-21)
src/ai_company/engine/routing/models.py (1)
  • RoutingCandidate (16-38)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/core/agent.py (1)
  • AgentIdentity (246-304)
src/ai_company/engine/decomposition/models.py (1)
  • SubtaskDefinition (22-63)
src/ai_company/engine/decomposition/dag.py (3)
src/ai_company/engine/errors.py (2)
  • DecompositionCycleError (54-55)
  • DecompositionError (50-51)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/engine/decomposition/models.py (1)
  • SubtaskDefinition (22-63)
src/ai_company/engine/decomposition/manual.py (3)
src/ai_company/engine/errors.py (2)
  • DecompositionDepthError (58-59)
  • DecompositionError (50-51)
src/ai_company/core/task.py (1)
  • Task (45-257)
src/ai_company/engine/decomposition/models.py (2)
  • DecompositionContext (244-269)
  • DecompositionPlan (66-122)
🔇 Additional comments (25)
tests/unit/engine/test_decomposition_models.py (2)

3-17: Remove unused TYPE_CHECKING guard.

The TYPE_CHECKING block imports Task (line 17), but it's never used for type hints in this file. The actual Task usage is a runtime import inside the helper function at line 180. This guard is dead code.

♻️ Proposed fix
-from typing import TYPE_CHECKING
-
 import pytest
 
 from ai_company.core.enums import (
     Complexity,
     CoordinationTopology,
     Priority,
     TaskStatus,
     TaskStructure,
     TaskType,
 )
 
-if TYPE_CHECKING:
-    from ai_company.core.task import Task
 from ai_company.engine.decomposition.models import (

196-212: LGTM!

The test correctly validates DecompositionResult construction and uses the helper function _make_result_task appropriately. The fixture sample_task_with_criteria is expected to be provided by conftest.

src/ai_company/engine/decomposition/rollup.py (1)

22-76: LGTM!

The StatusRollup.compute method is well-structured with:

  • Proper warning log for empty subtask statuses (lines 45-51)
  • Correct debug log after computation (lines 69-74)
  • Clean status counting logic delegating derived status to the model's computed field
src/ai_company/engine/routing/scorer.py (1)

94-117: LGTM!

The skill overlap scoring logic correctly handles:

  • Primary skill matching with 0.4 weight
  • Secondary skill matching (excluding already-matched primary skills) with 0.2 weight
  • Proper handling when no skills are required (skipping skill-based scoring)
src/ai_company/observability/events/task_routing.py (1)

1-13: LGTM!

Event constants are well-structured with consistent naming conventions. The Final[str] typing ensures immutability and type safety. Based on learnings: "Always use constants from domain-specific modules under ai_company.observability.events for event names" — this module properly provides those constants for task routing.

src/ai_company/engine/decomposition/models.py (3)

1-20: Add logger and log validator failures before raising.

This module contains multiple validators that raise ValueError without logging. Per coding guidelines, all error paths must log at WARNING or ERROR with context before raising.

🔧 Proposed fix (partial - add logger)
 from ai_company.core.enums import (
     Complexity,
     CoordinationTopology,
     TaskStatus,
     TaskStructure,
 )
 from ai_company.core.task import Task  # noqa: TC001
 from ai_company.core.types import NotBlankStr  # noqa: TC001
+from ai_company.observability import get_logger
+
+logger = get_logger(__name__)

Then update each validator to log before raising (e.g., in _validate_no_self_dependency, _validate_subtasks, _validate_plan_task_consistency, _validate_counts).

As per coding guidelines: "Every module with business logic must import from ai_company.observability import get_logger and create logger = get_logger(__name__)" and "All error paths must log at WARNING or ERROR with context before raising."


145-165: Strengthen DecompositionResult consistency checks.

The validator only compares task counts and checks edge endpoints. It doesn't verify that created_tasks IDs match plan.subtasks IDs, allowing mismatched task sets to pass validation.

🔧 Proposed fix
     `@model_validator`(mode="after")
     def _validate_plan_task_consistency(self) -> Self:
         """Ensure created_tasks align with plan subtasks."""
         if len(self.created_tasks) != len(self.plan.subtasks):
             msg = (
                 f"created_tasks count ({len(self.created_tasks)}) "
                 f"does not match plan subtask count ({len(self.plan.subtasks)})"
             )
             raise ValueError(msg)
 
+        expected_ids = {s.id for s in self.plan.subtasks}
         task_ids = {t.id for t in self.created_tasks}
+        if task_ids != expected_ids:
+            msg = (
+                f"created_tasks IDs {sorted(task_ids)} do not match "
+                f"plan subtask IDs {sorted(expected_ids)}"
+            )
+            raise ValueError(msg)
+
         edge_ids = {eid for edge in self.dependency_edges for eid in edge}
         unknown_edge_ids = edge_ids - task_ids

213-242: LGTM!

The derived_parent_status computed field correctly implements the status derivation logic with proper priority ordering:

  1. Empty total → CREATED
  2. All completed → COMPLETED
  3. All cancelled → CANCELLED
  4. Any failed → FAILED
  5. Any in progress → IN_PROGRESS
  6. Any blocked → BLOCKED
  7. Mixed terminal (completed + cancelled = total) → COMPLETED
  8. Default fallback → IN_PROGRESS
src/ai_company/engine/decomposition/manual.py (1)

26-92: LGTM!

The ManualDecompositionStrategy implementation is well-structured:

  • Proper validation with warning logs before raising errors (lines 63, 71, 79)
  • Clear error messages with context
  • Debug log on successful completion
  • Clean separation of validation concerns (task ID mismatch, depth limit, subtask count)
src/ai_company/engine/decomposition/classifier.py (2)

46-104: LGTM!

The TaskStructureClassifier.classify method implements a sensible heuristic approach:

  • Short-circuits when task_structure is explicitly set (lines 68-75)
  • MIXED classification triggered only by conflicting language signals (line 86-87)
  • Defaults to SEQUENTIAL as the safest fallback (line 94)
  • Comprehensive debug logging with all scoring details (lines 96-103)

106-150: LGTM!

The language scoring methods properly scan all relevant text sources (title, description, acceptance criteria) and use pre-compiled regex patterns for efficiency.

src/ai_company/observability/events/decomposition.py (1)

1-15: LGTM!

Event constants are comprehensive and cover the full decomposition lifecycle (started, completed, failed) plus specific events (subtask creation, validation errors, graph operations, rollup computation). The Final[str] typing ensures immutability. Based on learnings: "Always use constants from domain-specific modules under ai_company.observability.events for event names."

src/ai_company/engine/routing/service.py (4)

1-31: LGTM!

Module structure follows conventions correctly: proper docstring, TYPE_CHECKING imports for avoiding circular dependencies, and logger instantiated via get_logger(__name__) as required. Event constants are imported directly from the domain-specific module.


33-49: LGTM!

Clean class design with dependency injection for AgentTaskScorer and TopologySelector. The __slots__ definition is good for memory efficiency. Google-style docstring present.


51-100: LGTM!

The route() method follows proper patterns:

  • Complete Google-style docstring with Args/Returns.
  • Early exit for empty agents case with appropriate WARNING log.
  • Exception handling logs with logger.exception() (includes traceback) before re-raising.
  • All logging uses structured kwargs as required by guidelines.

102-178: LGTM!

The _do_route() implementation is well-structured:

  • Topology is correctly computed once per parent task and applied uniformly to all routing decisions (per §6.9 design).
  • Mutable lists used during iteration are properly converted to tuples for the final RoutingResult.
  • Logging follows the structured kwargs pattern at appropriate levels (WARNING for unroutable, DEBUG for each routed subtask, INFO for completion).
  • Clean separation between scoring, filtering, and decision creation.
tests/unit/engine/test_routing_service.py (6)

1-25: LGTM!

Imports are well-organized and test file follows the vendor-agnostic naming convention (test-provider, test-model-001). All necessary domain types are imported for testing.


27-110: LGTM!

Helper functions are well-designed test factories with sensible defaults. The _make_decomposition_result creates a realistic scenario with two subtasks where sub-2 depends on sub-1, enabling meaningful routing tests.


116-154: LGTM!

Comprehensive happy-path test validating skill-based routing. Correctly verifies that sub-1 (requiring Python/SQL) routes to Backend Dev and sub-2 (requiring JavaScript/React) routes to Frontend Dev.


156-178: LGTM!

Good negative test case validating the min_score filtering. An agent with unrelated skills ("cooking") and mismatched role ("chef") correctly fails to meet the routing threshold for developer subtasks.


180-224: LGTM!

Good test validating that multiple viable candidates result in the best being selected with others captured as alternatives. The test correctly creates a scenario where both agents can route the subtask.


226-317: LGTM!

Good coverage of edge cases and error handling:

  • test_topology_applied: Validates topology propagation to all decisions.
  • test_empty_agents: Validates the early-exit path when no agents are available.
  • test_inactive_agents_filtered: Ensures terminated agents are excluded (score=0).
  • test_parent_task_id_in_result: Ensures result carries the correct parent task ID.
  • test_exception_propagates: Uses MagicMock to simulate scorer failure and validates exception re-raising behavior.
DESIGN_SPEC.md (3)

783-784: LGTM!

Task definition schema correctly updated with task_structure and coordination_topology fields, matching the implemented domain models.


1162-1207: LGTM!

Documentation correctly updated to reflect the implementation:

  • Current state accurately lists implemented components (TaskStructureClassifier, DependencyGraph, ManualDecompositionStrategy, etc.).
  • Terminology consistently changed from tool_count to artifact_count throughout §6.9.
  • LLM-based decomposition correctly noted as deferred (M4+).

2368-2466: LGTM!

Project structure documentation accurately reflects the new packages:

  • engine/decomposition/ with classifier, DAG, manual strategy, models, protocol, rollup, and service modules.
  • engine/routing/ with models, scorer, service, and topology_selector modules.
  • observability/events/ updated with decomposition.py and task_routing.py event constants.

Comment on lines +66 to +72
if context.current_depth >= context.max_depth:
msg = (
f"Decomposition depth {context.current_depth} "
f"exceeds max depth {context.max_depth}"
)
logger.warning(DECOMPOSITION_VALIDATION_ERROR, error=msg)
raise DecompositionDepthError(msg)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guard condition uses >= (meets or exceeds), but the error message only says "exceeds". When current_depth == max_depth exactly, the message is factually incorrect.

Suggested change
if context.current_depth >= context.max_depth:
msg = (
f"Decomposition depth {context.current_depth} "
f"exceeds max depth {context.max_depth}"
)
logger.warning(DECOMPOSITION_VALIDATION_ERROR, error=msg)
raise DecompositionDepthError(msg)
if context.current_depth >= context.max_depth:
msg = (
f"Decomposition depth {context.current_depth} "
f"meets or exceeds max depth {context.max_depth}"
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/decomposition/manual.py
Line: 66-72

Comment:
The guard condition uses `>=` (meets or exceeds), but the error message only says "exceeds". When `current_depth == max_depth` exactly, the message is factually incorrect.

```suggestion
        if context.current_depth >= context.max_depth:
            msg = (
                f"Decomposition depth {context.current_depth} "
                f"meets or exceeds max depth {context.max_depth}"
            )
```

How can I resolve this? If you propose a fix, please make it concise.

…ef checks)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 8, 2026 09:35
@Aureliolo Aureliolo merged commit 9c7fb52 into main Mar 8, 2026
10 of 11 checks passed
@Aureliolo Aureliolo deleted the feat/task-decomposition branch March 8, 2026 09:35
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 34 out of 34 changed files in this pull request and generated 3 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

if not 0.0 <= min_score <= 1.0:
msg = f"min_score must be between 0.0 and 1.0, got {min_score}"
logger.warning(
"agent_task_scorer.invalid_min_score",
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logger warning for an invalid min_score uses a string literal event name ("agent_task_scorer.invalid_min_score") that is not defined in ai_company.observability.events.*. Elsewhere the codebase centralizes event names in those modules, and tests/unit/observability/test_events.py enforces discovery/duplication rules only for that package. Consider adding a TASK_ROUTING_* constant (or a routing/scorer-specific event constant) under observability/events/task_routing.py and using it here so event names remain centralized and discoverable.

Suggested change
"agent_task_scorer.invalid_min_score",
TASK_ROUTING_AGENT_SCORED,

Copilot uses AI. Check for mistakes.
Comment on lines +73 to +80
plan = decomposition_result.plan

logger.info(
TASK_ROUTING_STARTED,
parent_task_id=plan.parent_task_id,
subtask_count=len(plan.subtasks),
agent_count=len(available_agents),
)
Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskRoutingService.route() uses decomposition_result.plan.parent_task_id for logging/result fields but uses the passed parent_task object for topology selection. There’s currently no validation that parent_task.id matches plan.parent_task_id, so a caller mistake could silently produce routing decisions using the wrong task’s topology override and misleading logs/results. Consider validating this early (and raising TaskRoutingError/ValueError) to keep inputs consistent.

Copilot uses AI. Check for mistakes.
Comment on lines +145 to +164
@model_validator(mode="after")
def _validate_plan_task_consistency(self) -> Self:
"""Ensure created_tasks align with plan subtasks."""
if len(self.created_tasks) != len(self.plan.subtasks):
msg = (
f"created_tasks count ({len(self.created_tasks)}) "
f"does not match plan subtask count ({len(self.plan.subtasks)})"
)
raise ValueError(msg)

task_ids = {t.id for t in self.created_tasks}
edge_ids = {eid for edge in self.dependency_edges for eid in edge}
unknown_edge_ids = edge_ids - task_ids
if unknown_edge_ids:
msg = (
f"dependency_edges reference unknown task IDs: "
f"{sorted(unknown_edge_ids)}"
)
raise ValueError(msg)

Copy link

Copilot AI Mar 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DecompositionResult._validate_plan_task_consistency() checks that the number of created_tasks matches the number of plan.subtasks, but it doesn’t verify that the task IDs actually correspond to the plan’s subtask IDs (only that dependency_edges refer to some created task IDs). This can allow constructing an inconsistent DecompositionResult where the plan and created tasks disagree but validation passes. Consider validating that {t.id for t in created_tasks} equals {s.id for s in plan.subtasks} (and optionally that edges reference only plan subtask IDs).

Copilot uses AI. Check for mistakes.
Comment on lines +145 to +165
@model_validator(mode="after")
def _validate_plan_task_consistency(self) -> Self:
"""Ensure created_tasks align with plan subtasks."""
if len(self.created_tasks) != len(self.plan.subtasks):
msg = (
f"created_tasks count ({len(self.created_tasks)}) "
f"does not match plan subtask count ({len(self.plan.subtasks)})"
)
raise ValueError(msg)

task_ids = {t.id for t in self.created_tasks}
edge_ids = {eid for edge in self.dependency_edges for eid in edge}
unknown_edge_ids = edge_ids - task_ids
if unknown_edge_ids:
msg = (
f"dependency_edges reference unknown task IDs: "
f"{sorted(unknown_edge_ids)}"
)
raise ValueError(msg)

return self
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DecompositionResult validator doesn't cross-check created-task IDs against plan subtask IDs

_validate_plan_task_consistency verifies that len(created_tasks) == len(plan.subtasks) and that all edge IDs are in the created-task set — but it never checks that the actual IDs of the created tasks match the IDs declared in plan.subtasks. This means the following is silently accepted by Pydantic:

plan = DecompositionPlan(
    parent_task_id="task-1",
    subtasks=(
        SubtaskDefinition(id="sub-1", title="A", description="A"),
        SubtaskDefinition(id="sub-2", title="B", description="B"),
    ),
)
DecompositionResult(
    plan=plan,
    created_tasks=(task_with_id_sub_99, task_with_id_sub_98),  # completely different IDs
    dependency_edges=(),  # no edges → edge check also passes
)

The count check passes (2 == 2) and the edge check passes (no edges). The result looks valid but there is no semantic alignment between the plan's subtask definitions and the created tasks, which would silently break any consumer that correlates plan.subtasks[i].id with created_tasks[i].id (e.g. routing, status rollup, DAG edge lookups).

The DecompositionService always creates tasks with id=subtask_def.id, so the production path is correct. However, the model-level invariant should be enforced explicitly. Adding a set-equality check is sufficient:

task_ids = {t.id for t in self.created_tasks}
subtask_ids = {s.id for s in self.plan.subtasks}
if task_ids != subtask_ids:
    msg = (
        f"created_tasks IDs {sorted(task_ids)} do not match "
        f"plan subtask IDs {sorted(subtask_ids)}"
    )
    raise ValueError(msg)

There is no existing test that exercises this scenario (a same-count but mismatched-ID construction), confirming the gap is currently undetected.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/decomposition/models.py
Line: 145-165

Comment:
**`DecompositionResult` validator doesn't cross-check created-task IDs against plan subtask IDs**

`_validate_plan_task_consistency` verifies that `len(created_tasks) == len(plan.subtasks)` and that all edge IDs are in the created-task set — but it never checks that the actual IDs of the created tasks match the IDs declared in `plan.subtasks`. This means the following is silently accepted by Pydantic:

```python
plan = DecompositionPlan(
    parent_task_id="task-1",
    subtasks=(
        SubtaskDefinition(id="sub-1", title="A", description="A"),
        SubtaskDefinition(id="sub-2", title="B", description="B"),
    ),
)
DecompositionResult(
    plan=plan,
    created_tasks=(task_with_id_sub_99, task_with_id_sub_98),  # completely different IDs
    dependency_edges=(),  # no edges → edge check also passes
)
```

The count check passes (2 == 2) and the edge check passes (no edges). The result looks valid but there is no semantic alignment between the plan's subtask definitions and the created tasks, which would silently break any consumer that correlates `plan.subtasks[i].id` with `created_tasks[i].id` (e.g. routing, status rollup, DAG edge lookups).

The `DecompositionService` always creates tasks with `id=subtask_def.id`, so the production path is correct. However, the model-level invariant should be enforced explicitly. Adding a set-equality check is sufficient:

```python
task_ids = {t.id for t in self.created_tasks}
subtask_ids = {s.id for s in self.plan.subtasks}
if task_ids != subtask_ids:
    msg = (
        f"created_tasks IDs {sorted(task_ids)} do not match "
        f"plan subtask IDs {sorted(subtask_ids)}"
    )
    raise ValueError(msg)
```

There is no existing test that exercises this scenario (a same-count but mismatched-ID construction), confirming the gap is currently undetected.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +43 to +76
total = len(subtask_statuses)

if total == 0:
logger.warning(
DECOMPOSITION_ROLLUP_COMPUTED,
parent_task_id=parent_task_id,
total=0,
reason="rollup computed with no subtask statuses",
)

completed = subtask_statuses.count(TaskStatus.COMPLETED)
failed = subtask_statuses.count(TaskStatus.FAILED)
in_progress = subtask_statuses.count(TaskStatus.IN_PROGRESS)
blocked = subtask_statuses.count(TaskStatus.BLOCKED)
cancelled = subtask_statuses.count(TaskStatus.CANCELLED)

rollup = SubtaskStatusRollup(
parent_task_id=parent_task_id,
total=total,
completed=completed,
failed=failed,
in_progress=in_progress,
blocked=blocked,
cancelled=cancelled,
)

logger.debug(
DECOMPOSITION_ROLLUP_COMPUTED,
parent_task_id=parent_task_id,
total=total,
derived_status=rollup.derived_parent_status.value,
)

return rollup
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Double-logging DECOMPOSITION_ROLLUP_COMPUTED event when total == 0

When subtask_statuses is empty, the method fires the same event constant (DECOMPOSITION_ROLLUP_COMPUTED) twice in the same call — once as logger.warning (line 47) and then again unconditionally as logger.debug (line 70). Any log aggregator or metric counter keyed on this event string will record two entries for a single compute() invocation, and the two entries carry different structured fields (reason vs derived_status).

The CLAUDE.md convention is one logical event per operation. The cleanest fix is to introduce a dedicated event constant for the anomalous case, e.g. DECOMPOSITION_ROLLUP_EMPTY in events/decomposition.py, and only emit DECOMPOSITION_ROLLUP_COMPUTED once (at DEBUG) unconditionally:

if total == 0:
    logger.warning(
        DECOMPOSITION_ROLLUP_EMPTY,          # ← new constant
        parent_task_id=parent_task_id,
        reason="rollup computed with no subtask statuses",
    )

This keeps one canonical success event (DECOMPOSITION_ROLLUP_COMPUTED) per operation while the anomaly has its own observable event name.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/decomposition/rollup.py
Line: 43-76

Comment:
**Double-logging `DECOMPOSITION_ROLLUP_COMPUTED` event when `total == 0`**

When `subtask_statuses` is empty, the method fires the same event constant (`DECOMPOSITION_ROLLUP_COMPUTED`) **twice** in the same call — once as `logger.warning` (line 47) and then again unconditionally as `logger.debug` (line 70). Any log aggregator or metric counter keyed on this event string will record two entries for a single `compute()` invocation, and the two entries carry different structured fields (`reason` vs `derived_status`).

The CLAUDE.md convention is one logical event per operation. The cleanest fix is to introduce a dedicated event constant for the anomalous case, e.g. `DECOMPOSITION_ROLLUP_EMPTY` in `events/decomposition.py`, and only emit `DECOMPOSITION_ROLLUP_COMPUTED` once (at DEBUG) unconditionally:

```python
if total == 0:
    logger.warning(
        DECOMPOSITION_ROLLUP_EMPTY,          # ← new constant
        parent_task_id=parent_task_id,
        reason="rollup computed with no subtask statuses",
    )
```

This keeps one canonical success event (`DECOMPOSITION_ROLLUP_COMPUTED`) per operation while the anomaly has its own observable event name.

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement task decomposition and routing engine

2 participants