Skip to content

feat: implement hierarchical delegation and loop prevention (#12, #17)#160

Merged
Aureliolo merged 4 commits intomainfrom
feat/delegation-loop-prevention
Mar 7, 2026
Merged

feat: implement hierarchical delegation and loop prevention (#12, #17)#160
Aureliolo merged 4 commits intomainfrom
feat/delegation-loop-prevention

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

Summary

Loop Prevention Mechanisms

Mechanism Purpose
Ancestry Prevents delegation to agents already in the chain
Depth Caps maximum delegation chain length
Dedup Rejects duplicate (delegator, delegatee, task) within a time window
Rate limit Limits delegation frequency per agent pair
Circuit breaker Opens after repeated bounces between a pair, with cooldown

Architecture

  • Pure (stateless) checks run first; stateful checks only if pure checks pass
  • Shared _pair_key utility for canonical agent pair ordering
  • O(n) DFS cycle detection with visited/in-stack colouring
  • Injectable clocks for deterministic testing

Test Plan

  • 33 unit tests for hierarchy resolver (construction, supervisor lookup, direct reports, subordinate checks, ancestors, delegation depth, explicit reporting lines, edge cases, cycle detection)
  • 10 unit tests for loop prevention mechanisms (ancestry, depth, dedup, rate limit, circuit breaker)
  • 9 unit tests for delegation guard (orchestration, short-circuit, recording)
  • 14 unit tests for delegation models and authority validation
  • 12 unit tests for delegation service
  • 8 integration tests for end-to-end delegation flows
  • 3097 total tests passing, 96.50% coverage

Review Coverage

Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 26 findings addressed — including bug fixes (circuit breaker re-arming, hierarchy duplicate entries), DRY extraction, model validators, observability gaps, and documentation updates.

🤖 Generated with Claude Code

Aureliolo and others added 2 commits March 7, 2026 19:17
Add hierarchical delegation with authority validation, 5-mechanism
loop prevention (ancestry, depth, dedup, rate limit, circuit breaker),
and a DelegationService orchestrator with audit trail.

- Task model: add parent_task_id and delegation_chain fields
- HierarchyResolver: build org graph from Company, cycle detection
- AuthorityValidator: chain-of-command + role permission checks
- DelegationGuard: orchestrates all loop prevention mechanisms
- DelegationService: end-to-end delegation with audit recording
- 9 new error types for delegation and hierarchy failures
- 14 delegation event constants for observability
- Integration tests for multi-hop delegation flows

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…revention

Pre-reviewed by 9 agents, 26 findings addressed:
- Extract shared _pair_key utility (DRY violation)
- Add model validators to DelegationResult, AuthorityCheckResult, GuardCheckOutcome
- Fix circuit breaker re-arming bug in record_bounce
- Add O(n) DFS cycle detection replacing O(n²) approach
- Fix duplicate entries in hierarchy reporting lines
- Freeze hierarchy internal state with MappingProxyType
- Add logging to ancestry and depth check modules
- Add circuit reset event logging
- Prune expired entries in dedup and rate limiter
- Split guard checks into pure/stateful phases
- Rename create_sub_task to _create_sub_task (private)
- Update DESIGN_SPEC.md, CLAUDE.md, README.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 7, 2026 18:42
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 7, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 7, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f2a34c42-1e1e-45b9-a338-c02015cb6de5

📥 Commits

Reviewing files that changed from the base of the PR and between a9a1ba9 and 1d8d010.

📒 Files selected for processing (6)
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/observability/events/delegation.py
  • tests/unit/communication/delegation/test_hierarchy.py

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Hierarchical task delegation with authority checks, delegated sub-tasks, audit trail, and new task delegation fields
    • Multi-layer loop-prevention (ancestry, max-depth, deduplication, per-pair rate limits, circuit-breaker) with enforcement and escalation, plus observability events for delegation lifecycle and loop prevention
  • Documentation

    • README and package docs updated: product status -> M4: Multi-Agent; added delegation and loop-prevention guidance
  • Tests

    • Comprehensive unit and integration tests for delegation, hierarchy, and loop-prevention mechanisms

Walkthrough

Implements hierarchical delegation and loop-prevention: adds DelegationService, delegation models, authority checks, HierarchyResolver, a DelegationGuard coordinating five prevention mechanisms (ancestry, depth, dedup, rate limit, circuit breaker), task delegation fields, new errors/events, public exports, and extensive unit/integration tests.

Changes

Cohort / File(s) Summary
Design & Docs
CLAUDE.md, DESIGN_SPEC.md, README.md
Updated design/spec and README to describe hierarchical delegation, loop-prevention mechanisms, and M4 multi-agent status.
Public API Surface
src/ai_company/communication/__init__.py, src/ai_company/communication/delegation/__init__.py, src/ai_company/communication/loop_prevention/__init__.py
Re-exported delegation and loop-prevention types/services; extended __all__ to expose new errors, events, validators, guard, and related models.
Delegation Core
src/ai_company/communication/delegation/service.py, .../delegation/models.py, .../delegation/authority.py
Added DelegationService orchestration, DelegationRequest/Result/Record models, and AuthorityValidator with hierarchy & role checks and observability events.
Hierarchy Resolver
src/ai_company/communication/delegation/hierarchy.py
Added HierarchyResolver: builds supervisor/report maps, detects cycles, exposes ancestor and delegation-depth lookups, and emits hierarchy events.
Loop-prevention package
src/ai_company/communication/loop_prevention/*
New package: utilities and orchestrator for five mechanisms — pair_key, ancestry.py, depth.py, dedup.py, rate_limit.py, circuit_breaker.py, guard.py, and models.py (GuardCheckOutcome).
Errors & Events
src/ai_company/communication/errors.py, src/ai_company/observability/events/delegation.py
Added delegation-specific exception classes and delegation observability event constants.
Core Task Model
src/ai_company/core/task.py
Added Task fields parent_task_id and delegation_chain with validators to prevent self-parenting, duplicates, and conflicts with assigned_to.
Tests — Unit & Integration
tests/unit/..., tests/integration/...
Large suite of new unit tests for hierarchy, authority, models, loop-prevention components, guard orchestration, and an integration test exercising multi-hop delegation, dedup, rate limiting, and circuit-breaker behavior.
Small utilities
src/ai_company/communication/loop_prevention/_pair_key.py
Added canonical pair key utility with input validation used by per-pair mechanisms.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant DelegationService
    participant AuthorityValidator
    participant HierarchyResolver
    participant DelegationGuard
    participant LoopPrevention
    participant TaskManager

    Client->>DelegationService: delegate(request, delegator, delegatee)
    DelegationService->>AuthorityValidator: validate(delegator, delegatee)
    AuthorityValidator->>HierarchyResolver: is_subordinate(delegator, delegatee)
    HierarchyResolver-->>AuthorityValidator: hierarchy result
    AuthorityValidator-->>DelegationService: AuthorityCheckResult

    alt Authority Denied
        DelegationService-->>Client: DelegationResult(success=False)
    else Authority Approved
        DelegationService->>DelegationGuard: check(chain, delegator, delegatee, task)
        DelegationGuard->>LoopPrevention: ancestry / depth / dedup / rate_limit / circuit_breaker
        LoopPrevention-->>DelegationGuard: first failing GuardCheckOutcome or all_passed

        alt Blocked by Loop Prevention
            DelegationGuard-->>DelegationService: GuardCheckOutcome(passed=False)
            DelegationService-->>Client: DelegationResult(success=False, blocked_by=mechanism)
        else Passed
            DelegationService->>TaskManager: _create_sub_task(request)
            TaskManager-->>DelegationService: new Task (delegation_chain updated)
            DelegationService->>DelegationGuard: record_delegation(delegator, delegatee, task)
            DelegationGuard-->>LoopPrevention: record across mechanisms
            DelegationService->>TaskManager: record_audit_trail(DelegationRecord)
            DelegationService-->>Client: DelegationResult(success=True, delegated_task=sub_task)
        end
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 27.98% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and specifically summarizes the main changes: implementing hierarchical delegation and loop prevention with direct references to the closed issues (#12, #17).
Description check ✅ Passed The description is comprehensive and directly related to the changeset, detailing implementation of hierarchical delegation, five loop-prevention mechanisms, task model extensions, observability, and test coverage.
Linked Issues check ✅ Passed The pull request implements all acceptance criteria from both #12 and #17: delegation hierarchy with authority validation [#12], task/result flow tracking with delegation_chain and parent_task_id [#12], all five loop-prevention mechanisms [#17], audit trails, configuration via LoopPreventionConfig, comprehensive tests (96.50% coverage), and observability via delegation events.
Out of Scope Changes check ✅ Passed All changes directly support the stated objectives: delegation system components, loop-prevention mechanisms, Task model extensions for delegation tracking, observability events, and corresponding tests. No unrelated refactoring or scope creep detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/delegation-loop-prevention
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/delegation-loop-prevention

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a fundamental capability for multi-agent systems: hierarchical task delegation. It allows agents to assign tasks to subordinates while ensuring the integrity and stability of the system through a sophisticated five-mechanism loop prevention guard. These changes are crucial for enabling complex, multi-step workflows and maintaining a coherent organizational structure within the AI company, preventing infinite delegation loops and ensuring tasks flow efficiently through the hierarchy.

Highlights

  • Hierarchical Delegation System: Implemented a robust hierarchical delegation system, including an organizational hierarchy resolver, authority validation based on reporting lines and roles, and a dedicated delegation service. This enables agents to delegate tasks down the organizational structure.
  • 5-Mechanism Loop Prevention Guard: Introduced a comprehensive loop prevention system with five distinct mechanisms: ancestry checks to prevent delegation to agents already in the chain, a depth limit for delegation chains, deduplication to reject identical delegations within a time window, rate limiting per agent pair, and a circuit breaker that opens after repeated bounces between agents.
  • Task Model Extensions: The core Task model has been extended with parent_task_id to track the lineage of delegated tasks and delegation_chain to maintain an ordered list of delegator agent IDs, ensuring proper delegation history and enabling ancestry checks.
  • Enhanced Observability and Immutability: Integrated structured logging with domain-specific event constants across all new delegation and loop prevention modules for better monitoring. Key hierarchy states are now wrapped in MappingProxyType and Pydantic models are frozen with validators to enforce consistency invariants, promoting immutability and data integrity.
Changelog
  • CLAUDE.md
    • Updated the description of the communication/ directory to include delegation and loop prevention modules.
  • DESIGN_SPEC.md
    • Updated the 'Current state (M4 in-progress)' section to reflect the implementation of loop prevention and hierarchical delegation.
    • Extended the task model specification to include parent_task_id and delegation_chain fields.
    • Added a new section detailing the 'Delegation & loop prevention' conventions, including HierarchyResolver, AuthorityValidator, DelegationGuard, and the five loop prevention mechanisms.
  • README.md
    • Added 'Hierarchical Delegation' as a key feature of AI Company.
    • Updated the project status from M3: Single Agent to M4: Multi-Agent.
  • src/ai_company/communication/init.py
    • Imported and exposed new classes and functions related to delegation (e.g., AuthorityValidator, DelegationService, HierarchyResolver) and loop prevention (e.g., DelegationGuard, DelegationCircuitBreaker).
    • Imported and exposed new error types for delegation and hierarchy resolution.
  • src/ai_company/communication/delegation/init.py
    • Added a new __init__.py file to define the delegation subsystem and export its public interfaces.
  • src/ai_company/communication/delegation/authority.py
    • Added a new module implementing AuthorityCheckResult and AuthorityValidator for validating delegation authority based on hierarchy and role permissions.
  • src/ai_company/communication/delegation/hierarchy.py
    • Added a new module implementing HierarchyResolver to build and query the organizational hierarchy from the Company structure, including cycle detection.
  • src/ai_company/communication/delegation/models.py
    • Added a new module defining Pydantic models for DelegationRequest, DelegationResult, and DelegationRecord to standardize delegation data structures.
  • src/ai_company/communication/delegation/service.py
    • Added a new module implementing DelegationService to orchestrate the delegation process, including authority checks, loop prevention, sub-task creation, and audit trail recording.
  • src/ai_company/communication/errors.py
    • Added a new hierarchy of exceptions for delegation-related errors, including DelegationError, DelegationAuthorityError, DelegationLoopError (with subclasses for each loop prevention mechanism), and HierarchyResolutionError.
  • src/ai_company/communication/loop_prevention/init.py
    • Added a new __init__.py file to define the loop prevention subsystem and export its public interfaces.
  • src/ai_company/communication/loop_prevention/_pair_key.py
    • Added a new utility module for creating canonical, direction-agnostic keys for agent pairs.
  • src/ai_company/communication/loop_prevention/ancestry.py
    • Added a new module implementing check_ancestry for detecting delegation cycles based on the task's delegation chain.
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
    • Added a new module implementing DelegationCircuitBreaker to prevent repeated delegation bounces between agent pairs after a configurable threshold.
  • src/ai_company/communication/loop_prevention/dedup.py
    • Added a new module implementing DelegationDeduplicator to prevent duplicate delegations of the same task between the same agents within a specified time window.
  • src/ai_company/communication/loop_prevention/depth.py
    • Added a new module implementing check_delegation_depth to enforce a maximum length for delegation chains.
  • src/ai_company/communication/loop_prevention/guard.py
    • Added a new module implementing DelegationGuard to orchestrate and sequence all five loop prevention mechanisms, short-circuiting on the first failure.
  • src/ai_company/communication/loop_prevention/models.py
    • Added a new module defining GuardCheckOutcome to standardize the result of loop prevention checks.
  • src/ai_company/communication/loop_prevention/rate_limit.py
    • Added a new module implementing DelegationRateLimiter to control the frequency of delegations between agent pairs.
  • src/ai_company/core/task.py
    • Extended the Task Pydantic model with parent_task_id and delegation_chain fields.
    • Added model validators to ensure consistency and prevent self-referential parent tasks, duplicate entries in the delegation chain, and assigned agents appearing in the delegation chain.
  • src/ai_company/observability/events/delegation.py
    • Added a new module defining constants for delegation-related structured logging events.
  • tests/integration/communication/test_delegation_integration.py
    • Added a new integration test file covering end-to-end delegation flows, including multi-hop delegation, ancestry checks, deduplication, and circuit breaker functionality.
  • tests/unit/communication/delegation/test_authority.py
    • Added a new unit test file for the AuthorityValidator module, covering hierarchy and role-based permission checks.
  • tests/unit/communication/delegation/test_hierarchy.py
    • Added a new unit test file for the HierarchyResolver module, including tests for construction, supervisor lookup, direct reports, subordination, ancestors, delegation depth, explicit reporting lines, edge cases, and cycle detection.
  • tests/unit/communication/delegation/test_models.py
    • Added a new unit test file for the delegation Pydantic models (DelegationRequest, DelegationResult, DelegationRecord), verifying their structure and validation rules.
  • tests/unit/communication/delegation/test_service.py
    • Added a new unit test file for the DelegationService, covering successful delegations, authority denials, and various loop prevention scenarios.
  • tests/unit/communication/loop_prevention/test_ancestry.py
    • Added a new unit test file for the check_ancestry function, verifying its ability to detect cycles in the delegation chain.
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
    • Added a new unit test file for the DelegationCircuitBreaker, testing its state transitions, bounce recording, and cooldown period.
  • tests/unit/communication/loop_prevention/test_dedup.py
    • Added a new unit test file for the DelegationDeduplicator, verifying its ability to prevent duplicate delegations within a time window.
  • tests/unit/communication/loop_prevention/test_depth.py
    • Added a new unit test file for the check_delegation_depth function, ensuring it correctly enforces maximum delegation chain length.
  • tests/unit/communication/loop_prevention/test_guard.py
    • Added a new unit test file for the DelegationGuard, testing its orchestration of various loop prevention mechanisms and short-circuiting behavior.
  • tests/unit/communication/loop_prevention/test_rate_limit.py
    • Added a new unit test file for the DelegationRateLimiter, verifying its per-pair rate limiting and burst allowance functionality.
  • tests/unit/communication/test_errors.py
    • Updated error tests to include the new delegation and hierarchy-related error classes, ensuring proper inheritance and context handling.
  • tests/unit/core/test_task.py
    • Updated unit tests for the Task model to cover the new parent_task_id and delegation_chain fields and their associated validation rules.
  • tests/unit/observability/test_events.py
    • Updated event discovery tests to include the new 'delegation' domain module.
Activity
  • 33 unit tests were added for the hierarchy resolver, covering construction, supervisor lookup, direct reports, subordinate checks, ancestors, delegation depth, explicit reporting lines, edge cases, and cycle detection.
  • 10 unit tests were added for the individual loop prevention mechanisms (ancestry, depth, dedup, rate limit, circuit breaker).
  • 9 unit tests were added for the delegation guard, focusing on its orchestration, short-circuiting behavior, and recording.
  • 14 unit tests were added for delegation models and authority validation.
  • 12 unit tests were added for the delegation service.
  • 8 integration tests were added for end-to-end delegation flows, ensuring the system works as expected across components.
  • A total of 3097 tests are passing with 96.50% code coverage.
  • The pull request underwent pre-review by 9 automated agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency), leading to 26 addressed findings, including bug fixes, DRY extraction, model validators, observability improvements, and documentation updates.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements a hierarchical delegation subsystem in communication/ with authority validation and a five-layer loop-prevention guard, extending the core Task model to track delegation lineage and adding observability event constants and tests to cover the new behavior.

Changes:

  • Add HierarchyResolver, AuthorityValidator, and DelegationService for chain-of-command delegation flows.
  • Add loop-prevention mechanisms (ancestry, depth, dedup, rate limit, circuit breaker) orchestrated by DelegationGuard.
  • Extend Task with parent_task_id and delegation_chain, and add new delegation-related observability events + extensive unit/integration tests.

Reviewed changes

Copilot reviewed 35 out of 38 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Registers delegation events module in discovery test.
tests/unit/core/test_task.py Adds unit tests for new task delegation fields/validators.
tests/unit/communication/test_errors.py Adds tests for new delegation + hierarchy error classes.
tests/unit/communication/loop_prevention/test_rate_limit.py Unit tests for per-pair delegation rate limiter.
tests/unit/communication/loop_prevention/test_guard.py Unit tests for guard orchestration + ordering.
tests/unit/communication/loop_prevention/test_depth.py Unit tests for max-depth check.
tests/unit/communication/loop_prevention/test_dedup.py Unit tests for time-window deduplication.
tests/unit/communication/loop_prevention/test_circuit_breaker.py Unit tests for circuit breaker open/reset behavior.
tests/unit/communication/loop_prevention/test_ancestry.py Unit tests for ancestry-cycle blocking.
tests/unit/communication/loop_prevention/init.py Package marker for loop prevention tests.
tests/unit/communication/delegation/test_service.py Unit tests for DelegationService end-to-end behavior.
tests/unit/communication/delegation/test_models.py Unit tests for delegation request/result/audit models.
tests/unit/communication/delegation/test_hierarchy.py Unit tests for hierarchy resolution + cycle detection.
tests/unit/communication/delegation/test_authority.py Unit tests for authority validation rules/config.
tests/unit/communication/delegation/init.py Package marker for delegation tests.
tests/integration/communication/test_delegation_integration.py Integration tests for multi-hop delegation + loop prevention.
tests/integration/communication/init.py Package marker for communication integration tests.
src/ai_company/observability/events/delegation.py Adds domain-specific delegation/loop-prevention event constants.
src/ai_company/core/task.py Adds parent_task_id/delegation_chain fields + invariants.
src/ai_company/communication/loop_prevention/rate_limit.py Implements per-pair sliding-window rate limiter.
src/ai_company/communication/loop_prevention/models.py Adds GuardCheckOutcome model for mechanism outcomes.
src/ai_company/communication/loop_prevention/guard.py Orchestrates mechanisms with short-circuiting.
src/ai_company/communication/loop_prevention/depth.py Implements pure max-depth check.
src/ai_company/communication/loop_prevention/dedup.py Implements deduplication within a time window.
src/ai_company/communication/loop_prevention/circuit_breaker.py Implements per-pair circuit breaker with cooldown.
src/ai_company/communication/loop_prevention/ancestry.py Implements pure ancestry check.
src/ai_company/communication/loop_prevention/_pair_key.py Adds canonical undirected agent-pair key helper.
src/ai_company/communication/loop_prevention/init.py Exposes loop prevention public API.
src/ai_company/communication/errors.py Adds delegation + hierarchy error hierarchy.
src/ai_company/communication/delegation/service.py Implements delegation flow: authority → guard → subtask → audit.
src/ai_company/communication/delegation/models.py Adds delegation request/result/audit Pydantic models.
src/ai_company/communication/delegation/hierarchy.py Builds org hierarchy from Company, freezes state, detects cycles.
src/ai_company/communication/delegation/authority.py Implements hierarchy + role-based authority validation.
src/ai_company/communication/delegation/init.py Exposes delegation subsystem public API.
src/ai_company/communication/init.py Re-exports delegation + loop-prevention types from package root.
README.md Updates feature list + roadmap milestone status.
DESIGN_SPEC.md Updates spec to reflect implemented delegation + loop prevention.
CLAUDE.md Updates repo structure description for communication subsystem.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +77 to +78
key = pair_key(delegator_id, delegatee_id)
return self._pairs.setdefault(key, _PairState())
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_state() calls _get_pair(), which uses setdefault and therefore allocates a _PairState for every pair ever checked, even if no bounces are ever recorded. In a long-running system with many unique pairs, _pairs can grow without bound. Consider treating a missing key as CLOSED in get_state()/check() and only creating _PairState in record_bounce() (or implement periodic pruning).

Copilot uses AI. Check for mistakes.
Comment on lines +95 to +96
key = (delegator_id, delegatee_id, task_title)
self._records[key] = self._clock()
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelegationDeduplicator only deletes expired entries when check() is called for the same (delegator, delegatee, task_title) key. In normal operation with many one-off task titles, _records can accumulate stale entries indefinitely. Consider pruning expired entries opportunistically in record() as well (e.g., prune a bounded number of oldest entries) or switching to a time-ordered structure so old keys are removed without requiring a future check() for the same key.

Copilot uses AI. Check for mistakes.
Comment on lines +57 to +61
delegation_chain: tuple[str, ...],
delegator_id: str,
delegatee_id: str,
task_title: str,
) -> GuardCheckOutcome:
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description/spec calls out deduplication for duplicate (delegator, delegatee, task) within a window, but the guard/deduplicator currently key on task_title. Titles are not stable (and tests already retitle tasks to bypass dedup), so this doesn’t reliably prevent repeated delegations of the same task. Consider switching the dedup key to a stable identifier such as task.id (or a hash of the full request payload) and update task_title plumbing accordingly.

Copilot uses AI. Check for mistakes.
Comment on lines +96 to +100
guard_outcome = self._guard.check(
delegation_chain=request.task.delegation_chain,
delegator_id=request.delegator_id,
delegatee_id=request.delegatee_id,
task_title=request.task.title,
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegate() uses request.delegator_id / request.delegatee_id when running guard checks and recording state, but authority is validated against the delegator/delegatee AgentIdentity objects. If these ever differ, callers can bypass dedup/rate-limit/circuit-breaker accounting and produce incorrect audit logs. Add a consistency check up front (reject if request IDs don’t match the provided identities) or refactor to derive IDs from AgentIdentity everywhere in this method.

Copilot uses AI. Check for mistakes.
Comment on lines +67 to +70
timestamps = self._timestamps.get(key, [])
recent = [t for t in timestamps if t > cutoff]
# Prune expired entries on read
self._timestamps[key] = recent
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check() always writes back self._timestamps[key] = recent even when the pair has never been recorded (i.e., recent is empty). Because DelegationGuard.check() runs before record(), this will create empty entries for every unique pair ever checked, which can grow unbounded in long-running processes. Consider only updating the dict when the key already exists (or when recent is non-empty), and leave missing keys absent until the first record().

Suggested change
timestamps = self._timestamps.get(key, [])
recent = [t for t in timestamps if t > cutoff]
# Prune expired entries on read
self._timestamps[key] = recent
timestamps = self._timestamps.get(key)
if timestamps is not None:
recent = [t for t in timestamps if t > cutoff]
# Prune expired entries on read for existing keys
self._timestamps[key] = recent
else:
recent = []

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Mar 7, 2026

Greptile Summary

This PR implements a complete hierarchical delegation system and a 5-mechanism loop prevention guard for the communication layer. HierarchyResolver builds an immutable org graph from Company data and detects cycles at construction time. AuthorityValidator enforces chain-of-command and role permissions. DelegationGuard runs ancestry, depth, dedup, rate-limit, and circuit-breaker checks in order (pure checks first, stateful second), with injectable clocks for each stateful component. DelegationService orchestrates all three and creates a properly-chained Task sub-task on success. The Task model was extended with parent_task_id and delegation_chain, guarded by validators that enforce uniqueness and self-reference constraints.

Key observations:

  • The identity mismatch issue (previous thread) is addressed via _validate_identity raising ValueError before any processing.
  • Sub-task creation now copies all relevant Task fields (reviewers, dependencies, artifacts_expected, acceptance_criteria) and appends constraints/refinement to the description.
  • DelegationRecord audit trail omits constraints — the structured constraint tuple is forwarded to the sub-task description but not persisted in the audit entry, making it unrecoverable for compliance or replay.
  • DelegationCircuitBreaker.get_state() deletes the pair entry on cooldown expiry, which is functionally correct and documented, but a side-effectful getter that can surprise test code calling it for observation.
  • HierarchyResolver silently applies first-team-wins when an agent leads teams across multiple departments, which is not documented and can produce unexpected authority-check results for complex org structures.

Confidence Score: 4/5

  • Safe to merge — no critical regressions or security issues; two audit-completeness gaps and one undocumented hierarchy behaviour to address.
  • The core logic (identity validation, authority checks, all five loop-prevention mechanisms, sub-task creation with full field inheritance) is correct and well-tested at 96.50% coverage. The three findings are: (1) an incomplete audit record (missing constraints), (2) a side-effecting getter in the circuit breaker, and (3) undocumented first-wins hierarchy resolution. None of these are correctness bugs in the delegation flow itself — they are API quality and observability gaps.
  • src/ai_company/communication/delegation/models.py (DelegationRecord missing constraints), src/ai_company/communication/loop_prevention/circuit_breaker.py (get_state side effect), src/ai_company/communication/delegation/hierarchy.py (multi-team lead documentation)

Important Files Changed

Filename Overview
src/ai_company/communication/delegation/hierarchy.py HierarchyResolver builds org graph from Company; O(n) cycle detection is correct; minor: first-team-wins behaviour for multi-team leads is undocumented.
src/ai_company/communication/delegation/service.py DelegationService correctly validates identity, runs authority + loop-prevention checks, copies all Task fields into sub-task, and appends constraints/refinement to description; all previous review findings addressed.
src/ai_company/communication/delegation/models.py Model validators are well-structured; DelegationRecord is missing the constraints field, producing an incomplete audit trail for non-empty constraint tuples.
src/ai_company/communication/loop_prevention/guard.py Orchestrates all five mechanisms in the correct order (pure checks first); clock injection still not forwarded to sub-components (pre-existing finding).
src/ai_company/communication/loop_prevention/circuit_breaker.py Re-arming logic is correct post-cooldown; get_state() mutates _pairs on eviction — a side-effectful getter that may surprise callers or tests inspecting state directly.
src/ai_company/communication/errors.py Exception hierarchy is well-structured; loop-prevention subclasses are still exported but never raised (pre-existing finding flagged in prior review).
src/ai_company/core/task.py New parent_task_id and delegation_chain fields are well-validated; uniqueness, self-reference, and assignment-consistency invariants are all enforced.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant DelegationService
    participant AuthorityValidator
    participant HierarchyResolver
    participant DelegationGuard
    participant AncestryCheck
    participant DepthCheck
    participant Deduplicator
    participant RateLimiter
    participant CircuitBreaker

    Caller->>DelegationService: delegate(request, delegator, delegatee)
    DelegationService->>DelegationService: _validate_identity()
    DelegationService->>AuthorityValidator: validate(delegator, delegatee)
    AuthorityValidator->>HierarchyResolver: is_direct_report() / is_subordinate()
    HierarchyResolver-->>AuthorityValidator: bool
    AuthorityValidator-->>DelegationService: AuthorityCheckResult

    alt authority denied
        DelegationService-->>Caller: DelegationResult(success=False, blocked_by="authority")
    end

    DelegationService->>DelegationGuard: check(chain, delegator_id, delegatee_id, task_id)
    DelegationGuard->>AncestryCheck: check_ancestry(chain, delegatee_id)
    AncestryCheck-->>DelegationGuard: GuardCheckOutcome
    DelegationGuard->>DepthCheck: check_delegation_depth(chain, max_depth)
    DepthCheck-->>DelegationGuard: GuardCheckOutcome
    DelegationGuard->>Deduplicator: check(delegator, delegatee, task_id)
    Deduplicator-->>DelegationGuard: GuardCheckOutcome
    DelegationGuard->>RateLimiter: check(delegator, delegatee)
    RateLimiter-->>DelegationGuard: GuardCheckOutcome
    DelegationGuard->>CircuitBreaker: check(delegator, delegatee)
    CircuitBreaker-->>DelegationGuard: GuardCheckOutcome
    DelegationGuard-->>DelegationService: GuardCheckOutcome

    alt loop detected
        DelegationService->>HierarchyResolver: get_supervisor(delegator_id)
        HierarchyResolver-->>DelegationService: supervisor (for escalation log)
        DelegationService-->>Caller: DelegationResult(success=False, blocked_by=mechanism)
    end

    DelegationService->>DelegationService: _create_sub_task(request)
    Note over DelegationService: new_chain = (*chain, delegator_id)<br/>new Task with fresh ID + parent_task_id
    DelegationService->>DelegationGuard: record_delegation(delegator, delegatee, task_id)
    DelegationGuard->>Deduplicator: record()
    DelegationGuard->>RateLimiter: record()
    DelegationGuard->>CircuitBreaker: record_delegation()
    DelegationService->>DelegationService: append DelegationRecord to _audit_trail
    DelegationService-->>Caller: DelegationResult(success=True, delegated_task=sub_task)
Loading

Last reviewed commit: 1d8d010

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive and well-designed hierarchical delegation and loop prevention system, featuring robust implementation with clear separation of concerns among components like HierarchyResolver, AuthorityValidator, and DelegationService. However, security concerns have been identified regarding potential unbounded memory growth in stateful loop prevention components (deduplication and rate limiting), and a lack of consistency checks between provided agent identities and request payloads in the delegation service, which could lead to resource exhaustion or audit trail manipulation. Additionally, a suggestion is made for sub-task creation to ensure consistency in retry behavior.

) -> None:
self._window_seconds = window_seconds
self._clock = clock
self._records: dict[tuple[str, str, str], float] = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The DelegationDeduplicator uses an in-memory dictionary _records to store delegation timestamps for deduplication. However, there is no mechanism to prune expired entries from this dictionary unless a check is performed for the exact same key (delegator, delegatee, and task title). This leads to unbounded memory growth as new unique tasks are delegated, which can be exploited to cause a Denial of Service (DoS) via memory exhaustion in long-running processes.

self._config = config
self._clock = clock
self._window_seconds = window_seconds
self._timestamps: dict[tuple[str, str], list[float]] = {}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The DelegationRateLimiter stores timestamps for every agent pair in the _timestamps dictionary. Similar to the deduplicator, expired entries are only pruned for the specific pair being checked. If the system handles many unique agent pairs over time, this dictionary will grow indefinitely, leading to memory exhaustion.

Comment on lines +63 to +78
def delegate(
self,
request: DelegationRequest,
delegator: AgentIdentity,
delegatee: AgentIdentity,
) -> DelegationResult:
"""Execute a delegation: authority, loops, sub-task, audit.

Args:
request: The delegation request.
delegator: Identity of the delegating agent.
delegatee: Identity of the target agent.

Returns:
Result indicating success or rejection with reason.
"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-medium medium

The delegate method accepts both AgentIdentity objects and a DelegationRequest object. While it uses the identities for authority validation, it relies on the IDs provided in the request for loop prevention checks, audit trail records, and sub-task creation. There is no validation to ensure that request.delegator_id matches delegator.name. This allows an authenticated agent to spoof its identity in audit logs and potentially bypass per-agent loop prevention controls (like rate limits) by providing a different delegator_id in the request payload.

    def delegate(
        self,
        request: DelegationRequest,
        delegator: AgentIdentity,
        delegatee: AgentIdentity,
    ) -> DelegationResult:
        """Execute a delegation: authority, loops, sub-task, audit.

        Args:
            request: The delegation request.
            delegator: Identity of the delegating agent.
            delegatee: Identity of the target agent.

        Returns:
            Result indicating success or rejection with reason.
        """
        if request.delegator_id != delegator.name:
            return DelegationResult(
                success=False,
                rejection_reason=f"Request delegator_id {request.delegator_id!r} does not match identity {delegator.name!r}",
                blocked_by="identity_validation",
            )
        if request.delegatee_id != delegatee.name:
            return DelegationResult(
                success=False,
                rejection_reason=f"Request delegatee_id {request.delegatee_id!r} does not match identity {delegatee.name!r}",
                blocked_by="identity_validation",
            )

Comment on lines +163 to +176
return Task(
id=f"del-{uuid4().hex[:12]}",
title=original.title,
description=description,
type=original.type,
priority=original.priority,
project=original.project,
created_by=request.delegator_id,
parent_task_id=original.id,
delegation_chain=new_chain,
estimated_complexity=original.estimated_complexity,
budget_limit=original.budget_limit,
deadline=original.deadline,
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When creating a sub-task, the max_retries field from the original task is not being inherited. The new task will fall back to the default value of 1. This could lead to inconsistent retry behavior, especially if the original task was configured with a different number of retries (e.g., 0 or 3). To ensure consistent behavior for delegated tasks, I suggest inheriting the max_retries value from the original task.

        return Task(
            id=f"del-{uuid4().hex[:12]}",
            title=original.title,
            description=description,
            type=original.type,
            priority=original.priority,
            project=original.project,
            created_by=request.delegator_id,
            parent_task_id=original.id,
            delegation_chain=new_chain,
            estimated_complexity=original.estimated_complexity,
            budget_limit=original.budget_limit,
            deadline=original.deadline,
            max_retries=original.max_retries,
        )

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/communication/loop_prevention/circuit_breaker.py`:
- Around line 145-174: The method record_bounce is misleading because
DelegationGuard.record_delegation calls it on successful delegations; either
rename record_bounce to record_delegation (and update all call sites such as
DelegationGuard.record_delegation and any tests/mocks) or keep the name but
update the docstring and logging to explicitly state that "bounce" here means a
recorded delegation event that can contribute to back-and-forth loops; if you
rename, update the method signature in class CircuitBreaker, any imports/usages,
and adjust unit tests to the new name.

In `@src/ai_company/communication/loop_prevention/dedup.py`:
- Around line 37-55: The dedup key used in dedup.py's check method is too coarse
(key = (delegator_id, delegatee_id, task_title)) and will collide for different
requests with the same title; change it to use a canonical request fingerprint
that reflects the full request payload (e.g., title plus
refinement/constraints/other identifying fields) instead of the bare title.
Implement or call a deterministic fingerprint/canonicalization helper (e.g., a
new _fingerprint_request or reuse existing canonicalizer) to produce a string or
hash, then replace key = (delegator_id, delegatee_id, task_title) with key =
(delegator_id, delegatee_id, fingerprint) and use that for self._records lookups
and inserts so only truly identical requests dedupe. Ensure the fingerprinting
function is stable and includes all fields that define request identity.
- Around line 55-60: The dedup map (_records) only deletes the single key being
checked and keeps growing because new (delegator, delegatee, title) keys are
continually added; fix by implementing global expiry rather than key-reuse
deletion: maintain an expiry-ordered structure (e.g., a min-heap of
(expiry_timestamp, key) or an OrderedDict keyed by expiry) and on any
check/insert (the code paths that use _records, _clock, and _window_seconds) pop
and delete all entries whose expiry <= _clock(), or run a sweep when inserting
if _records grows past a threshold; update both the lookup branch (where
recorded_at is checked) and the insertion branch (where new entries are added)
to call this purge so expired entries are removed instead of only deleting the
single rechecked key.

In `@src/ai_company/communication/loop_prevention/depth.py`:
- Around line 14-41: The function check_delegation_depth should validate its
max_depth parameter and raise a ValueError for invalid limits instead of
silently blocking delegation; add a guard at the start of check_delegation_depth
that raises ValueError (with a clear message like "max_depth must be a positive
integer") when max_depth <= 0 before any use of delegation_chain, then keep the
existing length check that logs DELEGATION_LOOP_DEPTH_EXCEEDED and returns
GuardCheckOutcome with mechanism _MECHANISM on failure; ensure the raised error
message is descriptive to surface misconfiguration to callers.

In `@src/ai_company/communication/loop_prevention/guard.py`:
- Around line 77-97: The current loops build tuples which evaluate all checks
eagerly (causing unwanted state mutations); instead call each check sequentially
and short-circuit on the first failure: invoke check_ancestry(delegation_chain,
delegatee_id) and if not outcome.passed return self._log_and_return(outcome,
delegator_id, delegatee_id), then call check_delegation_depth(delegation_chain,
self._config.max_delegation_depth) and short-circuit similarly, and only after
those pure checks run the stateful checks by calling
self._deduplicator.check(delegator_id, delegatee_id, task_title), then
self._rate_limiter.check(delegator_id, delegatee_id), then
self._circuit_breaker.check(delegator_id, delegatee_id), short-circuiting
(returning via self._log_and_return) immediately on the first failed outcome so
later checks are not invoked.

In `@src/ai_company/communication/loop_prevention/models.py`:
- Around line 25-38: The post-validation in _validate_passed_message currently
allows whitespace-only failure messages; update the model_validator method
(named _validate_passed_message) to treat message.strip() as the real content:
when self.passed is False ensure that self.message exists and
self.message.strip() is non-empty, raising the existing "message is required
when passed is False" ValueError if it is empty after stripping; keep the
existing check that when self.passed is True the message must be empty.

In `@src/ai_company/communication/loop_prevention/rate_limit.py`:
- Around line 64-70: The check() logic currently assigns an empty list back into
self._timestamps for unseen pair keys which creates permanent no-op buckets;
change the code after computing recent = [t for t in timestamps if t > cutoff]
to remove the key (del self._timestamps[key] or pop) when recent is empty, and
only assign self._timestamps[key] = recent when recent is non-empty; reference
pair_key, _timestamps, _window_seconds and the check() method to locate where to
apply this change.

In `@tests/unit/communication/delegation/test_hierarchy.py`:
- Around line 278-293: The test currently can't observe the "first supervisor
wins" rule because both teams' departments use the same head; update
test_multi_team_leeps_first_supervisor to place the second team under a
different department head so precedence is measurable (e.g., create two
Department instances: dept1 with head="cto" containing Team(name="t1",
lead="shared_lead", ...), and dept2 with head="vp" containing Team(name="t2",
lead="shared_lead", ...), then build company via
_make_company(departments=(dept1, dept2))) and keep assertions against
HierarchyResolver.get_supervisor to ensure shared_lead resolves to "cto" while
dev1/dev2 resolve to "shared_lead".

In `@tests/unit/communication/delegation/test_service.py`:
- Around line 234-272: The test test_ancestry_blocked never reaches the ancestry
guard because DelegationService authorizes (authority check) before loop
prevention; fix by building a service that bypasses authority enforcement (call
_build_service with enforce_chain=False or otherwise make the authority path
permissive) so the delegate call in this test (service.delegate with
DelegationRequest from dev -> ceo) exercises ancestry blocking in
DelegationService.delegate; then tighten the assertion to check r3.success is
False and r3.blocked_by == "ancestry". Apply the same change to the analogous
integration test that uses the same setup.

In `@tests/unit/communication/loop_prevention/test_ancestry.py`:
- Around line 1-43: Consolidate the repetitive failure tests in
TestCheckAncestry by replacing the separate methods
test_delegatee_in_chain_fails, test_delegatee_is_root_fails,
test_delegatee_is_last_in_chain_fails, and test_single_element_chain_match_fails
with a single parametrized test using `@pytest.mark.parametrize` that supplies
(chain, delegatee, expected_passed[, expected_message_substring]) and calls
check_ancestry for each case; ensure you still assert result.passed is False for
failure cases and keep the mechanism == "ancestry" and any message substring
checks (e.g., "'b'") where relevant so the behavior validated by those original
tests is preserved.

In `@tests/unit/communication/loop_prevention/test_dedup.py`:
- Around line 80-92: The test only verifies per-key expiry; add an unrelated
expired record so pruning must remove entries beyond the checked key: in
test_expired_entries_pruned_on_check create a second record (e.g.,
dedup.record("x","y","task-2")) before advancing clock to an expired time, then
call dedup.check on the original key and assert both ("a","b","task-1") and
("x","y","task-2") are not in dedup._records; this ensures
DelegationDeduplicator.check prunes stale entries globally from the _records
store rather than only deleting the checked key.
- Around line 45-54: The dedup currently keys on (delegator, delegatee,
task_title) which is too coarse; update DelegationDeduplicator to compute and
use a stable request fingerprint instead of task_title for identity (introduce a
helper like fingerprint_request(payload) or accept a request_payload argument in
record() and check()), adjust DelegationDeduplicator.record() and .check() to
accept and use that fingerprint when storing and comparing entries, and ensure
existing window_seconds behavior is preserved so refined requests with the same
title but different payloads do not get false-blocked.

In `@tests/unit/communication/loop_prevention/test_guard.py`:
- Around line 88-100: The test incorrectly uses
DelegationGuard.record_delegation to simulate circuit-breaker bounces; replace
the two calls to record_delegation("a","b",...) with calls to
DelegationGuard.record_bounce("a","b",...) (or otherwise invoke the dedicated
bounce API) so the circuit breaker logic defined by CircuitBreakerConfig
(bounce_threshold) is exercised; keep the rest of the assertions (call to
guard.check((), "a", "b", "Task-new-2") and expects passed is False and
mechanism == "circuit_breaker") unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 41e99a3b-8046-4661-9c8d-2dc65e07f0d5

📥 Commits

Reviewing files that changed from the base of the PR and between 10b2299 and d46b455.

📒 Files selected for processing (38)
  • CLAUDE.md
  • DESIGN_SPEC.md
  • README.md
  • src/ai_company/communication/__init__.py
  • src/ai_company/communication/delegation/__init__.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/communication/errors.py
  • src/ai_company/communication/loop_prevention/__init__.py
  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/loop_prevention/ancestry.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/depth.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • src/ai_company/core/task.py
  • src/ai_company/observability/events/delegation.py
  • tests/integration/communication/__init__.py
  • tests/integration/communication/test_delegation_integration.py
  • tests/unit/communication/delegation/__init__.py
  • tests/unit/communication/delegation/test_authority.py
  • tests/unit/communication/delegation/test_hierarchy.py
  • tests/unit/communication/delegation/test_models.py
  • tests/unit/communication/delegation/test_service.py
  • tests/unit/communication/loop_prevention/__init__.py
  • tests/unit/communication/loop_prevention/test_ancestry.py
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
  • tests/unit/communication/loop_prevention/test_dedup.py
  • tests/unit/communication/loop_prevention/test_depth.py
  • tests/unit/communication/loop_prevention/test_guard.py
  • tests/unit/communication/loop_prevention/test_rate_limit.py
  • tests/unit/communication/test_errors.py
  • tests/unit/core/test_task.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Always use native lazy annotations (PEP 649) — do not use from __future__ import annotations
Use PEP 758 except syntax: except A, B: without parentheses for multiple exception types
Maintain line length of 88 characters (enforced by ruff)

Files:

  • tests/unit/communication/loop_prevention/test_guard.py
  • tests/unit/core/test_task.py
  • src/ai_company/communication/loop_prevention/__init__.py
  • src/ai_company/communication/loop_prevention/models.py
  • tests/unit/communication/loop_prevention/test_rate_limit.py
  • src/ai_company/communication/delegation/__init__.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • tests/integration/communication/test_delegation_integration.py
  • tests/unit/communication/test_errors.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/core/task.py
  • tests/unit/communication/loop_prevention/test_ancestry.py
  • src/ai_company/communication/loop_prevention/depth.py
  • src/ai_company/communication/errors.py
  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/observability/events/delegation.py
  • src/ai_company/communication/loop_prevention/ancestry.py
  • tests/unit/communication/loop_prevention/test_dedup.py
  • src/ai_company/communication/delegation/models.py
  • tests/unit/communication/loop_prevention/test_depth.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • tests/unit/communication/delegation/test_service.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/__init__.py
  • tests/unit/communication/delegation/test_models.py
  • tests/unit/observability/test_events.py
  • tests/unit/communication/delegation/test_hierarchy.py
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
  • tests/unit/communication/delegation/test_authority.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use pytest markers @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow to categorize tests
Prefer @pytest.mark.parametrize for testing similar test cases
Use generic vendor-agnostic names in test fixtures and configuration: test-provider, test-small-001, test-medium-001, test-large-001

Files:

  • tests/unit/communication/loop_prevention/test_guard.py
  • tests/unit/core/test_task.py
  • tests/unit/communication/loop_prevention/test_rate_limit.py
  • tests/integration/communication/test_delegation_integration.py
  • tests/unit/communication/test_errors.py
  • tests/unit/communication/loop_prevention/test_ancestry.py
  • tests/unit/communication/loop_prevention/test_dedup.py
  • tests/unit/communication/loop_prevention/test_depth.py
  • tests/unit/communication/delegation/test_service.py
  • tests/unit/communication/delegation/test_models.py
  • tests/unit/observability/test_events.py
  • tests/unit/communication/delegation/test_hierarchy.py
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
  • tests/unit/communication/delegation/test_authority.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/**/*.py: All public functions and classes must have type hints and Google-style docstrings (enforced by ruff D rules)
Never use import logging or logging.getLogger() in application code — use from ai_company.observability import get_logger with variable name logger
Use structured logging with constants from ai_company.observability.events.<domain> — always log as logger.info(EVENT_CONSTANT, key=value) never with format strings
For all identifier/name fields in Pydantic models, use NotBlankStr type from core.types instead of manual whitespace validators (including optional and tuple variants)
Use @computed_field for derived values in Pydantic models instead of storing and validating redundant fields
Enforce immutability: create new objects instead of mutating. For non-Pydantic internal collections use copy.deepcopy() at construction and MappingProxyType for read-only enforcement
For config/identity data use frozen Pydantic models; for runtime state that evolves use separate mutable-via-copy models with model_copy(update=...)
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code instead of bare create_task
Functions must be under 50 lines, files must be under 800 lines
Handle errors explicitly — never silently swallow exceptions
Never use real vendor names (Anthropic, OpenAI, Claude, GPT) in project-owned code, docstrings, comments, or tests — use generic names: example-provider, example-large-001, test-provider, test-small-001

Files:

  • src/ai_company/communication/loop_prevention/__init__.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/delegation/__init__.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/core/task.py
  • src/ai_company/communication/loop_prevention/depth.py
  • src/ai_company/communication/errors.py
  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/observability/events/delegation.py
  • src/ai_company/communication/loop_prevention/ancestry.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/__init__.py
src/ai_company/{providers,tools,engine,communication}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations

Files:

  • src/ai_company/communication/loop_prevention/__init__.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/delegation/__init__.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/communication/loop_prevention/depth.py
  • src/ai_company/communication/errors.py
  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/communication/loop_prevention/ancestry.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/__init__.py
DESIGN_SPEC.md

📄 CodeRabbit inference engine (CLAUDE.md)

Update DESIGN_SPEC.md after approved deviations to reflect the new reality

Files:

  • DESIGN_SPEC.md
🧠 Learnings (1)
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,tools,engine,communication}/**/*.py : Use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations

Applied to files:

  • src/ai_company/communication/delegation/hierarchy.py
🧬 Code graph analysis (22)
tests/unit/core/test_task.py (4)
tests/unit/communication/delegation/test_models.py (1)
  • _make_task (19-29)
tests/integration/communication/test_delegation_integration.py (1)
  • _make_task (67-77)
tests/unit/communication/delegation/test_service.py (1)
  • _make_task (66-76)
src/ai_company/core/enums.py (1)
  • TaskStatus (122-148)
src/ai_company/communication/loop_prevention/__init__.py (7)
src/ai_company/communication/loop_prevention/ancestry.py (1)
  • check_ancestry (14-41)
src/ai_company/communication/loop_prevention/circuit_breaker.py (2)
  • CircuitBreakerState (21-30)
  • DelegationCircuitBreaker (48-174)
src/ai_company/communication/loop_prevention/dedup.py (1)
  • DelegationDeduplicator (17-96)
src/ai_company/communication/loop_prevention/depth.py (1)
  • check_delegation_depth (14-41)
src/ai_company/communication/loop_prevention/guard.py (1)
  • DelegationGuard (28-134)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/communication/loop_prevention/rate_limit.py (1)
  • DelegationRateLimiter (20-109)
tests/unit/communication/loop_prevention/test_rate_limit.py (2)
src/ai_company/communication/config.py (1)
  • RateLimitConfig (182-203)
src/ai_company/communication/loop_prevention/rate_limit.py (3)
  • DelegationRateLimiter (20-109)
  • check (48-91)
  • record (93-109)
src/ai_company/communication/delegation/__init__.py (4)
src/ai_company/communication/delegation/authority.py (2)
  • AuthorityCheckResult (21-43)
  • AuthorityValidator (46-157)
src/ai_company/communication/delegation/hierarchy.py (1)
  • HierarchyResolver (16-234)
src/ai_company/communication/delegation/models.py (3)
  • DelegationRecord (86-122)
  • DelegationRequest (11-38)
  • DelegationResult (41-83)
src/ai_company/communication/delegation/service.py (1)
  • DelegationService (31-195)
src/ai_company/communication/loop_prevention/circuit_breaker.py (5)
src/ai_company/communication/config.py (1)
  • CircuitBreakerConfig (206-227)
src/ai_company/communication/loop_prevention/_pair_key.py (1)
  • pair_key (9-22)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/communication/loop_prevention/guard.py (1)
  • check (55-101)
tests/integration/communication/test_delegation_integration.py (12)
src/ai_company/communication/delegation/authority.py (1)
  • AuthorityValidator (46-157)
src/ai_company/communication/delegation/hierarchy.py (1)
  • HierarchyResolver (16-234)
src/ai_company/communication/delegation/models.py (1)
  • DelegationRequest (11-38)
src/ai_company/communication/delegation/service.py (3)
  • DelegationService (31-195)
  • delegate (63-140)
  • get_audit_trail (178-184)
src/ai_company/communication/loop_prevention/guard.py (1)
  • DelegationGuard (28-134)
src/ai_company/core/agent.py (2)
  • AgentIdentity (246-304)
  • ModelConfig (145-174)
src/ai_company/core/company.py (1)
  • Company (387-470)
src/ai_company/core/enums.py (3)
  • SeniorityLevel (6-21)
  • TaskStatus (122-148)
  • TaskType (151-159)
src/ai_company/core/task.py (1)
  • Task (38-242)
tests/unit/communication/delegation/test_authority.py (1)
  • _make_agent (27-44)
tests/unit/communication/delegation/test_models.py (1)
  • _make_task (19-29)
src/ai_company/providers/drivers/litellm_driver.py (1)
  • update (675-697)
tests/unit/communication/test_errors.py (1)
src/ai_company/communication/errors.py (12)
  • DelegationAncestryError (83-84)
  • DelegationCircuitOpenError (91-92)
  • DelegationDepthError (79-80)
  • DelegationDuplicateError (95-96)
  • DelegationError (67-68)
  • DelegationLoopError (75-76)
  • DelegationRateLimitError (87-88)
  • HierarchyResolutionError (99-100)
  • MessageBusAlreadyRunningError (63-64)
  • MessageBusNotRunningError (59-60)
  • NotSubscribedError (55-56)
  • CommunicationError (12-44)
src/ai_company/communication/delegation/service.py (5)
src/ai_company/communication/delegation/authority.py (2)
  • AuthorityValidator (46-157)
  • validate (70-98)
src/ai_company/communication/delegation/hierarchy.py (2)
  • HierarchyResolver (16-234)
  • get_supervisor (131-140)
src/ai_company/communication/delegation/models.py (3)
  • DelegationRecord (86-122)
  • DelegationRequest (11-38)
  • DelegationResult (41-83)
src/ai_company/communication/loop_prevention/guard.py (3)
  • DelegationGuard (28-134)
  • check (55-101)
  • record_delegation (119-134)
src/ai_company/core/task.py (1)
  • Task (38-242)
tests/unit/communication/loop_prevention/test_ancestry.py (2)
src/ai_company/communication/loop_prevention/ancestry.py (1)
  • check_ancestry (14-41)
tests/unit/communication/loop_prevention/test_depth.py (1)
  • test_empty_chain_passes (14-17)
src/ai_company/communication/loop_prevention/guard.py (5)
src/ai_company/communication/config.py (1)
  • LoopPreventionConfig (230-267)
src/ai_company/communication/loop_prevention/ancestry.py (1)
  • check_ancestry (14-41)
src/ai_company/communication/loop_prevention/dedup.py (3)
  • DelegationDeduplicator (17-96)
  • check (37-80)
  • record (82-96)
src/ai_company/communication/loop_prevention/depth.py (1)
  • check_delegation_depth (14-41)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/communication/loop_prevention/ancestry.py (2)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
tests/unit/communication/loop_prevention/test_dedup.py (1)
src/ai_company/communication/loop_prevention/dedup.py (3)
  • DelegationDeduplicator (17-96)
  • check (37-80)
  • record (82-96)
src/ai_company/communication/delegation/models.py (1)
src/ai_company/core/task.py (1)
  • Task (38-242)
tests/unit/communication/loop_prevention/test_depth.py (1)
src/ai_company/communication/loop_prevention/depth.py (1)
  • check_delegation_depth (14-41)
src/ai_company/communication/delegation/hierarchy.py (3)
src/ai_company/communication/errors.py (1)
  • HierarchyResolutionError (99-100)
src/ai_company/core/company.py (1)
  • Company (387-470)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/communication/loop_prevention/dedup.py (1)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/communication/loop_prevention/rate_limit.py (4)
src/ai_company/communication/config.py (1)
  • RateLimitConfig (182-203)
src/ai_company/communication/loop_prevention/_pair_key.py (1)
  • pair_key (9-22)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/communication/loop_prevention/dedup.py (2)
  • check (37-80)
  • record (82-96)
src/ai_company/communication/delegation/authority.py (3)
src/ai_company/communication/config.py (1)
  • HierarchyConfig (160-179)
src/ai_company/communication/delegation/hierarchy.py (3)
  • HierarchyResolver (16-234)
  • is_direct_report (156-170)
  • is_subordinate (172-193)
src/ai_company/core/agent.py (1)
  • AgentIdentity (246-304)
src/ai_company/communication/__init__.py (10)
src/ai_company/communication/delegation/authority.py (2)
  • AuthorityCheckResult (21-43)
  • AuthorityValidator (46-157)
src/ai_company/communication/delegation/service.py (1)
  • DelegationService (31-195)
src/ai_company/communication/delegation/hierarchy.py (1)
  • HierarchyResolver (16-234)
src/ai_company/communication/errors.py (9)
  • DelegationAncestryError (83-84)
  • DelegationAuthorityError (71-72)
  • DelegationCircuitOpenError (91-92)
  • DelegationDepthError (79-80)
  • DelegationDuplicateError (95-96)
  • DelegationError (67-68)
  • DelegationLoopError (75-76)
  • DelegationRateLimitError (87-88)
  • HierarchyResolutionError (99-100)
src/ai_company/communication/loop_prevention/dedup.py (1)
  • DelegationDeduplicator (17-96)
src/ai_company/communication/loop_prevention/guard.py (1)
  • DelegationGuard (28-134)
src/ai_company/communication/loop_prevention/rate_limit.py (1)
  • DelegationRateLimiter (20-109)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/communication/loop_prevention/ancestry.py (1)
  • check_ancestry (14-41)
src/ai_company/communication/loop_prevention/depth.py (1)
  • check_delegation_depth (14-41)
tests/unit/communication/delegation/test_models.py (1)
src/ai_company/communication/delegation/models.py (3)
  • DelegationRecord (86-122)
  • DelegationRequest (11-38)
  • DelegationResult (41-83)
tests/unit/communication/delegation/test_hierarchy.py (1)
src/ai_company/communication/errors.py (1)
  • HierarchyResolutionError (99-100)
tests/unit/communication/loop_prevention/test_circuit_breaker.py (4)
src/ai_company/communication/config.py (1)
  • CircuitBreakerConfig (206-227)
src/ai_company/communication/loop_prevention/dedup.py (1)
  • check (37-80)
src/ai_company/communication/loop_prevention/guard.py (1)
  • check (55-101)
src/ai_company/communication/loop_prevention/rate_limit.py (1)
  • check (48-91)
🪛 LanguageTool
README.md

[typographical] ~17-~17: To join two clauses or introduce examples, consider using an em dash.
Context: ...timization - Hierarchical Delegation - Chain-of-command task delegation with fi...

(DASH_RULE)

🔇 Additional comments (44)
CLAUDE.md (1)

49-49: LGTM!

Documentation accurately reflects the expanded communication package scope with delegation and loop prevention subsystems.

src/ai_company/communication/loop_prevention/_pair_key.py (1)

1-22: LGTM!

Clean utility function with proper type hints and Google-style docstring. The min/max approach correctly produces a canonical key for undirected pairs.

src/ai_company/communication/delegation/__init__.py (1)

1-27: LGTM!

Clean package initialization that properly exposes the delegation subsystem's public API. The __all__ list is alphabetically sorted and matches the imports.

src/ai_company/core/task.py (3)

128-135: LGTM!

New delegation fields use NotBlankStr type as required by coding guidelines for identifier fields. Default values are sensible (None for optional parent, empty tuple for chain).


146-148: LGTM!

Good improvement to chain the original exception with from exc instead of from None, preserving the full traceback for debugging invalid ISO 8601 strings.


167-184: LGTM!

Delegation field validation is comprehensive:

  • Prevents self-referential parent task
  • Detects duplicates in delegation chain using the same Counter pattern as other validators
  • Ensures assigned_to is not in the delegation chain (prevents delegation loops)

Consistent with existing validation patterns in this model.

src/ai_company/communication/delegation/models.py (4)

11-38: LGTM!

DelegationRequest is well-structured with:

  • NotBlankStr for identifier fields (delegator_id, delegatee_id)
  • Sensible defaults for optional fields
  • tuple[NotBlankStr, ...] for constraints ensuring non-blank constraint strings
  • Frozen configuration for immutability

67-83: LGTM!

The validator correctly enforces the success/failure field correlation:

  • Success requires delegated_task and forbids rejection_reason/blocked_by
  • Failure requires rejection_reason

This provides strong invariants for consumers of the result.


86-122: LGTM!

DelegationRecord provides a complete audit trail entry with:

  • All identifier fields using NotBlankStr
  • AwareDatetime for timezone-aware timestamps
  • Frozen configuration for immutability

58-61: 🧹 Nitpick | 🔵 Trivial

Consider using NotBlankStr | None for rejection_reason.

The validator on lines 80-82 ensures rejection_reason is not None when success is False, but doesn't prevent whitespace-only strings. Per coding guidelines, identifier/name fields should use NotBlankStr. While rejection_reason isn't strictly an identifier, ensuring it's non-blank when required would provide stronger invariants.

♻️ Optional: Strengthen rejection_reason validation
-    rejection_reason: str | None = Field(
+    rejection_reason: NotBlankStr | None = Field(
         default=None,
         description="Reason for rejection",
     )
⛔ Skipped due to learnings
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/**/*.py : For all identifier/name fields in Pydantic models, use `NotBlankStr` type from `core.types` instead of manual whitespace validators (including optional and tuple variants)
DESIGN_SPEC.md (4)

581-581: LGTM!

The "Current state" callout accurately reflects the M4 implementation status: communication foundation, loop prevention with DelegationGuard, hierarchical delegation via DelegationService, and the task model extensions.


788-789: LGTM!

Task definition example correctly shows the new parent_task_id and delegation_chain fields with appropriate default values.


2355-2372: LGTM!

Project structure documentation accurately reflects the new delegation and loop_prevention module hierarchies with all submodules properly listed.


2552-2552: LGTM!

Engineering conventions entry properly documents the adopted delegation and loop prevention patterns including synchronous design, five-mechanism guard, injectable clocks, and task model extensions.

src/ai_company/communication/errors.py (1)

67-100: LGTM!

Well-designed error hierarchy:

  • DelegationError as base for all delegation-related errors
  • DelegationLoopError groups loop-prevention rejections, enabling callers to catch all loop issues uniformly
  • Each specific error type maps to a loop-prevention mechanism from the design spec
  • HierarchyResolutionError is correctly separate from DelegationError (hierarchy issues aren't delegation failures)

All errors inherit context handling from CommunicationError following the established pattern.

tests/unit/communication/test_errors.py (3)

23-23: LGTM!

Global timeout marker applied at module level ensures all tests in this file adhere to the 30-second timeout policy.


76-84: LGTM!

Parametrized inheritance tests comprehensively cover:

  • DelegationErrorCommunicationError
  • DelegationAuthorityErrorDelegationError
  • All DelegationLoopError subclasses → DelegationLoopError
  • HierarchyResolutionErrorCommunicationError (correctly separate from delegation)

116-129: LGTM!

Dedicated test class for delegation error hierarchy:

  • test_depth_error_chain validates the full inheritance chain through all base classes
  • test_hierarchy_error_is_communication_error correctly asserts that HierarchyResolutionError is not a DelegationError

Good defensive tests ensuring the error taxonomy remains intact.

tests/unit/core/test_task.py (1)

277-332: LGTM!

The delegation field tests are well-structured and provide comprehensive coverage for:

  • Default values validation
  • Self-referential parent_task_id rejection
  • Delegation chain duplicate detection
  • Cross-field validation (assigned_to not in delegation_chain)
  • Serialization preservation

The test class correctly uses @pytest.mark.unit marker and follows established patterns in the file.

src/ai_company/observability/events/delegation.py (1)

1-24: LGTM!

The delegation event constants are well-organized into logical groups (lifecycle, loop prevention, hierarchy) and follow the established domain.subject[.qualifier] naming pattern. Using Final[str] ensures immutability at the type level.

src/ai_company/communication/delegation/hierarchy.py (3)

37-88: LGTM!

The __init__ method correctly implements the hierarchy resolution with proper priority ordering (explicit lines override team/department relationships). Key strengths:

  • Defensive handling when removing from old supervisor's reports (lines 63-66)
  • Cycle detection before freezing state
  • MappingProxyType for read-only exposure with tuple conversion for nested lists
  • Structured logging with event constants

The logic correctly handles edge cases like team lead being in their own members list (line 50-51).


89-129: LGTM!

The cycle detection uses an efficient O(n) single-pass DFS algorithm with visited/in-stack coloring. Each agent is processed exactly once across all outer loop iterations, and the error message provides clear context about where the cycle was detected.


131-234: LGTM!

The query methods are well-implemented with:

  • Consistent return types (tuples for collections, None for missing data)
  • Proper traversal logic for hierarchy walking (is_subordinate, get_ancestors, get_delegation_depth)
  • Complete type hints and Google-style docstrings

All methods are stateless queries operating on the frozen internal state.

src/ai_company/communication/delegation/authority.py (2)

21-43: LGTM!

The AuthorityCheckResult model correctly enforces the allowed/reason correlation invariant:

  • Success (allowed=True) must have an empty reason
  • Failure (allowed=False) must have a non-empty reason

The frozen model config ensures immutability.


46-157: LGTM!

The AuthorityValidator correctly implements the two-phase validation:

  1. Hierarchy check (when enforce_chain_of_command is enabled): validates direct reports, with optional skip-level subordinate fallback
  2. Role permissions check: validates against can_delegate_to list (empty list permits all roles)

The logging strategy is appropriate—debug level for successful authorization, info level for denials with full context.

tests/unit/observability/test_events.py (1)

101-123: LGTM!

The test correctly adds "delegation" to the expected domain modules set, ensuring the new events/delegation.py module is discovered by the dynamic module scanner.

README.md (2)

17-17: LGTM!

The new feature bullet clearly describes the hierarchical delegation capability with loop prevention. The formatting is consistent with other feature bullets in the list.


27-27: LGTM!

The status update correctly reflects the milestone progression to M4 (Multi-Agent), acknowledging all previous milestones as complete.

tests/unit/communication/delegation/test_models.py (4)

1-29: LGTM!

The test module setup is clean with appropriate imports and a reusable _make_task helper that follows the established pattern across the delegation test suite.


32-86: LGTM!

Comprehensive coverage for DelegationRequest:

  • Minimal construction with default values
  • Full construction with optional fields
  • Frozen immutability enforcement
  • NotBlankStr validation for delegator/delegatee IDs

88-143: LGTM!

Excellent coverage of DelegationResult model validator invariants:

  • Success requires delegated_task, prohibits rejection_reason and blocked_by
  • Failure requires rejection_reason
  • Frozen immutability enforced

All validator error messages are matched appropriately.


145-216: LGTM!

DelegationRecord tests cover essential scenarios including JSON roundtrip, which validates that AwareDatetime serialization/deserialization works correctly for audit trail entries.

src/ai_company/communication/loop_prevention/circuit_breaker.py (4)

1-18: LGTM on module structure and imports.

The module correctly uses get_logger from observability and imports structured event constants. The setup follows coding guidelines.


21-46: LGTM on state definitions.

CircuitBreakerState enum and internal _PairState class are well-structured with appropriate docstrings and __slots__ usage.


62-78: LGTM on initialization and pair lookup.

Constructor properly stores config and clock with type hints. The _get_pair method correctly uses the shared pair_key utility for canonical direction-agnostic keys.


80-143: LGTM on state query and check methods.

The get_state method correctly handles cooldown expiry with proper reset and logging. The check method appropriately returns GuardCheckOutcome based on circuit state.

src/ai_company/communication/delegation/service.py (4)

1-28: LGTM on module structure and imports.

Proper use of get_logger from observability and structured event constants. Imports are well-organized.


31-61: LGTM on class definition.

The DelegationService class uses __slots__ for memory efficiency and has a clean constructor with keyword-only arguments and proper type hints.


63-140: LGTM on delegation flow.

The delegate method follows a correct pattern: validate authority, check loop prevention guards, create sub-task only after all checks pass, then record side effects. Structured logging is properly used.


178-195: LGTM on accessor methods.

get_audit_trail correctly returns a tuple (immutable copy) to prevent external mutation of the internal list. get_supervisor_of cleanly delegates to the hierarchy resolver.

src/ai_company/communication/__init__.py (4)

17-25: LGTM on delegation imports.

New delegation subsystem components are properly imported and will be re-exported through this module's public API.


35-51: LGTM on error imports.

All new delegation and hierarchy error types are properly imported for public API exposure.


58-67: LGTM on loop prevention imports.

Loop prevention components including guards, checkers, and utility functions are properly imported.


72-132: LGTM on __all__ exports.

The __all__ list is alphabetically sorted and includes all newly imported delegation, error, and loop-prevention components. The public API surface is well-defined.

Comment on lines +88 to +100
def test_circuit_breaker_blocked(self) -> None:
config = _make_config(
circuit_breaker=CircuitBreakerConfig(
bounce_threshold=2, cooldown_seconds=300
),
)
guard = DelegationGuard(config)
for i in range(2):
guard.record_delegation("a", "b", f"Task-{i}")
# Circuit should be open after 2 bounces
result = guard.check((), "a", "b", "Task-new-2")
assert result.passed is False
assert result.mechanism == "circuit_breaker"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't treat every delegation as a circuit-breaker bounce.

This test opens the breaker after two same-direction record_delegation("a", "b", ...) calls. The dedicated breaker API is bounce-based (record_bounce), and the PR objective says the circuit trips after repeated bounces. Keeping this expectation means ordinary A→B traffic can open the breaker even when there is no ping-pong loop.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/communication/loop_prevention/test_guard.py` around lines 88 -
100, The test incorrectly uses DelegationGuard.record_delegation to simulate
circuit-breaker bounces; replace the two calls to record_delegation("a","b",...)
with calls to DelegationGuard.record_bounce("a","b",...) (or otherwise invoke
the dedicated bounce API) so the circuit breaker logic defined by
CircuitBreakerConfig (bounce_threshold) is exercised; keep the rest of the
assertions (call to guard.check((), "a", "b", "Task-new-2") and expects passed
is False and mechanism == "circuit_breaker") unchanged.

…l reviewers

Source fixes:
- Add _escalate_loop_detection method with supervisor lookup and logging
- Add result event logging (DELEGATION_RESULT_SENT) on successful delegation
- Change guard/dedup API from task_title to task_id for correct dedup identity
- Change logger.error to logger.exception in sub-task creation error path
- Add identity consistency checks (delegator/delegatee ID vs name mismatch)
- Add self-delegation model validator on DelegationRequest
- Add failure-with-task validator on DelegationResult
- Add whitespace-only message rejection on GuardCheckOutcome
- Add max_depth <= 0 validation on depth check
- Add blank agent ID validation on pair_key utility
- Split circuit breaker _get_pair into _get_pair/_get_or_create_pair
- Circuit breaker evicts stale entries on cooldown expiry
- Rate limiter evicts empty timestamp lists
- Dedup adds global _purge_expired sweep on every check/record
- Circuit breaker record_delegation is no-op when circuit is open
- Constraints appended to sub-task description
- Sub-task inherits max_retries from original task

Documentation fixes:
- Fix DESIGN_SPEC.md: CircuitBreakerState listed in correct file
- Add __init__.py entries for delegation/ and loop_prevention/ in DESIGN_SPEC
- Update docstrings across hierarchy, authority, depth, circuit breaker, guard

Test fixes:
- Fix test_ancestry_blocked to use enforce_chain=False (not allow_skip=True)
- Rename record_bounce -> record_delegation in circuit breaker tests
- Rename test_different_task_title_passes -> test_different_task_id_passes
- Update guard tests from task_title to task_id parameter
- Add identity mismatch, constraints, self-delegation, failure-with-task tests
- Add circuit breaker no-op-when-open test
- Add dedup global pruning test
- Add delegation event value assertions in test_events.py
- Parametrize ancestry failure tests
- Update integration test comments for task_id-based dedup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment on lines +45 to +53
def __init__(self, config: LoopPreventionConfig) -> None:
self._config = config
self._deduplicator = DelegationDeduplicator(
window_seconds=config.dedup_window_seconds,
)
self._rate_limiter = DelegationRateLimiter(config.rate_limit)
self._circuit_breaker = DelegationCircuitBreaker(
config.circuit_breaker,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelegationGuard doesn't expose injectable clocks to sub-components

DelegationDeduplicator, DelegationRateLimiter, and DelegationCircuitBreaker each accept a clock: Callable[[], float] parameter for deterministic testing, but DelegationGuard.__init__ does not expose a clock parameter, causing all three components to default to time.monotonic. The PR description specifically calls out "injectable clocks for deterministic testing" as a design goal, yet this integration point doesn't honour it.

Any test that needs to verify time-based behaviour (dedup window expiry, rate-limit window reset, circuit breaker cooldown) through the DelegationGuard API must either monkeypatch time.monotonic globally or bypass the guard and instantiate components directly.

Consider accepting an optional clock parameter in __init__ and passing it through to the three stateful components:

def __init__(
    self,
    config: LoopPreventionConfig,
    *,
    clock: Callable[[], float] = time.monotonic,
) -> None:
    self._config = config
    self._deduplicator = DelegationDeduplicator(
        window_seconds=config.dedup_window_seconds,
        clock=clock,
    )
    self._rate_limiter = DelegationRateLimiter(config.rate_limit, clock=clock)
    self._circuit_breaker = DelegationCircuitBreaker(config.circuit_breaker, clock=clock)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/loop_prevention/guard.py
Line: 45-53

Comment:
**`DelegationGuard` doesn't expose injectable clocks to sub-components**

`DelegationDeduplicator`, `DelegationRateLimiter`, and `DelegationCircuitBreaker` each accept a `clock: Callable[[], float]` parameter for deterministic testing, but `DelegationGuard.__init__` does not expose a clock parameter, causing all three components to default to `time.monotonic`. The PR description specifically calls out "injectable clocks for deterministic testing" as a design goal, yet this integration point doesn't honour it.

Any test that needs to verify time-based behaviour (dedup window expiry, rate-limit window reset, circuit breaker cooldown) through the `DelegationGuard` API must either monkeypatch `time.monotonic` globally or bypass the guard and instantiate components directly.

Consider accepting an optional `clock` parameter in `__init__` and passing it through to the three stateful components:

```python
def __init__(
    self,
    config: LoopPreventionConfig,
    *,
    clock: Callable[[], float] = time.monotonic,
) -> None:
    self._config = config
    self._deduplicator = DelegationDeduplicator(
        window_seconds=config.dedup_window_seconds,
        clock=clock,
    )
    self._rate_limiter = DelegationRateLimiter(config.rate_limit, clock=clock)
    self._circuit_breaker = DelegationCircuitBreaker(config.circuit_breaker, clock=clock)
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +75 to +96
class DelegationLoopError(DelegationError):
"""Base for loop prevention mechanism rejections."""


class DelegationDepthError(DelegationLoopError):
"""Delegation chain exceeds maximum depth."""


class DelegationAncestryError(DelegationLoopError):
"""Delegation would create a cycle in the task ancestry."""


class DelegationRateLimitError(DelegationLoopError):
"""Delegation rate limit exceeded for agent pair."""


class DelegationCircuitOpenError(DelegationLoopError):
"""Circuit breaker is open for agent pair."""


class DelegationDuplicateError(DelegationLoopError):
"""Duplicate delegation detected within dedup window."""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loop-prevention exception subclasses are exported but never raised

DelegationAncestryError, DelegationDepthError, DelegationRateLimitError, DelegationCircuitOpenError, and DelegationDuplicateError are all declared in the error hierarchy and re-exported through communication/__init__.py, but none of them are ever raised by any code path. The loop-prevention mechanisms return GuardCheckOutcome(passed=False, ...) objects, and DelegationService.delegate() maps those to a DelegationResult(success=False) return value instead.

This creates a misleading public API: callers who examine the exported symbols will reasonably expect to catch these exceptions, but they will never be raised:

try:
    result = service.delegate(request, delegator, delegatee)
except DelegationAncestryError:
    # This block will never execute
    pass

Either raise the appropriate exception from DelegationService.delegate() when a loop-prevention mechanism blocks delegation, or remove these subclasses and document that all rejections surface through the DelegationResult.blocked_by field.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/errors.py
Line: 75-96

Comment:
**Loop-prevention exception subclasses are exported but never raised**

`DelegationAncestryError`, `DelegationDepthError`, `DelegationRateLimitError`, `DelegationCircuitOpenError`, and `DelegationDuplicateError` are all declared in the error hierarchy and re-exported through `communication/__init__.py`, but none of them are ever raised by any code path. The loop-prevention mechanisms return `GuardCheckOutcome(passed=False, ...)` objects, and `DelegationService.delegate()` maps those to a `DelegationResult(success=False)` return value instead.

This creates a misleading public API: callers who examine the exported symbols will reasonably expect to catch these exceptions, but they will never be raised:

```python
try:
    result = service.delegate(request, delegator, delegatee)
except DelegationAncestryError:
    # This block will never execute
    pass
```

Either raise the appropriate exception from `DelegationService.delegate()` when a loop-prevention mechanism blocks delegation, or remove these subclasses and document that all rejections surface through the `DelegationResult.blocked_by` field.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

♻️ Duplicate comments (3)
tests/unit/communication/loop_prevention/test_guard.py (1)

88-100: ⚠️ Potential issue | 🟠 Major

Don't open the breaker on ordinary same-direction delegations.

CircuitBreakerConfig.bounce_threshold is defined as a bounce count, but this test trips the breaker with two plain record_delegation("a", "b", ...) calls. That hard-wires DelegationGuard.record_delegation() to false-positive on steady manager→report traffic instead of only on ping-pong loops. Exercise the bounce path here; the dedicated breaker and integration tests should mirror the same contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/communication/loop_prevention/test_guard.py` around lines 88 -
100, The test is triggering the circuit breaker using two same-direction
delegations; change it to exercise the bounce path so bounce_threshold counts
true ping-pong bounces. Update the test to call
DelegationGuard.record_delegation(...) with alternating source/target (e.g.,
record_delegation("a","b",...) then record_delegation("b","a",...)) until
CircuitBreakerConfig.bounce_threshold is reached, then call
DelegationGuard.check(...) and assert result.passed is False and
result.mechanism == "circuit_breaker".
tests/integration/communication/test_delegation_integration.py (1)

188-235: ⚠️ Potential issue | 🟡 Minor

This setup never reaches the ancestry guard.

DelegationService.delegate() validates authority before loop prevention, and _build_three_level_service() hard-codes enforce_chain_of_command=True, so dev -> ceo is rejected as authority before ancestry is evaluated. Make this test’s authority path permissive and assert blocked_by == "ancestry" so it actually covers the ancestry mechanism.

Representative fix
-def _build_three_level_service() -> tuple[
+def _build_three_level_service(
+    *,
+    enforce_chain: bool = True,
+) -> tuple[
     DelegationService,
     HierarchyResolver,
     dict[str, AgentIdentity],
 ]:
...
     hierarchy_config = HierarchyConfig(
-        enforce_chain_of_command=True,
+        enforce_chain_of_command=enforce_chain,
         allow_skip_level=True,
     )
...
-        service, _, agents = _build_three_level_service()
+        service, _, agents = _build_three_level_service(enforce_chain=False)
...
-        # Blocked by either ancestry or authority
-        assert r3.blocked_by is not None
+        assert r3.blocked_by == "ancestry"
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/communication/test_delegation_integration.py` around lines
188 - 235, The test test_ancestry_prevents_back_delegation never exercises the
ancestry guard because DelegationService.delegate() checks authority first and
_build_three_level_service() sets enforce_chain_of_command=True; update the test
to construct a service/agents that allow the dev→ceo authority check to succeed
(e.g., call _build_three_level_service() or otherwise create the service with
enforce_chain_of_command=False or adjust agent permissions so authority passes)
then call DelegationService.delegate() as before and replace the loose
blocked_by assertion with assert r3.blocked_by == "ancestry" (keep r3.success is
False) so the test actually verifies the ancestry prevention path.
src/ai_company/communication/loop_prevention/circuit_breaker.py (1)

155-185: ⚠️ Potential issue | 🟠 Major

Only increment the breaker on real bounce-back patterns.

This increments bounce_count on every successful delegation, so three ordinary A→B handoffs open the circuit even if nothing ever bounced back. The breaker becomes a per-pair quota instead of the repeated-bounce guard described by the feature. Track direction or task lineage and increment only when the interaction actually reverses.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/loop_prevention/circuit_breaker.py` around lines
155 - 185, The current record_delegation in record_delegation increments
pair.bounce_count for every delegation; change it to only increment on a true
bounce by tracking direction/task lineage on the pair (e.g., add a
last_delegator or last_task_id field on the pair returned by
_get_or_create_pair) and: if delegator_id equals pair.last_delegator (same
direction) reset or leave bounce_count unchanged; if delegator_id equals
pair.last_delegatee (i.e., a reversal/bounce) increment bounce_count; update
pair.last_delegator/last_delegatee (or last_task_id) on each call, keep the
existing opened_at and logger.warning behaviour when threshold is reached, and
keep the early return when get_state(...) is OPEN. Ensure this logic is applied
inside record_delegation and persisted on the pair object returned by
_get_or_create_pair.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/communication/delegation/authority.py`:
- Around line 94-98: Replace the debug-level structured log with an info-level
structured log for successful delegations: change the call using logger.debug
with DELEGATION_AUTHORIZED to logger.info(DELEGATION_AUTHORIZED,
delegator=delegator.name, delegatee=delegatee.name) so the event constant
DELEGATION_AUTHORIZED is logged at info level (use the same key names delegator
and delegatee) to match the other delegation observability patterns.
- Around line 34-43: The validator _validate_allowed_reason currently only
rejects empty strings but allows whitespace-only reasons; update its denial
check to treat whitespace-only as empty by using not self.reason.strip() (i.e.,
replace the `if not self.allowed and not self.reason:` condition with one that
checks `not self.allowed and not self.reason.strip()`), so when self.allowed is
False the reason must contain non-whitespace characters; keep the other checks
and return self as before.

In `@src/ai_company/communication/delegation/hierarchy.py`:
- Around line 43-46: When building the team->department head edge, avoid
inserting a self-cycle by skipping the assignment if team.lead equals dept.head;
update the block that sets supervisor_of[team.lead] and appends to
reports_of[dept.head] to first check that team.lead != dept.head so you don't
create a head->head edge that will trigger _detect_cycles() falsely.
- Around line 83-87: Replace debug/warning-level structured logs in this module
with info-level structured events: change
logger.debug(DELEGATION_HIERARCHY_BUILT, ...) to
logger.info(DELEGATION_HIERARCHY_BUILT, agents=len(supervisor_of),
supervisors=len(reports_of)) and do the same for the other event calls around
lines 111-115 (use the corresponding event constants from
ai_company.observability.events.<domain>), ensuring you pass key=value pairs (no
formatted strings) and import the correct EVENT_CONSTANT names used in this
file.

In `@src/ai_company/communication/delegation/models.py`:
- Around line 75-94: The _validate_success_consistency model validator in
DelegationResult should also reject empty or whitespace-only rejection_reason
values when success is False; update the check that currently only tests "if not
self.success and self.rejection_reason is None" to instead verify that
rejection_reason is a non-empty, non-whitespace string (e.g., using str.strip()
or equivalent) and raise a ValueError like "rejection_reason is required when
success is False" when the stripped value is empty; keep all other existing
consistency checks in _validate_success_consistency unchanged.

In `@src/ai_company/communication/delegation/service.py`:
- Around line 199-201: The Task ID is being truncated to 12 hex chars using
uuid4().hex[:12] which reduces entropy and risks collisions; update the Task
creation to use the full UUID hex (e.g., replace uuid4().hex[:12] with
uuid4().hex) so Task id retains full entropy (reference the Task creation that
currently uses uuid4().hex[:12] in service.py) and ensure the uuid4 import is
present/used consistently where Task ids are generated for lineage/dedup keys.
- Around line 190-214: The child Task is built directly from request.task which
can share mutable nested state; fix by deep-copying the source task at the start
(e.g., src_task = copy.deepcopy(request.task)) before reading or modifying
fields like delegation_chain, description, constraints, estimated_complexity,
etc.; update subsequent references in this block to use src_task (and construct
new_chain from a copied chain) and add the necessary import for copy.deepcopy so
the new Task() is assembled from the isolated copy rather than the original
request.task.
- Around line 68-171: The delegate() and _create_sub_task() functions are too
long and mix responsibilities; refactor by extracting small helper methods for
each stage: e.g., _validate_identity(request, delegator, delegatee) to perform
the identity checks and raise ValueError, _check_authority_and_guard(request,
delegator, delegatee) to call self._authority_validator.validate and
self._guard.check and return an outcome/DelegationResult when blocked,
_build_sub_task(request) to construct the sub_task (moved logic from
_create_sub_task), and _record_audit_and_guard(request, sub_task) to call
self._guard.record_delegation, create/append DelegationRecord and centralize
logging; update delegate() to orchestrate these helpers and return early on
failures so each function stays under ~50 lines and responsibilities are
separated (refer to delegate, _create_sub_task, _guard.record_delegation,
_authority_validator.validate, _guard.check, and DelegationRecord).
- Around line 165-170: The delegation event logging is inconsistent and leaks
raw exception text; replace ad-hoc logger.debug calls with structured
logger.info using the domain event constants from
ai_company.observability.events.<domain> (e.g., DELEGATION_RESULT_SENT and
DELEGATION_LOOP_ESCALATED) and ensure all delegation paths (including the
sub-task failure path at the other ranges) emit the proper event constant with
key=value pairs rather than format strings; for failure cases do NOT log
str(exc) directly — instead log only sanitized error metadata (e.g.,
error_type=exc.__class__.__name__,
error_summary=short_truncated_message_or_code, task_id=request.task_id,
delegator=request.delegator_id, delegatee=request.delegatee_id) and update the
occurrences around the DELEGATION_RESULT_SENT (lines ~165-170), the similar
block at ~215-222 and ~253-260 to follow this pattern.

In `@src/ai_company/communication/loop_prevention/guard.py`:
- Around line 55-107: The check() method is too long—split it into two helper
methods (e.g., _run_pure_checks and _run_stateful_checks) to keep the
short-circuit behavior: move the ancestry and delegation-depth logic (calls to
check_ancestry and check_delegation_depth using
self._config.max_delegation_depth) into _run_pure_checks, and move the
deduplicator, rate limiter, and circuit breaker checks
(self._deduplicator.check, self._rate_limiter.check,
self._circuit_breaker.check) into _run_stateful_checks; have check() call the
helpers in order and use existing _log_and_return for failures and return
GuardCheckOutcome(passed=True, mechanism=_SUCCESS_MECHANISM) on success so
behavior and short-circuiting remain identical.

In `@src/ai_company/communication/loop_prevention/rate_limit.py`:
- Around line 65-75: The current two-step logic (check() reading/pruning
_timestamps around pair_key(...) and record() appending later) is racy—make
admission atomic by combining them into a single synchronized method (e.g.,
check_and_record()) or by holding self._lock across both operations; inside that
critical section compute now and cutoff via self._clock(), prune recent
timestamps from self._timestamps[key], compare len(recent) to limit
(self._config.max_per_pair_per_minute + self._config.burst_allowance), return a
failing GuardCheckOutcome if >= limit (ensuring to evict empty keys), otherwise
append now to recent, write back to self._timestamps[key] and return a passing
GuardCheckOutcome; update callers to use the new atomic method and remove the
separate record() / check() pairing.
- Around line 37-47: In the __init__ of the rate limiter, validate the incoming
window_seconds before storing it: if window_seconds is <= 0 raise a ValueError
with a clear message (e.g. "window_seconds must be > 0") so the invalid value is
rejected early; ensure you still assign self._window_seconds only after
validation and keep the rest of initialization (_config, _clock, _timestamps)
unchanged—this prevents check() from silently pruning histories when
window_seconds is non-positive.

---

Duplicate comments:
In `@src/ai_company/communication/loop_prevention/circuit_breaker.py`:
- Around line 155-185: The current record_delegation in record_delegation
increments pair.bounce_count for every delegation; change it to only increment
on a true bounce by tracking direction/task lineage on the pair (e.g., add a
last_delegator or last_task_id field on the pair returned by
_get_or_create_pair) and: if delegator_id equals pair.last_delegator (same
direction) reset or leave bounce_count unchanged; if delegator_id equals
pair.last_delegatee (i.e., a reversal/bounce) increment bounce_count; update
pair.last_delegator/last_delegatee (or last_task_id) on each call, keep the
existing opened_at and logger.warning behaviour when threshold is reached, and
keep the early return when get_state(...) is OPEN. Ensure this logic is applied
inside record_delegation and persisted on the pair object returned by
_get_or_create_pair.

In `@tests/integration/communication/test_delegation_integration.py`:
- Around line 188-235: The test test_ancestry_prevents_back_delegation never
exercises the ancestry guard because DelegationService.delegate() checks
authority first and _build_three_level_service() sets
enforce_chain_of_command=True; update the test to construct a service/agents
that allow the dev→ceo authority check to succeed (e.g., call
_build_three_level_service() or otherwise create the service with
enforce_chain_of_command=False or adjust agent permissions so authority passes)
then call DelegationService.delegate() as before and replace the loose
blocked_by assertion with assert r3.blocked_by == "ancestry" (keep r3.success is
False) so the test actually verifies the ancestry prevention path.

In `@tests/unit/communication/loop_prevention/test_guard.py`:
- Around line 88-100: The test is triggering the circuit breaker using two
same-direction delegations; change it to exercise the bounce path so
bounce_threshold counts true ping-pong bounces. Update the test to call
DelegationGuard.record_delegation(...) with alternating source/target (e.g.,
record_delegation("a","b",...) then record_delegation("b","a",...)) until
CircuitBreakerConfig.bounce_threshold is reached, then call
DelegationGuard.check(...) and assert result.passed is False and
result.mechanism == "circuit_breaker".

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 9dabc300-718d-47ff-a4e0-9d52f332e16b

📥 Commits

Reviewing files that changed from the base of the PR and between d46b455 and a9a1ba9.

📒 Files selected for processing (20)
  • DESIGN_SPEC.md
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/depth.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • tests/integration/communication/test_delegation_integration.py
  • tests/unit/communication/delegation/test_models.py
  • tests/unit/communication/delegation/test_service.py
  • tests/unit/communication/loop_prevention/test_ancestry.py
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
  • tests/unit/communication/loop_prevention/test_dedup.py
  • tests/unit/communication/loop_prevention/test_guard.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: Always use native lazy annotations (PEP 649) — do not use from __future__ import annotations
Use PEP 758 except syntax: except A, B: without parentheses for multiple exception types
Maintain line length of 88 characters (enforced by ruff)

Files:

  • tests/integration/communication/test_delegation_integration.py
  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
  • src/ai_company/communication/loop_prevention/guard.py
  • tests/unit/communication/loop_prevention/test_dedup.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • tests/unit/communication/delegation/test_models.py
  • tests/unit/communication/delegation/test_service.py
  • tests/unit/observability/test_events.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • tests/unit/communication/loop_prevention/test_ancestry.py
  • tests/unit/communication/loop_prevention/test_guard.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/loop_prevention/depth.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use pytest markers @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow to categorize tests
Prefer @pytest.mark.parametrize for testing similar test cases
Use generic vendor-agnostic names in test fixtures and configuration: test-provider, test-small-001, test-medium-001, test-large-001

Files:

  • tests/integration/communication/test_delegation_integration.py
  • tests/unit/communication/loop_prevention/test_circuit_breaker.py
  • tests/unit/communication/loop_prevention/test_dedup.py
  • tests/unit/communication/delegation/test_models.py
  • tests/unit/communication/delegation/test_service.py
  • tests/unit/observability/test_events.py
  • tests/unit/communication/loop_prevention/test_ancestry.py
  • tests/unit/communication/loop_prevention/test_guard.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/**/*.py: All public functions and classes must have type hints and Google-style docstrings (enforced by ruff D rules)
Never use import logging or logging.getLogger() in application code — use from ai_company.observability import get_logger with variable name logger
Use structured logging with constants from ai_company.observability.events.<domain> — always log as logger.info(EVENT_CONSTANT, key=value) never with format strings
For all identifier/name fields in Pydantic models, use NotBlankStr type from core.types instead of manual whitespace validators (including optional and tuple variants)
Use @computed_field for derived values in Pydantic models instead of storing and validating redundant fields
Enforce immutability: create new objects instead of mutating. For non-Pydantic internal collections use copy.deepcopy() at construction and MappingProxyType for read-only enforcement
For config/identity data use frozen Pydantic models; for runtime state that evolves use separate mutable-via-copy models with model_copy(update=...)
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code instead of bare create_task
Functions must be under 50 lines, files must be under 800 lines
Handle errors explicitly — never silently swallow exceptions
Never use real vendor names (Anthropic, OpenAI, Claude, GPT) in project-owned code, docstrings, comments, or tests — use generic names: example-provider, example-large-001, test-provider, test-small-001

Files:

  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/loop_prevention/depth.py
src/ai_company/{providers,tools,engine,communication}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations

Files:

  • src/ai_company/communication/loop_prevention/_pair_key.py
  • src/ai_company/communication/delegation/service.py
  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/guard.py
  • src/ai_company/communication/loop_prevention/rate_limit.py
  • src/ai_company/communication/loop_prevention/circuit_breaker.py
  • src/ai_company/communication/delegation/models.py
  • src/ai_company/communication/loop_prevention/models.py
  • src/ai_company/communication/delegation/authority.py
  • src/ai_company/communication/delegation/hierarchy.py
  • src/ai_company/communication/loop_prevention/depth.py
DESIGN_SPEC.md

📄 CodeRabbit inference engine (CLAUDE.md)

Update DESIGN_SPEC.md after approved deviations to reflect the new reality

Files:

  • DESIGN_SPEC.md
🧠 Learnings (4)
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,tools,engine,communication}/**/*.py : Use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations

Applied to files:

  • src/ai_company/communication/loop_prevention/dedup.py
  • src/ai_company/communication/loop_prevention/guard.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,config}/**/*.py : Configure `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`, not in calling code

Applied to files:

  • src/ai_company/communication/loop_prevention/rate_limit.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{api,config,providers}/**/*.py : Validate at system boundaries (user input, external APIs, config files)

Applied to files:

  • src/ai_company/communication/loop_prevention/models.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/**/*.py : For all identifier/name fields in Pydantic models, use `NotBlankStr` type from `core.types` instead of manual whitespace validators (including optional and tuple variants)

Applied to files:

  • src/ai_company/communication/loop_prevention/models.py
🧬 Code graph analysis (11)
tests/integration/communication/test_delegation_integration.py (7)
src/ai_company/communication/delegation/authority.py (1)
  • AuthorityValidator (46-158)
src/ai_company/communication/delegation/hierarchy.py (1)
  • HierarchyResolver (16-234)
src/ai_company/communication/delegation/models.py (1)
  • DelegationRequest (11-46)
src/ai_company/communication/delegation/service.py (3)
  • DelegationService (36-279)
  • delegate (68-171)
  • get_audit_trail (262-268)
src/ai_company/communication/loop_prevention/guard.py (1)
  • DelegationGuard (28-145)
src/ai_company/core/agent.py (1)
  • AgentIdentity (246-304)
src/ai_company/core/task.py (1)
  • Task (38-242)
src/ai_company/communication/loop_prevention/dedup.py (3)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
tests/unit/communication/loop_prevention/test_dedup.py (8)
  • clock (23-24)
  • clock (36-37)
  • clock (48-49)
  • clock (59-60)
  • clock (71-72)
  • clock (84-85)
  • clock (97-98)
  • clock (112-113)
tests/unit/communication/loop_prevention/test_circuit_breaker.py (2)
src/ai_company/communication/config.py (1)
  • CircuitBreakerConfig (206-227)
src/ai_company/communication/loop_prevention/circuit_breaker.py (1)
  • CircuitBreakerState (21-30)
tests/unit/communication/loop_prevention/test_dedup.py (1)
src/ai_company/communication/loop_prevention/dedup.py (3)
  • DelegationDeduplicator (17-111)
  • check (54-94)
  • record (96-111)
src/ai_company/communication/loop_prevention/rate_limit.py (4)
src/ai_company/communication/config.py (1)
  • RateLimitConfig (182-203)
src/ai_company/communication/loop_prevention/_pair_key.py (1)
  • pair_key (9-31)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
tests/unit/communication/delegation/test_models.py (3)
src/ai_company/communication/delegation/models.py (3)
  • DelegationRecord (97-133)
  • DelegationRequest (11-46)
  • DelegationResult (49-94)
src/ai_company/core/enums.py (1)
  • TaskType (151-159)
src/ai_company/core/task.py (1)
  • Task (38-242)
src/ai_company/communication/loop_prevention/circuit_breaker.py (3)
src/ai_company/communication/config.py (1)
  • CircuitBreakerConfig (206-227)
src/ai_company/communication/loop_prevention/_pair_key.py (1)
  • pair_key (9-31)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
tests/unit/communication/loop_prevention/test_ancestry.py (2)
src/ai_company/communication/loop_prevention/ancestry.py (1)
  • check_ancestry (14-41)
tests/unit/communication/loop_prevention/test_depth.py (1)
  • test_empty_chain_passes (14-17)
src/ai_company/communication/delegation/models.py (2)
src/ai_company/core/task.py (1)
  • Task (38-242)
src/ai_company/tools/sandbox/result.py (1)
  • success (26-28)
src/ai_company/communication/delegation/hierarchy.py (2)
src/ai_company/communication/errors.py (1)
  • HierarchyResolutionError (99-100)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/communication/loop_prevention/depth.py (2)
src/ai_company/communication/loop_prevention/models.py (1)
  • GuardCheckOutcome (10-39)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)

