feat: implement parallel agent execution (#22)#161
Conversation
Add ParallelExecutor for concurrent agent execution using asyncio.TaskGroup with error isolation, concurrency limits, resource locking, and progress tracking. Follows the ToolInvoker.invoke_all() pattern. New modules: - parallel_models: AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult, ParallelProgress (frozen Pydantic models) - resource_lock: ResourceLock protocol + InMemoryResourceLock - parallel: ParallelExecutor orchestrator - events/parallel: 10 event constants 54 new unit tests (24 models, 13 lock, 17 executor), 96.42% coverage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Pre-reviewed by 9 agents, 30 findings addressed: Source fixes: - Add AgentOutcome model_validator (result XOR error) - Make ParallelProgress.pending a @computed_field - Use NotBlankStr for resource_claims tuple - Use Self return type in _validate_assignments - Fix import ordering (events before TYPE_CHECKING) - Add fatal error logging (MemoryError/RecursionError) - Add shutdown rejection logging - Raise except* log level from DEBUG to INFO - Remove redundant per-task lock release (group-level only) - Wrap group-level lock release in try/except - Return None from _resolve_lock when no claims - Add wrong-holder warning in InMemoryResourceLock.release - Use nullcontext for semaphore pattern - Narrow _execute_assignment group parameter to group_id Documentation: - Update DESIGN_SPEC.md §15.3 (new files), §15.5 (conventions), §6.3 and §6.5 (implementation callouts) - Update CLAUDE.md engine package description Tests: - Add shutdown-in-progress rejection test - Add external lock holder conflict test - Add auto-created lock tests (success + conflict) - Add progress callback exception resilience test - Fix _make_run_result helper consistency - Fix test_total_cost_usd assertion - Add ParallelProgress computed pending tests - Add AgentOutcome both-none-rejected test
Polyfactory could randomly generate a task ID matching a dependency, causing sporadic self-dependency validation failures. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a parallel execution subsystem: new ParallelExecutor, Pydantic models for assignments/groups/outcomes/progress, a ResourceLock protocol with InMemoryResourceLock, new parallel-specific errors and observability events, plan parsing doc updates, and comprehensive unit tests for parallel behavior and locking. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant ParallelExecutor
participant ResourceLock
participant TaskGroup as AsyncTaskGroup
participant AgentEngine
participant Progress as ProgressCallback
Client->>ParallelExecutor: execute_group(group)
ParallelExecutor->>ResourceLock: validate/resolve resource claims
ParallelExecutor->>AsyncTaskGroup: create task group
loop per assignment
ParallelExecutor->>Progress: emit progress (pending)
ParallelExecutor->>AsyncTaskGroup: schedule _run_guarded(assignment)
AsyncTaskGroup->>ResourceLock: acquire(resource, holder)
ResourceLock-->>AsyncTaskGroup: acquired? (bool)
alt lock acquired
AsyncTaskGroup->>AgentEngine: run(assignment)
AgentEngine-->>AsyncTaskGroup: result / error
AsyncTaskGroup->>ResourceLock: release(resource, holder)
AsyncTaskGroup->>Progress: emit agent complete
else conflict
AsyncTaskGroup->>Progress: emit lock conflict / agent error
end
end
AsyncTaskGroup->>ParallelExecutor: tasks complete / cancelled
ParallelExecutor->>Progress: emit final progress
ParallelExecutor-->>Client: ParallelExecutionResult
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the system's concurrency capabilities by introducing a dedicated framework for executing multiple agents in parallel. It provides robust orchestration, resource management, and error handling for concurrent agent tasks, allowing for more efficient and scalable multi-agent workflows. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a robust and well-designed parallel execution framework for agents, featuring asyncio.TaskGroup for structured concurrency, a pluggable ResourceLock protocol, and comprehensive error handling. While the overall architecture is sound and the contribution is high-quality with well-validated Pydantic models and thorough test coverage, two critical issues require attention. A significant security flaw exists in the resource locking logic where the ParallelExecutor creates a new lock instance for every execution group if a shared lock is not provided, preventing effective coordination of shared resources and potentially leading to race conditions and data corruption. Furthermore, a critical race condition in progress tracking also needs to be addressed.
| @dataclasses.dataclass | ||
| class _ProgressState: | ||
| """Mutable progress tracking — internal to ``execute_group()`` scope.""" | ||
|
|
||
| group_id: str | ||
| total: int | ||
| completed: int = 0 | ||
| in_progress: int = 0 | ||
| succeeded: int = 0 | ||
| failed: int = 0 | ||
|
|
||
| def snapshot(self) -> ParallelProgress: | ||
| """Create a frozen progress snapshot.""" | ||
| return ParallelProgress( | ||
| group_id=self.group_id, | ||
| total=self.total, | ||
| completed=self.completed, | ||
| in_progress=self.in_progress, | ||
| succeeded=self.succeeded, | ||
| failed=self.failed, | ||
| ) |
There was a problem hiding this comment.
The _ProgressState object is shared among multiple concurrent tasks and its attributes are mutated without any synchronization mechanism. Operations like progress.in_progress += 1 are not atomic and can lead to race conditions when multiple _run_guarded tasks execute them concurrently. This can result in incorrect progress tracking, which could affect any logic that relies on these counts.
To fix this, you should introduce an asyncio.Lock to protect all modifications to the _ProgressState instance. A good approach would be to create the lock in execute_group and pass it down to _run_guarded and other helper methods that modify the progress state. All writes to progress attributes should then be performed within the context of this lock.
| def _resolve_lock( | ||
| self, | ||
| group: ParallelExecutionGroup, | ||
| ) -> ResourceLock | None: | ||
| """Return the resource lock to use, or ``None`` if not needed. | ||
|
|
||
| When no assignments declare resource claims, returns ``None`` | ||
| (no locking needed). When claims exist, falls back to | ||
| ``InMemoryResourceLock()`` if no lock was injected. | ||
| """ | ||
| has_claims = any(a.resource_claims for a in group.assignments) | ||
| if not has_claims: | ||
| return None | ||
| if self._resource_lock is not None: | ||
| return self._resource_lock | ||
| return InMemoryResourceLock() |
There was a problem hiding this comment.
The ParallelExecutor class fails to share its default resource lock across multiple execution groups. When no resource_lock is provided to the constructor, the _resolve_lock method creates a new instance of InMemoryResourceLock for every call to execute_group. This means that resource claims are only enforced within a single group, and concurrent executions of different groups (or even subsequent executions of the same ParallelExecutor instance) will not respect each other's locks. This defeats the purpose of the resource locking mechanism, which is intended to provide exclusive access to shared resources (like file paths) to prevent data corruption or race conditions. To fix this, initialize a default InMemoryResourceLock in the ParallelExecutor.__init__ method if no lock is provided, and use this shared instance in _resolve_lock.
| def _resolve_lock( | |
| self, | |
| group: ParallelExecutionGroup, | |
| ) -> ResourceLock | None: | |
| """Return the resource lock to use, or ``None`` if not needed. | |
| When no assignments declare resource claims, returns ``None`` | |
| (no locking needed). When claims exist, falls back to | |
| ``InMemoryResourceLock()`` if no lock was injected. | |
| """ | |
| has_claims = any(a.resource_claims for a in group.assignments) | |
| if not has_claims: | |
| return None | |
| if self._resource_lock is not None: | |
| return self._resource_lock | |
| return InMemoryResourceLock() | |
| def _resolve_lock( | |
| self, | |
| group: ParallelExecutionGroup, | |
| ) -> ResourceLock | None: | |
| """Return the resource lock to use, or ``None`` if not needed. | |
| When no assignments declare resource claims, returns ``None`` | |
| (no locking needed). When claims exist, falls back to | |
| the shared ``InMemoryResourceLock()`` if no lock was injected. | |
| """ | |
| has_claims = any(a.resource_claims for a in group.assignments) | |
| if not has_claims: | |
| return None | |
| if self._resource_lock is None: | |
| self._resource_lock = InMemoryResourceLock() | |
| return self._resource_lock |
Greptile SummaryThis PR introduces Key observations:
Remaining findings:
Confidence Score: 4/5
Last reviewed commit: f75ec0b |
There was a problem hiding this comment.
Pull request overview
Adds first-class parallel agent execution to the engine layer, enabling multiple AgentEngine.run() calls to be coordinated concurrently with structured concurrency, optional resource locking, progress tracking, and new observability events.
Changes:
- Introduces
ParallelExecutor+ supporting Pydantic models (AgentAssignment,ParallelExecutionGroup,AgentOutcome,ParallelExecutionResult,ParallelProgress) for parallel execution orchestration and reporting. - Adds pluggable resource locking (
ResourceLock+InMemoryResourceLock) and parallel execution observability events (events/parallel.py). - Expands unit test coverage for parallel execution, models, and locking; updates docs/specs and fixes a Polyfactory
TaskFactory.dependenciesflake.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
src/ai_company/engine/parallel.py |
New parallel orchestrator using asyncio.TaskGroup, semaphore limits, progress callback, shutdown integration, and resource-lock lifecycle. |
src/ai_company/engine/parallel_models.py |
Frozen Pydantic models defining assignments, groups, outcomes, results, and progress snapshots. |
src/ai_company/engine/resource_lock.py |
Resource locking protocol + in-memory implementation with conflict detection logging. |
src/ai_company/observability/events/parallel.py |
Adds structured event constants for parallel execution + lock/progress events. |
src/ai_company/engine/errors.py |
Adds ParallelExecutionError and ResourceConflictError. |
src/ai_company/engine/__init__.py |
Re-exports parallel execution APIs and locking types from engine. |
tests/unit/engine/test_parallel.py |
Unit tests for executor behavior (success/failure, concurrency, fail-fast, locking, progress, shutdown, fatal errors). |
tests/unit/engine/test_parallel_models.py |
Unit tests for model validation + computed fields (success flags, cost aggregation, pending). |
tests/unit/engine/test_resource_lock.py |
Unit tests for ResourceLock protocol compliance and in-memory lock correctness. |
tests/unit/observability/test_events.py |
Registers the new parallel events module for discovery tests. |
tests/unit/core/conftest.py |
Pins TaskFactory.dependencies = () to avoid flaky self-dependency collisions. |
DESIGN_SPEC.md |
Documents the parallel execution architecture and conventions. |
CLAUDE.md |
Updates repo structure overview to mention parallel execution under engine/. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/ai_company/engine/parallel.py
Outdated
| lock = self._resolve_lock(group) | ||
| self._validate_resource_claims(group) | ||
|
|
||
| if lock is not None: | ||
| await self._acquire_all_locks(group, lock) | ||
|
|
There was a problem hiding this comment.
Resource locks are acquired before the TaskGroup, but release is not in a finally that will run on caller cancellation (e.g., CancelledError). If execute_group() is cancelled after _acquire_all_locks() succeeds, the function can exit before _release_all_locks() runs, leaking locks and causing future groups to deadlock/conflict. Consider wrapping the whole “acquire → run → release” sequence in try/finally, and (optionally) shielding the release so cancellation can’t interrupt lock cleanup.
| task_id = assignment.task_id | ||
| agent_id = assignment.agent_id | ||
|
|
||
| if not self._register_with_shutdown(task_id, agent_id, outcomes): |
There was a problem hiding this comment.
When shutdown registration fails, _run_guarded() returns early without updating progress (completed/failed) or emitting a progress snapshot. This can leave progress callbacks reporting fewer completed items than group.total, even though an outcome is recorded. Consider updating progress.failed/progress.completed (and emitting progress) for this early-exit path as well.
| if not self._register_with_shutdown(task_id, agent_id, outcomes): | |
| if not self._register_with_shutdown(task_id, agent_id, outcomes): | |
| # Registration failed: an outcome may have been recorded, but we still need | |
| # to keep progress accounting consistent with the number of completed items. | |
| progress.failed += 1 | |
| progress.completed += 1 | |
| self._emit_progress(progress) |
| finally: | ||
| progress.in_progress = max(0, progress.in_progress - 1) | ||
| progress.completed += 1 | ||
| self._emit_progress(progress) | ||
|
|
There was a problem hiding this comment.
Progress accounting can become internally inconsistent for cancelled tasks (e.g., siblings cancelled during fail_fast): completed is incremented in finally, but neither succeeded nor failed is incremented for cancellations because asyncio.CancelledError isn’t handled. This means progress snapshots can end with completed == total while succeeded + failed < completed. Consider explicitly handling CancelledError to record an error outcome and increment failed (while still re-raising if you want cancellation to propagate).
DESIGN_SPEC.md
Outdated
| | **Shared field groups** | Adopted (M2.5) | Extracted common field sets into base models (e.g. `_SpendingTotals`) | Prevents field duplication across spending summary models. `_SpendingTotals` provides shared aggregation fields; `AgentSpending`, `DepartmentSpending`, `PeriodSpending` extend it. | | ||
| | **Event constants** | Adopted (per-domain) | Per-domain submodules under `events/` package (e.g. `events.provider`, `events.budget`). Import directly: `from ai_company.observability.events.<domain> import CONSTANT` | Split by domain for discoverability, co-location with domain logic, and reduced merge conflicts as constants grow. `__init__.py` serves as package marker with usage documentation; no re-exports. | | ||
| | **Parallel tool execution** | Adopted (M2.5) | `asyncio.TaskGroup` in `ToolInvoker.invoke_all` with optional `max_concurrency` semaphore | Structured concurrency with proper cancellation semantics. Fatal errors collected via guarded wrapper and re-raised after all tasks complete. | | ||
| | **Parallel agent execution** | Adopted (M3) | `ParallelExecutor` coordinates concurrent `AgentEngine.run()` calls via `asyncio.TaskGroup` + optional `Semaphore` concurrency limit + `_run_guarded()` error isolation. `ResourceLock` protocol with `InMemoryResourceLock` for exclusive file-path claims. Progress tracking via `ProgressCallback`. Shutdown-aware via `ShutdownManager` task registration. Fail-fast mode re-raises first failure through `TaskGroup`. | Follows the `ToolInvoker.invoke_all()` pattern (parallel tool execution above). Composition over inheritance — wraps `AgentEngine`. Structured concurrency with proper cancellation. See §6.3 Parallel Execution. | |
There was a problem hiding this comment.
This table row states “Fail-fast mode re-raises first failure through TaskGroup”, but ParallelExecutor.execute_group() catches the resulting ExceptionGroup and returns a ParallelExecutionResult instead of propagating the exception to the caller. Please clarify the spec wording (e.g., fail-fast cancels siblings early, but errors are still surfaced via outcomes/result) or adjust the implementation to match the spec.
| | **Parallel agent execution** | Adopted (M3) | `ParallelExecutor` coordinates concurrent `AgentEngine.run()` calls via `asyncio.TaskGroup` + optional `Semaphore` concurrency limit + `_run_guarded()` error isolation. `ResourceLock` protocol with `InMemoryResourceLock` for exclusive file-path claims. Progress tracking via `ProgressCallback`. Shutdown-aware via `ShutdownManager` task registration. Fail-fast mode re-raises first failure through `TaskGroup`. | Follows the `ToolInvoker.invoke_all()` pattern (parallel tool execution above). Composition over inheritance — wraps `AgentEngine`. Structured concurrency with proper cancellation. See §6.3 Parallel Execution. | | |
| | **Parallel agent execution** | Adopted (M3) | `ParallelExecutor` coordinates concurrent `AgentEngine.run()` calls via `asyncio.TaskGroup` + optional `Semaphore` concurrency limit + `_run_guarded()` error isolation. `ResourceLock` protocol with `InMemoryResourceLock` for exclusive file-path claims. Progress tracking via `ProgressCallback`. Shutdown-aware via `ShutdownManager` task registration. Fail-fast mode cancels sibling tasks on first failure; all errors are surfaced via `ParallelExecutionResult` outcomes instead of being re-raised. | Follows the `ToolInvoker.invoke_all()` pattern (parallel tool execution above). Composition over inheritance — wraps `AgentEngine`. Structured concurrency with proper cancellation. See §6.3 Parallel Execution. | |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@DESIGN_SPEC.md`:
- Line 2531: Update the "Parallel agent execution" row to describe fail_fast as
sibling cancellation (the first failing task triggers cancellation of sibling
tasks) rather than re-raising the first exception; state that execute_group()
returns a ParallelExecutionResult containing cancellation outcomes for siblings
and only raises when structured concurrency (TaskGroup) itself fails for
unexpected reasons, and reference the relevant symbols: ParallelExecutor,
AgentEngine.run(), _run_guarded(), TaskGroup, execute_group(),
ParallelExecutionResult, and fail_fast so readers/tests (e.g.,
tests/unit/engine/test_parallel.py) understand callers should expect a
cancellation-result path instead of a thrown exception for ordinary fail-fast
failures.
- Line 806: Update the milestone wording in DESIGN_SPEC.md to remove the
contradiction: where M3 is described as single-agent-only (e.g., section §6.8
and other earlier references) reconcile it with the new statement that
ParallelExecutor (engine/parallel.py) and associated models
(engine/parallel_models.py: AgentAssignment, ParallelExecutionGroup,
AgentOutcome, ParallelExecutionResult, ParallelProgress) implement parallel
multi-agent execution; choose and update M3 wording consistently to either: (a)
state M3 shipped single-agent and mark parallel execution as a post-M3
deviation/approved change, or (b) declare M3 as including parallel multi-agent
execution and update all earlier references (including §6.8 and lines around the
existing M3 callout) to reflect that new reality, and ensure the DESIGN_SPEC.md
note about updating the spec after approved deviations is applied.
In `@src/ai_company/engine/parallel_models.py`:
- Around line 143-159: The validator _validate_result_or_error currently only
rejects both-missing but allows both result and error and never verifies that a
present result belongs to this outcome; update _validate_result_or_error to (1)
raise an error if both self.result and self.error are set (mutual exclusivity),
and (2) if self.result is set, verify that self.result.task_id == self.task_id
and self.result.agent_id == self.agent_id and raise ValueError with a clear
message if they differ; keep raising when both are None as before. Use the
existing types AgentRunResult, task_id, agent_id and the method name
_validate_result_or_error to locate where to implement these checks.
- Around line 240-253: The pending property currently silences impossible
states; add explicit validation in the ParallelProgress model to reject
inconsistent snapshots instead of clamping. Implement a pydantic root_validator
(e.g., def validate_counts(cls, values): ...) on ParallelProgress that checks
invariants such as total >= 0, completed >= 0, in_progress >= 0, completed +
in_progress <= total, succeeded + failed <= completed, and raise a ValueError
with a clear message when any invariant fails; keep the pending
computed_field/property as-is but rely on these pre-validated counters.
In `@src/ai_company/engine/parallel.py`:
- Around line 106-222: The execute_group function is too long; extract the
semaphore creation, TaskGroup execution, and result construction into private
helpers to reduce length while preserving behavior. Create a
_create_semaphore(self, group) that returns asyncio.Semaphore or None (used
where semaphore is currently created); a _run_all_assignments(self, group,
outcomes, fatal_errors, progress, semaphore) that contains the async with
asyncio.TaskGroup() block and calls self._run_guarded for each assignment
(ensure it propagates ExceptionGroup handling exactly as current code); and a
_build_result(self, group, outcomes, start) that computes duration and returns
the ParallelExecutionResult (used where result is built and logging of
PARALLEL_GROUP_COMPLETE is done). Ensure these helpers accept and return the
same values used in execute_group and keep existing calls to _acquire_all_locks,
_release_all_locks, logging, and fatal_errors handling unchanged.
In `@tests/unit/engine/test_parallel.py`:
- Around line 189-203: The test only checks aggregate counts and not that each
AgentRunResult is matched to the correct assignment; update the
test_two_agents_both_succeed to assert that the returned run results from
executor.execute_group contain entries whose agent_id and task_id match the
assignments created via _make_assignment (a1.identity/a1.task and
a2.identity/a2.task) and that each corresponding _make_run_result (r1, r2) is
attached to the correct assignment; locate the returned structure from
execute_group (e.g., result.agent_runs or result.items) and add explicit
comparisons of agent_id/task_id pairs for a1 and a2 in addition to the existing
aggregate asserts, and apply the same pairing assertions to the other
multi-agent success/failure tests mentioned (the similar tests around the other
ranges).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 521a635c-aa49-4142-be06-d03660bdd1f9
📒 Files selected for processing (13)
CLAUDE.mdDESIGN_SPEC.mdsrc/ai_company/engine/__init__.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.pysrc/ai_company/engine/resource_lock.pysrc/ai_company/observability/events/parallel.pytests/unit/core/conftest.pytests/unit/engine/test_parallel.pytests/unit/engine/test_parallel_models.pytests/unit/engine/test_resource_lock.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Agent
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (6)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Always use native lazy annotations (PEP 649) — do not usefrom __future__ import annotations
Use PEP 758 except syntax:except A, B:without parentheses for multiple exception types
Maintain line length of 88 characters (enforced by ruff)
Files:
tests/unit/core/conftest.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/resource_lock.pytests/unit/engine/test_parallel_models.pysrc/ai_company/engine/__init__.pytests/unit/observability/test_events.pytests/unit/engine/test_resource_lock.pysrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.pysrc/ai_company/observability/events/parallel.pytests/unit/engine/test_parallel.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slowto categorize tests
Prefer@pytest.mark.parametrizefor testing similar test cases
Use generic vendor-agnostic names in test fixtures and configuration:test-provider,test-small-001,test-medium-001,test-large-001
Files:
tests/unit/core/conftest.pytests/unit/engine/test_parallel_models.pytests/unit/observability/test_events.pytests/unit/engine/test_resource_lock.pytests/unit/engine/test_parallel.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: All public functions and classes must have type hints and Google-style docstrings (enforced by ruff D rules)
Never useimport loggingorlogging.getLogger()in application code — usefrom ai_company.observability import get_loggerwith variable namelogger
Use structured logging with constants fromai_company.observability.events.<domain>— always log aslogger.info(EVENT_CONSTANT, key=value)never with format strings
For all identifier/name fields in Pydantic models, useNotBlankStrtype fromcore.typesinstead of manual whitespace validators (including optional and tuple variants)
Use@computed_fieldfor derived values in Pydantic models instead of storing and validating redundant fields
Enforce immutability: create new objects instead of mutating. For non-Pydantic internal collections usecopy.deepcopy()at construction andMappingProxyTypefor read-only enforcement
For config/identity data use frozen Pydantic models; for runtime state that evolves use separate mutable-via-copy models withmodel_copy(update=...)
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code instead of barecreate_task
Functions must be under 50 lines, files must be under 800 lines
Handle errors explicitly — never silently swallow exceptions
Never use real vendor names (Anthropic, OpenAI, Claude, GPT) in project-owned code, docstrings, comments, or tests — use generic names:example-provider,example-large-001,test-provider,test-small-001
Files:
src/ai_company/engine/errors.pysrc/ai_company/engine/resource_lock.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.pysrc/ai_company/observability/events/parallel.py
src/ai_company/{providers,tools,engine,communication}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use
copy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Files:
src/ai_company/engine/errors.pysrc/ai_company/engine/resource_lock.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.py
src/ai_company/engine/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Catch
RetryExhaustedErrorat the engine layer to trigger fallback chains when all provider retries fail
Files:
src/ai_company/engine/errors.pysrc/ai_company/engine/resource_lock.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.py
DESIGN_SPEC.md
📄 CodeRabbit inference engine (CLAUDE.md)
Update
DESIGN_SPEC.mdafter approved deviations to reflect the new reality
Files:
DESIGN_SPEC.md
🧠 Learnings (2)
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code instead of bare `create_task`
Applied to files:
src/ai_company/engine/parallel.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,tools,engine,communication}/**/*.py : Use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Applied to files:
DESIGN_SPEC.md
🧬 Code graph analysis (4)
src/ai_company/engine/resource_lock.py (1)
src/ai_company/observability/_logger.py (1)
get_logger(8-28)
src/ai_company/engine/__init__.py (4)
src/ai_company/engine/errors.py (3)
ParallelExecutionError(42-43)PromptBuildError(8-9)ResourceConflictError(46-47)src/ai_company/engine/parallel.py (1)
ParallelExecutor(77-485)src/ai_company/engine/parallel_models.py (5)
AgentAssignment(23-77)AgentOutcome(131-167)ParallelExecutionGroup(80-128)ParallelExecutionResult(170-222)ParallelProgress(225-253)src/ai_company/engine/resource_lock.py (2)
InMemoryResourceLock(64-137)ResourceLock(24-61)
tests/unit/engine/test_resource_lock.py (1)
src/ai_company/engine/resource_lock.py (12)
InMemoryResourceLock(64-137)ResourceLock(24-61)acquire(31-38)acquire(76-96)release(40-46)release(98-116)is_locked(55-57)is_locked(131-133)holder_of(59-61)holder_of(135-137)release_all(48-53)release_all(118-129)
src/ai_company/engine/parallel_models.py (4)
src/ai_company/core/agent.py (1)
AgentIdentity(246-304)src/ai_company/core/task.py (1)
Task(38-212)src/ai_company/engine/run_result.py (1)
AgentRunResult(19-109)src/ai_company/providers/models.py (2)
ChatMessage(138-210)CompletionConfig(213-254)
🔇 Additional comments (24)
tests/unit/core/conftest.py (1)
174-174: LGTM!Pinning
dependencies = ()prevents Polyfactory from generating random task dependencies that could inadvertently create self-referential or invalid dependency chains, fixing the flaky test behavior mentioned in the PR.CLAUDE.md (1)
52-52: LGTM!Documentation accurately reflects the new parallel execution capability added to the engine package.
tests/unit/observability/test_events.py (1)
111-111: LGTM!The addition of
"parallel"to the expected modules set correctly reflects the newevents/parallel.pymodule and maintains alphabetical ordering.tests/unit/engine/test_resource_lock.py (2)
1-16: LGTM!Good structural test validating that
InMemoryResourceLockimplements theResourceLockprotocol. The protocol compliance check ensures type safety at runtime.
19-111: LGTM!Comprehensive test coverage for
InMemoryResourceLock:
- Acquire/release semantics including idempotence and wrong-holder scenarios
release_allwith mixed holders- Concurrent acquisition test using
asyncio.TaskGroup(per coding guidelines)- Multiple resource independence
The concurrent test (lines 89-102) properly validates that only one holder wins when multiple tasks race for the same lock.
src/ai_company/engine/errors.py (1)
40-47: LGTM!Clean exception class additions that follow the existing
EngineErrorhierarchy pattern. The docstrings are concise and descriptive.src/ai_company/engine/parallel.py (11)
1-52: LGTM!Well-structured module header with proper imports, logger setup using
get_logger, and a clearProgressCallbacktype alias with documentation.
54-75: LGTM!Clean internal progress tracking dataclass with immutable snapshot generation. The
snapshot()method correctly creates a frozenParallelProgressfrom mutable state.
77-105: LGTM!Good use of composition over inheritance pattern. The constructor properly accepts optional dependencies with sensible defaults (auto-creating
InMemoryResourceLockwhen claims exist).
224-291: LGTM!The
_run_guardedmethod correctly implements the error isolation pattern:
- Fatal errors (
MemoryError,RecursionError) are collected but don't crash siblings- Regular exceptions are recorded and optionally re-raised for fail-fast
BaseExceptionpropagates through TaskGroup- Progress tracking is properly updated in the finally block
- Shutdown manager registration/unregistration is handled correctly
292-320: LGTM!Clean shutdown integration with proper error handling and outcome recording when shutdown is in progress.
321-364: LGTM!The
_execute_assignmentcorrectly usesnullcontext()pattern for optional semaphore and properly logs agent lifecycle events.
365-388: LGTM!Clean error outcome recording with structured logging.
389-405: LGTM!Smart auto-creation of
InMemoryResourceLockonly when claims exist, avoiding unnecessary lock overhead for groups without resource claims.
406-431: LGTM!Resource claim validation correctly detects conflicts before execution starts, with proper logging and error propagation.
433-464: LGTM!Lock acquisition includes rollback logic (line 454) when a lock fails mid-acquisition. The
_release_all_lockscorrectly usesrelease_allper-agent rather than per-resource.
466-485: LGTM!Progress emission is resilient - callback exceptions are caught and logged without disrupting execution. Good defensive programming.
tests/unit/engine/test_parallel_models.py (6)
1-112: LGTM!Well-structured test helpers that use vendor-agnostic names (
test-provider,test-small-001) as required by coding guidelines. The_make_run_resulthelper correctly builds completeAgentRunResultobjects for testing outcomes.
114-161: LGTM!Comprehensive
AgentAssignmenttests covering:
- Minimal and full construction with defaults verification
- Immutability enforcement via
ValidationError- Computed properties (
agent_id,task_id)
163-256: LGTM!Excellent validation test coverage for
ParallelExecutionGroup:
- Empty assignments rejection
- Duplicate task_ids and agent_ids rejection
- Invalid
max_concurrencyvalues (0, negative)- Blank
group_idrejection viaNotBlankStrThe regex patterns in
match=arguments are appropriately flexible.
258-309: LGTM!Good coverage of
AgentOutcomestates including the importanttest_both_none_rejectedcase ensuring at least one ofresultorerrormust be present.
311-386: LGTM!
ParallelExecutionResulttests properly validate computed fields (agents_succeeded,agents_failed,all_succeeded,total_cost_usd) and aggregation behavior.
388-441: LGTM!
ParallelProgresstests verify thependingcomputed field including edge case clamping to zero whencompleted + in_progress > total.src/ai_company/observability/events/parallel.py (1)
1-14: LGTM!Clean event constant definitions following the established
domain.subject.qualifierpattern. All constants are typed asFinal[str]and are properly used throughout the parallel execution implementation.
DESIGN_SPEC.md
Outdated
| └──▶ Backend Dev ──┘ | ||
| ``` | ||
|
|
||
| > **Current state (M3):** `ParallelExecutor` (in `engine/parallel.py`) implements concurrent agent execution with `asyncio.TaskGroup`, configurable concurrency limits, resource locking for exclusive file access, and error isolation. Models in `engine/parallel_models.py`: `AgentAssignment`, `ParallelExecutionGroup`, `AgentOutcome`, `ParallelExecutionResult`, `ParallelProgress`. |
There was a problem hiding this comment.
Reconcile the milestone wording.
These new M3 callouts now say parallel multi-agent execution ships in M3, but earlier sections and §6.8 still describe M3 as single-agent-only. Please update the surrounding milestone language so the spec does not claim both states.
As per coding guidelines, DESIGN_SPEC.md: Update DESIGN_SPEC.md after approved deviations to reflect the new reality.
Also applies to: 840-840
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@DESIGN_SPEC.md` at line 806, Update the milestone wording in DESIGN_SPEC.md
to remove the contradiction: where M3 is described as single-agent-only (e.g.,
section §6.8 and other earlier references) reconcile it with the new statement
that ParallelExecutor (engine/parallel.py) and associated models
(engine/parallel_models.py: AgentAssignment, ParallelExecutionGroup,
AgentOutcome, ParallelExecutionResult, ParallelProgress) implement parallel
multi-agent execution; choose and update M3 wording consistently to either: (a)
state M3 shipped single-agent and mark parallel execution as a post-M3
deviation/approved change, or (b) declare M3 as including parallel multi-agent
execution and update all earlier references (including §6.8 and lines around the
existing M3 callout) to reflect that new reality, and ensure the DESIGN_SPEC.md
note about updating the spec after approved deviations is applied.
| task_id: NotBlankStr = Field(description="Task identifier") | ||
| agent_id: NotBlankStr = Field(description="Agent identifier") | ||
| result: AgentRunResult | None = Field( | ||
| default=None, | ||
| description="Present if execution produced a result", | ||
| ) | ||
| error: str | None = Field( | ||
| default=None, | ||
| description="Present if agent crashed before producing result", | ||
| ) | ||
|
|
||
| @model_validator(mode="after") | ||
| def _validate_result_or_error(self) -> Self: | ||
| if self.result is None and self.error is None: | ||
| msg = "Either result or error must be set" | ||
| raise ValueError(msg) | ||
| return self |
There was a problem hiding this comment.
Tighten AgentOutcome invariants.
This validator only rejects the “both missing” case. It still allows both result and error, and it never checks that a present result matches task_id/agent_id, so an internally inconsistent outcome can still be marked successful.
Suggested validation hardening
`@model_validator`(mode="after")
def _validate_result_or_error(self) -> Self:
- if self.result is None and self.error is None:
- msg = "Either result or error must be set"
+ if (self.result is None) == (self.error is None):
+ msg = "Exactly one of result or error must be set"
raise ValueError(msg)
+ if self.result is not None:
+ if self.result.task_id != self.task_id:
+ msg = "result.task_id must match task_id"
+ raise ValueError(msg)
+ if self.result.agent_id != self.agent_id:
+ msg = "result.agent_id must match agent_id"
+ raise ValueError(msg)
return self🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/parallel_models.py` around lines 143 - 159, The
validator _validate_result_or_error currently only rejects both-missing but
allows both result and error and never verifies that a present result belongs to
this outcome; update _validate_result_or_error to (1) raise an error if both
self.result and self.error are set (mutual exclusivity), and (2) if self.result
is set, verify that self.result.task_id == self.task_id and self.result.agent_id
== self.agent_id and raise ValueError with a clear message if they differ; keep
raising when both are None as before. Use the existing types AgentRunResult,
task_id, agent_id and the method name _validate_result_or_error to locate where
to implement these checks.
| async def test_two_agents_both_succeed(self) -> None: | ||
| a1 = _make_assignment("a1", "t1") | ||
| a2 = _make_assignment("a2", "t2") | ||
| r1 = _make_run_result(a1.identity, a1.task) | ||
| r2 = _make_run_result(a2.identity, a2.task) | ||
| engine = _mock_engine(side_effect=[r1, r2]) | ||
| executor = ParallelExecutor(engine=engine) | ||
| group = _make_group(a1, a2) | ||
|
|
||
| result = await executor.execute_group(group) | ||
|
|
||
| assert result.agents_succeeded == 2 | ||
| assert result.agents_failed == 0 | ||
| assert result.all_succeeded is True | ||
| assert engine.run.await_count == 2 |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Assert assignment/result pairing in the multi-agent success tests.
These cases only verify aggregate success counts. If ParallelExecutor accidentally attaches a returned AgentRunResult to the wrong assignment, they still pass—especially with list side_effects consumed under concurrent scheduling. Please also assert the returned agent_id/task_id pairs.
Example assertion pattern
result = await executor.execute_group(group)
assert result.agents_succeeded == 2
assert result.agents_failed == 0
assert result.all_succeeded is True
+ assert sorted((o.agent_id, o.task_id) for o in result.outcomes) == sorted(
+ (str(a.identity.id), a.task.id) for a in (a1, a2)
+ )
assert engine.run.await_count == 2Also applies to: 312-335, 398-417, 504-526
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_parallel.py` around lines 189 - 203, The test only
checks aggregate counts and not that each AgentRunResult is matched to the
correct assignment; update the test_two_agents_both_succeed to assert that the
returned run results from executor.execute_group contain entries whose agent_id
and task_id match the assignments created via _make_assignment
(a1.identity/a1.task and a2.identity/a2.task) and that each corresponding
_make_run_result (r1, r2) is attached to the correct assignment; locate the
returned structure from execute_group (e.g., result.agent_runs or result.items)
and add explicit comparisons of agent_id/task_id pairs for a1 and a2 in addition
to the existing aggregate asserts, and apply the same pairing assertions to the
other multi-agent success/failure tests mentioned (the similar tests around the
other ranges).
Address 23 review items from local agents (code-reviewer, type-design, silent-failure-hunter, logging-audit, docs-consistency, issue-resolution) and external reviewers (CodeRabbit, Gemini, Copilot, Greptile): - Wrap lock release in try/finally for CancelledError safety - Add CancelledError handler with proper outcome recording - Track progress for shutdown-rejected tasks - Extract _run_task_group, _build_result, _record_fatal_outcome methods - Persist auto-created InMemoryResourceLock on self._resource_lock - Reorder finally: unregister_task before _emit_progress - Add ERROR log before fatal ParallelExecutionError raise - Remove duplicate PARALLEL_GROUP_COMPLETE log from except* block - Add duplicate resource claims validator on AgentAssignment - Enforce mutual exclusivity (result XOR error) on AgentOutcome - Add cross-field count validators on ParallelProgress - Update DESIGN_SPEC.md: add ParallelProgress, plan_parsing.py, company.py events, fix fail_fast wording, add M3 clarification - Add parametrized numeric constraint tests - Add outcome pairing assertions in multi-agent test - Fix fail_fast test to handle CancelledError outcomes Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 5
♻️ Duplicate comments (3)
tests/unit/engine/test_parallel.py (1)
189-235:⚠️ Potential issue | 🟡 MinorAssert the embedded
AgentRunResultpairing in these multi-agent tests.These cases still only verify aggregate counts or the wrapper
AgentOutcome.agent_id/task_id.ParallelExecutor._execute_assignment()copies those wrapper IDs from the assignment, so the tests still pass if concurrent scheduling swaps the returnedAgentRunResultobjects between assignments. Asserto.result.agent_id/task_id(or exactr1/r2object pairing) instead.Also applies to: 316-339, 402-421, 508-530
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_parallel.py` around lines 189 - 235, Tests currently only check wrapper outcome pairing and aggregate counts, which can miss mismatches between returned AgentRunResult objects and the assignments; update the tests (e.g., in test_two_agents_both_succeed and test_one_fails_one_succeeds) to assert the embedded AgentRunResult pairing produced by ParallelExecutor._execute_assignment by verifying outcome.result.agent_id and outcome.result.task_id (or comparing outcome.result against the exact r1/r2 objects) rather than only AgentOutcome.agent_id/task_id or counts, ensuring each outcome.result corresponds to the correct assignment's AgentRunResult.src/ai_company/engine/parallel_models.py (1)
167-172:⚠️ Potential issue | 🟠 MajorValidate that
resultbelongs to this outcome.
Exactly one of result/erroris not sufficient here. A caller can still buildAgentOutcome(task_id="t1", agent_id="a1", result=<run for t2/a2>), and the computed success/cost fields will silently describe the wrong execution.🔧 Suggested invariant check
`@model_validator`(mode="after") def _validate_result_or_error(self) -> Self: if (self.result is None) == (self.error is None): msg = "Exactly one of result or error must be set" raise ValueError(msg) + if self.result is not None: + if self.result.task_id != self.task_id: + msg = "result.task_id must match task_id" + raise ValueError(msg) + if self.result.agent_id != self.agent_id: + msg = "result.agent_id must match agent_id" + raise ValueError(msg) return self🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/parallel_models.py` around lines 167 - 172, The current _validate_result_or_error in AgentOutcome only enforces exclusivity but must also ensure that a provided result or error actually belongs to this outcome; update _validate_result_or_error to keep the existing exclusivity check and then, if self.result is not None, verify self.result.task_id == self.task_id and self.result.agent_id == self.agent_id (raise ValueError with a clear message on mismatch), and similarly if self.error is not None and the error object exposes task_id/agent_id attributes check those match self.task_id and self.agent_id (or skip those attribute checks if the error type does not have them), so stray results/errors for different task/agent IDs cannot be attached to this AgentOutcome.DESIGN_SPEC.md (1)
806-806:⚠️ Potential issue | 🟡 MinorReconcile the remaining single-agent-only wording.
These sections now describe shipped parallel execution in M3, but the workspace-isolation and coordination-topology callouts still say M3 is single-agent-only and that concurrent file work is not applicable. Please make the milestone story consistent across all three sections.
As per coding guidelines,
DESIGN_SPEC.md: UpdateDESIGN_SPEC.mdafter approved deviations to reflect the new reality.Also applies to: 840-840, 1098-1102, 1157-1160
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@DESIGN_SPEC.md` at line 806, Update the DESIGN_SPEC.md narrative to remove contradictory "single-agent-only" wording and make the milestone story consistent: state that M3 ships ParallelExecutor (engine/parallel.py) with asyncio.TaskGroup-based concurrent execution, concurrency limits, resource locking, error isolation and progress tracking, and reflect that workspace-isolation and coordination-topology sections now apply to concurrent file work; update mentions of models in engine/parallel_models.py (AgentAssignment, ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult, ParallelProgress) accordingly, and apply the same language changes to the other referenced sections (around the other ranges noted) so all three sections consistently describe parallel execution support in M3.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/engine/parallel.py`:
- Around line 134-157: The lock acquisition currently happens before the outer
try, so if _acquire_all_locks() raises or cancellation occurs during
acquisition, the finally that calls _release_all_locks() (which is inside the
try/finally) never runs and locks can leak; fix by moving the await
self._acquire_all_locks(group, lock) call inside the try block immediately after
resolving lock and validating claims, and either rely on _release_all_locks() to
be safe to call when acquisition was partial or set a local flag (e.g., acquired
= True) after a successful await self._acquire_all_locks(...) and check that
flag in the finally before calling await self._release_all_locks(group, lock);
keep the calls to _resolve_lock, _validate_resource_claims, _run_task_group, and
_release_all_locks unchanged otherwise.
- Around line 274-283: The code currently treats every completed AgentRunResult
as a success; change both completion paths (the except block at the shown
location and the similar block at lines 356-376) to inspect
AgentRunResult.is_success: if result.is_success is True then increment
progress.succeeded and log success=True, otherwise call
self._record_error_outcome(result, assignment, group, outcomes, progress) (or
the same signature used there) and, if group.fail_fast, propagate by raising an
exception or otherwise triggering cancellation so siblings are stopped. Use the
symbols AgentRunResult.is_success, progress.succeeded, _record_error_outcome,
and group.fail_fast to locate and implement the branching logic in both places.
- Around line 504-526: The lock holder should be scoped to the execution group
so concurrent groups for the same agent don't share locks; create an
execution-scoped holder (e.g. combine group.group_id and assignment.agent_id
into a single holder string) and pass that holder to ResourceLock.acquire and
later to ResourceLock.release_all (and use it when calling lock.holder_of for
logging) instead of passing raw assignment.agent_id, ensuring
acquire/release_all use the same composite holder per group.
In `@tests/unit/engine/test_parallel_models.py`:
- Around line 288-299: The test_success_outcome fixture creates an AgentOutcome
with hardcoded "a1"/"t1" IDs that contradict the embedded AgentRunResult
produced by _make_run_result; update the test to derive task_id and agent_id for
the AgentOutcome from the AgentRunResult returned by _make_run_result (e.g., use
result.task_id and result.agent_id) so the AgentOutcome wrapper is internally
consistent with the AgentRunResult; apply the same change pattern to the other
fixtures mentioned (lines around test cases at 311-321 and 357-418) that
currently hardcode IDs.
- Around line 73-82: Make _make_assignment positional-only for name and title to
avoid mypy errors when callers unpack dynamic kwargs: change the signature of
_make_assignment so name and title cannot be bound via keywords (e.g., make them
positional-only), keeping return type AgentAssignment and continuing to call
_make_identity and _make_task inside; also fix the AgentOutcome test fixtures
that hardcode ids ("t1","a1","t2","a2") so they match the ids produced by
_make_run_result (or derive the fixture ids from _make_run_result) to ensure
internal consistency between the fixtures and the run result.
---
Duplicate comments:
In `@DESIGN_SPEC.md`:
- Line 806: Update the DESIGN_SPEC.md narrative to remove contradictory
"single-agent-only" wording and make the milestone story consistent: state that
M3 ships ParallelExecutor (engine/parallel.py) with asyncio.TaskGroup-based
concurrent execution, concurrency limits, resource locking, error isolation and
progress tracking, and reflect that workspace-isolation and
coordination-topology sections now apply to concurrent file work; update
mentions of models in engine/parallel_models.py (AgentAssignment,
ParallelExecutionGroup, AgentOutcome, ParallelExecutionResult, ParallelProgress)
accordingly, and apply the same language changes to the other referenced
sections (around the other ranges noted) so all three sections consistently
describe parallel execution support in M3.
In `@src/ai_company/engine/parallel_models.py`:
- Around line 167-172: The current _validate_result_or_error in AgentOutcome
only enforces exclusivity but must also ensure that a provided result or error
actually belongs to this outcome; update _validate_result_or_error to keep the
existing exclusivity check and then, if self.result is not None, verify
self.result.task_id == self.task_id and self.result.agent_id == self.agent_id
(raise ValueError with a clear message on mismatch), and similarly if self.error
is not None and the error object exposes task_id/agent_id attributes check those
match self.task_id and self.agent_id (or skip those attribute checks if the
error type does not have them), so stray results/errors for different task/agent
IDs cannot be attached to this AgentOutcome.
In `@tests/unit/engine/test_parallel.py`:
- Around line 189-235: Tests currently only check wrapper outcome pairing and
aggregate counts, which can miss mismatches between returned AgentRunResult
objects and the assignments; update the tests (e.g., in
test_two_agents_both_succeed and test_one_fails_one_succeeds) to assert the
embedded AgentRunResult pairing produced by ParallelExecutor._execute_assignment
by verifying outcome.result.agent_id and outcome.result.task_id (or comparing
outcome.result against the exact r1/r2 objects) rather than only
AgentOutcome.agent_id/task_id or counts, ensuring each outcome.result
corresponds to the correct assignment's AgentRunResult.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: a1ea3768-46df-4768-9306-77123b8e035d
📒 Files selected for processing (5)
DESIGN_SPEC.mdsrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.pytests/unit/engine/test_parallel.pytests/unit/engine/test_parallel_models.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Greptile Review
🧰 Additional context used
📓 Path-based instructions (6)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Always use native lazy annotations (PEP 649) — do not usefrom __future__ import annotations
Use PEP 758 except syntax:except A, B:without parentheses for multiple exception types
Maintain line length of 88 characters (enforced by ruff)
Files:
tests/unit/engine/test_parallel_models.pytests/unit/engine/test_parallel.pysrc/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use pytest markers@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slowto categorize tests
Prefer@pytest.mark.parametrizefor testing similar test cases
Use generic vendor-agnostic names in test fixtures and configuration:test-provider,test-small-001,test-medium-001,test-large-001
Files:
tests/unit/engine/test_parallel_models.pytests/unit/engine/test_parallel.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: All public functions and classes must have type hints and Google-style docstrings (enforced by ruff D rules)
Never useimport loggingorlogging.getLogger()in application code — usefrom ai_company.observability import get_loggerwith variable namelogger
Use structured logging with constants fromai_company.observability.events.<domain>— always log aslogger.info(EVENT_CONSTANT, key=value)never with format strings
For all identifier/name fields in Pydantic models, useNotBlankStrtype fromcore.typesinstead of manual whitespace validators (including optional and tuple variants)
Use@computed_fieldfor derived values in Pydantic models instead of storing and validating redundant fields
Enforce immutability: create new objects instead of mutating. For non-Pydantic internal collections usecopy.deepcopy()at construction andMappingProxyTypefor read-only enforcement
For config/identity data use frozen Pydantic models; for runtime state that evolves use separate mutable-via-copy models withmodel_copy(update=...)
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code instead of barecreate_task
Functions must be under 50 lines, files must be under 800 lines
Handle errors explicitly — never silently swallow exceptions
Never use real vendor names (Anthropic, OpenAI, Claude, GPT) in project-owned code, docstrings, comments, or tests — use generic names:example-provider,example-large-001,test-provider,test-small-001
Files:
src/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.py
src/ai_company/{providers,tools,engine,communication}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use
copy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Files:
src/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.py
src/ai_company/engine/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Catch
RetryExhaustedErrorat the engine layer to trigger fallback chains when all provider retries fail
Files:
src/ai_company/engine/parallel.pysrc/ai_company/engine/parallel_models.py
DESIGN_SPEC.md
📄 CodeRabbit inference engine (CLAUDE.md)
Update
DESIGN_SPEC.mdafter approved deviations to reflect the new reality
Files:
DESIGN_SPEC.md
🧠 Learnings (6)
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code instead of bare `create_task`
Applied to files:
src/ai_company/engine/parallel.pyDESIGN_SPEC.md
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{api,config,providers}/**/*.py : Validate at system boundaries (user input, external APIs, config files)
Applied to files:
src/ai_company/engine/parallel_models.py
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to DESIGN_SPEC.md : Update `DESIGN_SPEC.md` after approved deviations to reflect the new reality
Applied to files:
DESIGN_SPEC.md
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: If implementation deviates from `DESIGN_SPEC.md`, alert the user and explain why — do not silently diverge. User decides whether to proceed or update the spec
Applied to files:
DESIGN_SPEC.md
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/**/*.py : Use structured logging with constants from `ai_company.observability.events.<domain>` — always log as `logger.info(EVENT_CONSTANT, key=value)` never with format strings
Applied to files:
DESIGN_SPEC.md
📚 Learning: 2026-03-07T17:28:05.391Z
Learnt from: CR
Repo: Aureliolo/ai-company PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-07T17:28:05.391Z
Learning: Applies to src/ai_company/{providers,tools,engine,communication}/**/*.py : Use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization) to prevent unintended mutations
Applied to files:
DESIGN_SPEC.md
🧬 Code graph analysis (3)
tests/unit/engine/test_parallel_models.py (8)
src/ai_company/core/agent.py (3)
AgentIdentity(246-304)ModelConfig(145-174)PersonalityConfig(25-122)src/ai_company/core/enums.py (5)
Complexity(171-177)Priority(162-168)SeniorityLevel(6-21)TaskStatus(122-148)TaskType(151-159)src/ai_company/core/task.py (1)
Task(38-212)src/ai_company/engine/context.py (2)
AgentContext(87-307)from_identity(140-171)src/ai_company/engine/prompt.py (1)
SystemPrompt(56-82)src/ai_company/engine/run_result.py (2)
AgentRunResult(19-109)termination_reason(64-66)src/ai_company/providers/enums.py (1)
MessageRole(6-12)src/ai_company/providers/models.py (1)
ChatMessage(138-210)
tests/unit/engine/test_parallel.py (6)
src/ai_company/engine/context.py (2)
AgentContext(87-307)from_identity(140-171)src/ai_company/engine/errors.py (2)
ParallelExecutionError(42-43)ResourceConflictError(46-47)src/ai_company/engine/loop_protocol.py (2)
ExecutionResult(78-135)TerminationReason(28-35)src/ai_company/engine/parallel.py (2)
ParallelExecutor(77-547)execute_group(106-192)src/ai_company/engine/parallel_models.py (9)
AgentAssignment(23-89)ParallelExecutionGroup(92-140)ParallelProgress(238-276)agent_id(79-81)task_id(87-89)all_succeeded(233-235)agents_succeeded(217-219)agents_failed(225-227)pending(274-276)src/ai_company/engine/resource_lock.py (7)
InMemoryResourceLock(64-137)is_locked(55-57)is_locked(131-133)acquire(31-38)acquire(76-96)holder_of(59-61)holder_of(135-137)
src/ai_company/engine/parallel.py (4)
src/ai_company/engine/errors.py (2)
ParallelExecutionError(42-43)ResourceConflictError(46-47)src/ai_company/engine/parallel_models.py (10)
AgentAssignment(23-89)AgentOutcome(143-180)ParallelExecutionGroup(92-140)ParallelExecutionResult(183-235)ParallelProgress(238-276)agents_succeeded(217-219)agents_failed(225-227)task_id(87-89)agent_id(79-81)pending(274-276)src/ai_company/engine/resource_lock.py (8)
InMemoryResourceLock(64-137)ResourceLock(24-61)acquire(31-38)acquire(76-96)holder_of(59-61)holder_of(135-137)release_all(48-53)release_all(118-129)src/ai_company/engine/shutdown.py (2)
unregister_task(471-479)register_task(443-469)
🪛 GitHub Actions: CI
tests/unit/engine/test_parallel_models.py
[error] 169-169: mypy check failed. Command 'uv run mypy src/ tests/' reported 1 error: Argument 1 to "_make_assignment" has incompatible type "**dict[str, object]"; expected "str".
| lock = self._resolve_lock(group) | ||
| self._validate_resource_claims(group) | ||
|
|
||
| if lock is not None: | ||
| await self._acquire_all_locks(group, lock) | ||
|
|
||
| outcomes: dict[str, AgentOutcome] = {} | ||
| fatal_errors: list[Exception] = [] | ||
| progress = _ProgressState( | ||
| group_id=group.group_id, | ||
| total=len(group.assignments), | ||
| ) | ||
|
|
||
| try: | ||
| await self._run_task_group( | ||
| group, | ||
| outcomes, | ||
| fatal_errors, | ||
| progress, | ||
| ) | ||
| finally: | ||
| if lock is not None: | ||
| try: | ||
| await self._release_all_locks(group, lock) |
There was a problem hiding this comment.
Move lock acquisition inside the outer try/finally.
If _acquire_all_locks() raises, or the coroutine is cancelled after earlier claims were acquired, the current finally never runs because the try starts later. That leaks locks and can block subsequent groups.
🔧 Suggested restructuring
lock = self._resolve_lock(group)
self._validate_resource_claims(group)
-
- if lock is not None:
- await self._acquire_all_locks(group, lock)
outcomes: dict[str, AgentOutcome] = {}
fatal_errors: list[Exception] = []
progress = _ProgressState(
group_id=group.group_id,
total=len(group.assignments),
)
try:
+ if lock is not None:
+ await self._acquire_all_locks(group, lock)
await self._run_task_group(
group,
outcomes,
fatal_errors,
progress,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/parallel.py` around lines 134 - 157, The lock
acquisition currently happens before the outer try, so if _acquire_all_locks()
raises or cancellation occurs during acquisition, the finally that calls
_release_all_locks() (which is inside the try/finally) never runs and locks can
leak; fix by moving the await self._acquire_all_locks(group, lock) call inside
the try block immediately after resolving lock and validating claims, and either
rely on _release_all_locks() to be safe to call when acquisition was partial or
set a local flag (e.g., acquired = True) after a successful await
self._acquire_all_locks(...) and check that flag in the finally before calling
await self._release_all_locks(group, lock); keep the calls to _resolve_lock,
_validate_resource_claims, _run_task_group, and _release_all_locks unchanged
otherwise.
| except Exception as exc: | ||
| self._record_error_outcome( | ||
| exc, | ||
| assignment, | ||
| group, | ||
| outcomes, | ||
| progress, | ||
| ) | ||
| if group.fail_fast: | ||
| raise |
There was a problem hiding this comment.
Don’t count every returned AgentRunResult as a success.
AgentRunResult.is_success already distinguishes successful completions from completed-but-failed runs. This code always increments progress.succeeded, logs success=True, and only lets fail_fast react to raised exceptions, so a returned TerminationReason.ERROR/MAX_TURNS/SHUTDOWN result is misreported and won’t cancel siblings.
Also applies to: 356-376
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/parallel.py` around lines 274 - 283, The code currently
treats every completed AgentRunResult as a success; change both completion paths
(the except block at the shown location and the similar block at lines 356-376)
to inspect AgentRunResult.is_success: if result.is_success is True then
increment progress.succeeded and log success=True, otherwise call
self._record_error_outcome(result, assignment, group, outcomes, progress) (or
the same signature used there) and, if group.fail_fast, propagate by raising an
exception or otherwise triggering cancellation so siblings are stopped. Use the
symbols AgentRunResult.is_success, progress.succeeded, _record_error_outcome,
and group.fail_fast to locate and implement the branching logic in both places.
src/ai_company/engine/parallel.py
Outdated
| acquired = await lock.acquire( | ||
| resource, | ||
| assignment.agent_id, | ||
| ) | ||
| if not acquired: | ||
| holder = lock.holder_of(resource) | ||
| msg = f"Failed to acquire lock on {resource!r}: held by {holder!r}" | ||
| logger.warning( | ||
| PARALLEL_VALIDATION_ERROR, | ||
| group_id=group.group_id, | ||
| error=msg, | ||
| ) | ||
| await self._release_all_locks(group, lock) | ||
| raise ResourceConflictError(msg) | ||
|
|
||
| async def _release_all_locks( | ||
| self, | ||
| group: ParallelExecutionGroup, | ||
| lock: ResourceLock, | ||
| ) -> None: | ||
| """Release all resource locks for all assignments.""" | ||
| for assignment in group.assignments: | ||
| await lock.release_all(assignment.agent_id) |
There was a problem hiding this comment.
Use an execution-scoped lock holder instead of raw agent_id.
InMemoryResourceLock.acquire() treats the same holder as re-entrant, and release_all() releases every resource owned by that holder. Reusing agent_id here lets two concurrent groups for the same agent share a file lock and lets one group release the other group’s locks during cleanup.
🔧 Suggested holder scoping
for assignment in group.assignments:
+ holder = f"{group.group_id}:{assignment.task_id}"
for resource in assignment.resource_claims:
acquired = await lock.acquire(
resource,
- assignment.agent_id,
+ holder,
)
if not acquired:
holder = lock.holder_of(resource)
msg = f"Failed to acquire lock on {resource!r}: held by {holder!r}"
@@
for assignment in group.assignments:
- await lock.release_all(assignment.agent_id)
+ holder = f"{group.group_id}:{assignment.task_id}"
+ await lock.release_all(holder)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| acquired = await lock.acquire( | |
| resource, | |
| assignment.agent_id, | |
| ) | |
| if not acquired: | |
| holder = lock.holder_of(resource) | |
| msg = f"Failed to acquire lock on {resource!r}: held by {holder!r}" | |
| logger.warning( | |
| PARALLEL_VALIDATION_ERROR, | |
| group_id=group.group_id, | |
| error=msg, | |
| ) | |
| await self._release_all_locks(group, lock) | |
| raise ResourceConflictError(msg) | |
| async def _release_all_locks( | |
| self, | |
| group: ParallelExecutionGroup, | |
| lock: ResourceLock, | |
| ) -> None: | |
| """Release all resource locks for all assignments.""" | |
| for assignment in group.assignments: | |
| await lock.release_all(assignment.agent_id) | |
| for assignment in group.assignments: | |
| holder = f"{group.group_id}:{assignment.task_id}" | |
| for resource in assignment.resource_claims: | |
| acquired = await lock.acquire( | |
| resource, | |
| holder, | |
| ) | |
| if not acquired: | |
| holder = lock.holder_of(resource) | |
| msg = f"Failed to acquire lock on {resource!r}: held by {holder!r}" | |
| logger.warning( | |
| PARALLEL_VALIDATION_ERROR, | |
| group_id=group.group_id, | |
| error=msg, | |
| ) | |
| await self._release_all_locks(group, lock) | |
| raise ResourceConflictError(msg) | |
| async def _release_all_locks( | |
| self, | |
| group: ParallelExecutionGroup, | |
| lock: ResourceLock, | |
| ) -> None: | |
| """Release all resource locks for all assignments.""" | |
| for assignment in group.assignments: | |
| holder = f"{group.group_id}:{assignment.task_id}" | |
| await lock.release_all(holder) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/parallel.py` around lines 504 - 526, The lock holder
should be scoped to the execution group so concurrent groups for the same agent
don't share locks; create an execution-scoped holder (e.g. combine
group.group_id and assignment.agent_id into a single holder string) and pass
that holder to ResourceLock.acquire and later to ResourceLock.release_all (and
use it when calling lock.holder_of for logging) instead of passing raw
assignment.agent_id, ensuring acquire/release_all use the same composite holder
per group.
| def _make_assignment( | ||
| name: str = "agent", | ||
| title: str = "task", | ||
| **kwargs: object, | ||
| ) -> AgentAssignment: | ||
| return AgentAssignment( | ||
| identity=_make_identity(name), | ||
| task=_make_task(title), | ||
| **kwargs, # type: ignore[arg-type] | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, check the file exists and get basic info
wc -l tests/unit/engine/test_parallel_models.pyRepository: Aureliolo/ai-company
Length of output: 109
🏁 Script executed:
# Read the _make_assignment function (lines 73-82)
sed -n '70,85p' tests/unit/engine/test_parallel_models.pyRepository: Aureliolo/ai-company
Length of output: 421
🏁 Script executed:
# Read the context around line 169
sed -n '150,175p' tests/unit/engine/test_parallel_models.pyRepository: Aureliolo/ai-company
Length of output: 813
🏁 Script executed:
# Check for mypy configuration and see if there's a mypy error report
find . -name "*.py" -o -name "pyproject.toml" -o -name "mypy.ini" | head -20
cat pyproject.toml 2>/dev/null | grep -A 20 "\[tool.mypy\]"Repository: Aureliolo/ai-company
Length of output: 1305
🏁 Script executed:
# Search for AgentOutcome fixtures mentioned in the review
rg "AgentOutcome" tests/unit/engine/test_parallel_models.py -B2 -A2 | head -100Repository: Aureliolo/ai-company
Length of output: 1941
🏁 Script executed:
# Find _make_run_result function definition
sed -n '85,110p' tests/unit/engine/test_parallel_models.pyRepository: Aureliolo/ai-company
Length of output: 977
🏁 Script executed:
# Check AgentOutcome model definition
rg "class AgentOutcome" src/ai_company/engine/ -A 15Repository: Aureliolo/ai-company
Length of output: 1329
🏁 Script executed:
# Check _make_run_result in more detail
rg "_make_run_result" tests/unit/engine/test_parallel_models.py -B2 -A5 | head -80Repository: Aureliolo/ai-company
Length of output: 1745
🏁 Script executed:
# Check for any mypy errors in CI or build files
find . -name "*.yml" -o -name "*.yaml" | xargs grep -l "mypy\|type.*check" 2>/dev/null | head -5Repository: Aureliolo/ai-company
Length of output: 90
🏁 Script executed:
# Check the CI workflow for mypy errors or test results
cat .github/workflows/ci.yml | head -100Repository: Aureliolo/ai-company
Length of output: 2479
🏁 Script executed:
# Look for the full _make_run_result to see what task_id and agent_id it returns
sed -n '85,115p' tests/unit/engine/test_parallel_models.pyRepository: Aureliolo/ai-company
Length of output: 1030
🏁 Script executed:
# Check the exact test that was mentioned to fail (around line 169)
sed -n '154,175p' tests/unit/engine/test_parallel_models.pyRepository: Aureliolo/ai-company
Length of output: 667
Make _make_assignment compatible with the dynamic kwargs test.
The current helper signature causes a CI mypy error: when calling _make_assignment(**{field: value}) with field as a string variable, mypy cannot exclude the possibility that the unpacked key is name or title. Since value is typed object, it cannot guarantee compliance with the str type annotation. Making name and title positional-only prevents keyword binding and resolves the type error.
Additionally, the AgentOutcome test fixtures use hardcoded IDs (t1, a1, t2, a2) that do not match the IDs embedded in the result returned by _make_run_result(). This creates internally inconsistent test state and should be corrected to ensure fixtures reflect valid model invariants.
🔧 Minimal fix for mypy blocker
def _make_assignment(
name: str = "agent",
title: str = "task",
+ /,
**kwargs: object,
) -> AgentAssignment:📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _make_assignment( | |
| name: str = "agent", | |
| title: str = "task", | |
| **kwargs: object, | |
| ) -> AgentAssignment: | |
| return AgentAssignment( | |
| identity=_make_identity(name), | |
| task=_make_task(title), | |
| **kwargs, # type: ignore[arg-type] | |
| ) | |
| def _make_assignment( | |
| name: str = "agent", | |
| title: str = "task", | |
| /, | |
| **kwargs: object, | |
| ) -> AgentAssignment: | |
| return AgentAssignment( | |
| identity=_make_identity(name), | |
| task=_make_task(title), | |
| **kwargs, # type: ignore[arg-type] | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_parallel_models.py` around lines 73 - 82, Make
_make_assignment positional-only for name and title to avoid mypy errors when
callers unpack dynamic kwargs: change the signature of _make_assignment so name
and title cannot be bound via keywords (e.g., make them positional-only),
keeping return type AgentAssignment and continuing to call _make_identity and
_make_task inside; also fix the AgentOutcome test fixtures that hardcode ids
("t1","a1","t2","a2") so they match the ids produced by _make_run_result (or
derive the fixture ids from _make_run_result) to ensure internal consistency
between the fixtures and the run result.
| def test_success_outcome(self) -> None: | ||
| result = _make_run_result() | ||
| outcome = AgentOutcome( | ||
| task_id="t1", | ||
| agent_id="a1", | ||
| result=result, | ||
| ) | ||
|
|
||
| assert outcome.result is result | ||
| assert outcome.error is None | ||
| assert outcome.is_success is True | ||
|
|
There was a problem hiding this comment.
Keep these AgentOutcome fixtures internally consistent.
These success fixtures wrap _make_run_result() with hardcoded "a1"/"t1"-style IDs even though the embedded AgentRunResult carries different agent_id/task_id values. That bakes an invalid state into the suite and will block tightening the AgentOutcome invariant. Build the wrapper IDs from the AgentRunResult instead.
Also applies to: 311-321, 357-418
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_parallel_models.py` around lines 288 - 299, The
test_success_outcome fixture creates an AgentOutcome with hardcoded "a1"/"t1"
IDs that contradict the embedded AgentRunResult produced by _make_run_result;
update the test to derive task_id and agent_id for the AgentOutcome from the
AgentRunResult returned by _make_run_result (e.g., use result.task_id and
result.agent_id) so the AgentOutcome wrapper is internally consistent with the
AgentRunResult; apply the same change pattern to the other fixtures mentioned
(lines around test cases at 311-321 and 357-418) that currently hardcode IDs.
- Move lock acquisition inside try/finally to prevent leak on partial acquire - Check run_result.is_success before counting as succeeded in progress - Use execution-scoped lock holder (group_id:task_id) for cross-group safety - Change lock-release failure event to PARALLEL_VALIDATION_ERROR - Narrow _record_fatal_outcome exc type to MemoryError | RecursionError - Scope auto-created lock locally instead of mutating self - Add result.task_id/agent_id matching in AgentOutcome validator - Make _make_assignment positional-only (fixes mypy arg-type) - Fix AgentOutcome test fixture IDs to match _make_run_result Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| except* Exception: # noqa: S110 | ||
| # TaskGroup wraps exceptions in ExceptionGroup when | ||
| # fail_fast re-raises inside _run_guarded. | ||
| # Outcomes from completed tasks are already collected; | ||
| # individual errors logged in _record_error_outcome. | ||
| pass |
There was a problem hiding this comment.
_run_task_group() unconditionally swallows any ExceptionGroup from TaskGroup via except* Exception: pass. That can mask unexpected exceptions (e.g., bugs occurring outside _run_guarded’s try/except) and make failures silent. Consider only suppressing the ExceptionGroup when group.fail_fast is enabled (and re-raise otherwise), and/or log the exception group before suppressing it.
| except* Exception: # noqa: S110 | |
| # TaskGroup wraps exceptions in ExceptionGroup when | |
| # fail_fast re-raises inside _run_guarded. | |
| # Outcomes from completed tasks are already collected; | |
| # individual errors logged in _record_error_outcome. | |
| pass | |
| except* Exception as eg: # noqa: S110 | |
| # TaskGroup wraps exceptions in ExceptionGroup when | |
| # fail_fast re-raises inside _run_guarded. | |
| # Outcomes from completed tasks are already collected; | |
| # individual errors logged in _record_error_outcome. | |
| logger.error( | |
| "Unhandled exception group in _run_task_group: %r", | |
| eg, | |
| ) | |
| if not getattr(group, "fail_fast", False): | |
| # For non-fail-fast groups, propagate the exception group | |
| # so callers can observe the failure. | |
| raise |
| await self._release_all_locks(group, lock) | ||
| except Exception: | ||
| logger.exception( | ||
| PARALLEL_VALIDATION_ERROR, | ||
| error="Failed to release resource locks", | ||
| group_id=group.group_id, | ||
| ) |
There was a problem hiding this comment.
In the finally block, failures to release resource locks are logged under PARALLEL_VALIDATION_ERROR with message "Failed to release resource locks". This isn’t a validation error, and reusing the validation event makes observability harder to interpret. Consider logging under a lock-specific event (or adding one) so dashboards/alerts can distinguish validation failures from lock lifecycle failures.
| │ │ ├── plan_parsing.py # Plan response parsing utilities | ||
| │ │ ├── plan_execute_loop.py # Plan-and-Execute loop implementation | ||
| │ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback) |
There was a problem hiding this comment.
The engine file tree lists plan_parsing.py twice (once as “Plan response parsing utilities” and again as “Plan extraction from LLM responses (JSON + text fallback)”). This looks like an accidental duplicate entry; consider consolidating to a single line with the correct description.
| │ │ ├── plan_parsing.py # Plan response parsing utilities | |
| │ │ ├── plan_execute_loop.py # Plan-and-Execute loop implementation | |
| │ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback) | |
| │ │ ├── plan_parsing.py # Plan response parsing utilities + extraction from LLM responses (JSON + text fallback) | |
| │ │ ├── plan_execute_loop.py # Plan-and-Execute loop implementation |
| │ │ ├── plan_models.py # Plan step, plan, and plan-execute config models | ||
| │ │ ├── plan_parsing.py # Plan response parsing utilities | ||
| │ │ ├── plan_execute_loop.py # Plan-and-Execute loop implementation | ||
| │ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback) |
There was a problem hiding this comment.
The plan_parsing.py entry appears twice in the engine module file tree — once at line 2377 with description "Plan response parsing utilities" and again at line 2379 with "Plan extraction from LLM responses (JSON + text fallback)". The new entry was added but the original wasn't removed.
| │ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback) | |
| │ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: DESIGN_SPEC.md
Line: 2379
Comment:
The `plan_parsing.py` entry appears twice in the engine module file tree — once at line 2377 with description "Plan response parsing utilities" and again at line 2379 with "Plan extraction from LLM responses (JSON + text fallback)". The new entry was added but the original wasn't removed.
```suggestion
│ │ ├── plan_parsing.py # Plan extraction from LLM responses (JSON + text fallback)
```
How can I resolve this? If you propose a fix, please make it concise.| except asyncio.CancelledError: | ||
| outcomes[task_id] = AgentOutcome( | ||
| task_id=task_id, | ||
| agent_id=agent_id, | ||
| error="Cancelled", | ||
| ) | ||
| progress.failed += 1 | ||
| raise |
There was a problem hiding this comment.
When a task is cancelled (e.g., via fail_fast sibling cancellation), the except asyncio.CancelledError handler records the outcome and updates progress but emits no structured log event. Every other error path emits PARALLEL_AGENT_ERROR — _register_with_shutdown (line 320), _record_error_outcome (line 397), and _record_fatal_outcome (line 416) all do. A log consumer monitoring execution.parallel.agent_error events will silently miss cancelled agents; the only signal is the aggregate failed count in the final PARALLEL_GROUP_COMPLETE.
Adding a log event would keep cancellation observable through the same event stream as other failures:
| except asyncio.CancelledError: | |
| outcomes[task_id] = AgentOutcome( | |
| task_id=task_id, | |
| agent_id=agent_id, | |
| error="Cancelled", | |
| ) | |
| progress.failed += 1 | |
| raise | |
| except asyncio.CancelledError: | |
| outcomes[task_id] = AgentOutcome( | |
| task_id=task_id, | |
| agent_id=agent_id, | |
| error="Cancelled", | |
| ) | |
| progress.failed += 1 | |
| logger.warning( | |
| PARALLEL_AGENT_ERROR, | |
| agent_id=agent_id, | |
| task_id=task_id, | |
| error="Cancelled", | |
| ) | |
| raise |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/parallel.py
Line: 283-290
Comment:
When a task is cancelled (e.g., via `fail_fast` sibling cancellation), the `except asyncio.CancelledError` handler records the outcome and updates progress but emits no structured log event. Every other error path emits `PARALLEL_AGENT_ERROR` — `_register_with_shutdown` (line 320), `_record_error_outcome` (line 397), and `_record_fatal_outcome` (line 416) all do. A log consumer monitoring `execution.parallel.agent_error` events will silently miss cancelled agents; the only signal is the aggregate `failed` count in the final `PARALLEL_GROUP_COMPLETE`.
Adding a log event would keep cancellation observable through the same event stream as other failures:
```suggestion
except asyncio.CancelledError:
outcomes[task_id] = AgentOutcome(
task_id=task_id,
agent_id=agent_id,
error="Cancelled",
)
progress.failed += 1
logger.warning(
PARALLEL_AGENT_ERROR,
agent_id=agent_id,
task_id=task_id,
error="Cancelled",
)
raise
```
How can I resolve this? If you propose a fix, please make it concise.| try: | ||
| progress.in_progress += 1 | ||
| self._emit_progress(progress) |
There was a problem hiding this comment.
progress.in_progress is incremented here before _execute_assignment is called. Inside _execute_assignment, the actual engine run sits behind the semaphore wait (line 354, async with ctx:). With max_concurrency=2 and 5 agents, all 5 coroutines will increment in_progress to 5 immediately, while only 2 are actually held by the semaphore.
The ParallelProgress docstring specifies in_progress as "Number of assignments currently running" and its derived pending field as "Not yet started". A task queued in the semaphore would appear as in_progress rather than pending, which is semantically misleading to progress callback consumers.
Moving the increment to inside _execute_assignment, after the semaphore is acquired (or updating the docstring to explicitly clarify that in_progress includes semaphore-queued tasks), would align semantics with consumer expectations.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/parallel.py
Line: 254-256
Comment:
`progress.in_progress` is incremented here before `_execute_assignment` is called. Inside `_execute_assignment`, the actual engine run sits behind the semaphore wait (line 354, `async with ctx:`). With `max_concurrency=2` and 5 agents, all 5 coroutines will increment `in_progress` to 5 immediately, while only 2 are actually held by the semaphore.
The `ParallelProgress` docstring specifies `in_progress` as "Number of assignments **currently running**" and its derived `pending` field as "Not yet started". A task queued in the semaphore would appear as `in_progress` rather than `pending`, which is semantically misleading to progress callback consumers.
Moving the increment to inside `_execute_assignment`, after the semaphore is acquired (or updating the docstring to explicitly clarify that `in_progress` includes semaphore-queued tasks), would align semantics with consumer expectations.
How can I resolve this? If you propose a fix, please make it concise.Address 13 findings from post-merge bot reviews on PRs #157-#162: Communication layer (PR #157): - Wake blocked receive() callers on unsubscribe via None sentinel - Distinguish shutdown vs timeout in receive() logging - Add AgentMessenger.receive() facade method - Validate MessageHandler.handle() is async at registration - Await cancelled tasks in _await_with_shutdown to prevent warnings Observability (PR #158): - Add log-before-raise to all validators missing it (company.py, schema.py) — 14 raise sites across 11 validators Parallel execution (PR #161): - Log suppressed ExceptionGroup instead of silent pass - Add PARALLEL_AGENT_CANCELLED structured event for cancellations - Fix progress.in_progress semantics (increment after semaphore) - Use PARALLEL_LOCK_RELEASE_ERROR for lock release failures - Remove duplicate plan_parsing.py from DESIGN_SPEC file tree Template inheritance (PR #162): - Update DESIGN_SPEC merge key docs to include merge_id - Preserve merge_id in _expand_single_agent (confirmed bug fix) - Defer deepcopy in _apply_child_agent past _remove early-return Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
🤖 I have created a release *beep* *boop* --- ## [0.1.1](ai-company-v0.1.0...ai-company-v0.1.1) (2026-03-10) ### Features * add autonomy levels and approval timeout policies ([#42](#42), [#126](#126)) ([#197](#197)) ([eecc25a](eecc25a)) * add CFO cost optimization service with anomaly detection, reports, and approval decisions ([#186](#186)) ([a7fa00b](a7fa00b)) * add code quality toolchain (ruff, mypy, pre-commit, dependabot) ([#63](#63)) ([36681a8](36681a8)) * add configurable cost tiers and subscription/quota-aware tracking ([#67](#67)) ([#185](#185)) ([9baedfa](9baedfa)) * add container packaging, Docker Compose, and CI pipeline ([#269](#269)) ([435bdfe](435bdfe)), closes [#267](#267) * add coordination error taxonomy classification pipeline ([#146](#146)) ([#181](#181)) ([70c7480](70c7480)) * add cost-optimized, hierarchical, and auction assignment strategies ([#175](#175)) ([ce924fa](ce924fa)), closes [#173](#173) * add design specification, license, and project setup ([8669a09](8669a09)) * add env var substitution and config file auto-discovery ([#77](#77)) ([7f53832](7f53832)) * add FastestStrategy routing + vendor-agnostic cleanup ([#140](#140)) ([09619cb](09619cb)), closes [#139](#139) * add HR engine and performance tracking ([#45](#45), [#47](#47)) ([#193](#193)) ([2d091ea](2d091ea)) * add issue auto-search and resolution verification to PR review skill ([#119](#119)) ([deecc39](deecc39)) * add memory retrieval, ranking, and context injection pipeline ([#41](#41)) ([873b0aa](873b0aa)) * add pluggable MemoryBackend protocol with models, config, and events ([#180](#180)) ([46cfdd4](46cfdd4)) * add pluggable MemoryBackend protocol with models, config, and events ([#32](#32)) ([46cfdd4](46cfdd4)) * add pluggable PersistenceBackend protocol with SQLite implementation ([#36](#36)) ([f753779](f753779)) * add progressive trust and promotion/demotion subsystems ([#43](#43), [#49](#49)) ([3a87c08](3a87c08)) * add retry handler, rate limiter, and provider resilience ([#100](#100)) ([b890545](b890545)) * add SecOps security agent with rule engine, audit log, and ToolInvoker integration ([#40](#40)) ([83b7b6c](83b7b6c)) * add shared org memory and memory consolidation/archival ([#125](#125), [#48](#48)) ([4a0832b](4a0832b)) * design unified provider interface ([#86](#86)) ([3e23d64](3e23d64)) * expand template presets, rosters, and add inheritance ([#80](#80), [#81](#81), [#84](#84)) ([15a9134](15a9134)) * implement agent runtime state vs immutable config split ([#115](#115)) ([4cb1ca5](4cb1ca5)) * implement AgentEngine core orchestrator ([#11](#11)) ([#143](#143)) ([f2eb73a](f2eb73a)) * implement basic tool system (registry, invocation, results) ([#15](#15)) ([c51068b](c51068b)) * implement built-in file system tools ([#18](#18)) ([325ef98](325ef98)) * implement communication foundation — message bus, dispatcher, and messenger ([#157](#157)) ([8e71bfd](8e71bfd)) * implement company template system with 7 built-in presets ([#85](#85)) ([cbf1496](cbf1496)) * implement conflict resolution protocol ([#122](#122)) ([#166](#166)) ([e03f9f2](e03f9f2)) * implement core entity and role system models ([#69](#69)) ([acf9801](acf9801)) * implement crash recovery with fail-and-reassign strategy ([#149](#149)) ([e6e91ed](e6e91ed)) * implement engine extensions — Plan-and-Execute loop and call categorization ([#134](#134), [#135](#135)) ([#159](#159)) ([9b2699f](9b2699f)) * implement enterprise logging system with structlog ([#73](#73)) ([2f787e5](2f787e5)) * implement graceful shutdown with cooperative timeout strategy ([#130](#130)) ([6592515](6592515)) * implement hierarchical delegation and loop prevention ([#12](#12), [#17](#17)) ([6be60b6](6be60b6)) * implement LiteLLM driver and provider registry ([#88](#88)) ([ae3f18b](ae3f18b)), closes [#4](#4) * implement LLM decomposition strategy and workspace isolation ([#174](#174)) ([aa0eefe](aa0eefe)) * implement meeting protocol system ([#123](#123)) ([ee7caca](ee7caca)) * implement message and communication domain models ([#74](#74)) ([560a5d2](560a5d2)) * implement model routing engine ([#99](#99)) ([d3c250b](d3c250b)) * implement parallel agent execution ([#22](#22)) ([#161](#161)) ([65940b3](65940b3)) * implement per-call cost tracking service ([#7](#7)) ([#102](#102)) ([c4f1f1c](c4f1f1c)) * implement personality injection and system prompt construction ([#105](#105)) ([934dd85](934dd85)) * implement single-task execution lifecycle ([#21](#21)) ([#144](#144)) ([c7e64e4](c7e64e4)) * implement subprocess sandbox for tool execution isolation ([#131](#131)) ([#153](#153)) ([3c8394e](3c8394e)) * implement task assignment subsystem with pluggable strategies ([#172](#172)) ([c7f1b26](c7f1b26)), closes [#26](#26) [#30](#30) * implement task decomposition and routing engine ([#14](#14)) ([9c7fb52](9c7fb52)) * implement Task, Project, Artifact, Budget, and Cost domain models ([#71](#71)) ([81eabf1](81eabf1)) * implement tool permission checking ([#16](#16)) ([833c190](833c190)) * implement YAML config loader with Pydantic validation ([#59](#59)) ([ff3a2ba](ff3a2ba)) * implement YAML config loader with Pydantic validation ([#75](#75)) ([ff3a2ba](ff3a2ba)) * initialize project with uv, hatchling, and src layout ([39005f9](39005f9)) * initialize project with uv, hatchling, and src layout ([#62](#62)) ([39005f9](39005f9)) * Litestar REST API, WebSocket feed, and approval queue (M6) ([#189](#189)) ([29fcd08](29fcd08)) * make TokenUsage.total_tokens a computed field ([#118](#118)) ([c0bab18](c0bab18)), closes [#109](#109) * parallel tool execution in ToolInvoker.invoke_all ([#137](#137)) ([58517ee](58517ee)) * testing framework, CI pipeline, and M0 gap fixes ([#64](#64)) ([f581749](f581749)) * wire all modules into observability system ([#97](#97)) ([f7a0617](f7a0617)) ### Bug Fixes * address Greptile post-merge review findings from PRs [#170](https://github.com/Aureliolo/ai-company/issues/170)-[#175](https://github.com/Aureliolo/ai-company/issues/175) ([#176](#176)) ([c5ca929](c5ca929)) * address post-merge review feedback from PRs [#164](https://github.com/Aureliolo/ai-company/issues/164)-[#167](https://github.com/Aureliolo/ai-company/issues/167) ([#170](#170)) ([3bf897a](3bf897a)), closes [#169](#169) * enforce strict mypy on test files ([#89](#89)) ([aeeff8c](aeeff8c)) * harden Docker sandbox, MCP bridge, and code runner ([#50](#50), [#53](#53)) ([d5e1b6e](d5e1b6e)) * harden git tools security + code quality improvements ([#150](#150)) ([000a325](000a325)) * harden subprocess cleanup, env filtering, and shutdown resilience ([#155](#155)) ([d1fe1fb](d1fe1fb)) * incorporate post-merge feedback + pre-PR review fixes ([#164](#164)) ([c02832a](c02832a)) * pre-PR review fixes for post-merge findings ([#183](#183)) ([26b3108](26b3108)) * strengthen immutability for BaseTool schema and ToolInvoker boundaries ([#117](#117)) ([7e5e861](7e5e861)) ### Performance * harden non-inferable principle implementation ([#195](#195)) ([02b5f4e](02b5f4e)), closes [#188](#188) ### Refactoring * adopt NotBlankStr across all models ([#108](#108)) ([#120](#120)) ([ef89b90](ef89b90)) * extract _SpendingTotals base class from spending summary models ([#111](#111)) ([2f39c1b](2f39c1b)) * harden BudgetEnforcer with error handling, validation extraction, and review fixes ([#182](#182)) ([c107bf9](c107bf9)) * harden personality profiles, department validation, and template rendering ([#158](#158)) ([10b2299](10b2299)) * pre-PR review improvements for ExecutionLoop + ReAct loop ([#124](#124)) ([8dfb3c0](8dfb3c0)) * split events.py into per-domain event modules ([#136](#136)) ([e9cba89](e9cba89)) ### Documentation * add ADR-001 memory layer evaluation and selection ([#178](#178)) ([db3026f](db3026f)), closes [#39](#39) * add agent scaling research findings to DESIGN_SPEC ([#145](#145)) ([57e487b](57e487b)) * add CLAUDE.md, contributing guide, and dev documentation ([#65](#65)) ([55c1025](55c1025)), closes [#54](#54) * add crash recovery, sandboxing, analytics, and testing decisions ([#127](#127)) ([5c11595](5c11595)) * address external review feedback with MVP scope and new protocols ([#128](#128)) ([3b30b9a](3b30b9a)) * expand design spec with pluggable strategy protocols ([#121](#121)) ([6832db6](6832db6)) * finalize 23 design decisions (ADR-002) ([#190](#190)) ([8c39742](8c39742)) * update project docs for M2.5 conventions and add docs-consistency review agent ([#114](#114)) ([99766ee](99766ee)) ### Tests * add e2e single agent integration tests ([#24](#24)) ([#156](#156)) ([f566fb4](f566fb4)) * add provider adapter integration tests ([#90](#90)) ([40a61f4](40a61f4)) ### CI/CD * add Release Please for automated versioning and GitHub Releases ([#278](#278)) ([a488758](a488758)) * bump actions/checkout from 4 to 6 ([#95](#95)) ([1897247](1897247)) * bump actions/upload-artifact from 4 to 7 ([#94](#94)) ([27b1517](27b1517)) * harden CI/CD pipeline ([#92](#92)) ([ce4693c](ce4693c)) * split vulnerability scans into critical-fail and high-warn tiers ([#277](#277)) ([aba48af](aba48af)) ### Maintenance * add /worktree skill for parallel worktree management ([#171](#171)) ([951e337](951e337)) * add design spec context loading to research-link skill ([8ef9685](8ef9685)) * add post-merge-cleanup skill ([#70](#70)) ([f913705](f913705)) * add pre-pr-review skill and update CLAUDE.md ([#103](#103)) ([92e9023](92e9023)) * add research-link skill and rename skill files to SKILL.md ([#101](#101)) ([651c577](651c577)) * bump aiosqlite from 0.21.0 to 0.22.1 ([#191](#191)) ([3274a86](3274a86)) * bump pyyaml from 6.0.2 to 6.0.3 in the minor-and-patch group ([#96](#96)) ([0338d0c](0338d0c)) * bump ruff from 0.15.4 to 0.15.5 ([a49ee46](a49ee46)) * fix M0 audit items ([#66](#66)) ([c7724b5](c7724b5)) * pin setup-uv action to full SHA ([#281](#281)) ([4448002](4448002)) * post-audit cleanup — PEP 758, loggers, bug fixes, refactoring, tests, hookify rules ([#148](#148)) ([c57a6a9](c57a6a9)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
🤖 I have created a release *beep* *boop* --- ## [0.1.0](v0.0.0...v0.1.0) (2026-03-11) ### Features * add autonomy levels and approval timeout policies ([#42](#42), [#126](#126)) ([#197](#197)) ([eecc25a](eecc25a)) * add CFO cost optimization service with anomaly detection, reports, and approval decisions ([#186](#186)) ([a7fa00b](a7fa00b)) * add code quality toolchain (ruff, mypy, pre-commit, dependabot) ([#63](#63)) ([36681a8](36681a8)) * add configurable cost tiers and subscription/quota-aware tracking ([#67](#67)) ([#185](#185)) ([9baedfa](9baedfa)) * add container packaging, Docker Compose, and CI pipeline ([#269](#269)) ([435bdfe](435bdfe)), closes [#267](#267) * add coordination error taxonomy classification pipeline ([#146](#146)) ([#181](#181)) ([70c7480](70c7480)) * add cost-optimized, hierarchical, and auction assignment strategies ([#175](#175)) ([ce924fa](ce924fa)), closes [#173](#173) * add design specification, license, and project setup ([8669a09](8669a09)) * add env var substitution and config file auto-discovery ([#77](#77)) ([7f53832](7f53832)) * add FastestStrategy routing + vendor-agnostic cleanup ([#140](#140)) ([09619cb](09619cb)), closes [#139](#139) * add HR engine and performance tracking ([#45](#45), [#47](#47)) ([#193](#193)) ([2d091ea](2d091ea)) * add issue auto-search and resolution verification to PR review skill ([#119](#119)) ([deecc39](deecc39)) * add mandatory JWT + API key authentication ([#256](#256)) ([c279cfe](c279cfe)) * add memory retrieval, ranking, and context injection pipeline ([#41](#41)) ([873b0aa](873b0aa)) * add pluggable MemoryBackend protocol with models, config, and events ([#180](#180)) ([46cfdd4](46cfdd4)) * add pluggable MemoryBackend protocol with models, config, and events ([#32](#32)) ([46cfdd4](46cfdd4)) * add pluggable output scan response policies ([#263](#263)) ([b9907e8](b9907e8)) * add pluggable PersistenceBackend protocol with SQLite implementation ([#36](#36)) ([f753779](f753779)) * add progressive trust and promotion/demotion subsystems ([#43](#43), [#49](#49)) ([3a87c08](3a87c08)) * add retry handler, rate limiter, and provider resilience ([#100](#100)) ([b890545](b890545)) * add SecOps security agent with rule engine, audit log, and ToolInvoker integration ([#40](#40)) ([83b7b6c](83b7b6c)) * add shared org memory and memory consolidation/archival ([#125](#125), [#48](#48)) ([4a0832b](4a0832b)) * design unified provider interface ([#86](#86)) ([3e23d64](3e23d64)) * expand template presets, rosters, and add inheritance ([#80](#80), [#81](#81), [#84](#84)) ([15a9134](15a9134)) * implement agent runtime state vs immutable config split ([#115](#115)) ([4cb1ca5](4cb1ca5)) * implement AgentEngine core orchestrator ([#11](#11)) ([#143](#143)) ([f2eb73a](f2eb73a)) * implement AuditRepository for security audit log persistence ([#279](#279)) ([94bc29f](94bc29f)) * implement basic tool system (registry, invocation, results) ([#15](#15)) ([c51068b](c51068b)) * implement built-in file system tools ([#18](#18)) ([325ef98](325ef98)) * implement communication foundation — message bus, dispatcher, and messenger ([#157](#157)) ([8e71bfd](8e71bfd)) * implement company template system with 7 built-in presets ([#85](#85)) ([cbf1496](cbf1496)) * implement conflict resolution protocol ([#122](#122)) ([#166](#166)) ([e03f9f2](e03f9f2)) * implement core entity and role system models ([#69](#69)) ([acf9801](acf9801)) * implement crash recovery with fail-and-reassign strategy ([#149](#149)) ([e6e91ed](e6e91ed)) * implement engine extensions — Plan-and-Execute loop and call categorization ([#134](#134), [#135](#135)) ([#159](#159)) ([9b2699f](9b2699f)) * implement enterprise logging system with structlog ([#73](#73)) ([2f787e5](2f787e5)) * implement graceful shutdown with cooperative timeout strategy ([#130](#130)) ([6592515](6592515)) * implement hierarchical delegation and loop prevention ([#12](#12), [#17](#17)) ([6be60b6](6be60b6)) * implement LiteLLM driver and provider registry ([#88](#88)) ([ae3f18b](ae3f18b)), closes [#4](#4) * implement LLM decomposition strategy and workspace isolation ([#174](#174)) ([aa0eefe](aa0eefe)) * implement meeting protocol system ([#123](#123)) ([ee7caca](ee7caca)) * implement message and communication domain models ([#74](#74)) ([560a5d2](560a5d2)) * implement model routing engine ([#99](#99)) ([d3c250b](d3c250b)) * implement parallel agent execution ([#22](#22)) ([#161](#161)) ([65940b3](65940b3)) * implement per-call cost tracking service ([#7](#7)) ([#102](#102)) ([c4f1f1c](c4f1f1c)) * implement personality injection and system prompt construction ([#105](#105)) ([934dd85](934dd85)) * implement single-task execution lifecycle ([#21](#21)) ([#144](#144)) ([c7e64e4](c7e64e4)) * implement subprocess sandbox for tool execution isolation ([#131](#131)) ([#153](#153)) ([3c8394e](3c8394e)) * implement task assignment subsystem with pluggable strategies ([#172](#172)) ([c7f1b26](c7f1b26)), closes [#26](#26) [#30](#30) * implement task decomposition and routing engine ([#14](#14)) ([9c7fb52](9c7fb52)) * implement Task, Project, Artifact, Budget, and Cost domain models ([#71](#71)) ([81eabf1](81eabf1)) * implement tool permission checking ([#16](#16)) ([833c190](833c190)) * implement YAML config loader with Pydantic validation ([#59](#59)) ([ff3a2ba](ff3a2ba)) * implement YAML config loader with Pydantic validation ([#75](#75)) ([ff3a2ba](ff3a2ba)) * initialize project with uv, hatchling, and src layout ([39005f9](39005f9)) * initialize project with uv, hatchling, and src layout ([#62](#62)) ([39005f9](39005f9)) * Litestar REST API, WebSocket feed, and approval queue (M6) ([#189](#189)) ([29fcd08](29fcd08)) * make TokenUsage.total_tokens a computed field ([#118](#118)) ([c0bab18](c0bab18)), closes [#109](#109) * parallel tool execution in ToolInvoker.invoke_all ([#137](#137)) ([58517ee](58517ee)) * testing framework, CI pipeline, and M0 gap fixes ([#64](#64)) ([f581749](f581749)) * wire all modules into observability system ([#97](#97)) ([f7a0617](f7a0617)) ### Bug Fixes * address Greptile post-merge review findings from PRs [#170](https://github.com/Aureliolo/ai-company/issues/170)-[#175](https://github.com/Aureliolo/ai-company/issues/175) ([#176](#176)) ([c5ca929](c5ca929)) * address post-merge review feedback from PRs [#164](https://github.com/Aureliolo/ai-company/issues/164)-[#167](https://github.com/Aureliolo/ai-company/issues/167) ([#170](#170)) ([3bf897a](3bf897a)), closes [#169](#169) * enforce strict mypy on test files ([#89](#89)) ([aeeff8c](aeeff8c)) * harden Docker sandbox, MCP bridge, and code runner ([#50](#50), [#53](#53)) ([d5e1b6e](d5e1b6e)) * harden git tools security + code quality improvements ([#150](#150)) ([000a325](000a325)) * harden subprocess cleanup, env filtering, and shutdown resilience ([#155](#155)) ([d1fe1fb](d1fe1fb)) * incorporate post-merge feedback + pre-PR review fixes ([#164](#164)) ([c02832a](c02832a)) * pre-PR review fixes for post-merge findings ([#183](#183)) ([26b3108](26b3108)) * resolve circular imports, bump litellm, fix release tag format ([#286](#286)) ([a6659b5](a6659b5)) * strengthen immutability for BaseTool schema and ToolInvoker boundaries ([#117](#117)) ([7e5e861](7e5e861)) ### Performance * harden non-inferable principle implementation ([#195](#195)) ([02b5f4e](02b5f4e)), closes [#188](#188) ### Refactoring * adopt NotBlankStr across all models ([#108](#108)) ([#120](#120)) ([ef89b90](ef89b90)) * extract _SpendingTotals base class from spending summary models ([#111](#111)) ([2f39c1b](2f39c1b)) * harden BudgetEnforcer with error handling, validation extraction, and review fixes ([#182](#182)) ([c107bf9](c107bf9)) * harden personality profiles, department validation, and template rendering ([#158](#158)) ([10b2299](10b2299)) * pre-PR review improvements for ExecutionLoop + ReAct loop ([#124](#124)) ([8dfb3c0](8dfb3c0)) * split events.py into per-domain event modules ([#136](#136)) ([e9cba89](e9cba89)) ### Documentation * add ADR-001 memory layer evaluation and selection ([#178](#178)) ([db3026f](db3026f)), closes [#39](#39) * add agent scaling research findings to DESIGN_SPEC ([#145](#145)) ([57e487b](57e487b)) * add CLAUDE.md, contributing guide, and dev documentation ([#65](#65)) ([55c1025](55c1025)), closes [#54](#54) * add crash recovery, sandboxing, analytics, and testing decisions ([#127](#127)) ([5c11595](5c11595)) * address external review feedback with MVP scope and new protocols ([#128](#128)) ([3b30b9a](3b30b9a)) * expand design spec with pluggable strategy protocols ([#121](#121)) ([6832db6](6832db6)) * finalize 23 design decisions (ADR-002) ([#190](#190)) ([8c39742](8c39742)) * update project docs for M2.5 conventions and add docs-consistency review agent ([#114](#114)) ([99766ee](99766ee)) ### Tests * add e2e single agent integration tests ([#24](#24)) ([#156](#156)) ([f566fb4](f566fb4)) * add provider adapter integration tests ([#90](#90)) ([40a61f4](40a61f4)) ### CI/CD * add Release Please for automated versioning and GitHub Releases ([#278](#278)) ([a488758](a488758)) * bump actions/checkout from 4 to 6 ([#95](#95)) ([1897247](1897247)) * bump actions/upload-artifact from 4 to 7 ([#94](#94)) ([27b1517](27b1517)) * bump anchore/scan-action from 6.5.1 to 7.3.2 ([#271](#271)) ([80a1c15](80a1c15)) * bump docker/build-push-action from 6.19.2 to 7.0.0 ([#273](#273)) ([dd0219e](dd0219e)) * bump docker/login-action from 3.7.0 to 4.0.0 ([#272](#272)) ([33d6238](33d6238)) * bump docker/metadata-action from 5.10.0 to 6.0.0 ([#270](#270)) ([baee04e](baee04e)) * bump docker/setup-buildx-action from 3.12.0 to 4.0.0 ([#274](#274)) ([5fc06f7](5fc06f7)) * bump sigstore/cosign-installer from 3.9.1 to 4.1.0 ([#275](#275)) ([29dd16c](29dd16c)) * harden CI/CD pipeline ([#92](#92)) ([ce4693c](ce4693c)) * split vulnerability scans into critical-fail and high-warn tiers ([#277](#277)) ([aba48af](aba48af)) ### Maintenance * add /worktree skill for parallel worktree management ([#171](#171)) ([951e337](951e337)) * add design spec context loading to research-link skill ([8ef9685](8ef9685)) * add post-merge-cleanup skill ([#70](#70)) ([f913705](f913705)) * add pre-pr-review skill and update CLAUDE.md ([#103](#103)) ([92e9023](92e9023)) * add research-link skill and rename skill files to SKILL.md ([#101](#101)) ([651c577](651c577)) * bump aiosqlite from 0.21.0 to 0.22.1 ([#191](#191)) ([3274a86](3274a86)) * bump pyyaml from 6.0.2 to 6.0.3 in the minor-and-patch group ([#96](#96)) ([0338d0c](0338d0c)) * bump ruff from 0.15.4 to 0.15.5 ([a49ee46](a49ee46)) * fix M0 audit items ([#66](#66)) ([c7724b5](c7724b5)) * **main:** release ai-company 0.1.1 ([#282](#282)) ([2f4703d](2f4703d)) * pin setup-uv action to full SHA ([#281](#281)) ([4448002](4448002)) * post-audit cleanup — PEP 758, loggers, bug fixes, refactoring, tests, hookify rules ([#148](#148)) ([c57a6a9](c57a6a9)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --------- Signed-off-by: Aurelio <19254254+Aureliolo@users.noreply.github.com>
Summary
ParallelExecutororchestrates concurrentAgentEngine.run()calls usingasyncio.TaskGroupwith error isolation, concurrency limits (Semaphore), resource locking, and progress trackingParallelExecutionGroup/AgentAssignmentfrozen Pydantic models define execution groups with duplicate-agent/task validation,NotBlankStrresource claims, and optional timeout/concurrency settingsAgentOutcome/ParallelExecutionResult/ParallelProgresscapture per-agent results with@computed_fieldfor derived values (is_success,pending, cost aggregation)ResourceLockprotocol +InMemoryResourceLockprovide pluggable exclusive file-path locking with conflict detection and group-level acquire/releaseevents/parallel.py) for group start/complete, agent start/complete/error, lock operations, and progress updatesTaskFactory.dependencies = ()to prevent flaky Polyfactory self-dependency collisionsDesign Decisions
ParallelExecutorwrapsAgentEnginerather than extending it_run_guarded()pattern: mirrorsToolInvoker.invoke_all()— fatal errors (MemoryError,RecursionError) collected separately, regular exceptions stored as error outcomes,fail_fastre-raises throughTaskGroupTaskGroup, released after — no per-task lock release to avoid race conditionsInMemoryResourceLockauto-created only when assignments declare resource claims; no lock overhead when claims are emptyTest Plan
test_parallel.py— 18 tests covering: single/multi-agent execution, semaphore concurrency, fail-fast, error isolation, fatal error propagation, resource conflicts, progress callbacks, shutdown integration, timeout, memory messages, lock auto-creationtest_parallel_models.py— 24 tests covering: all model construction/validation, frozen enforcement, duplicate rejection, computed fields, cost aggregation, edge casestest_resource_lock.py— 9 tests covering: acquire/release, re-entrant locking, wrong-holder release, release_all, concurrent accessReview Coverage
Pre-reviewed by 9 agents, 30 findings addressed:
Closes #22
🤖 Generated with Claude Code