feat: implement hierarchical delegation and loop prevention (#12, #17)#160
feat: implement hierarchical delegation and loop prevention (#12, #17)#160
Conversation
Add hierarchical delegation with authority validation, 5-mechanism loop prevention (ancestry, depth, dedup, rate limit, circuit breaker), and a DelegationService orchestrator with audit trail. - Task model: add parent_task_id and delegation_chain fields - HierarchyResolver: build org graph from Company, cycle detection - AuthorityValidator: chain-of-command + role permission checks - DelegationGuard: orchestrates all loop prevention mechanisms - DelegationService: end-to-end delegation with audit recording - 9 new error types for delegation and hierarchy failures - 14 delegation event constants for observability - Integration tests for multi-hop delegation flows Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…revention Pre-reviewed by 9 agents, 26 findings addressed: - Extract shared _pair_key utility (DRY violation) - Add model validators to DelegationResult, AuthorityCheckResult, GuardCheckOutcome - Fix circuit breaker re-arming bug in record_bounce - Add O(n) DFS cycle detection replacing O(n²) approach - Fix duplicate entries in hierarchy reporting lines - Freeze hierarchy internal state with MappingProxyType - Add logging to ancestry and depth check modules - Add circuit reset event logging - Prune expired entries in dedup and rate limiter - Split guard checks into pure/stateful phases - Rename create_sub_task to _create_sub_task (private) - Update DESIGN_SPEC.md, CLAUDE.md, README.md Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
|
Caution Review failedThe pull request is closed. ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (6)
📝 WalkthroughSummary by CodeRabbit
WalkthroughImplements hierarchical delegation and loop-prevention: adds DelegationService, delegation models, authority checks, HierarchyResolver, a DelegationGuard coordinating five prevention mechanisms (ancestry, depth, dedup, rate limit, circuit breaker), task delegation fields, new errors/events, public exports, and extensive unit/integration tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant DelegationService
participant AuthorityValidator
participant HierarchyResolver
participant DelegationGuard
participant LoopPrevention
participant TaskManager
Client->>DelegationService: delegate(request, delegator, delegatee)
DelegationService->>AuthorityValidator: validate(delegator, delegatee)
AuthorityValidator->>HierarchyResolver: is_subordinate(delegator, delegatee)
HierarchyResolver-->>AuthorityValidator: hierarchy result
AuthorityValidator-->>DelegationService: AuthorityCheckResult
alt Authority Denied
DelegationService-->>Client: DelegationResult(success=False)
else Authority Approved
DelegationService->>DelegationGuard: check(chain, delegator, delegatee, task)
DelegationGuard->>LoopPrevention: ancestry / depth / dedup / rate_limit / circuit_breaker
LoopPrevention-->>DelegationGuard: first failing GuardCheckOutcome or all_passed
alt Blocked by Loop Prevention
DelegationGuard-->>DelegationService: GuardCheckOutcome(passed=False)
DelegationService-->>Client: DelegationResult(success=False, blocked_by=mechanism)
else Passed
DelegationService->>TaskManager: _create_sub_task(request)
TaskManager-->>DelegationService: new Task (delegation_chain updated)
DelegationService->>DelegationGuard: record_delegation(delegator, delegatee, task)
DelegationGuard-->>LoopPrevention: record across mechanisms
DelegationService->>TaskManager: record_audit_trail(DelegationRecord)
DelegationService-->>Client: DelegationResult(success=True, delegated_task=sub_task)
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a fundamental capability for multi-agent systems: hierarchical task delegation. It allows agents to assign tasks to subordinates while ensuring the integrity and stability of the system through a sophisticated five-mechanism loop prevention guard. These changes are crucial for enabling complex, multi-step workflows and maintaining a coherent organizational structure within the AI company, preventing infinite delegation loops and ensuring tasks flow efficiently through the hierarchy. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull request overview
Implements a hierarchical delegation subsystem in communication/ with authority validation and a five-layer loop-prevention guard, extending the core Task model to track delegation lineage and adding observability event constants and tests to cover the new behavior.
Changes:
- Add
HierarchyResolver,AuthorityValidator, andDelegationServicefor chain-of-command delegation flows. - Add loop-prevention mechanisms (ancestry, depth, dedup, rate limit, circuit breaker) orchestrated by
DelegationGuard. - Extend
Taskwithparent_task_idanddelegation_chain, and add new delegation-related observability events + extensive unit/integration tests.
Reviewed changes
Copilot reviewed 35 out of 38 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Registers delegation events module in discovery test. |
| tests/unit/core/test_task.py | Adds unit tests for new task delegation fields/validators. |
| tests/unit/communication/test_errors.py | Adds tests for new delegation + hierarchy error classes. |
| tests/unit/communication/loop_prevention/test_rate_limit.py | Unit tests for per-pair delegation rate limiter. |
| tests/unit/communication/loop_prevention/test_guard.py | Unit tests for guard orchestration + ordering. |
| tests/unit/communication/loop_prevention/test_depth.py | Unit tests for max-depth check. |
| tests/unit/communication/loop_prevention/test_dedup.py | Unit tests for time-window deduplication. |
| tests/unit/communication/loop_prevention/test_circuit_breaker.py | Unit tests for circuit breaker open/reset behavior. |
| tests/unit/communication/loop_prevention/test_ancestry.py | Unit tests for ancestry-cycle blocking. |
| tests/unit/communication/loop_prevention/init.py | Package marker for loop prevention tests. |
| tests/unit/communication/delegation/test_service.py | Unit tests for DelegationService end-to-end behavior. |
| tests/unit/communication/delegation/test_models.py | Unit tests for delegation request/result/audit models. |
| tests/unit/communication/delegation/test_hierarchy.py | Unit tests for hierarchy resolution + cycle detection. |
| tests/unit/communication/delegation/test_authority.py | Unit tests for authority validation rules/config. |
| tests/unit/communication/delegation/init.py | Package marker for delegation tests. |
| tests/integration/communication/test_delegation_integration.py | Integration tests for multi-hop delegation + loop prevention. |
| tests/integration/communication/init.py | Package marker for communication integration tests. |
| src/ai_company/observability/events/delegation.py | Adds domain-specific delegation/loop-prevention event constants. |
| src/ai_company/core/task.py | Adds parent_task_id/delegation_chain fields + invariants. |
| src/ai_company/communication/loop_prevention/rate_limit.py | Implements per-pair sliding-window rate limiter. |
| src/ai_company/communication/loop_prevention/models.py | Adds GuardCheckOutcome model for mechanism outcomes. |
| src/ai_company/communication/loop_prevention/guard.py | Orchestrates mechanisms with short-circuiting. |
| src/ai_company/communication/loop_prevention/depth.py | Implements pure max-depth check. |
| src/ai_company/communication/loop_prevention/dedup.py | Implements deduplication within a time window. |
| src/ai_company/communication/loop_prevention/circuit_breaker.py | Implements per-pair circuit breaker with cooldown. |
| src/ai_company/communication/loop_prevention/ancestry.py | Implements pure ancestry check. |
| src/ai_company/communication/loop_prevention/_pair_key.py | Adds canonical undirected agent-pair key helper. |
| src/ai_company/communication/loop_prevention/init.py | Exposes loop prevention public API. |
| src/ai_company/communication/errors.py | Adds delegation + hierarchy error hierarchy. |
| src/ai_company/communication/delegation/service.py | Implements delegation flow: authority → guard → subtask → audit. |
| src/ai_company/communication/delegation/models.py | Adds delegation request/result/audit Pydantic models. |
| src/ai_company/communication/delegation/hierarchy.py | Builds org hierarchy from Company, freezes state, detects cycles. |
| src/ai_company/communication/delegation/authority.py | Implements hierarchy + role-based authority validation. |
| src/ai_company/communication/delegation/init.py | Exposes delegation subsystem public API. |
| src/ai_company/communication/init.py | Re-exports delegation + loop-prevention types from package root. |
| README.md | Updates feature list + roadmap milestone status. |
| DESIGN_SPEC.md | Updates spec to reflect implemented delegation + loop prevention. |
| CLAUDE.md | Updates repo structure description for communication subsystem. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| key = pair_key(delegator_id, delegatee_id) | ||
| return self._pairs.setdefault(key, _PairState()) |
There was a problem hiding this comment.
get_state() calls _get_pair(), which uses setdefault and therefore allocates a _PairState for every pair ever checked, even if no bounces are ever recorded. In a long-running system with many unique pairs, _pairs can grow without bound. Consider treating a missing key as CLOSED in get_state()/check() and only creating _PairState in record_bounce() (or implement periodic pruning).
| key = (delegator_id, delegatee_id, task_title) | ||
| self._records[key] = self._clock() |
There was a problem hiding this comment.
DelegationDeduplicator only deletes expired entries when check() is called for the same (delegator, delegatee, task_title) key. In normal operation with many one-off task titles, _records can accumulate stale entries indefinitely. Consider pruning expired entries opportunistically in record() as well (e.g., prune a bounded number of oldest entries) or switching to a time-ordered structure so old keys are removed without requiring a future check() for the same key.
| delegation_chain: tuple[str, ...], | ||
| delegator_id: str, | ||
| delegatee_id: str, | ||
| task_title: str, | ||
| ) -> GuardCheckOutcome: |
There was a problem hiding this comment.
The PR description/spec calls out deduplication for duplicate (delegator, delegatee, task) within a window, but the guard/deduplicator currently key on task_title. Titles are not stable (and tests already retitle tasks to bypass dedup), so this doesn’t reliably prevent repeated delegations of the same task. Consider switching the dedup key to a stable identifier such as task.id (or a hash of the full request payload) and update task_title plumbing accordingly.
| guard_outcome = self._guard.check( | ||
| delegation_chain=request.task.delegation_chain, | ||
| delegator_id=request.delegator_id, | ||
| delegatee_id=request.delegatee_id, | ||
| task_title=request.task.title, |
There was a problem hiding this comment.
delegate() uses request.delegator_id / request.delegatee_id when running guard checks and recording state, but authority is validated against the delegator/delegatee AgentIdentity objects. If these ever differ, callers can bypass dedup/rate-limit/circuit-breaker accounting and produce incorrect audit logs. Add a consistency check up front (reject if request IDs don’t match the provided identities) or refactor to derive IDs from AgentIdentity everywhere in this method.
| timestamps = self._timestamps.get(key, []) | ||
| recent = [t for t in timestamps if t > cutoff] | ||
| # Prune expired entries on read | ||
| self._timestamps[key] = recent |
There was a problem hiding this comment.
check() always writes back self._timestamps[key] = recent even when the pair has never been recorded (i.e., recent is empty). Because DelegationGuard.check() runs before record(), this will create empty entries for every unique pair ever checked, which can grow unbounded in long-running processes. Consider only updating the dict when the key already exists (or when recent is non-empty), and leave missing keys absent until the first record().
| timestamps = self._timestamps.get(key, []) | |
| recent = [t for t in timestamps if t > cutoff] | |
| # Prune expired entries on read | |
| self._timestamps[key] = recent | |
| timestamps = self._timestamps.get(key) | |
| if timestamps is not None: | |
| recent = [t for t in timestamps if t > cutoff] | |
| # Prune expired entries on read for existing keys | |
| self._timestamps[key] = recent | |
| else: | |
| recent = [] |
Greptile SummaryThis PR implements a complete hierarchical delegation system and a 5-mechanism loop prevention guard for the Key observations:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant DelegationService
participant AuthorityValidator
participant HierarchyResolver
participant DelegationGuard
participant AncestryCheck
participant DepthCheck
participant Deduplicator
participant RateLimiter
participant CircuitBreaker
Caller->>DelegationService: delegate(request, delegator, delegatee)
DelegationService->>DelegationService: _validate_identity()
DelegationService->>AuthorityValidator: validate(delegator, delegatee)
AuthorityValidator->>HierarchyResolver: is_direct_report() / is_subordinate()
HierarchyResolver-->>AuthorityValidator: bool
AuthorityValidator-->>DelegationService: AuthorityCheckResult
alt authority denied
DelegationService-->>Caller: DelegationResult(success=False, blocked_by="authority")
end
DelegationService->>DelegationGuard: check(chain, delegator_id, delegatee_id, task_id)
DelegationGuard->>AncestryCheck: check_ancestry(chain, delegatee_id)
AncestryCheck-->>DelegationGuard: GuardCheckOutcome
DelegationGuard->>DepthCheck: check_delegation_depth(chain, max_depth)
DepthCheck-->>DelegationGuard: GuardCheckOutcome
DelegationGuard->>Deduplicator: check(delegator, delegatee, task_id)
Deduplicator-->>DelegationGuard: GuardCheckOutcome
DelegationGuard->>RateLimiter: check(delegator, delegatee)
RateLimiter-->>DelegationGuard: GuardCheckOutcome
DelegationGuard->>CircuitBreaker: check(delegator, delegatee)
CircuitBreaker-->>DelegationGuard: GuardCheckOutcome
DelegationGuard-->>DelegationService: GuardCheckOutcome
alt loop detected
DelegationService->>HierarchyResolver: get_supervisor(delegator_id)
HierarchyResolver-->>DelegationService: supervisor (for escalation log)
DelegationService-->>Caller: DelegationResult(success=False, blocked_by=mechanism)
end
DelegationService->>DelegationService: _create_sub_task(request)
Note over DelegationService: new_chain = (*chain, delegator_id)<br/>new Task with fresh ID + parent_task_id
DelegationService->>DelegationGuard: record_delegation(delegator, delegatee, task_id)
DelegationGuard->>Deduplicator: record()
DelegationGuard->>RateLimiter: record()
DelegationGuard->>CircuitBreaker: record_delegation()
DelegationService->>DelegationService: append DelegationRecord to _audit_trail
DelegationService-->>Caller: DelegationResult(success=True, delegated_task=sub_task)
Last reviewed commit: 1d8d010 |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive and well-designed hierarchical delegation and loop prevention system, featuring robust implementation with clear separation of concerns among components like HierarchyResolver, AuthorityValidator, and DelegationService. However, security concerns have been identified regarding potential unbounded memory growth in stateful loop prevention components (deduplication and rate limiting), and a lack of consistency checks between provided agent identities and request payloads in the delegation service, which could lead to resource exhaustion or audit trail manipulation. Additionally, a suggestion is made for sub-task creation to ensure consistency in retry behavior.
| ) -> None: | ||
| self._window_seconds = window_seconds | ||
| self._clock = clock | ||
| self._records: dict[tuple[str, str, str], float] = {} |
There was a problem hiding this comment.
The DelegationDeduplicator uses an in-memory dictionary _records to store delegation timestamps for deduplication. However, there is no mechanism to prune expired entries from this dictionary unless a check is performed for the exact same key (delegator, delegatee, and task title). This leads to unbounded memory growth as new unique tasks are delegated, which can be exploited to cause a Denial of Service (DoS) via memory exhaustion in long-running processes.
| self._config = config | ||
| self._clock = clock | ||
| self._window_seconds = window_seconds | ||
| self._timestamps: dict[tuple[str, str], list[float]] = {} |
There was a problem hiding this comment.
The DelegationRateLimiter stores timestamps for every agent pair in the _timestamps dictionary. Similar to the deduplicator, expired entries are only pruned for the specific pair being checked. If the system handles many unique agent pairs over time, this dictionary will grow indefinitely, leading to memory exhaustion.
| def delegate( | ||
| self, | ||
| request: DelegationRequest, | ||
| delegator: AgentIdentity, | ||
| delegatee: AgentIdentity, | ||
| ) -> DelegationResult: | ||
| """Execute a delegation: authority, loops, sub-task, audit. | ||
|
|
||
| Args: | ||
| request: The delegation request. | ||
| delegator: Identity of the delegating agent. | ||
| delegatee: Identity of the target agent. | ||
|
|
||
| Returns: | ||
| Result indicating success or rejection with reason. | ||
| """ |
There was a problem hiding this comment.
The delegate method accepts both AgentIdentity objects and a DelegationRequest object. While it uses the identities for authority validation, it relies on the IDs provided in the request for loop prevention checks, audit trail records, and sub-task creation. There is no validation to ensure that request.delegator_id matches delegator.name. This allows an authenticated agent to spoof its identity in audit logs and potentially bypass per-agent loop prevention controls (like rate limits) by providing a different delegator_id in the request payload.
def delegate(
self,
request: DelegationRequest,
delegator: AgentIdentity,
delegatee: AgentIdentity,
) -> DelegationResult:
"""Execute a delegation: authority, loops, sub-task, audit.
Args:
request: The delegation request.
delegator: Identity of the delegating agent.
delegatee: Identity of the target agent.
Returns:
Result indicating success or rejection with reason.
"""
if request.delegator_id != delegator.name:
return DelegationResult(
success=False,
rejection_reason=f"Request delegator_id {request.delegator_id!r} does not match identity {delegator.name!r}",
blocked_by="identity_validation",
)
if request.delegatee_id != delegatee.name:
return DelegationResult(
success=False,
rejection_reason=f"Request delegatee_id {request.delegatee_id!r} does not match identity {delegatee.name!r}",
blocked_by="identity_validation",
)| return Task( | ||
| id=f"del-{uuid4().hex[:12]}", | ||
| title=original.title, | ||
| description=description, | ||
| type=original.type, | ||
| priority=original.priority, | ||
| project=original.project, | ||
| created_by=request.delegator_id, | ||
| parent_task_id=original.id, | ||
| delegation_chain=new_chain, | ||
| estimated_complexity=original.estimated_complexity, | ||
| budget_limit=original.budget_limit, | ||
| deadline=original.deadline, | ||
| ) |
There was a problem hiding this comment.
When creating a sub-task, the max_retries field from the original task is not being inherited. The new task will fall back to the default value of 1. This could lead to inconsistent retry behavior, especially if the original task was configured with a different number of retries (e.g., 0 or 3). To ensure consistent behavior for delegated tasks, I suggest inheriting the max_retries value from the original task.
return Task(
id=f"del-{uuid4().hex[:12]}",
title=original.title,
description=description,
type=original.type,
priority=original.priority,
project=original.project,
created_by=request.delegator_id,
parent_task_id=original.id,
delegation_chain=new_chain,
estimated_complexity=original.estimated_complexity,
budget_limit=original.budget_limit,
deadline=original.deadline,
max_retries=original.max_retries,
)There was a problem hiding this comment.
Actionable comments posted: 14
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/communication/loop_prevention/circuit_breaker.py`:
- Around line 145-174: The method record_bounce is misleading because
DelegationGuard.record_delegation calls it on successful delegations; either
rename record_bounce to record_delegation (and update all call sites such as
DelegationGuard.record_delegation and any tests/mocks) or keep the name but
update the docstring and logging to explicitly state that "bounce" here means a
recorded delegation event that can contribute to back-and-forth loops; if you
rename, update the method signature in class CircuitBreaker, any imports/usages,
and adjust unit tests to the new name.
In `@src/ai_company/communication/loop_prevention/dedup.py`:
- Around line 37-55: The dedup key used in dedup.py's check method is too coarse
(key = (delegator_id, delegatee_id, task_title)) and will collide for different
requests with the same title; change it to use a canonical request fingerprint
that reflects the full request payload (e.g., title plus
refinement/constraints/other identifying fields) instead of the bare title.
Implement or call a deterministic fingerprint/canonicalization helper (e.g., a
new _fingerprint_request or reuse existing canonicalizer) to produce a string or
hash, then replace key = (delegator_id, delegatee_id, task_title) with key =
(delegator_id, delegatee_id, fingerprint) and use that for self._records lookups
and inserts so only truly identical requests dedupe. Ensure the fingerprinting
function is stable and includes all fields that define request identity.
- Around line 55-60: The dedup map (_records) only deletes the single key being
checked and keeps growing because new (delegator, delegatee, title) keys are
continually added; fix by implementing global expiry rather than key-reuse
deletion: maintain an expiry-ordered structure (e.g., a min-heap of
(expiry_timestamp, key) or an OrderedDict keyed by expiry) and on any
check/insert (the code paths that use _records, _clock, and _window_seconds) pop
and delete all entries whose expiry <= _clock(), or run a sweep when inserting
if _records grows past a threshold; update both the lookup branch (where
recorded_at is checked) and the insertion branch (where new entries are added)
to call this purge so expired entries are removed instead of only deleting the
single rechecked key.
In `@src/ai_company/communication/loop_prevention/depth.py`:
- Around line 14-41: The function check_delegation_depth should validate its
max_depth parameter and raise a ValueError for invalid limits instead of
silently blocking delegation; add a guard at the start of check_delegation_depth
that raises ValueError (with a clear message like "max_depth must be a positive
integer") when max_depth <= 0 before any use of delegation_chain, then keep the
existing length check that logs DELEGATION_LOOP_DEPTH_EXCEEDED and returns
GuardCheckOutcome with mechanism _MECHANISM on failure; ensure the raised error
message is descriptive to surface misconfiguration to callers.
In `@src/ai_company/communication/loop_prevention/guard.py`:
- Around line 77-97: The current loops build tuples which evaluate all checks
eagerly (causing unwanted state mutations); instead call each check sequentially
and short-circuit on the first failure: invoke check_ancestry(delegation_chain,
delegatee_id) and if not outcome.passed return self._log_and_return(outcome,
delegator_id, delegatee_id), then call check_delegation_depth(delegation_chain,
self._config.max_delegation_depth) and short-circuit similarly, and only after
those pure checks run the stateful checks by calling
self._deduplicator.check(delegator_id, delegatee_id, task_title), then
self._rate_limiter.check(delegator_id, delegatee_id), then
self._circuit_breaker.check(delegator_id, delegatee_id), short-circuiting
(returning via self._log_and_return) immediately on the first failed outcome so
later checks are not invoked.
In `@src/ai_company/communication/loop_prevention/models.py`:
- Around line 25-38: The post-validation in _validate_passed_message currently
allows whitespace-only failure messages; update the model_validator method
(named _validate_passed_message) to treat message.strip() as the real content:
when self.passed is False ensure that self.message exists and
self.message.strip() is non-empty, raising the existing "message is required
when passed is False" ValueError if it is empty after stripping; keep the
existing check that when self.passed is True the message must be empty.
In `@src/ai_company/communication/loop_prevention/rate_limit.py`:
- Around line 64-70: The check() logic currently assigns an empty list back into
self._timestamps for unseen pair keys which creates permanent no-op buckets;
change the code after computing recent = [t for t in timestamps if t > cutoff]
to remove the key (del self._timestamps[key] or pop) when recent is empty, and
only assign self._timestamps[key] = recent when recent is non-empty; reference
pair_key, _timestamps, _window_seconds and the check() method to locate where to
apply this change.
In `@tests/unit/communication/delegation/test_hierarchy.py`:
- Around line 278-293: The test currently can't observe the "first supervisor
wins" rule because both teams' departments use the same head; update
test_multi_team_leeps_first_supervisor to place the second team under a
different department head so precedence is measurable (e.g., create two
Department instances: dept1 with head="cto" containing Team(name="t1",
lead="shared_lead", ...), and dept2 with head="vp" containing Team(name="t2",
lead="shared_lead", ...), then build company via
_make_company(departments=(dept1, dept2))) and keep assertions against
HierarchyResolver.get_supervisor to ensure shared_lead resolves to "cto" while
dev1/dev2 resolve to "shared_lead".
In `@tests/unit/communication/delegation/test_service.py`:
- Around line 234-272: The test test_ancestry_blocked never reaches the ancestry
guard because DelegationService authorizes (authority check) before loop
prevention; fix by building a service that bypasses authority enforcement (call
_build_service with enforce_chain=False or otherwise make the authority path
permissive) so the delegate call in this test (service.delegate with
DelegationRequest from dev -> ceo) exercises ancestry blocking in
DelegationService.delegate; then tighten the assertion to check r3.success is
False and r3.blocked_by == "ancestry". Apply the same change to the analogous
integration test that uses the same setup.
In `@tests/unit/communication/loop_prevention/test_ancestry.py`:
- Around line 1-43: Consolidate the repetitive failure tests in
TestCheckAncestry by replacing the separate methods
test_delegatee_in_chain_fails, test_delegatee_is_root_fails,
test_delegatee_is_last_in_chain_fails, and test_single_element_chain_match_fails
with a single parametrized test using `@pytest.mark.parametrize` that supplies
(chain, delegatee, expected_passed[, expected_message_substring]) and calls
check_ancestry for each case; ensure you still assert result.passed is False for
failure cases and keep the mechanism == "ancestry" and any message substring
checks (e.g., "'b'") where relevant so the behavior validated by those original
tests is preserved.
In `@tests/unit/communication/loop_prevention/test_dedup.py`:
- Around line 80-92: The test only verifies per-key expiry; add an unrelated
expired record so pruning must remove entries beyond the checked key: in
test_expired_entries_pruned_on_check create a second record (e.g.,
dedup.record("x","y","task-2")) before advancing clock to an expired time, then
call dedup.check on the original key and assert both ("a","b","task-1") and
("x","y","task-2") are not in dedup._records; this ensures
DelegationDeduplicator.check prunes stale entries globally from the _records
store rather than only deleting the checked key.
- Around line 45-54: The dedup currently keys on (delegator, delegatee,
task_title) which is too coarse; update DelegationDeduplicator to compute and
use a stable request fingerprint instead of task_title for identity (introduce a
helper like fingerprint_request(payload) or accept a request_payload argument in
record() and check()), adjust DelegationDeduplicator.record() and .check() to
accept and use that fingerprint when storing and comparing entries, and ensure
existing window_seconds behavior is preserved so refined requests with the same
title but different payloads do not get false-blocked.
In `@tests/unit/communication/loop_prevention/test_guard.py`:
- Around line 88-100: The test incorrectly uses
DelegationGuard.record_delegation to simulate circuit-breaker bounces; replace
the two calls to record_delegation("a","b",...) with calls to
DelegationGuard.record_bounce("a","b",...) (or otherwise invoke the dedicated
bounce API) so the circuit breaker logic defined by CircuitBreakerConfig
(bounce_threshold) is exercised; keep the rest of the assertions (call to
guard.check((), "a", "b", "Task-new-2") and expects passed is False and
mechanism == "circuit_breaker") unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 41e99a3b-8046-4661-9c8d-2dc65e07f0d5
📒 Files selected for processing (38)
CLAUDE.mdDESIGN_SPEC.mdREADME.mdsrc/ai_company/communication/__init__.pysrc/ai_company/communication/delegation/__init__.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/communication/errors.pysrc/ai_company/communication/loop_prevention/__init__.pysrc/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/loop_prevention/ancestry.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/depth.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/loop_prevention/rate_limit.pysrc/ai_company/core/task.pysrc/ai_company/observability/events/delegation.pytests/integration/communication/__init__.pytests/integration/communication/test_delegation_integration.pytests/unit/communication/delegation/__init__.pytests/unit/communication/delegation/test_authority.pytests/unit/communication/delegation/test_hierarchy.pytests/unit/communication/delegation/test_models.pytests/unit/communication/delegation/test_service.pytests/unit/communication/loop_prevention/__init__.pytests/unit/communication/loop_prevention/test_ancestry.pytests/unit/communication/loop_prevention/test_circuit_breaker.pytests/unit/communication/loop_prevention/test_dedup.pytests/unit/communication/loop_prevention/test_depth.pytests/unit/communication/loop_prevention/test_guard.pytests/unit/communication/loop_prevention/test_rate_limit.pytests/unit/communication/test_errors.pytests/unit/core/test_task.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Agent
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Always use native lazy annotations (PEP 649) — do not usefrom __future__ import annotations
Use PEP 758 except syntax:except A, B:without parentheses for multiple exception types
Maintain line length of 88 characters (enforced by ruff)
Files:
tests/unit/communication/loop_prevention/test_guard.pytests/unit/core/test_task.pysrc/ai_company/communication/loop_prevention/__init__.pysrc/ai_company/communication/loop_prevention/models.pytests/unit/communication/loop_prevention/test_rate_limit.pysrc/ai_company/communication/delegation/__init__.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pytests/integration/communication/test_delegation_integration.pytests/unit/communication/test_errors.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/core/task.pytests/unit/communication/loop_prevention/test_ancestry.pysrc/ai_company/communication/loop_prevention/depth.pysrc/ai_company/communication/errors.pysrc/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/observability/events/delegation.pysrc/ai_company/communication/loop_prevention/ancestry.pytests/unit/communication/loop_prevention/test_dedup.pysrc/ai_company/communication/delegation/models.pytests/unit/communication/loop_prevention/test_depth.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/rate_limit.pytests/unit/communication/delegation/test_service.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/__init__.pytests/unit/communication/delegation/test_models.pytests/unit/observability/test_events.pytests/unit/communication/delegation/test_hierarchy.pytests/unit/communication/loop_prevention/test_circuit_breaker.pytests/unit/communication/delegation/test_authority.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slowto categorize tests
Prefer@pytest.mark.parametrizefor testing similar test cases
Use generic vendor-agnostic names in test fixtures and configuration:test-provider,test-small-001,test-medium-001,test-large-001
Files:
tests/unit/communication/loop_prevention/test_guard.pytests/unit/core/test_task.pytests/unit/communication/loop_prevention/test_rate_limit.pytests/integration/communication/test_delegation_integration.pytests/unit/communication/test_errors.pytests/unit/communication/loop_prevention/test_ancestry.pytests/unit/communication/loop_prevention/test_dedup.pytests/unit/communication/loop_prevention/test_depth.pytests/unit/communication/delegation/test_service.pytests/unit/communication/delegation/test_models.pytests/unit/observability/test_events.pytests/unit/communication/delegation/test_hierarchy.pytests/unit/communication/loop_prevention/test_circuit_breaker.pytests/unit/communication/delegation/test_authority.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: All public functions and classes must have type hints and Google-style docstrings (enforced by ruff D rules)
Never useimport loggingorlogging.getLogger()in application code — usefrom ai_company.observability import get_loggerwith variable namelogger
Use structured logging with constants fromai_company.observability.events.<domain>— always log aslogger.info(EVENT_CONSTANT, key=value)never with format strings
For all identifier/name fields in Pydantic models, useNotBlankStrtype fromcore.typesinstead of manual whitespace validators (including optional and tuple variants)
Use@computed_fieldfor derived values in Pydantic models instead of storing and validating redundant fields
Enforce immutability: create new objects instead of mutating. For non-Pydantic internal collections usecopy.deepcopy()at construction andMappingProxyTypefor read-only enforcement
For config/identity data use frozen Pydantic models; for runtime state that evolves use separate mutable-via-copy models withmodel_copy(update=...)
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code instead of barecreate_task
Functions must be under 50 lines, files must be under 800 lines
Handle errors explicitly — never silently swallow exceptions
Never use real vendor names (Anthropic, OpenAI, Claude, GPT) in project-owned code, docstrings, comments, or tests — use generic names:example-provider,example-large-001,test-provider,test-small-001
Files:
src/ai_company/communication/loop_prevention/__init__.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/delegation/__init__.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/core/task.pysrc/ai_company/communication/loop_prevention/depth.pysrc/ai_company/communication/errors.pysrc/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/observability/events/delegation.pysrc/ai_company/communication/loop_prevention/ancestry.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/rate_limit.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/__init__.py
src/ai_company/{providers,tools,engine,communication}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use
copy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Files:
src/ai_company/communication/loop_prevention/__init__.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/delegation/__init__.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/communication/loop_prevention/depth.pysrc/ai_company/communication/errors.pysrc/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/communication/loop_prevention/ancestry.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/rate_limit.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/__init__.py
DESIGN_SPEC.md
📄 CodeRabbit inference engine (CLAUDE.md)
Update
DESIGN_SPEC.mdafter approved deviations to reflect the new reality
Files:
DESIGN_SPEC.md
🧠 Learnings (1)
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,tools,engine,communication}/**/*.py : Use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Applied to files:
src/ai_company/communication/delegation/hierarchy.py
🧬 Code graph analysis (22)
tests/unit/core/test_task.py (4)
tests/unit/communication/delegation/test_models.py (1)
_make_task(19-29)tests/integration/communication/test_delegation_integration.py (1)
_make_task(67-77)tests/unit/communication/delegation/test_service.py (1)
_make_task(66-76)src/ai_company/core/enums.py (1)
TaskStatus(122-148)
src/ai_company/communication/loop_prevention/__init__.py (7)
src/ai_company/communication/loop_prevention/ancestry.py (1)
check_ancestry(14-41)src/ai_company/communication/loop_prevention/circuit_breaker.py (2)
CircuitBreakerState(21-30)DelegationCircuitBreaker(48-174)src/ai_company/communication/loop_prevention/dedup.py (1)
DelegationDeduplicator(17-96)src/ai_company/communication/loop_prevention/depth.py (1)
check_delegation_depth(14-41)src/ai_company/communication/loop_prevention/guard.py (1)
DelegationGuard(28-134)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/communication/loop_prevention/rate_limit.py (1)
DelegationRateLimiter(20-109)
tests/unit/communication/loop_prevention/test_rate_limit.py (2)
src/ai_company/communication/config.py (1)
RateLimitConfig(182-203)src/ai_company/communication/loop_prevention/rate_limit.py (3)
DelegationRateLimiter(20-109)check(48-91)record(93-109)
src/ai_company/communication/delegation/__init__.py (4)
src/ai_company/communication/delegation/authority.py (2)
AuthorityCheckResult(21-43)AuthorityValidator(46-157)src/ai_company/communication/delegation/hierarchy.py (1)
HierarchyResolver(16-234)src/ai_company/communication/delegation/models.py (3)
DelegationRecord(86-122)DelegationRequest(11-38)DelegationResult(41-83)src/ai_company/communication/delegation/service.py (1)
DelegationService(31-195)
src/ai_company/communication/loop_prevention/circuit_breaker.py (5)
src/ai_company/communication/config.py (1)
CircuitBreakerConfig(206-227)src/ai_company/communication/loop_prevention/_pair_key.py (1)
pair_key(9-22)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/communication/loop_prevention/guard.py (1)
check(55-101)
tests/integration/communication/test_delegation_integration.py (12)
src/ai_company/communication/delegation/authority.py (1)
AuthorityValidator(46-157)src/ai_company/communication/delegation/hierarchy.py (1)
HierarchyResolver(16-234)src/ai_company/communication/delegation/models.py (1)
DelegationRequest(11-38)src/ai_company/communication/delegation/service.py (3)
DelegationService(31-195)delegate(63-140)get_audit_trail(178-184)src/ai_company/communication/loop_prevention/guard.py (1)
DelegationGuard(28-134)src/ai_company/core/agent.py (2)
AgentIdentity(246-304)ModelConfig(145-174)src/ai_company/core/company.py (1)
Company(387-470)src/ai_company/core/enums.py (3)
SeniorityLevel(6-21)TaskStatus(122-148)TaskType(151-159)src/ai_company/core/task.py (1)
Task(38-242)tests/unit/communication/delegation/test_authority.py (1)
_make_agent(27-44)tests/unit/communication/delegation/test_models.py (1)
_make_task(19-29)src/ai_company/providers/drivers/litellm_driver.py (1)
update(675-697)
tests/unit/communication/test_errors.py (1)
src/ai_company/communication/errors.py (12)
DelegationAncestryError(83-84)DelegationCircuitOpenError(91-92)DelegationDepthError(79-80)DelegationDuplicateError(95-96)DelegationError(67-68)DelegationLoopError(75-76)DelegationRateLimitError(87-88)HierarchyResolutionError(99-100)MessageBusAlreadyRunningError(63-64)MessageBusNotRunningError(59-60)NotSubscribedError(55-56)CommunicationError(12-44)
src/ai_company/communication/delegation/service.py (5)
src/ai_company/communication/delegation/authority.py (2)
AuthorityValidator(46-157)validate(70-98)src/ai_company/communication/delegation/hierarchy.py (2)
HierarchyResolver(16-234)get_supervisor(131-140)src/ai_company/communication/delegation/models.py (3)
DelegationRecord(86-122)DelegationRequest(11-38)DelegationResult(41-83)src/ai_company/communication/loop_prevention/guard.py (3)
DelegationGuard(28-134)check(55-101)record_delegation(119-134)src/ai_company/core/task.py (1)
Task(38-242)
tests/unit/communication/loop_prevention/test_ancestry.py (2)
src/ai_company/communication/loop_prevention/ancestry.py (1)
check_ancestry(14-41)tests/unit/communication/loop_prevention/test_depth.py (1)
test_empty_chain_passes(14-17)
src/ai_company/communication/loop_prevention/guard.py (5)
src/ai_company/communication/config.py (1)
LoopPreventionConfig(230-267)src/ai_company/communication/loop_prevention/ancestry.py (1)
check_ancestry(14-41)src/ai_company/communication/loop_prevention/dedup.py (3)
DelegationDeduplicator(17-96)check(37-80)record(82-96)src/ai_company/communication/loop_prevention/depth.py (1)
check_delegation_depth(14-41)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)
src/ai_company/communication/loop_prevention/ancestry.py (2)
src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
tests/unit/communication/loop_prevention/test_dedup.py (1)
src/ai_company/communication/loop_prevention/dedup.py (3)
DelegationDeduplicator(17-96)check(37-80)record(82-96)
src/ai_company/communication/delegation/models.py (1)
src/ai_company/core/task.py (1)
Task(38-242)
tests/unit/communication/loop_prevention/test_depth.py (1)
src/ai_company/communication/loop_prevention/depth.py (1)
check_delegation_depth(14-41)
src/ai_company/communication/delegation/hierarchy.py (3)
src/ai_company/communication/errors.py (1)
HierarchyResolutionError(99-100)src/ai_company/core/company.py (1)
Company(387-470)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
src/ai_company/communication/loop_prevention/dedup.py (1)
src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)
src/ai_company/communication/loop_prevention/rate_limit.py (4)
src/ai_company/communication/config.py (1)
RateLimitConfig(182-203)src/ai_company/communication/loop_prevention/_pair_key.py (1)
pair_key(9-22)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/communication/loop_prevention/dedup.py (2)
check(37-80)record(82-96)
src/ai_company/communication/delegation/authority.py (3)
src/ai_company/communication/config.py (1)
HierarchyConfig(160-179)src/ai_company/communication/delegation/hierarchy.py (3)
HierarchyResolver(16-234)is_direct_report(156-170)is_subordinate(172-193)src/ai_company/core/agent.py (1)
AgentIdentity(246-304)
src/ai_company/communication/__init__.py (10)
src/ai_company/communication/delegation/authority.py (2)
AuthorityCheckResult(21-43)AuthorityValidator(46-157)src/ai_company/communication/delegation/service.py (1)
DelegationService(31-195)src/ai_company/communication/delegation/hierarchy.py (1)
HierarchyResolver(16-234)src/ai_company/communication/errors.py (9)
DelegationAncestryError(83-84)DelegationAuthorityError(71-72)DelegationCircuitOpenError(91-92)DelegationDepthError(79-80)DelegationDuplicateError(95-96)DelegationError(67-68)DelegationLoopError(75-76)DelegationRateLimitError(87-88)HierarchyResolutionError(99-100)src/ai_company/communication/loop_prevention/dedup.py (1)
DelegationDeduplicator(17-96)src/ai_company/communication/loop_prevention/guard.py (1)
DelegationGuard(28-134)src/ai_company/communication/loop_prevention/rate_limit.py (1)
DelegationRateLimiter(20-109)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/communication/loop_prevention/ancestry.py (1)
check_ancestry(14-41)src/ai_company/communication/loop_prevention/depth.py (1)
check_delegation_depth(14-41)
tests/unit/communication/delegation/test_models.py (1)
src/ai_company/communication/delegation/models.py (3)
DelegationRecord(86-122)DelegationRequest(11-38)DelegationResult(41-83)
tests/unit/communication/delegation/test_hierarchy.py (1)
src/ai_company/communication/errors.py (1)
HierarchyResolutionError(99-100)
tests/unit/communication/loop_prevention/test_circuit_breaker.py (4)
src/ai_company/communication/config.py (1)
CircuitBreakerConfig(206-227)src/ai_company/communication/loop_prevention/dedup.py (1)
check(37-80)src/ai_company/communication/loop_prevention/guard.py (1)
check(55-101)src/ai_company/communication/loop_prevention/rate_limit.py (1)
check(48-91)
🪛 LanguageTool
README.md
[typographical] ~17-~17: To join two clauses or introduce examples, consider using an em dash.
Context: ...timization - Hierarchical Delegation - Chain-of-command task delegation with fi...
(DASH_RULE)
🔇 Additional comments (44)
CLAUDE.md (1)
49-49: LGTM!Documentation accurately reflects the expanded communication package scope with delegation and loop prevention subsystems.
src/ai_company/communication/loop_prevention/_pair_key.py (1)
1-22: LGTM!Clean utility function with proper type hints and Google-style docstring. The
min/maxapproach correctly produces a canonical key for undirected pairs.src/ai_company/communication/delegation/__init__.py (1)
1-27: LGTM!Clean package initialization that properly exposes the delegation subsystem's public API. The
__all__list is alphabetically sorted and matches the imports.src/ai_company/core/task.py (3)
128-135: LGTM!New delegation fields use
NotBlankStrtype as required by coding guidelines for identifier fields. Default values are sensible (Nonefor optional parent, empty tuple for chain).
146-148: LGTM!Good improvement to chain the original exception with
from excinstead offrom None, preserving the full traceback for debugging invalid ISO 8601 strings.
167-184: LGTM!Delegation field validation is comprehensive:
- Prevents self-referential parent task
- Detects duplicates in delegation chain using the same
Counterpattern as other validators- Ensures
assigned_tois not in the delegation chain (prevents delegation loops)Consistent with existing validation patterns in this model.
src/ai_company/communication/delegation/models.py (4)
11-38: LGTM!
DelegationRequestis well-structured with:
NotBlankStrfor identifier fields (delegator_id,delegatee_id)- Sensible defaults for optional fields
tuple[NotBlankStr, ...]for constraints ensuring non-blank constraint strings- Frozen configuration for immutability
67-83: LGTM!The validator correctly enforces the success/failure field correlation:
- Success requires
delegated_taskand forbidsrejection_reason/blocked_by- Failure requires
rejection_reasonThis provides strong invariants for consumers of the result.
86-122: LGTM!
DelegationRecordprovides a complete audit trail entry with:
- All identifier fields using
NotBlankStrAwareDatetimefor timezone-aware timestamps- Frozen configuration for immutability
58-61: 🧹 Nitpick | 🔵 TrivialConsider using
NotBlankStr | Noneforrejection_reason.The validator on lines 80-82 ensures
rejection_reason is not Nonewhensuccess is False, but doesn't prevent whitespace-only strings. Per coding guidelines, identifier/name fields should useNotBlankStr. Whilerejection_reasonisn't strictly an identifier, ensuring it's non-blank when required would provide stronger invariants.♻️ Optional: Strengthen rejection_reason validation
- rejection_reason: str | None = Field( + rejection_reason: NotBlankStr | None = Field( default=None, description="Reason for rejection", )⛔ Skipped due to learnings
Learnt from: CR Repo: Aureliolo/ai-company PR: 0 File: CLAUDE.md:0-0 Timestamp: 2026-03-07T17:28:05.391Z Learning: Applies to src/**/*.py : For all identifier/name fields in Pydantic models, use `NotBlankStr` type from `core.types` instead of manual whitespace validators (including optional and tuple variants)DESIGN_SPEC.md (4)
581-581: LGTM!The "Current state" callout accurately reflects the M4 implementation status: communication foundation, loop prevention with
DelegationGuard, hierarchical delegation viaDelegationService, and the task model extensions.
788-789: LGTM!Task definition example correctly shows the new
parent_task_idanddelegation_chainfields with appropriate default values.
2355-2372: LGTM!Project structure documentation accurately reflects the new delegation and loop_prevention module hierarchies with all submodules properly listed.
2552-2552: LGTM!Engineering conventions entry properly documents the adopted delegation and loop prevention patterns including synchronous design, five-mechanism guard, injectable clocks, and task model extensions.
src/ai_company/communication/errors.py (1)
67-100: LGTM!Well-designed error hierarchy:
DelegationErroras base for all delegation-related errorsDelegationLoopErrorgroups loop-prevention rejections, enabling callers to catch all loop issues uniformly- Each specific error type maps to a loop-prevention mechanism from the design spec
HierarchyResolutionErroris correctly separate fromDelegationError(hierarchy issues aren't delegation failures)All errors inherit context handling from
CommunicationErrorfollowing the established pattern.tests/unit/communication/test_errors.py (3)
23-23: LGTM!Global timeout marker applied at module level ensures all tests in this file adhere to the 30-second timeout policy.
76-84: LGTM!Parametrized inheritance tests comprehensively cover:
DelegationError→CommunicationErrorDelegationAuthorityError→DelegationError- All
DelegationLoopErrorsubclasses →DelegationLoopErrorHierarchyResolutionError→CommunicationError(correctly separate from delegation)
116-129: LGTM!Dedicated test class for delegation error hierarchy:
test_depth_error_chainvalidates the full inheritance chain through all base classestest_hierarchy_error_is_communication_errorcorrectly asserts thatHierarchyResolutionErroris not aDelegationErrorGood defensive tests ensuring the error taxonomy remains intact.
tests/unit/core/test_task.py (1)
277-332: LGTM!The delegation field tests are well-structured and provide comprehensive coverage for:
- Default values validation
- Self-referential parent_task_id rejection
- Delegation chain duplicate detection
- Cross-field validation (assigned_to not in delegation_chain)
- Serialization preservation
The test class correctly uses
@pytest.mark.unitmarker and follows established patterns in the file.src/ai_company/observability/events/delegation.py (1)
1-24: LGTM!The delegation event constants are well-organized into logical groups (lifecycle, loop prevention, hierarchy) and follow the established
domain.subject[.qualifier]naming pattern. UsingFinal[str]ensures immutability at the type level.src/ai_company/communication/delegation/hierarchy.py (3)
37-88: LGTM!The
__init__method correctly implements the hierarchy resolution with proper priority ordering (explicit lines override team/department relationships). Key strengths:
- Defensive handling when removing from old supervisor's reports (lines 63-66)
- Cycle detection before freezing state
- MappingProxyType for read-only exposure with tuple conversion for nested lists
- Structured logging with event constants
The logic correctly handles edge cases like team lead being in their own members list (line 50-51).
89-129: LGTM!The cycle detection uses an efficient O(n) single-pass DFS algorithm with visited/in-stack coloring. Each agent is processed exactly once across all outer loop iterations, and the error message provides clear context about where the cycle was detected.
131-234: LGTM!The query methods are well-implemented with:
- Consistent return types (tuples for collections,
Nonefor missing data)- Proper traversal logic for hierarchy walking (
is_subordinate,get_ancestors,get_delegation_depth)- Complete type hints and Google-style docstrings
All methods are stateless queries operating on the frozen internal state.
src/ai_company/communication/delegation/authority.py (2)
21-43: LGTM!The
AuthorityCheckResultmodel correctly enforces the allowed/reason correlation invariant:
- Success (
allowed=True) must have an empty reason- Failure (
allowed=False) must have a non-empty reasonThe frozen model config ensures immutability.
46-157: LGTM!The
AuthorityValidatorcorrectly implements the two-phase validation:
- Hierarchy check (when
enforce_chain_of_commandis enabled): validates direct reports, with optional skip-level subordinate fallback- Role permissions check: validates against
can_delegate_tolist (empty list permits all roles)The logging strategy is appropriate—
debuglevel for successful authorization,infolevel for denials with full context.tests/unit/observability/test_events.py (1)
101-123: LGTM!The test correctly adds "delegation" to the expected domain modules set, ensuring the new
events/delegation.pymodule is discovered by the dynamic module scanner.README.md (2)
17-17: LGTM!The new feature bullet clearly describes the hierarchical delegation capability with loop prevention. The formatting is consistent with other feature bullets in the list.
27-27: LGTM!The status update correctly reflects the milestone progression to M4 (Multi-Agent), acknowledging all previous milestones as complete.
tests/unit/communication/delegation/test_models.py (4)
1-29: LGTM!The test module setup is clean with appropriate imports and a reusable
_make_taskhelper that follows the established pattern across the delegation test suite.
32-86: LGTM!Comprehensive coverage for
DelegationRequest:
- Minimal construction with default values
- Full construction with optional fields
- Frozen immutability enforcement
- NotBlankStr validation for delegator/delegatee IDs
88-143: LGTM!Excellent coverage of
DelegationResultmodel validator invariants:
- Success requires
delegated_task, prohibitsrejection_reasonandblocked_by- Failure requires
rejection_reason- Frozen immutability enforced
All validator error messages are matched appropriately.
145-216: LGTM!
DelegationRecordtests cover essential scenarios including JSON roundtrip, which validates thatAwareDatetimeserialization/deserialization works correctly for audit trail entries.src/ai_company/communication/loop_prevention/circuit_breaker.py (4)
1-18: LGTM on module structure and imports.The module correctly uses
get_loggerfrom observability and imports structured event constants. The setup follows coding guidelines.
21-46: LGTM on state definitions.
CircuitBreakerStateenum and internal_PairStateclass are well-structured with appropriate docstrings and__slots__usage.
62-78: LGTM on initialization and pair lookup.Constructor properly stores config and clock with type hints. The
_get_pairmethod correctly uses the sharedpair_keyutility for canonical direction-agnostic keys.
80-143: LGTM on state query and check methods.The
get_statemethod correctly handles cooldown expiry with proper reset and logging. Thecheckmethod appropriately returnsGuardCheckOutcomebased on circuit state.src/ai_company/communication/delegation/service.py (4)
1-28: LGTM on module structure and imports.Proper use of
get_loggerfrom observability and structured event constants. Imports are well-organized.
31-61: LGTM on class definition.The
DelegationServiceclass uses__slots__for memory efficiency and has a clean constructor with keyword-only arguments and proper type hints.
63-140: LGTM on delegation flow.The
delegatemethod follows a correct pattern: validate authority, check loop prevention guards, create sub-task only after all checks pass, then record side effects. Structured logging is properly used.
178-195: LGTM on accessor methods.
get_audit_trailcorrectly returns a tuple (immutable copy) to prevent external mutation of the internal list.get_supervisor_ofcleanly delegates to the hierarchy resolver.src/ai_company/communication/__init__.py (4)
17-25: LGTM on delegation imports.New delegation subsystem components are properly imported and will be re-exported through this module's public API.
35-51: LGTM on error imports.All new delegation and hierarchy error types are properly imported for public API exposure.
58-67: LGTM on loop prevention imports.Loop prevention components including guards, checkers, and utility functions are properly imported.
72-132: LGTM on__all__exports.The
__all__list is alphabetically sorted and includes all newly imported delegation, error, and loop-prevention components. The public API surface is well-defined.
src/ai_company/communication/loop_prevention/circuit_breaker.py
Outdated
Show resolved
Hide resolved
| def test_circuit_breaker_blocked(self) -> None: | ||
| config = _make_config( | ||
| circuit_breaker=CircuitBreakerConfig( | ||
| bounce_threshold=2, cooldown_seconds=300 | ||
| ), | ||
| ) | ||
| guard = DelegationGuard(config) | ||
| for i in range(2): | ||
| guard.record_delegation("a", "b", f"Task-{i}") | ||
| # Circuit should be open after 2 bounces | ||
| result = guard.check((), "a", "b", "Task-new-2") | ||
| assert result.passed is False | ||
| assert result.mechanism == "circuit_breaker" |
There was a problem hiding this comment.
Don't treat every delegation as a circuit-breaker bounce.
This test opens the breaker after two same-direction record_delegation("a", "b", ...) calls. The dedicated breaker API is bounce-based (record_bounce), and the PR objective says the circuit trips after repeated bounces. Keeping this expectation means ordinary A→B traffic can open the breaker even when there is no ping-pong loop.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/communication/loop_prevention/test_guard.py` around lines 88 -
100, The test incorrectly uses DelegationGuard.record_delegation to simulate
circuit-breaker bounces; replace the two calls to record_delegation("a","b",...)
with calls to DelegationGuard.record_bounce("a","b",...) (or otherwise invoke
the dedicated bounce API) so the circuit breaker logic defined by
CircuitBreakerConfig (bounce_threshold) is exercised; keep the rest of the
assertions (call to guard.check((), "a", "b", "Task-new-2") and expects passed
is False and mechanism == "circuit_breaker") unchanged.
…l reviewers Source fixes: - Add _escalate_loop_detection method with supervisor lookup and logging - Add result event logging (DELEGATION_RESULT_SENT) on successful delegation - Change guard/dedup API from task_title to task_id for correct dedup identity - Change logger.error to logger.exception in sub-task creation error path - Add identity consistency checks (delegator/delegatee ID vs name mismatch) - Add self-delegation model validator on DelegationRequest - Add failure-with-task validator on DelegationResult - Add whitespace-only message rejection on GuardCheckOutcome - Add max_depth <= 0 validation on depth check - Add blank agent ID validation on pair_key utility - Split circuit breaker _get_pair into _get_pair/_get_or_create_pair - Circuit breaker evicts stale entries on cooldown expiry - Rate limiter evicts empty timestamp lists - Dedup adds global _purge_expired sweep on every check/record - Circuit breaker record_delegation is no-op when circuit is open - Constraints appended to sub-task description - Sub-task inherits max_retries from original task Documentation fixes: - Fix DESIGN_SPEC.md: CircuitBreakerState listed in correct file - Add __init__.py entries for delegation/ and loop_prevention/ in DESIGN_SPEC - Update docstrings across hierarchy, authority, depth, circuit breaker, guard Test fixes: - Fix test_ancestry_blocked to use enforce_chain=False (not allow_skip=True) - Rename record_bounce -> record_delegation in circuit breaker tests - Rename test_different_task_title_passes -> test_different_task_id_passes - Update guard tests from task_title to task_id parameter - Add identity mismatch, constraints, self-delegation, failure-with-task tests - Add circuit breaker no-op-when-open test - Add dedup global pruning test - Add delegation event value assertions in test_events.py - Parametrize ancestry failure tests - Update integration test comments for task_id-based dedup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| def __init__(self, config: LoopPreventionConfig) -> None: | ||
| self._config = config | ||
| self._deduplicator = DelegationDeduplicator( | ||
| window_seconds=config.dedup_window_seconds, | ||
| ) | ||
| self._rate_limiter = DelegationRateLimiter(config.rate_limit) | ||
| self._circuit_breaker = DelegationCircuitBreaker( | ||
| config.circuit_breaker, | ||
| ) |
There was a problem hiding this comment.
DelegationGuard doesn't expose injectable clocks to sub-components
DelegationDeduplicator, DelegationRateLimiter, and DelegationCircuitBreaker each accept a clock: Callable[[], float] parameter for deterministic testing, but DelegationGuard.__init__ does not expose a clock parameter, causing all three components to default to time.monotonic. The PR description specifically calls out "injectable clocks for deterministic testing" as a design goal, yet this integration point doesn't honour it.
Any test that needs to verify time-based behaviour (dedup window expiry, rate-limit window reset, circuit breaker cooldown) through the DelegationGuard API must either monkeypatch time.monotonic globally or bypass the guard and instantiate components directly.
Consider accepting an optional clock parameter in __init__ and passing it through to the three stateful components:
def __init__(
self,
config: LoopPreventionConfig,
*,
clock: Callable[[], float] = time.monotonic,
) -> None:
self._config = config
self._deduplicator = DelegationDeduplicator(
window_seconds=config.dedup_window_seconds,
clock=clock,
)
self._rate_limiter = DelegationRateLimiter(config.rate_limit, clock=clock)
self._circuit_breaker = DelegationCircuitBreaker(config.circuit_breaker, clock=clock)Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/loop_prevention/guard.py
Line: 45-53
Comment:
**`DelegationGuard` doesn't expose injectable clocks to sub-components**
`DelegationDeduplicator`, `DelegationRateLimiter`, and `DelegationCircuitBreaker` each accept a `clock: Callable[[], float]` parameter for deterministic testing, but `DelegationGuard.__init__` does not expose a clock parameter, causing all three components to default to `time.monotonic`. The PR description specifically calls out "injectable clocks for deterministic testing" as a design goal, yet this integration point doesn't honour it.
Any test that needs to verify time-based behaviour (dedup window expiry, rate-limit window reset, circuit breaker cooldown) through the `DelegationGuard` API must either monkeypatch `time.monotonic` globally or bypass the guard and instantiate components directly.
Consider accepting an optional `clock` parameter in `__init__` and passing it through to the three stateful components:
```python
def __init__(
self,
config: LoopPreventionConfig,
*,
clock: Callable[[], float] = time.monotonic,
) -> None:
self._config = config
self._deduplicator = DelegationDeduplicator(
window_seconds=config.dedup_window_seconds,
clock=clock,
)
self._rate_limiter = DelegationRateLimiter(config.rate_limit, clock=clock)
self._circuit_breaker = DelegationCircuitBreaker(config.circuit_breaker, clock=clock)
```
How can I resolve this? If you propose a fix, please make it concise.| class DelegationLoopError(DelegationError): | ||
| """Base for loop prevention mechanism rejections.""" | ||
|
|
||
|
|
||
| class DelegationDepthError(DelegationLoopError): | ||
| """Delegation chain exceeds maximum depth.""" | ||
|
|
||
|
|
||
| class DelegationAncestryError(DelegationLoopError): | ||
| """Delegation would create a cycle in the task ancestry.""" | ||
|
|
||
|
|
||
| class DelegationRateLimitError(DelegationLoopError): | ||
| """Delegation rate limit exceeded for agent pair.""" | ||
|
|
||
|
|
||
| class DelegationCircuitOpenError(DelegationLoopError): | ||
| """Circuit breaker is open for agent pair.""" | ||
|
|
||
|
|
||
| class DelegationDuplicateError(DelegationLoopError): | ||
| """Duplicate delegation detected within dedup window.""" |
There was a problem hiding this comment.
Loop-prevention exception subclasses are exported but never raised
DelegationAncestryError, DelegationDepthError, DelegationRateLimitError, DelegationCircuitOpenError, and DelegationDuplicateError are all declared in the error hierarchy and re-exported through communication/__init__.py, but none of them are ever raised by any code path. The loop-prevention mechanisms return GuardCheckOutcome(passed=False, ...) objects, and DelegationService.delegate() maps those to a DelegationResult(success=False) return value instead.
This creates a misleading public API: callers who examine the exported symbols will reasonably expect to catch these exceptions, but they will never be raised:
try:
result = service.delegate(request, delegator, delegatee)
except DelegationAncestryError:
# This block will never execute
passEither raise the appropriate exception from DelegationService.delegate() when a loop-prevention mechanism blocks delegation, or remove these subclasses and document that all rejections surface through the DelegationResult.blocked_by field.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/errors.py
Line: 75-96
Comment:
**Loop-prevention exception subclasses are exported but never raised**
`DelegationAncestryError`, `DelegationDepthError`, `DelegationRateLimitError`, `DelegationCircuitOpenError`, and `DelegationDuplicateError` are all declared in the error hierarchy and re-exported through `communication/__init__.py`, but none of them are ever raised by any code path. The loop-prevention mechanisms return `GuardCheckOutcome(passed=False, ...)` objects, and `DelegationService.delegate()` maps those to a `DelegationResult(success=False)` return value instead.
This creates a misleading public API: callers who examine the exported symbols will reasonably expect to catch these exceptions, but they will never be raised:
```python
try:
result = service.delegate(request, delegator, delegatee)
except DelegationAncestryError:
# This block will never execute
pass
```
Either raise the appropriate exception from `DelegationService.delegate()` when a loop-prevention mechanism blocks delegation, or remove these subclasses and document that all rejections surface through the `DelegationResult.blocked_by` field.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Actionable comments posted: 12
♻️ Duplicate comments (3)
tests/unit/communication/loop_prevention/test_guard.py (1)
88-100:⚠️ Potential issue | 🟠 MajorDon't open the breaker on ordinary same-direction delegations.
CircuitBreakerConfig.bounce_thresholdis defined as a bounce count, but this test trips the breaker with two plainrecord_delegation("a", "b", ...)calls. That hard-wiresDelegationGuard.record_delegation()to false-positive on steady manager→report traffic instead of only on ping-pong loops. Exercise the bounce path here; the dedicated breaker and integration tests should mirror the same contract.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/communication/loop_prevention/test_guard.py` around lines 88 - 100, The test is triggering the circuit breaker using two same-direction delegations; change it to exercise the bounce path so bounce_threshold counts true ping-pong bounces. Update the test to call DelegationGuard.record_delegation(...) with alternating source/target (e.g., record_delegation("a","b",...) then record_delegation("b","a",...)) until CircuitBreakerConfig.bounce_threshold is reached, then call DelegationGuard.check(...) and assert result.passed is False and result.mechanism == "circuit_breaker".tests/integration/communication/test_delegation_integration.py (1)
188-235:⚠️ Potential issue | 🟡 MinorThis setup never reaches the ancestry guard.
DelegationService.delegate()validates authority before loop prevention, and_build_three_level_service()hard-codesenforce_chain_of_command=True, sodev -> ceois rejected asauthoritybefore ancestry is evaluated. Make this test’s authority path permissive and assertblocked_by == "ancestry"so it actually covers the ancestry mechanism.Representative fix
-def _build_three_level_service() -> tuple[ +def _build_three_level_service( + *, + enforce_chain: bool = True, +) -> tuple[ DelegationService, HierarchyResolver, dict[str, AgentIdentity], ]: ... hierarchy_config = HierarchyConfig( - enforce_chain_of_command=True, + enforce_chain_of_command=enforce_chain, allow_skip_level=True, ) ... - service, _, agents = _build_three_level_service() + service, _, agents = _build_three_level_service(enforce_chain=False) ... - # Blocked by either ancestry or authority - assert r3.blocked_by is not None + assert r3.blocked_by == "ancestry"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/integration/communication/test_delegation_integration.py` around lines 188 - 235, The test test_ancestry_prevents_back_delegation never exercises the ancestry guard because DelegationService.delegate() checks authority first and _build_three_level_service() sets enforce_chain_of_command=True; update the test to construct a service/agents that allow the dev→ceo authority check to succeed (e.g., call _build_three_level_service() or otherwise create the service with enforce_chain_of_command=False or adjust agent permissions so authority passes) then call DelegationService.delegate() as before and replace the loose blocked_by assertion with assert r3.blocked_by == "ancestry" (keep r3.success is False) so the test actually verifies the ancestry prevention path.src/ai_company/communication/loop_prevention/circuit_breaker.py (1)
155-185:⚠️ Potential issue | 🟠 MajorOnly increment the breaker on real bounce-back patterns.
This increments
bounce_counton every successful delegation, so three ordinaryA→Bhandoffs open the circuit even if nothing ever bounced back. The breaker becomes a per-pair quota instead of the repeated-bounce guard described by the feature. Track direction or task lineage and increment only when the interaction actually reverses.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/communication/loop_prevention/circuit_breaker.py` around lines 155 - 185, The current record_delegation in record_delegation increments pair.bounce_count for every delegation; change it to only increment on a true bounce by tracking direction/task lineage on the pair (e.g., add a last_delegator or last_task_id field on the pair returned by _get_or_create_pair) and: if delegator_id equals pair.last_delegator (same direction) reset or leave bounce_count unchanged; if delegator_id equals pair.last_delegatee (i.e., a reversal/bounce) increment bounce_count; update pair.last_delegator/last_delegatee (or last_task_id) on each call, keep the existing opened_at and logger.warning behaviour when threshold is reached, and keep the early return when get_state(...) is OPEN. Ensure this logic is applied inside record_delegation and persisted on the pair object returned by _get_or_create_pair.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/communication/delegation/authority.py`:
- Around line 94-98: Replace the debug-level structured log with an info-level
structured log for successful delegations: change the call using logger.debug
with DELEGATION_AUTHORIZED to logger.info(DELEGATION_AUTHORIZED,
delegator=delegator.name, delegatee=delegatee.name) so the event constant
DELEGATION_AUTHORIZED is logged at info level (use the same key names delegator
and delegatee) to match the other delegation observability patterns.
- Around line 34-43: The validator _validate_allowed_reason currently only
rejects empty strings but allows whitespace-only reasons; update its denial
check to treat whitespace-only as empty by using not self.reason.strip() (i.e.,
replace the `if not self.allowed and not self.reason:` condition with one that
checks `not self.allowed and not self.reason.strip()`), so when self.allowed is
False the reason must contain non-whitespace characters; keep the other checks
and return self as before.
In `@src/ai_company/communication/delegation/hierarchy.py`:
- Around line 43-46: When building the team->department head edge, avoid
inserting a self-cycle by skipping the assignment if team.lead equals dept.head;
update the block that sets supervisor_of[team.lead] and appends to
reports_of[dept.head] to first check that team.lead != dept.head so you don't
create a head->head edge that will trigger _detect_cycles() falsely.
- Around line 83-87: Replace debug/warning-level structured logs in this module
with info-level structured events: change
logger.debug(DELEGATION_HIERARCHY_BUILT, ...) to
logger.info(DELEGATION_HIERARCHY_BUILT, agents=len(supervisor_of),
supervisors=len(reports_of)) and do the same for the other event calls around
lines 111-115 (use the corresponding event constants from
ai_company.observability.events.<domain>), ensuring you pass key=value pairs (no
formatted strings) and import the correct EVENT_CONSTANT names used in this
file.
In `@src/ai_company/communication/delegation/models.py`:
- Around line 75-94: The _validate_success_consistency model validator in
DelegationResult should also reject empty or whitespace-only rejection_reason
values when success is False; update the check that currently only tests "if not
self.success and self.rejection_reason is None" to instead verify that
rejection_reason is a non-empty, non-whitespace string (e.g., using str.strip()
or equivalent) and raise a ValueError like "rejection_reason is required when
success is False" when the stripped value is empty; keep all other existing
consistency checks in _validate_success_consistency unchanged.
In `@src/ai_company/communication/delegation/service.py`:
- Around line 199-201: The Task ID is being truncated to 12 hex chars using
uuid4().hex[:12] which reduces entropy and risks collisions; update the Task
creation to use the full UUID hex (e.g., replace uuid4().hex[:12] with
uuid4().hex) so Task id retains full entropy (reference the Task creation that
currently uses uuid4().hex[:12] in service.py) and ensure the uuid4 import is
present/used consistently where Task ids are generated for lineage/dedup keys.
- Around line 190-214: The child Task is built directly from request.task which
can share mutable nested state; fix by deep-copying the source task at the start
(e.g., src_task = copy.deepcopy(request.task)) before reading or modifying
fields like delegation_chain, description, constraints, estimated_complexity,
etc.; update subsequent references in this block to use src_task (and construct
new_chain from a copied chain) and add the necessary import for copy.deepcopy so
the new Task() is assembled from the isolated copy rather than the original
request.task.
- Around line 68-171: The delegate() and _create_sub_task() functions are too
long and mix responsibilities; refactor by extracting small helper methods for
each stage: e.g., _validate_identity(request, delegator, delegatee) to perform
the identity checks and raise ValueError, _check_authority_and_guard(request,
delegator, delegatee) to call self._authority_validator.validate and
self._guard.check and return an outcome/DelegationResult when blocked,
_build_sub_task(request) to construct the sub_task (moved logic from
_create_sub_task), and _record_audit_and_guard(request, sub_task) to call
self._guard.record_delegation, create/append DelegationRecord and centralize
logging; update delegate() to orchestrate these helpers and return early on
failures so each function stays under ~50 lines and responsibilities are
separated (refer to delegate, _create_sub_task, _guard.record_delegation,
_authority_validator.validate, _guard.check, and DelegationRecord).
- Around line 165-170: The delegation event logging is inconsistent and leaks
raw exception text; replace ad-hoc logger.debug calls with structured
logger.info using the domain event constants from
ai_company.observability.events.<domain> (e.g., DELEGATION_RESULT_SENT and
DELEGATION_LOOP_ESCALATED) and ensure all delegation paths (including the
sub-task failure path at the other ranges) emit the proper event constant with
key=value pairs rather than format strings; for failure cases do NOT log
str(exc) directly — instead log only sanitized error metadata (e.g.,
error_type=exc.__class__.__name__,
error_summary=short_truncated_message_or_code, task_id=request.task_id,
delegator=request.delegator_id, delegatee=request.delegatee_id) and update the
occurrences around the DELEGATION_RESULT_SENT (lines ~165-170), the similar
block at ~215-222 and ~253-260 to follow this pattern.
In `@src/ai_company/communication/loop_prevention/guard.py`:
- Around line 55-107: The check() method is too long—split it into two helper
methods (e.g., _run_pure_checks and _run_stateful_checks) to keep the
short-circuit behavior: move the ancestry and delegation-depth logic (calls to
check_ancestry and check_delegation_depth using
self._config.max_delegation_depth) into _run_pure_checks, and move the
deduplicator, rate limiter, and circuit breaker checks
(self._deduplicator.check, self._rate_limiter.check,
self._circuit_breaker.check) into _run_stateful_checks; have check() call the
helpers in order and use existing _log_and_return for failures and return
GuardCheckOutcome(passed=True, mechanism=_SUCCESS_MECHANISM) on success so
behavior and short-circuiting remain identical.
In `@src/ai_company/communication/loop_prevention/rate_limit.py`:
- Around line 65-75: The current two-step logic (check() reading/pruning
_timestamps around pair_key(...) and record() appending later) is racy—make
admission atomic by combining them into a single synchronized method (e.g.,
check_and_record()) or by holding self._lock across both operations; inside that
critical section compute now and cutoff via self._clock(), prune recent
timestamps from self._timestamps[key], compare len(recent) to limit
(self._config.max_per_pair_per_minute + self._config.burst_allowance), return a
failing GuardCheckOutcome if >= limit (ensuring to evict empty keys), otherwise
append now to recent, write back to self._timestamps[key] and return a passing
GuardCheckOutcome; update callers to use the new atomic method and remove the
separate record() / check() pairing.
- Around line 37-47: In the __init__ of the rate limiter, validate the incoming
window_seconds before storing it: if window_seconds is <= 0 raise a ValueError
with a clear message (e.g. "window_seconds must be > 0") so the invalid value is
rejected early; ensure you still assign self._window_seconds only after
validation and keep the rest of initialization (_config, _clock, _timestamps)
unchanged—this prevents check() from silently pruning histories when
window_seconds is non-positive.
---
Duplicate comments:
In `@src/ai_company/communication/loop_prevention/circuit_breaker.py`:
- Around line 155-185: The current record_delegation in record_delegation
increments pair.bounce_count for every delegation; change it to only increment
on a true bounce by tracking direction/task lineage on the pair (e.g., add a
last_delegator or last_task_id field on the pair returned by
_get_or_create_pair) and: if delegator_id equals pair.last_delegator (same
direction) reset or leave bounce_count unchanged; if delegator_id equals
pair.last_delegatee (i.e., a reversal/bounce) increment bounce_count; update
pair.last_delegator/last_delegatee (or last_task_id) on each call, keep the
existing opened_at and logger.warning behaviour when threshold is reached, and
keep the early return when get_state(...) is OPEN. Ensure this logic is applied
inside record_delegation and persisted on the pair object returned by
_get_or_create_pair.
In `@tests/integration/communication/test_delegation_integration.py`:
- Around line 188-235: The test test_ancestry_prevents_back_delegation never
exercises the ancestry guard because DelegationService.delegate() checks
authority first and _build_three_level_service() sets
enforce_chain_of_command=True; update the test to construct a service/agents
that allow the dev→ceo authority check to succeed (e.g., call
_build_three_level_service() or otherwise create the service with
enforce_chain_of_command=False or adjust agent permissions so authority passes)
then call DelegationService.delegate() as before and replace the loose
blocked_by assertion with assert r3.blocked_by == "ancestry" (keep r3.success is
False) so the test actually verifies the ancestry prevention path.
In `@tests/unit/communication/loop_prevention/test_guard.py`:
- Around line 88-100: The test is triggering the circuit breaker using two
same-direction delegations; change it to exercise the bounce path so
bounce_threshold counts true ping-pong bounces. Update the test to call
DelegationGuard.record_delegation(...) with alternating source/target (e.g.,
record_delegation("a","b",...) then record_delegation("b","a",...)) until
CircuitBreakerConfig.bounce_threshold is reached, then call
DelegationGuard.check(...) and assert result.passed is False and
result.mechanism == "circuit_breaker".
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 9dabc300-718d-47ff-a4e0-9d52f332e16b
📒 Files selected for processing (20)
DESIGN_SPEC.mdsrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/depth.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/loop_prevention/rate_limit.pytests/integration/communication/test_delegation_integration.pytests/unit/communication/delegation/test_models.pytests/unit/communication/delegation/test_service.pytests/unit/communication/loop_prevention/test_ancestry.pytests/unit/communication/loop_prevention/test_circuit_breaker.pytests/unit/communication/loop_prevention/test_dedup.pytests/unit/communication/loop_prevention/test_guard.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Always use native lazy annotations (PEP 649) — do not usefrom __future__ import annotations
Use PEP 758 except syntax:except A, B:without parentheses for multiple exception types
Maintain line length of 88 characters (enforced by ruff)
Files:
tests/integration/communication/test_delegation_integration.pysrc/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/communication/loop_prevention/dedup.pytests/unit/communication/loop_prevention/test_circuit_breaker.pysrc/ai_company/communication/loop_prevention/guard.pytests/unit/communication/loop_prevention/test_dedup.pysrc/ai_company/communication/loop_prevention/rate_limit.pytests/unit/communication/delegation/test_models.pytests/unit/communication/delegation/test_service.pytests/unit/observability/test_events.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pytests/unit/communication/loop_prevention/test_ancestry.pytests/unit/communication/loop_prevention/test_guard.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/loop_prevention/depth.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slowto categorize tests
Prefer@pytest.mark.parametrizefor testing similar test cases
Use generic vendor-agnostic names in test fixtures and configuration:test-provider,test-small-001,test-medium-001,test-large-001
Files:
tests/integration/communication/test_delegation_integration.pytests/unit/communication/loop_prevention/test_circuit_breaker.pytests/unit/communication/loop_prevention/test_dedup.pytests/unit/communication/delegation/test_models.pytests/unit/communication/delegation/test_service.pytests/unit/observability/test_events.pytests/unit/communication/loop_prevention/test_ancestry.pytests/unit/communication/loop_prevention/test_guard.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: All public functions and classes must have type hints and Google-style docstrings (enforced by ruff D rules)
Never useimport loggingorlogging.getLogger()in application code — usefrom ai_company.observability import get_loggerwith variable namelogger
Use structured logging with constants fromai_company.observability.events.<domain>— always log aslogger.info(EVENT_CONSTANT, key=value)never with format strings
For all identifier/name fields in Pydantic models, useNotBlankStrtype fromcore.typesinstead of manual whitespace validators (including optional and tuple variants)
Use@computed_fieldfor derived values in Pydantic models instead of storing and validating redundant fields
Enforce immutability: create new objects instead of mutating. For non-Pydantic internal collections usecopy.deepcopy()at construction andMappingProxyTypefor read-only enforcement
For config/identity data use frozen Pydantic models; for runtime state that evolves use separate mutable-via-copy models withmodel_copy(update=...)
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code instead of barecreate_task
Functions must be under 50 lines, files must be under 800 lines
Handle errors explicitly — never silently swallow exceptions
Never use real vendor names (Anthropic, OpenAI, Claude, GPT) in project-owned code, docstrings, comments, or tests — use generic names:example-provider,example-large-001,test-provider,test-small-001
Files:
src/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/communication/loop_prevention/rate_limit.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/loop_prevention/depth.py
src/ai_company/{providers,tools,engine,communication}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use
copy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Files:
src/ai_company/communication/loop_prevention/_pair_key.pysrc/ai_company/communication/delegation/service.pysrc/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/guard.pysrc/ai_company/communication/loop_prevention/rate_limit.pysrc/ai_company/communication/loop_prevention/circuit_breaker.pysrc/ai_company/communication/delegation/models.pysrc/ai_company/communication/loop_prevention/models.pysrc/ai_company/communication/delegation/authority.pysrc/ai_company/communication/delegation/hierarchy.pysrc/ai_company/communication/loop_prevention/depth.py
DESIGN_SPEC.md
📄 CodeRabbit inference engine (CLAUDE.md)
Update
DESIGN_SPEC.mdafter approved deviations to reflect the new reality
Files:
DESIGN_SPEC.md
🧠 Learnings (4)
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,tools,engine,communication}/**/*.py : Use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Applied to files:
src/ai_company/communication/loop_prevention/dedup.pysrc/ai_company/communication/loop_prevention/guard.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,config}/**/*.py : Configure `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`, not in calling code
Applied to files:
src/ai_company/communication/loop_prevention/rate_limit.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{api,config,providers}/**/*.py : Validate at system boundaries (user input, external APIs, config files)
Applied to files:
src/ai_company/communication/loop_prevention/models.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/**/*.py : For all identifier/name fields in Pydantic models, use `NotBlankStr` type from `core.types` instead of manual whitespace validators (including optional and tuple variants)
Applied to files:
src/ai_company/communication/loop_prevention/models.py
🧬 Code graph analysis (11)
tests/integration/communication/test_delegation_integration.py (7)
src/ai_company/communication/delegation/authority.py (1)
AuthorityValidator(46-158)src/ai_company/communication/delegation/hierarchy.py (1)
HierarchyResolver(16-234)src/ai_company/communication/delegation/models.py (1)
DelegationRequest(11-46)src/ai_company/communication/delegation/service.py (3)
DelegationService(36-279)delegate(68-171)get_audit_trail(262-268)src/ai_company/communication/loop_prevention/guard.py (1)
DelegationGuard(28-145)src/ai_company/core/agent.py (1)
AgentIdentity(246-304)src/ai_company/core/task.py (1)
Task(38-242)
src/ai_company/communication/loop_prevention/dedup.py (3)
src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/observability/_logger.py (1)
get_logger(8-28)tests/unit/communication/loop_prevention/test_dedup.py (8)
clock(23-24)clock(36-37)clock(48-49)clock(59-60)clock(71-72)clock(84-85)clock(97-98)clock(112-113)
tests/unit/communication/loop_prevention/test_circuit_breaker.py (2)
src/ai_company/communication/config.py (1)
CircuitBreakerConfig(206-227)src/ai_company/communication/loop_prevention/circuit_breaker.py (1)
CircuitBreakerState(21-30)
tests/unit/communication/loop_prevention/test_dedup.py (1)
src/ai_company/communication/loop_prevention/dedup.py (3)
DelegationDeduplicator(17-111)check(54-94)record(96-111)
src/ai_company/communication/loop_prevention/rate_limit.py (4)
src/ai_company/communication/config.py (1)
RateLimitConfig(182-203)src/ai_company/communication/loop_prevention/_pair_key.py (1)
pair_key(9-31)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
tests/unit/communication/delegation/test_models.py (3)
src/ai_company/communication/delegation/models.py (3)
DelegationRecord(97-133)DelegationRequest(11-46)DelegationResult(49-94)src/ai_company/core/enums.py (1)
TaskType(151-159)src/ai_company/core/task.py (1)
Task(38-242)
src/ai_company/communication/loop_prevention/circuit_breaker.py (3)
src/ai_company/communication/config.py (1)
CircuitBreakerConfig(206-227)src/ai_company/communication/loop_prevention/_pair_key.py (1)
pair_key(9-31)src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)
tests/unit/communication/loop_prevention/test_ancestry.py (2)
src/ai_company/communication/loop_prevention/ancestry.py (1)
check_ancestry(14-41)tests/unit/communication/loop_prevention/test_depth.py (1)
test_empty_chain_passes(14-17)
src/ai_company/communication/delegation/models.py (2)
src/ai_company/core/task.py (1)
Task(38-242)src/ai_company/tools/sandbox/result.py (1)
success(26-28)
src/ai_company/communication/delegation/hierarchy.py (2)
src/ai_company/communication/errors.py (1)
HierarchyResolutionError(99-100)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
src/ai_company/communication/loop_prevention/depth.py (2)
src/ai_company/communication/loop_prevention/models.py (1)
GuardCheckOutcome(10-39)src/ai_company/observability/_logger.py (1)
get_logger(8-28)
| logger.debug( | ||
| DELEGATION_HIERARCHY_BUILT, | ||
| agents=len(supervisor_of), | ||
| supervisors=len(reports_of), | ||
| ) |
There was a problem hiding this comment.
Emit delegation events at info level.
These event constants currently go out through debug/warning, which diverges from the repo’s structured-event contract and makes them easy to miss in the normal observability pipeline.
Suggested fix
- logger.debug(
+ logger.info(
DELEGATION_HIERARCHY_BUILT,
agents=len(supervisor_of),
supervisors=len(reports_of),
)
...
- logger.warning(
+ logger.info(
DELEGATION_HIERARCHY_CYCLE,
agent=agent,
cycle_at=current,
)As per coding guidelines, Use structured logging with constants from ai_company.observability.events.<domain> — always log as logger.info(EVENT_CONSTANT, key=value) never with format strings.
Also applies to: 111-115
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/delegation/hierarchy.py` around lines 83 - 87,
Replace debug/warning-level structured logs in this module with info-level
structured events: change logger.debug(DELEGATION_HIERARCHY_BUILT, ...) to
logger.info(DELEGATION_HIERARCHY_BUILT, agents=len(supervisor_of),
supervisors=len(reports_of)) and do the same for the other event calls around
lines 111-115 (use the corresponding event constants from
ai_company.observability.events.<domain>), ensuring you pass key=value pairs (no
formatted strings) and import the correct EVENT_CONSTANT names used in this
file.
| original = request.task | ||
| new_chain = (*original.delegation_chain, request.delegator_id) | ||
| description = original.description | ||
| if request.refinement: | ||
| description = f"{description}\n\nDelegation context: {request.refinement}" | ||
| if request.constraints: | ||
| constraints_text = "\n".join(f"- {c}" for c in request.constraints) | ||
| description = f"{description}\n\nConstraints:\n{constraints_text}" | ||
|
|
||
| try: | ||
| return Task( | ||
| id=f"del-{uuid4().hex[:12]}", | ||
| title=original.title, | ||
| description=description, | ||
| type=original.type, | ||
| priority=original.priority, | ||
| project=original.project, | ||
| created_by=request.delegator_id, | ||
| parent_task_id=original.id, | ||
| delegation_chain=new_chain, | ||
| estimated_complexity=original.estimated_complexity, | ||
| budget_limit=original.budget_limit, | ||
| deadline=original.deadline, | ||
| max_retries=original.max_retries, | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Deep-copy the source task at this delegation boundary.
This child task is assembled from request.task fields directly. If any nested Task field is mutable, the delegator and delegatee will share state across the handoff. Copy the source payload before extracting fields for the new Task.
Based on learnings, Use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/delegation/service.py` around lines 190 - 214,
The child Task is built directly from request.task which can share mutable
nested state; fix by deep-copying the source task at the start (e.g., src_task =
copy.deepcopy(request.task)) before reading or modifying fields like
delegation_chain, description, constraints, estimated_complexity, etc.; update
subsequent references in this block to use src_task (and construct new_chain
from a copied chain) and add the necessary import for copy.deepcopy so the new
Task() is assembled from the isolated copy rather than the original
request.task.
| try: | ||
| return Task( | ||
| id=f"del-{uuid4().hex[:12]}", |
There was a problem hiding this comment.
Use a full-entropy ID for delegated tasks.
uuid4().hex[:12] leaves only 48 bits of entropy. Task.id is later reused for lineage and dedup keys, so a collision here can cross-wire unrelated delegations or falsely block a valid one.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/delegation/service.py` around lines 199 - 201,
The Task ID is being truncated to 12 hex chars using uuid4().hex[:12] which
reduces entropy and risks collisions; update the Task creation to use the full
UUID hex (e.g., replace uuid4().hex[:12] with uuid4().hex) so Task id retains
full entropy (reference the Task creation that currently uses uuid4().hex[:12]
in service.py) and ensure the uuid4 import is present/used consistently where
Task ids are generated for lineage/dedup keys.
| def check( | ||
| self, | ||
| delegation_chain: tuple[str, ...], | ||
| delegator_id: str, | ||
| delegatee_id: str, | ||
| task_id: str, | ||
| ) -> GuardCheckOutcome: | ||
| """Run all loop prevention checks. | ||
|
|
||
| Returns the first failing outcome, or a success outcome if | ||
| all checks pass. | ||
|
|
||
| Args: | ||
| delegation_chain: Current delegation ancestry. | ||
| delegator_id: ID of the delegating agent. | ||
| delegatee_id: ID of the proposed delegatee. | ||
| task_id: Unique ID of the task being delegated. | ||
|
|
||
| Returns: | ||
| First failing outcome or an all-passed success. | ||
| """ | ||
| # Pure (stateless) checks first — sequential to short-circuit | ||
| outcome = check_ancestry(delegation_chain, delegatee_id) | ||
| if not outcome.passed: | ||
| return self._log_and_return(outcome, delegator_id, delegatee_id) | ||
|
|
||
| outcome = check_delegation_depth( | ||
| delegation_chain, | ||
| self._config.max_delegation_depth, | ||
| ) | ||
| if not outcome.passed: | ||
| return self._log_and_return(outcome, delegator_id, delegatee_id) | ||
|
|
||
| # Stateful checks — only run if pure checks passed | ||
| outcome = self._deduplicator.check( | ||
| delegator_id, | ||
| delegatee_id, | ||
| task_id, | ||
| ) | ||
| if not outcome.passed: | ||
| return self._log_and_return(outcome, delegator_id, delegatee_id) | ||
|
|
||
| outcome = self._rate_limiter.check(delegator_id, delegatee_id) | ||
| if not outcome.passed: | ||
| return self._log_and_return(outcome, delegator_id, delegatee_id) | ||
|
|
||
| outcome = self._circuit_breaker.check(delegator_id, delegatee_id) | ||
| if not outcome.passed: | ||
| return self._log_and_return(outcome, delegator_id, delegatee_id) | ||
| return GuardCheckOutcome( | ||
| passed=True, | ||
| mechanism=_SUCCESS_MECHANISM, | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Split check() into smaller helpers.
This method now exceeds the repo's 50-line cap. The pure-check and stateful-check blocks are already natural extraction points, and pulling them out keeps the short-circuit flow readable without risking a regression back to eager evaluation.
As per coding guidelines, Functions must be under 50 lines, files must be under 800 lines.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/loop_prevention/guard.py` around lines 55 - 107,
The check() method is too long—split it into two helper methods (e.g.,
_run_pure_checks and _run_stateful_checks) to keep the short-circuit behavior:
move the ancestry and delegation-depth logic (calls to check_ancestry and
check_delegation_depth using self._config.max_delegation_depth) into
_run_pure_checks, and move the deduplicator, rate limiter, and circuit breaker
checks (self._deduplicator.check, self._rate_limiter.check,
self._circuit_breaker.check) into _run_stateful_checks; have check() call the
helpers in order and use existing _log_and_return for failures and return
GuardCheckOutcome(passed=True, mechanism=_SUCCESS_MECHANISM) on success so
behavior and short-circuiting remain identical.
| def __init__( | ||
| self, | ||
| config: RateLimitConfig, | ||
| *, | ||
| clock: Callable[[], float] = time.monotonic, | ||
| window_seconds: float = _DEFAULT_WINDOW_SECONDS, | ||
| ) -> None: | ||
| self._config = config | ||
| self._clock = clock | ||
| self._window_seconds = window_seconds | ||
| self._timestamps: dict[tuple[str, str], list[float]] = {} |
There was a problem hiding this comment.
Validate window_seconds before storing it.
Without a guard in Lines 37-47, window_seconds <= 0 makes every later check() prune the pair history to empty, so this limiter never trips. Fail fast here instead of silently accepting a broken window.
🛡️ Proposed fix
def __init__(
self,
config: RateLimitConfig,
*,
clock: Callable[[], float] = time.monotonic,
window_seconds: float = _DEFAULT_WINDOW_SECONDS,
) -> None:
+ if window_seconds <= 0:
+ msg = f"window_seconds must be greater than 0, got {window_seconds}"
+ raise ValueError(msg)
self._config = config
self._clock = clock
self._window_seconds = window_seconds
self._timestamps: dict[tuple[str, str], list[float]] = {}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/loop_prevention/rate_limit.py` around lines 37 -
47, In the __init__ of the rate limiter, validate the incoming window_seconds
before storing it: if window_seconds is <= 0 raise a ValueError with a clear
message (e.g. "window_seconds must be > 0") so the invalid value is rejected
early; ensure you still assign self._window_seconds only after validation and
keep the rest of initialization (_config, _clock, _timestamps) unchanged—this
prevents check() from silently pruning histories when window_seconds is
non-positive.
| key = pair_key(delegator_id, delegatee_id) | ||
| now = self._clock() | ||
| cutoff = now - self._window_seconds | ||
| timestamps = self._timestamps.get(key, []) | ||
| recent = [t for t in timestamps if t > cutoff] | ||
| # Prune expired entries on read; evict empty keys | ||
| if recent: | ||
| self._timestamps[key] = recent | ||
| else: | ||
| self._timestamps.pop(key, None) | ||
| limit = self._config.max_per_pair_per_minute + self._config.burst_allowance |
There was a problem hiding this comment.
Make rate-limit admission atomic.
Lines 65-75 only read/prune state, and Lines 108-115 append later. Two concurrent delegations for the same pair can both pass check() before either call reaches record(), so the limiter under-enforces exactly when requests arrive together. Fold this into a single synchronized check_and_record() path, or protect both operations with one shared critical section.
🔒 Suggested direction
def check_and_record(
self,
delegator_id: str,
delegatee_id: str,
) -> GuardCheckOutcome:
key = pair_key(delegator_id, delegatee_id)
now = self._clock()
cutoff = now - self._window_seconds
limit = self._config.max_per_pair_per_minute + self._config.burst_allowance
with self._lock:
recent = [t for t in self._timestamps.get(key, []) if t > cutoff]
if len(recent) >= limit:
self._timestamps.pop(key, None) if not recent else None
return GuardCheckOutcome(
passed=False,
mechanism=_MECHANISM,
message=...,
)
recent.append(now)
self._timestamps[key] = recent
return GuardCheckOutcome(passed=True, mechanism=_MECHANISM)Also applies to: 108-115
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/communication/loop_prevention/rate_limit.py` around lines 65 -
75, The current two-step logic (check() reading/pruning _timestamps around
pair_key(...) and record() appending later) is racy—make admission atomic by
combining them into a single synchronized method (e.g., check_and_record()) or
by holding self._lock across both operations; inside that critical section
compute now and cutoff via self._clock(), prune recent timestamps from
self._timestamps[key], compare len(recent) to limit
(self._config.max_per_pair_per_minute + self._config.burst_allowance), return a
failing GuardCheckOutcome if >= limit (ensuring to evict empty keys), otherwise
append now to recent, write back to self._timestamps[key] and return a passing
GuardCheckOutcome; update callers to use the new atomic method and remove the
separate record() / check() pairing.
…ptile - Fix hierarchy self-cycle when dept.head == team.lead (CodeRabbit) - Reject whitespace-only denial reasons in AuthorityCheckResult (CodeRabbit) - Reject whitespace-only rejection_reason in DelegationResult (CodeRabbit) - Promote DELEGATION_AUTHORIZED to logger.info (CodeRabbit) - Add DELEGATION_SUB_TASK_FAILED event constant, replace bare string (CodeRabbit) - Copy reviewers/dependencies/artifacts_expected/acceptance_criteria to sub-task (Greptile) - Extract _validate_identity and _record_delegation helpers from delegate() (CodeRabbit) - Add test for dept-head-as-team-lead self-cycle prevention Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 35 out of 38 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| self._hierarchy = hierarchy | ||
| self._authority_validator = authority_validator | ||
| self._guard = guard | ||
| self._audit_trail: list[DelegationRecord] = [] | ||
|
|
There was a problem hiding this comment.
DelegationService keeps an ever-growing in-memory _audit_trail list. In a long-running process this can become unbounded memory growth; consider adding a retention policy (max records / time window), an injectable persistence sink, or documenting that callers are expected to snapshot/clear it periodically.
|
|
||
| try: | ||
| return Task( | ||
| id=f"del-{uuid4().hex[:12]}", |
There was a problem hiding this comment.
Sub-task IDs are generated by truncating UUID4 hex to 12 characters (uuid4().hex[:12]), which reduces entropy and increases collision risk if many delegations occur over time. Consider using the full UUID (or a project-wide task ID generator) to keep task IDs effectively unique across the system.
| id=f"del-{uuid4().hex[:12]}", | |
| id=f"del-{uuid4().hex}", |
| class DelegationRecord(BaseModel): | ||
| """Audit trail entry for a completed delegation. | ||
|
|
||
| Attributes: | ||
| delegation_id: Unique delegation identifier. | ||
| delegator_id: Agent ID of the delegator. | ||
| delegatee_id: Agent ID of the delegatee. | ||
| original_task_id: ID of the original task. | ||
| delegated_task_id: ID of the created sub-task. | ||
| timestamp: When the delegation occurred. | ||
| refinement: Context provided by the delegator. | ||
| """ | ||
|
|
||
| model_config = ConfigDict(frozen=True) | ||
|
|
||
| delegation_id: NotBlankStr = Field( | ||
| description="Unique delegation identifier", | ||
| ) | ||
| delegator_id: NotBlankStr = Field( | ||
| description="Delegator agent ID", | ||
| ) | ||
| delegatee_id: NotBlankStr = Field( | ||
| description="Delegatee agent ID", | ||
| ) | ||
| original_task_id: NotBlankStr = Field( | ||
| description="Original task ID", | ||
| ) | ||
| delegated_task_id: NotBlankStr = Field( | ||
| description="Created sub-task ID", | ||
| ) | ||
| timestamp: AwareDatetime = Field( | ||
| description="When delegation occurred", | ||
| ) | ||
| refinement: str = Field( | ||
| default="", | ||
| description="Context provided by delegator", | ||
| ) |
There was a problem hiding this comment.
DelegationRecord omits constraints from the audit trail
DelegationRequest.constraints is now correctly forwarded into the sub-task description (via _create_sub_task), but DelegationRecord never records the original structured value. Any downstream consumer of the audit trail — compliance tooling, replay logic, or debugging — loses the constraints entirely once the DelegationRequest is discarded.
If constraints is meaningful enough to expose on DelegationRequest and to forward to the delegatee's task description, it should also be preserved in the audit entry. At minimum, DelegationRecord should include:
constraints: tuple[NotBlankStr, ...] = Field(
default=(),
description="Extra constraints imposed by the delegator",
)…and _record_delegation in service.py should pass refinement=request.refinement, constraints=request.constraints.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/delegation/models.py
Line: 99-135
Comment:
**`DelegationRecord` omits `constraints` from the audit trail**
`DelegationRequest.constraints` is now correctly forwarded into the sub-task description (via `_create_sub_task`), but `DelegationRecord` never records the original structured value. Any downstream consumer of the audit trail — compliance tooling, replay logic, or debugging — loses the constraints entirely once the `DelegationRequest` is discarded.
If `constraints` is meaningful enough to expose on `DelegationRequest` and to forward to the delegatee's task description, it should also be preserved in the audit entry. At minimum, `DelegationRecord` should include:
```python
constraints: tuple[NotBlankStr, ...] = Field(
default=(),
description="Extra constraints imposed by the delegator",
)
```
…and `_record_delegation` in `service.py` should pass `refinement=request.refinement, constraints=request.constraints`.
How can I resolve this? If you propose a fix, please make it concise.| def get_state( | ||
| self, | ||
| delegator_id: str, | ||
| delegatee_id: str, | ||
| ) -> CircuitBreakerState: | ||
| """Get the circuit breaker state for an agent pair. | ||
|
|
||
| If the circuit was previously open and the cooldown has expired, | ||
| the pair state is reset before returning ``CLOSED``. | ||
|
|
||
| Args: | ||
| delegator_id: First agent ID. | ||
| delegatee_id: Second agent ID. | ||
|
|
||
| Returns: | ||
| Current state of the circuit breaker. | ||
| """ | ||
| pair = self._get_pair(delegator_id, delegatee_id) | ||
| if pair is None: | ||
| return CircuitBreakerState.CLOSED | ||
| if pair.opened_at is not None: | ||
| elapsed = self._clock() - pair.opened_at | ||
| if elapsed < self._config.cooldown_seconds: | ||
| return CircuitBreakerState.OPEN | ||
| # Cooldown expired: evict the stale entry | ||
| key = pair_key(delegator_id, delegatee_id) | ||
| del self._pairs[key] | ||
| logger.info( | ||
| DELEGATION_LOOP_CIRCUIT_RESET, | ||
| delegator=delegator_id, | ||
| delegatee=delegatee_id, | ||
| cooldown_seconds=self._config.cooldown_seconds, | ||
| ) | ||
| return CircuitBreakerState.CLOSED |
There was a problem hiding this comment.
get_state() mutates internal state — surprising for a getter
The get_state method deletes the pair entry from self._pairs when cooldown expires. While the docstring mentions this ("the pair state is reset before returning CLOSED"), the name get_state implies a read-only inspection. This side effect causes two problems:
-
Double eviction under normal flow:
check()callsget_state()(may delete the pair), thenrecord_delegation()immediately callsget_state()again. The second call finds no entry and returnsCLOSEDcleanly, so the behavior is correct — but the state machine only survives by coincidence of call order. -
Test fragility: Any test that calls
get_state()directly to inspect state after a cooldown interval inadvertently resets the counter, making tests timing-sensitive and brittle. The injectableclockis there precisely to avoid real-time dependency — butget_state()'s side effect means calling it "to check state" modifies whatcheck()andrecord_delegation()would then see.
Consider separating eviction: keep get_state() as a pure reader and introduce a private _evict_if_expired() helper called at the top of check() and record_delegation().
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/loop_prevention/circuit_breaker.py
Line: 88-121
Comment:
**`get_state()` mutates internal state — surprising for a getter**
The `get_state` method deletes the pair entry from `self._pairs` when cooldown expires. While the docstring mentions this ("the pair state is reset before returning `CLOSED`"), the name `get_state` implies a read-only inspection. This side effect causes two problems:
1. **Double eviction under normal flow**: `check()` calls `get_state()` (may delete the pair), then `record_delegation()` immediately calls `get_state()` again. The second call finds no entry and returns `CLOSED` cleanly, so the behavior is correct — but the state machine only survives by coincidence of call order.
2. **Test fragility**: Any test that calls `get_state()` directly to inspect state after a cooldown interval inadvertently resets the counter, making tests timing-sensitive and brittle. The injectable `clock` is there precisely to avoid real-time dependency — but `get_state()`'s side effect means calling it "to check state" modifies what `check()` and `record_delegation()` would then see.
Consider separating eviction: keep `get_state()` as a pure reader and introduce a private `_evict_if_expired()` helper called at the top of `check()` and `record_delegation()`.
How can I resolve this? If you propose a fix, please make it concise.| # Team lead -> department head (lowest priority) | ||
| if team.lead != dept.head and team.lead not in supervisor_of: | ||
| supervisor_of[team.lead] = dept.head | ||
| reports_of.setdefault(dept.head, []).append(team.lead) |
There was a problem hiding this comment.
Undocumented "first team wins" behaviour for multi-team leads
When an agent leads teams in more than one department, their supervisor is silently assigned as the dept.head of whichever department happens to be iterated first, because the guard team.lead not in supervisor_of prevents any subsequent assignment:
if team.lead != dept.head and team.lead not in supervisor_of:
supervisor_of[team.lead] = dept.head # first dept wins
reports_of.setdefault(dept.head, []).append(team.lead)The class docstring documents three priority levels (explicit reporting line > team lead > dept head), but says nothing about conflicts within the same priority tier. In practice, iteration order is determined by company.departments ordering — which is a tuple in Company, so positional. A user who adds a department after the fact and doesn't realise their shared lead will silently switch hierarchy parent will get a wrong (and un-debuggable) authority check result.
At minimum, document this behaviour explicitly in the class docstring or in the __init__ inline comment. A logger.debug warning when a lead-to-dept assignment is skipped due to the guard would also help operators trace unexpected authority failures.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/communication/delegation/hierarchy.py
Line: 43-46
Comment:
**Undocumented "first team wins" behaviour for multi-team leads**
When an agent leads teams in more than one department, their supervisor is silently assigned as the `dept.head` of whichever department happens to be iterated first, because the guard `team.lead not in supervisor_of` prevents any subsequent assignment:
```python
if team.lead != dept.head and team.lead not in supervisor_of:
supervisor_of[team.lead] = dept.head # first dept wins
reports_of.setdefault(dept.head, []).append(team.lead)
```
The class docstring documents three priority levels (explicit reporting line > team lead > dept head), but says nothing about conflicts *within* the same priority tier. In practice, iteration order is determined by `company.departments` ordering — which is a `tuple` in `Company`, so positional. A user who adds a department after the fact and doesn't realise their shared lead will silently switch hierarchy parent will get a wrong (and un-debuggable) authority check result.
At minimum, document this behaviour explicitly in the class docstring or in the `__init__` inline comment. A `logger.debug` warning when a lead-to-dept assignment is skipped due to the guard would also help operators trace unexpected authority failures.
How can I resolve this? If you propose a fix, please make it concise.
Summary
parent_task_idanddelegation_chainfields for tracking delegation lineageMappingProxyType-wrapped hierarchy state, frozen Pydantic models with model validators enforcing consistency invariantsLoop Prevention Mechanisms
Architecture
_pair_keyutility for canonical agent pair orderingTest Plan
Review Coverage
Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 26 findings addressed — including bug fixes (circuit breaker re-arming, hierarchy duplicate entries), DRY extraction, model validators, observability gaps, and documentation updates.
🤖 Generated with Claude Code