Comment on lines +83 to +87
logger.debug(
DELEGATION_HIERARCHY_BUILT,
agents=len(supervisor_of),
supervisors=len(reports_of),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Emit delegation events at info level.

These event constants currently go out through debug/warning, which diverges from the repo’s structured-event contract and makes them easy to miss in the normal observability pipeline.

Suggested fix
-        logger.debug(
+        logger.info(
             DELEGATION_HIERARCHY_BUILT,
             agents=len(supervisor_of),
             supervisors=len(reports_of),
         )
...
-                    logger.warning(
+                    logger.info(
                         DELEGATION_HIERARCHY_CYCLE,
                         agent=agent,
                         cycle_at=current,
                     )

As per coding guidelines, Use structured logging with constants from ai_company.observability.events.<domain> — always log as logger.info(EVENT_CONSTANT, key=value) never with format strings.

Also applies to: 111-115

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/delegation/hierarchy.py` around lines 83 - 87,
Replace debug/warning-level structured logs in this module with info-level
structured events: change logger.debug(DELEGATION_HIERARCHY_BUILT, ...) to
logger.info(DELEGATION_HIERARCHY_BUILT, agents=len(supervisor_of),
supervisors=len(reports_of)) and do the same for the other event calls around
lines 111-115 (use the corresponding event constants from
ai_company.observability.events.<domain>), ensuring you pass key=value pairs (no
formatted strings) and import the correct EVENT_CONSTANT names used in this
file.

Comment on lines +190 to +214
original = request.task
new_chain = (*original.delegation_chain, request.delegator_id)
description = original.description
if request.refinement:
description = f"{description}\n\nDelegation context: {request.refinement}"
if request.constraints:
constraints_text = "\n".join(f"- {c}" for c in request.constraints)
description = f"{description}\n\nConstraints:\n{constraints_text}"

try:
return Task(
id=f"del-{uuid4().hex[:12]}",
title=original.title,
description=description,
type=original.type,
priority=original.priority,
project=original.project,
created_by=request.delegator_id,
parent_task_id=original.id,
delegation_chain=new_chain,
estimated_complexity=original.estimated_complexity,
budget_limit=original.budget_limit,
deadline=original.deadline,
max_retries=original.max_retries,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Deep-copy the source task at this delegation boundary.

This child task is assembled from request.task fields directly. If any nested Task field is mutable, the delegator and delegatee will share state across the handoff. Copy the source payload before extracting fields for the new Task.

Based on learnings, Use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/delegation/service.py` around lines 190 - 214,
The child Task is built directly from request.task which can share mutable
nested state; fix by deep-copying the source task at the start (e.g., src_task =
copy.deepcopy(request.task)) before reading or modifying fields like
delegation_chain, description, constraints, estimated_complexity, etc.; update
subsequent references in this block to use src_task (and construct new_chain
from a copied chain) and add the necessary import for copy.deepcopy so the new
Task() is assembled from the isolated copy rather than the original
request.task.

Comment on lines +199 to +201
try:
return Task(
id=f"del-{uuid4().hex[:12]}",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use a full-entropy ID for delegated tasks.

uuid4().hex[:12] leaves only 48 bits of entropy. Task.id is later reused for lineage and dedup keys, so a collision here can cross-wire unrelated delegations or falsely block a valid one.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/delegation/service.py` around lines 199 - 201,
The Task ID is being truncated to 12 hex chars using uuid4().hex[:12] which
reduces entropy and risks collisions; update the Task creation to use the full
UUID hex (e.g., replace uuid4().hex[:12] with uuid4().hex) so Task id retains
full entropy (reference the Task creation that currently uses uuid4().hex[:12]
in service.py) and ensure the uuid4 import is present/used consistently where
Task ids are generated for lineage/dedup keys.

Comment on lines +55 to +107
def check(
self,
delegation_chain: tuple[str, ...],
delegator_id: str,
delegatee_id: str,
task_id: str,
) -> GuardCheckOutcome:
"""Run all loop prevention checks.

Returns the first failing outcome, or a success outcome if
all checks pass.

Args:
delegation_chain: Current delegation ancestry.
delegator_id: ID of the delegating agent.
delegatee_id: ID of the proposed delegatee.
task_id: Unique ID of the task being delegated.

Returns:
First failing outcome or an all-passed success.
"""
# Pure (stateless) checks first — sequential to short-circuit
outcome = check_ancestry(delegation_chain, delegatee_id)
if not outcome.passed:
return self._log_and_return(outcome, delegator_id, delegatee_id)

outcome = check_delegation_depth(
delegation_chain,
self._config.max_delegation_depth,
)
if not outcome.passed:
return self._log_and_return(outcome, delegator_id, delegatee_id)

# Stateful checks — only run if pure checks passed
outcome = self._deduplicator.check(
delegator_id,
delegatee_id,
task_id,
)
if not outcome.passed:
return self._log_and_return(outcome, delegator_id, delegatee_id)

outcome = self._rate_limiter.check(delegator_id, delegatee_id)
if not outcome.passed:
return self._log_and_return(outcome, delegator_id, delegatee_id)

outcome = self._circuit_breaker.check(delegator_id, delegatee_id)
if not outcome.passed:
return self._log_and_return(outcome, delegator_id, delegatee_id)
return GuardCheckOutcome(
passed=True,
mechanism=_SUCCESS_MECHANISM,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Split check() into smaller helpers.

This method now exceeds the repo's 50-line cap. The pure-check and stateful-check blocks are already natural extraction points, and pulling them out keeps the short-circuit flow readable without risking a regression back to eager evaluation.

As per coding guidelines, Functions must be under 50 lines, files must be under 800 lines.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/loop_prevention/guard.py` around lines 55 - 107,
The check() method is too long—split it into two helper methods (e.g.,
_run_pure_checks and _run_stateful_checks) to keep the short-circuit behavior:
move the ancestry and delegation-depth logic (calls to check_ancestry and
check_delegation_depth using self._config.max_delegation_depth) into
_run_pure_checks, and move the deduplicator, rate limiter, and circuit breaker
checks (self._deduplicator.check, self._rate_limiter.check,
self._circuit_breaker.check) into _run_stateful_checks; have check() call the
helpers in order and use existing _log_and_return for failures and return
GuardCheckOutcome(passed=True, mechanism=_SUCCESS_MECHANISM) on success so
behavior and short-circuiting remain identical.

Comment on lines +37 to +47
def __init__(
self,
config: RateLimitConfig,
*,
clock: Callable[[], float] = time.monotonic,
window_seconds: float = _DEFAULT_WINDOW_SECONDS,
) -> None:
self._config = config
self._clock = clock
self._window_seconds = window_seconds
self._timestamps: dict[tuple[str, str], list[float]] = {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Validate window_seconds before storing it.

Without a guard in Lines 37-47, window_seconds <= 0 makes every later check() prune the pair history to empty, so this limiter never trips. Fail fast here instead of silently accepting a broken window.

🛡️ Proposed fix
     def __init__(
         self,
         config: RateLimitConfig,
         *,
         clock: Callable[[], float] = time.monotonic,
         window_seconds: float = _DEFAULT_WINDOW_SECONDS,
     ) -> None:
+        if window_seconds <= 0:
+            msg = f"window_seconds must be greater than 0, got {window_seconds}"
+            raise ValueError(msg)
         self._config = config
         self._clock = clock
         self._window_seconds = window_seconds
         self._timestamps: dict[tuple[str, str], list[float]] = {}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/loop_prevention/rate_limit.py` around lines 37 -
47, In the __init__ of the rate limiter, validate the incoming window_seconds
before storing it: if window_seconds is <= 0 raise a ValueError with a clear
message (e.g. "window_seconds must be > 0") so the invalid value is rejected
early; ensure you still assign self._window_seconds only after validation and
keep the rest of initialization (_config, _clock, _timestamps) unchanged—this
prevents check() from silently pruning histories when window_seconds is
non-positive.

Comment on lines +65 to +75
key = pair_key(delegator_id, delegatee_id)
now = self._clock()
cutoff = now - self._window_seconds
timestamps = self._timestamps.get(key, [])
recent = [t for t in timestamps if t > cutoff]
# Prune expired entries on read; evict empty keys
if recent:
self._timestamps[key] = recent
else:
self._timestamps.pop(key, None)
limit = self._config.max_per_pair_per_minute + self._config.burst_allowance
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make rate-limit admission atomic.

Lines 65-75 only read/prune state, and Lines 108-115 append later. Two concurrent delegations for the same pair can both pass check() before either call reaches record(), so the limiter under-enforces exactly when requests arrive together. Fold this into a single synchronized check_and_record() path, or protect both operations with one shared critical section.

🔒 Suggested direction
def check_and_record(
    self,
    delegator_id: str,
    delegatee_id: str,
) -> GuardCheckOutcome:
    key = pair_key(delegator_id, delegatee_id)
    now = self._clock()
    cutoff = now - self._window_seconds
    limit = self._config.max_per_pair_per_minute + self._config.burst_allowance

    with self._lock:
        recent = [t for t in self._timestamps.get(key, []) if t > cutoff]
        if len(recent) >= limit:
            self._timestamps.pop(key, None) if not recent else None
            return GuardCheckOutcome(
                passed=False,
                mechanism=_MECHANISM,
                message=...,
            )
        recent.append(now)
        self._timestamps[key] = recent

    return GuardCheckOutcome(passed=True, mechanism=_MECHANISM)

Also applies to: 108-115

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/communication/loop_prevention/rate_limit.py` around lines 65 -
75, The current two-step logic (check() reading/pruning _timestamps around
pair_key(...) and record() appending later) is racy—make admission atomic by
combining them into a single synchronized method (e.g., check_and_record()) or
by holding self._lock across both operations; inside that critical section
compute now and cutoff via self._clock(), prune recent timestamps from
self._timestamps[key], compare len(recent) to limit
(self._config.max_per_pair_per_minute + self._config.burst_allowance), return a
failing GuardCheckOutcome if >= limit (ensuring to evict empty keys), otherwise
append now to recent, write back to self._timestamps[key] and return a passing
GuardCheckOutcome; update callers to use the new atomic method and remove the
separate record() / check() pairing.

…ptile

- Fix hierarchy self-cycle when dept.head == team.lead (CodeRabbit)
- Reject whitespace-only denial reasons in AuthorityCheckResult (CodeRabbit)
- Reject whitespace-only rejection_reason in DelegationResult (CodeRabbit)
- Promote DELEGATION_AUTHORIZED to logger.info (CodeRabbit)
- Add DELEGATION_SUB_TASK_FAILED event constant, replace bare string (CodeRabbit)
- Copy reviewers/dependencies/artifacts_expected/acceptance_criteria to sub-task (Greptile)
- Extract _validate_identity and _record_delegation helpers from delegate() (CodeRabbit)
- Add test for dept-head-as-team-lead self-cycle prevention

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings March 7, 2026 19:24
@Aureliolo Aureliolo merged commit 6be60b6 into main Mar 7, 2026
9 of 10 checks passed
@Aureliolo Aureliolo deleted the feat/delegation-loop-prevention branch March 7, 2026 19:24
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 38 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +64 to +68
self._hierarchy = hierarchy
self._authority_validator = authority_validator
self._guard = guard
self._audit_trail: list[DelegationRecord] = []

Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelegationService keeps an ever-growing in-memory _audit_trail list. In a long-running process this can become unbounded memory growth; consider adding a retention policy (max records / time window), an injectable persistence sink, or documenting that callers are expected to snapshot/clear it periodically.

Copilot uses AI. Check for mistakes.

try:
return Task(
id=f"del-{uuid4().hex[:12]}",
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sub-task IDs are generated by truncating UUID4 hex to 12 characters (uuid4().hex[:12]), which reduces entropy and increases collision risk if many delegations occur over time. Consider using the full UUID (or a project-wide task ID generator) to keep task IDs effectively unique across the system.

Suggested change
id=f"del-{uuid4().hex[:12]}",
id=f"del-{uuid4().hex}",

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +135
class DelegationRecord(BaseModel):
"""Audit trail entry for a completed delegation.

Attributes:
delegation_id: Unique delegation identifier.
delegator_id: Agent ID of the delegator.
delegatee_id: Agent ID of the delegatee.
original_task_id: ID of the original task.
delegated_task_id: ID of the created sub-task.
timestamp: When the delegation occurred.
refinement: Context provided by the delegator.
"""

model_config = ConfigDict(frozen=True)

delegation_id: NotBlankStr = Field(
description="Unique delegation identifier",
)
delegator_id: NotBlankStr = Field(
description="Delegator agent ID",
)
delegatee_id: NotBlankStr = Field(
description="Delegatee agent ID",
)
original_task_id: NotBlankStr = Field(
description="Original task ID",
)
delegated_task_id: NotBlankStr = Field(
description="Created sub-task ID",
)
timestamp: AwareDatetime = Field(
description="When delegation occurred",
)
refinement: str = Field(
default="",
description="Context provided by delegator",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelegationRecord omits constraints from the audit trail

DelegationRequest.constraints is now correctly forwarded into the sub-task description (via _create_sub_task), but DelegationRecord never records the original structured value. Any downstream consumer of the audit trail — compliance tooling, replay logic, or debugging — loses the constraints entirely once the DelegationRequest is discarded.

If constraints is meaningful enough to expose on DelegationRequest and to forward to the delegatee's task description, it should also be preserved in the audit entry. At minimum, DelegationRecord should include:

constraints: tuple[NotBlankStr, ...] = Field(
    default=(),
    description="Extra constraints imposed by the delegator",
)

…and _record_delegation in service.py should pass refinement=request.refinement, constraints=request.constraints.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/delegation/models.py
Line: 99-135

Comment:
**`DelegationRecord` omits `constraints` from the audit trail**

`DelegationRequest.constraints` is now correctly forwarded into the sub-task description (via `_create_sub_task`), but `DelegationRecord` never records the original structured value. Any downstream consumer of the audit trail — compliance tooling, replay logic, or debugging — loses the constraints entirely once the `DelegationRequest` is discarded.

If `constraints` is meaningful enough to expose on `DelegationRequest` and to forward to the delegatee's task description, it should also be preserved in the audit entry. At minimum, `DelegationRecord` should include:

```python
constraints: tuple[NotBlankStr, ...] = Field(
    default=(),
    description="Extra constraints imposed by the delegator",
)
```

…and `_record_delegation` in `service.py` should pass `refinement=request.refinement, constraints=request.constraints`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +88 to +121
def get_state(
self,
delegator_id: str,
delegatee_id: str,
) -> CircuitBreakerState:
"""Get the circuit breaker state for an agent pair.

If the circuit was previously open and the cooldown has expired,
the pair state is reset before returning ``CLOSED``.

Args:
delegator_id: First agent ID.
delegatee_id: Second agent ID.

Returns:
Current state of the circuit breaker.
"""
pair = self._get_pair(delegator_id, delegatee_id)
if pair is None:
return CircuitBreakerState.CLOSED
if pair.opened_at is not None:
elapsed = self._clock() - pair.opened_at
if elapsed < self._config.cooldown_seconds:
return CircuitBreakerState.OPEN
# Cooldown expired: evict the stale entry
key = pair_key(delegator_id, delegatee_id)
del self._pairs[key]
logger.info(
DELEGATION_LOOP_CIRCUIT_RESET,
delegator=delegator_id,
delegatee=delegatee_id,
cooldown_seconds=self._config.cooldown_seconds,
)
return CircuitBreakerState.CLOSED
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_state() mutates internal state — surprising for a getter

The get_state method deletes the pair entry from self._pairs when cooldown expires. While the docstring mentions this ("the pair state is reset before returning CLOSED"), the name get_state implies a read-only inspection. This side effect causes two problems:

  1. Double eviction under normal flow: check() calls get_state() (may delete the pair), then record_delegation() immediately calls get_state() again. The second call finds no entry and returns CLOSED cleanly, so the behavior is correct — but the state machine only survives by coincidence of call order.

  2. Test fragility: Any test that calls get_state() directly to inspect state after a cooldown interval inadvertently resets the counter, making tests timing-sensitive and brittle. The injectable clock is there precisely to avoid real-time dependency — but get_state()'s side effect means calling it "to check state" modifies what check() and record_delegation() would then see.

Consider separating eviction: keep get_state() as a pure reader and introduce a private _evict_if_expired() helper called at the top of check() and record_delegation().

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/loop_prevention/circuit_breaker.py
Line: 88-121

Comment:
**`get_state()` mutates internal state — surprising for a getter**

The `get_state` method deletes the pair entry from `self._pairs` when cooldown expires. While the docstring mentions this ("the pair state is reset before returning `CLOSED`"), the name `get_state` implies a read-only inspection. This side effect causes two problems:

1. **Double eviction under normal flow**: `check()` calls `get_state()` (may delete the pair), then `record_delegation()` immediately calls `get_state()` again. The second call finds no entry and returns `CLOSED` cleanly, so the behavior is correct — but the state machine only survives by coincidence of call order.

2. **Test fragility**: Any test that calls `get_state()` directly to inspect state after a cooldown interval inadvertently resets the counter, making tests timing-sensitive and brittle. The injectable `clock` is there precisely to avoid real-time dependency — but `get_state()`'s side effect means calling it "to check state" modifies what `check()` and `record_delegation()` would then see.

Consider separating eviction: keep `get_state()` as a pure reader and introduce a private `_evict_if_expired()` helper called at the top of `check()` and `record_delegation()`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +43 to +46
# Team lead -> department head (lowest priority)
if team.lead != dept.head and team.lead not in supervisor_of:
supervisor_of[team.lead] = dept.head
reports_of.setdefault(dept.head, []).append(team.lead)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undocumented "first team wins" behaviour for multi-team leads

When an agent leads teams in more than one department, their supervisor is silently assigned as the dept.head of whichever department happens to be iterated first, because the guard team.lead not in supervisor_of prevents any subsequent assignment:

if team.lead != dept.head and team.lead not in supervisor_of:
    supervisor_of[team.lead] = dept.head          # first dept wins
    reports_of.setdefault(dept.head, []).append(team.lead)

The class docstring documents three priority levels (explicit reporting line > team lead > dept head), but says nothing about conflicts within the same priority tier. In practice, iteration order is determined by company.departments ordering — which is a tuple in Company, so positional. A user who adds a department after the fact and doesn't realise their shared lead will silently switch hierarchy parent will get a wrong (and un-debuggable) authority check result.

At minimum, document this behaviour explicitly in the class docstring or in the __init__ inline comment. A logger.debug warning when a lead-to-dept assignment is skipped due to the guard would also help operators trace unexpected authority failures.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/delegation/hierarchy.py
Line: 43-46

Comment:
**Undocumented "first team wins" behaviour for multi-team leads**

When an agent leads teams in more than one department, their supervisor is silently assigned as the `dept.head` of whichever department happens to be iterated first, because the guard `team.lead not in supervisor_of` prevents any subsequent assignment:

```python
if team.lead != dept.head and team.lead not in supervisor_of:
    supervisor_of[team.lead] = dept.head          # first dept wins
    reports_of.setdefault(dept.head, []).append(team.lead)
```

The class docstring documents three priority levels (explicit reporting line > team lead > dept head), but says nothing about conflicts *within* the same priority tier. In practice, iteration order is determined by `company.departments` ordering — which is a `tuple` in `Company`, so positional. A user who adds a department after the fact and doesn't realise their shared lead will silently switch hierarchy parent will get a wrong (and un-debuggable) authority check result.

At minimum, document this behaviour explicitly in the class docstring or in the `__init__` inline comment. A `logger.debug` warning when a lead-to-dept assignment is skipped due to the guard would also help operators trace unexpected authority failures.

How can I resolve this? If you propose a fix, please make it concise.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement loop prevention (depth limits, dedup, circuit breakers, ancestry) Implement hierarchical delegation (task flows down, results flow up)

2 participants