Conversation
Implement actor-like TaskEngine that owns all task state mutations via asyncio.Queue. A single background task processes mutations sequentially using model_copy(update=...), persists results, and publishes snapshots to the message bus. Reads bypass the queue for direct persistence access. - Add TaskEngine core with start/stop lifecycle, submit/convenience methods - Add 5 mutation types (create, update, transition, delete, cancel) - Add TaskMutationResult response and TaskStateChanged event models - Add TaskEngineConfig (queue size, drain timeout, snapshot toggle) - Add 4 error types (TaskEngineError, NotRunning, Mutation, VersionConflict) - Wire into API controllers, AppState, app lifecycle, and config - Add optional AgentEngine report-back for terminal task status - Add 57 unit tests covering all mutations, ordering, versioning, drain Closes #204
Pre-reviewed by 10 agents, 37 findings addressed: - Add exhaustive match default + typed error hierarchy (TaskNotFoundError, TaskEngineQueueFullError, TaskVersionConflictError) - Sanitize internal exception details from API responses - Add immutable field rejection validators on UpdateTaskMutation and TransitionTaskMutation - Thread previous_status through TaskMutationResult and snapshots - Add consistency model_validator to TaskMutationResult - Guard _processing_loop against unhandled exceptions - Fix startup cleanup to handle task engine failures - Replace assert with proper error handling in convenience methods - Add _fail_remaining_futures for drain timeout cleanup - Add comprehensive logging coverage (creation, conflicts, loop errors) - Add _not_found_result helper to reduce duplication - Extract _TERMINAL_STATUSES module constant - Use Self return type in model validators - Split broad except in _report_to_task_engine (TaskMutationError vs Exception) - Update docs: tech-stack Adopted, CLAUDE.md engine description, engine.md TaskEngine architecture subsection - Add tests: AppState.task_engine, _report_to_task_engine, app lifecycle, version conflicts, cancel not-found, previous_status, immutable fields, typed errors
- Add error_code discriminator to TaskMutationResult (not_found/version_conflict/validation/internal) - Fix _raise_typed_error to use error_code match instead of fragile string matching - Fix _processing_loop outer catch to resolve envelope future (prevents caller deadlock) - Guard happy-path set_result in _process_one with done() check - Fix _apply_update to use Task.model_validate() instead of model_copy() (runs validators) - Clean _IMMUTABLE_TASK_FIELDS: remove 4 non-existent timestamp fields (only id/status/created_by) - Rename _TERMINAL_STATUSES → _REPORTABLE_STATUSES (FAILED/INTERRUPTED are not strictly terminal) - Fix assigned_to=None override bug in transition_task controller - Add from_status to task transition audit log - Fix failure log to use API_TASK_TRANSITION_FAILED event instead of TASK_STATUS_CHANGED - Map TaskEngineNotRunningError/TaskEngineQueueFullError → ServiceUnavailableError (503) - Fix create_task missing error handling - Fix _on_expire broad exception handler to re-raise MemoryError/RecursionError - Add MemoryError/RecursionError re-raise to _publish_snapshot - Add API_TASK_TRANSITION_FAILED event constant - Fix AppState docstring and set_task_engine docstring - Add version_conflict test to TestTypedErrors - Add TestDrainTimeout: verify abandoned futures resolved on _fail_remaining_futures - Add TestMutationResultConsistency: validate success/error invariants enforced by Pydantic - Add test_memory_error_propagates to TestReportToTaskEngine - Fix engine.md: text lang specifier, version conflict description, _IMMUTABLE_TASK_FIELDS, asyncio.wait
…tness - Add TaskInternalError for internal engine faults (maps to 5xx vs 4xx) - Thread error_code through all TaskMutationResult failure paths - Extend _raise_typed_error to cover 'internal' code with -> Never return - Add TaskInternalError handling in all TaskController endpoints - Fix bridge leak in _cleanup_on_failure (started_bridge flag was missing) - Remove phantom 'created_at' from _IMMUTABLE_OVERRIDE_FIELDS (field absent on Task) - Change transition_task to return tuple[Task, TaskStatus | None] (eliminates extra get_task round-trip in controller) - Split 1140-line test_task_engine.py into three focused files + helpers - Move TaskEngine fixtures from helpers to conftest.py (auto-discovery, no F401 hacks) - Fix all ruff/mypy issues in new test files (TC001, I001, F811, E501, unused-ignore)
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
|
Caution Review failedPull request was closed or merged during review 📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a centralized single-writer TaskEngine with queue-based mutation processing, version tracking, snapshot publishing, typed task errors, Agent/API integration, updated lifecycle wiring, config/schema/defaults, observability events, and extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant Controller as TaskController
participant Engine as TaskEngine
participant Queue as asyncio.Queue
participant Processor as _processing_loop
participant Persistence
participant Bus as MessageBus
Client->>Controller: create_task(data)
Controller->>Engine: create_task(data, requested_by)
Engine->>Engine: build CreateTaskMutation
Engine->>Queue: submit(mutation)
Engine-->>Controller: returns pending Task / awaits result
Note over Processor,Queue: Background processing
Queue->>Processor: dequeue _MutationEnvelope
Processor->>Processor: dispatch -> apply_create
Processor->>Persistence: tasks.save(task)
Persistence-->>Processor: saved
Processor->>Engine: bump version / record result
Processor->>Bus: publish TaskStateChanged
Bus-->>Processor: acknowledged
Processor->>Queue: fulfill future with result
Queue-->>Controller: result delivered
sequenceDiagram
participant App as Litestar App
participant Lifecycle as LifecycleBuilder
participant Engine as TaskEngine
App->>Lifecycle: create_app(task_engine=instance)
Lifecycle->>Engine: start()
Engine->>Engine: create queue & spawn _processing_loop
Engine-->>Lifecycle: started
App->>Lifecycle: shutdown()
Lifecycle->>Engine: stop(timeout)
Engine->>Engine: drain queue, complete/cancel envelopes
Engine-->>Lifecycle: stopped
sequenceDiagram
participant Agent as AgentEngine
participant Exec as ExecutionLoop
participant Engine as TaskEngine
participant Persistence
Agent->>Exec: run(...)
Exec-->>Agent: ExecutionResult (COMPLETED)
Agent->>Agent: _post_execution_pipeline()
Agent->>Engine: _report_to_task_engine(result, agent_id, task_id)
alt task_engine configured
Engine->>Engine: transition_task(... expected_version ...)
Engine->>Persistence: persist transition
Persistence-->>Engine: ok
Engine-->>Agent: ack (best-effort)
else not configured
Engine-->>Agent: no-op
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
📝 Coding Plan
Comment |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #325 +/- ##
==========================================
+ Coverage 93.64% 93.71% +0.06%
==========================================
Files 427 433 +6
Lines 19177 19746 +569
Branches 1846 1904 +58
==========================================
+ Hits 17959 18504 +545
- Misses 943 958 +15
- Partials 275 284 +9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a centralized, single-writer TaskEngine to manage task state mutations, enhancing the reliability and observability of task processing. It includes improvements to error handling, data validation, logging, and integration with the AgentEngine, ensuring more robust and secure task management. Highlights
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull request overview
Introduces a centralized, single-writer TaskEngine to serialize task mutations (create/update/transition/delete/cancel), publish task state change snapshots, and integrate task state coordination into the API and agent execution pipeline.
Changes:
- Added
TaskEngine(queue-based mutation processing), request/result/event models, configuration, and typed errors. - Switched API task CRUD endpoints to use
TaskEngineinstead of writing directly to persistence; updated app lifecycle/state to optionally wire/start/stop a task engine. - Added extensive unit/integration tests plus observability event constants and documentation updates.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Registers task_engine as an observability domain module. |
| tests/unit/engine/test_task_engine_mutations.py | CRUD + typed error behavior tests for TaskEngine convenience APIs. |
| tests/unit/engine/test_task_engine_models.py | Validation/immutability/serialization tests for task engine models. |
| tests/unit/engine/test_task_engine_lifecycle.py | Start/stop lifecycle + config model tests. |
| tests/unit/engine/test_task_engine_integration.py | Queue/backpressure, ordering, snapshot publishing, drain/timeout behavior tests. |
| tests/unit/engine/test_agent_engine.py | Adds coverage for AgentEngine best-effort reporting to TaskEngine. |
| tests/unit/engine/task_engine_helpers.py | Shared fakes/helpers for TaskEngine tests. |
| tests/unit/engine/conftest.py | Adds TaskEngine fixtures for unit tests. |
| tests/unit/config/conftest.py | Extends root config factory defaults (adds escalation_paths). |
| tests/unit/api/test_state.py | Tests AppState.task_engine accessors and deferred configuration. |
| tests/unit/api/test_app.py | Updates lifecycle helper signatures and adds task-engine startup/shutdown failure coverage. |
| tests/unit/api/conftest.py | Provides a fake TaskEngine and updates test_client fixture to use context manager. |
| src/ai_company/observability/events/task_engine.py | New TaskEngine-specific observability event constants. |
| src/ai_company/observability/events/api.py | Adds API_TASK_TRANSITION_FAILED event constant. |
| src/ai_company/engine/task_engine.py | Implements queue-based single-writer mutation processing + snapshot publishing. |
| src/ai_company/engine/task_engine_models.py | Adds mutation request/result/event Pydantic models + immutability validators. |
| src/ai_company/engine/task_engine_config.py | Adds TaskEngineConfig model (queue size, drain timeout, snapshot toggle). |
| src/ai_company/engine/errors.py | Adds TaskEngine/TaskMutation typed exception hierarchy. |
| src/ai_company/engine/agent_engine.py | Adds optional task_engine + reports terminal statuses best-effort post-run. |
| src/ai_company/engine/init.py | Re-exports TaskEngine APIs and new errors/models/config. |
| src/ai_company/config/schema.py | Adds task_engine: TaskEngineConfig to RootConfig. |
| src/ai_company/config/defaults.py | Adds task_engine defaults block. |
| src/ai_company/api/state.py | Adds task_engine to AppState with accessor/late-binding setter. |
| src/ai_company/api/controllers/tasks.py | Converts task CRUD to go through TaskEngine with typed error mapping. |
| src/ai_company/api/app.py | Wires task engine into lifecycle (startup/shutdown/cleanup), extends create_app() signature. |
| docs/design/engine.md | Documents TaskEngine architecture and updates AgentEngine timeout behavior wording. |
| docs/architecture/tech-stack.md | Marks “State coordination” as Adopted. |
| CLAUDE.md | Updates engine module description and adds TaskEngine event constant example. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/ai_company/engine/task_engine.py
Outdated
| background task consumes mutation requests sequentially, applies | ||
| ``model_copy(update=...)`` on frozen ``Task`` models, persists the | ||
| result, and publishes snapshots to the message bus. |
There was a problem hiding this comment.
The module docstring says TaskEngine applies model_copy(update=...), but the implementation primarily does model_dump() + Task.model_validate(...) (and Task.with_transition() also uses model_validate). Please update the docstring to reflect the actual mutation mechanics to avoid misleading future maintainers.
| background task consumes mutation requests sequentially, applies | |
| ``model_copy(update=...)`` on frozen ``Task`` models, persists the | |
| result, and publishes snapshots to the message bus. | |
| background task consumes mutation requests sequentially, derives a new | |
| ``Task`` instance from the current state and the mutation (e.g. via | |
| ``Task.model_validate`` / ``Task.with_transition``), persists the result, | |
| and publishes snapshots to the message bus. |
| await asyncio.wait_for( | ||
| self._processing_task, | ||
| timeout=effective_timeout, | ||
| ) | ||
| logger.info(TASK_ENGINE_DRAIN_COMPLETE) | ||
| except TimeoutError: | ||
| logger.warning( | ||
| TASK_ENGINE_DRAIN_TIMEOUT, | ||
| remaining=self._queue.qsize(), | ||
| ) | ||
| self._processing_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await self._processing_task | ||
| self._fail_remaining_futures() |
There was a problem hiding this comment.
On drain timeout, stop() cancels _processing_task and then only fails futures still sitting in _queue. Any mutation already dequeued and mid-processing when cancellation happens will have its envelope.future left unresolved, so callers awaiting submit() can hang indefinitely. Consider tracking the in-flight envelope and failing its future on cancellation/timeout (or handling CancelledError in the processing loop to resolve the current envelope).
src/ai_company/engine/task_engine.py
Outdated
| try: | ||
| await self._process_one(envelope) | ||
| except Exception: | ||
| logger.exception( | ||
| TASK_ENGINE_LOOP_ERROR, | ||
| error="Unhandled exception in processing loop", | ||
| ) | ||
| if not envelope.future.done(): | ||
| envelope.future.set_result( | ||
| TaskMutationResult( | ||
| request_id=envelope.mutation.request_id, | ||
| success=False, | ||
| error="Internal error in processing loop", | ||
| error_code="internal", | ||
| ), | ||
| ) | ||
|
|
||
| async def _process_one(self, envelope: _MutationEnvelope) -> None: | ||
| """Process a single mutation envelope.""" | ||
| mutation = envelope.mutation | ||
| logger.debug( | ||
| TASK_ENGINE_MUTATION_RECEIVED, | ||
| mutation_type=mutation.mutation_type, | ||
| request_id=mutation.request_id, | ||
| ) | ||
| try: | ||
| result = await self._apply_mutation(mutation) | ||
| if not envelope.future.done(): | ||
| envelope.future.set_result(result) | ||
| if result.success and self._config.publish_snapshots: | ||
| await self._publish_snapshot(mutation, result) | ||
| except Exception as exc: | ||
| internal_msg = f"{type(exc).__name__}: {exc}" | ||
| logger.exception( | ||
| TASK_ENGINE_MUTATION_FAILED, | ||
| mutation_type=mutation.mutation_type, | ||
| request_id=mutation.request_id, | ||
| error=internal_msg, | ||
| ) | ||
| if not envelope.future.done(): | ||
| envelope.future.set_result( | ||
| TaskMutationResult( | ||
| request_id=mutation.request_id, | ||
| success=False, | ||
| error="Internal error processing mutation", | ||
| error_code="internal", | ||
| ), | ||
| ) | ||
|
|
There was a problem hiding this comment.
_processing_loop() and _process_one() catch broad Exception and convert it into an internal failure result. This will also swallow MemoryError / RecursionError, which elsewhere in the codebase are treated as non-recoverable and re-raised. Add an explicit except MemoryError, RecursionError: raise (or avoid catching them) so the process can fail fast on these conditions.
src/ai_company/engine/task_engine.py
Outdated
| async def _apply_transition( | ||
| self, | ||
| mutation: TransitionTaskMutation, | ||
| ) -> TaskMutationResult: | ||
| """Perform a task status transition.""" | ||
| task = await self._persistence.tasks.get(mutation.task_id) | ||
| if task is None: | ||
| return self._not_found_result( | ||
| "transition", | ||
| mutation.request_id, | ||
| mutation.task_id, | ||
| ) | ||
|
|
||
| try: | ||
| self._check_version(mutation.task_id, mutation.expected_version) | ||
| except TaskVersionConflictError as exc: | ||
| return TaskMutationResult( | ||
| request_id=mutation.request_id, | ||
| success=False, | ||
| error=str(exc), | ||
| error_code="version_conflict", | ||
| ) | ||
|
|
||
| previous_status = task.status | ||
|
|
||
| try: | ||
| updated = task.with_transition( | ||
| mutation.target_status, | ||
| **mutation.overrides, | ||
| ) |
There was a problem hiding this comment.
TransitionTaskMutation.reason is required and API/AgentEngine populate it, but the TaskEngine never uses it (not in Task.with_transition(), not in logs, and not in TaskStateChanged). Either plumb reason into the emitted snapshot/log fields for auditability, or drop it from the mutation model to avoid carrying unused data.
| """Reverse cleanup on startup failure (task engine, bridge, bus, persistence).""" | ||
| if started_task_engine and task_engine is not None: | ||
| try: | ||
| await task_engine.stop() | ||
| except Exception: | ||
| logger.exception( | ||
| API_APP_STARTUP, | ||
| error="Cleanup: failed to stop task engine", | ||
| ) | ||
| if started_bridge and bridge is not None: |
There was a problem hiding this comment.
_cleanup_on_failure() swallows all exceptions when stopping components. In this repo, best-effort exception handling typically re-raises MemoryError / RecursionError rather than swallowing them (you already do this in _on_expire() above). Consider adding except MemoryError, RecursionError: raise before the generic except Exception blocks here as well.
src/ai_company/api/app.py
Outdated
| @@ -263,6 +312,14 @@ async def _safe_shutdown( | |||
| API_APP_SHUTDOWN, | |||
| error="Failed to stop message bus bridge", | |||
| ) | |||
| if task_engine is not None: | |||
| try: | |||
| await task_engine.stop() | |||
| except Exception: | |||
| logger.exception( | |||
| API_APP_SHUTDOWN, | |||
| error="Failed to stop task engine", | |||
| ) | |||
| if message_bus is not None: | |||
There was a problem hiding this comment.
_safe_shutdown() catches Exception while stopping services, which will also swallow MemoryError / RecursionError. If those occur during shutdown, they should propagate (consistent with other best-effort blocks in this repo). Add an explicit except MemoryError, RecursionError: raise before the broad except Exception clauses.
| cost_tracker: CostTracker | None = None, | ||
| approval_store: ApprovalStore | None = None, | ||
| auth_service: AuthService | None = None, | ||
| task_engine: TaskEngine | None = None, | ||
| ) -> Litestar: | ||
| """Create and configure the Litestar application. | ||
|
|
There was a problem hiding this comment.
TaskController now requires AppState.task_engine, but create_app() still only warns about missing persistence/message_bus/cost_tracker. Since task_engine is also an optional constructor arg here, it’s easy to accidentally build an app with persistence configured but no TaskEngine and get surprising 503s on /tasks. Consider either (a) including task_engine in the warning, or (b) auto-constructing one from persistence/message_bus using config.task_engine when not provided.
docs/design/engine.md
Outdated
| - **Immutable updates**: Each mutation calls `model_copy(update=...)` on | ||
| frozen `Task` models — the original is never mutated. |
There was a problem hiding this comment.
This section says TaskEngine performs immutable updates via model_copy(update=...), but the current TaskEngine implementation uses model_dump() + Task.model_validate(...) (and Task.with_transition() uses model_validate). Please align the doc wording with the actual implementation so readers don’t assume model_copy() semantics.
| - **Immutable updates**: Each mutation calls `model_copy(update=...)` on | |
| frozen `Task` models — the original is never mutated. | |
| - **Immutable-style updates**: Each mutation constructs a new `Task` instance | |
| from the previous one (for example via `Task.model_validate({**task.model_dump(), **updates})` | |
| or `Task.with_transition(...)`); the existing instance is never mutated. |
Greptile SummaryThis PR implements the centralized Key changes:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant API as TaskController/AgentEngine
participant TE as TaskEngine
participant Q as asyncio.Queue
participant PL as ProcessingLoop
participant Apply as task_engine_apply
participant DB as PersistenceBackend
participant Bus as MessageBus
API->>TE: submit(mutation)
TE->>Q: put_nowait(envelope)
TE-->>API: await envelope.future
PL->>Q: get() with timeout
Q-->>PL: envelope
PL->>Apply: dispatch(mutation, persistence, versions)
Apply->>DB: save(task) / delete(task_id)
DB-->>Apply: ok
Apply-->>PL: TaskMutationResult
PL->>PL: envelope.future.set_result(result)
PL-->>API: result via future
PL->>Bus: publish(TaskStateChanged)
Note over TE,PL: stop() sets _running=False, drains queue, cancels, fails remaining futures
Prompt To Fix All With AIThis is a comment left during a code review.
Path: src/ai_company/api/controllers/tasks.py
Line: 100-101
Comment:
**`TaskVersionConflictError` silently mapped to HTTP 422 instead of 409**
`TaskVersionConflictError` is a subclass of `TaskMutationError`, so it falls through every earlier check and lands on `isinstance(exc, TaskMutationError)` → `ApiValidationError` (HTTP 422 Unprocessable Entity). This is semantically wrong: a version conflict is not a validation error from the caller's perspective — it signals that another writer modified the task between the caller's read and write.
`api/errors.py` already has a `ConflictError` class that maps to HTTP 409 Conflict, which is the correct status for optimistic-concurrency failures. Without an explicit guard here, callers cannot distinguish "invalid input" (422) from "concurrent modification" (409), making correct retry logic impossible.
```suggestion
if isinstance(exc, TaskVersionConflictError):
return ConflictError(str(exc))
if isinstance(exc, TaskMutationError):
return ApiValidationError(str(exc))
```
This also requires importing `TaskVersionConflictError` and `ConflictError` at the top of the file.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: src/ai_company/api/controllers/tasks.py
Line: 57-61
Comment:
**Misleading log event for missing auth identity**
`API_RESOURCE_NOT_FOUND` is used to log the "no authenticated user" fallback, but that event is intended for missing data resources (tasks, users from the data store, etc.). In structured log queries, a search for `api.resource.not_found` would surface auth-misconfiguration warnings alongside genuine 404-class events, making observability harder.
Consider a dedicated event constant (e.g. `API_AUTH_IDENTITY_MISSING` in `events/api.py`) or re-use an existing auth-scoped constant such as `API_AUTH_FAILED`:
```suggestion
logger.warning(
API_AUTH_FAILED,
resource="authenticated_user",
note="No authenticated user found, falling back to 'api'",
)
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 1762-1766
Comment:
**Redundant `task_id` extraction in `_publish_snapshot`**
`task_id` is computed inside the `try` block (line 1765 in the diff) and also re-evaluated identically in the `except` block (line 1784 in the diff). Because the assignment on line 1765 runs before the `await` that could raise, the except block will always re-execute the exact same `getattr` call. Moving the extraction before the `try` block would remove the duplication and make the intent clearer:
```python
task_id = getattr(mutation, "task_id", None)
try:
from ai_company.communication.enums import MessageType # noqa: PLC0415
from ai_company.communication.message import Message # noqa: PLC0415
msg = Message(...)
await self._message_bus.publish(msg)
logger.debug(TASK_ENGINE_SNAPSHOT_PUBLISHED, ..., task_id=task_id)
except MemoryError, RecursionError:
raise
except Exception:
logger.warning(TASK_ENGINE_SNAPSHOT_PUBLISH_FAILED, ..., task_id=task_id, exc_info=True)
```
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 7ec0ab7 |
There was a problem hiding this comment.
Code Review
This pull request introduces the TaskEngine, a crucial component for centralized and robust task state management. The implementation uses a single-writer actor model via an asyncio.Queue to prevent race conditions, which is an excellent design choice. The changes include a well-defined error hierarchy, immutable data models with strong validation, and graceful lifecycle management within the application. The integration with the API controllers significantly improves their structure and error handling. The documentation is thorough, and the test coverage is extensive and of high quality, covering unit, integration, and lifecycle aspects of the new engine. Overall, this is an exceptionally well-executed feature implementation that greatly enhances the core architecture of the application. I have no suggestions for improvement at this time.
There was a problem hiding this comment.
Actionable comments posted: 14
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/unit/observability/test_events.py (1)
175-223: 🧹 Nitpick | 🔵 TrivialAdd explicit contract assertions for
task_engineevents.This only verifies that the module exists. The new
TASK_ENGINE_*constants can still drift without failing here, unlike the other domains below that pin exact names/values. Please add a dedicatedtest_task_engine_events_exist(or a parametrized equivalent) for the new observability surface.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/observability/test_events.py` around lines 175 - 223, The test only checks that the task_engine module is present but doesn't assert the actual event constant names/values; add a new unit test (e.g., test_task_engine_events_exist) that imports the events.task_engine module and asserts the expected TASK_ENGINE_* constants (names and/or values) are defined and equal to their intended strings/values, or use a parametrized pytest.mark.parametrize over the expected constant names to assert hasattr(events.task_engine, name) and equality to the expected literal; reference the module events.task_engine and the specific constant symbols (TASK_ENGINE_START, TASK_ENGINE_STOP, TASK_ENGINE_ERROR, etc.) to prevent future drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/design/engine.md`:
- Around line 188-194: The doc incorrectly states optimistic concurrency uses
in-memory version counters only; update the design text to specify that the
persisted task version is the source of truth and that any in-memory counters
are merely an optimization and may be invalid after restart. Describe that
TaskEngine must persist and read the task version on writes and that callers’
expected_version is compared against the persisted version to produce a
TaskMutationResult with error_code="version_conflict" (and raise
TaskVersionConflictError for convenience). Also note that get_task() and
list_tasks() may use read-through caching but must validate or refresh versions
from persistence before accepting writes. Ensure references to expected_version,
TaskMutationResult, and TaskVersionConflictError remain in the doc.
In `@src/ai_company/api/app.py`:
- Around line 130-132: The shutdown ordering in _safe_shutdown is reversed: call
TaskEngine.stop() (or task_engine.stop()) before bridge.stop() so the TaskEngine
can drain and publish final snapshots that the bridge will forward; update
_safe_shutdown() to mirror _cleanup_on_failure()’s reverse order (stop
task_engine, then bridge, then message_bus/persistence), and ensure
on_shutdown() and any other shutdown paths use this corrected order.
In `@src/ai_company/api/controllers/tasks.py`:
- Around line 138-150: The repeated exception mapping in create_task,
update_task, transition_task, and delete_task should be centralized: implement a
small helper (e.g., map_task_exceptions or a decorator wrap_task_exceptions)
that catches TaskEngineNotRunningError, TaskEngineQueueFullError, and
TaskInternalError and re-raises ServiceUnavailableError(str(exc)) from exc, and
catches TaskMutationError to re-raise ApiValidationError(str(exc)) from exc;
replace the four duplicated try/except blocks by invoking the helper or applying
the decorator around the task engine calls (referencing create_task,
update_task, transition_task, delete_task and the exception classes
TaskEngineNotRunningError, TaskEngineQueueFullError, TaskInternalError,
TaskMutationError) so behavior remains identical but duplication is removed.
- Around line 180-201: The handler is passing a hardcoded requested_by="api"
into app_state.task_engine.update_task (and similar calls for
transition/delete), which loses who initiated the change; extract the actor
identifier from the request context (e.g., authenticated user id,
request.user.id, request.state.api_key or an injected RequestContext) and pass
that value into update_task's requested_by parameter instead of the literal
"api", defaulting to a safe fallback like "unknown" if no identity is present;
update the other call sites (transition_task/delete_task or methods on
app_state.task_engine) to use the same extracted identifier for consistent
auditing.
In `@src/ai_company/engine/errors.py`:
- Around line 109-116: TaskInternalError should not subclass TaskMutationError;
change its definition so it is a separate sibling (e.g., replace "class
TaskInternalError(TaskMutationError):" with "class
TaskInternalError(Exception):" or another common base used for engine/internal
errors) so that broad "except TaskMutationError" handlers won't catch it; keep
the existing docstring and tests but adjust any places that explicitly relied on
TaskInternalError being a subtype of TaskMutationError if needed.
In `@src/ai_company/engine/task_engine_models.py`:
- Around line 47-50: The model field assigned_to currently allows setting an
assignee while Task defaults to status=CREATED (and CREATED rejects non-None
assignees), so either enforce the create-time invariant here or derive the
initial status from the assignee; update assigned_to on the model to validate
that it must be None when creating a Task with default status=CREATED (reject
input) or change the creation logic so that if assigned_to is provided the
default status becomes ASSIGNED/IN_PROGRESS accordingly — adjust the pydantic
Field/validator for assigned_to or the Task constructor/creation path and ensure
consistency with _apply_create() and the CREATED constant.
- Around line 116-129: The updates/overrides dicts are accepted without
validating keys and are not deep-copied, causing silent no-ops on typos and
potential mutation leaks; update the mutation boundary logic (in
UpdateTaskMutation and TransitionTaskMutation) to deep-copy the incoming dicts
with copy.deepcopy() before any processing, then validate that every key in
updates/overrides is a known Task field (use Task.model_fields.keys() or
equivalent) and raise a clear ValueError listing unknown keys if any; keep the
existing _reject_immutable_fields model_validator (which checks against
_IMMUTABLE_TASK_FIELDS) but ensure validation against Task schema occurs before
calling Task.model_validate() so unknown keys are rejected rather than ignored.
In `@src/ai_company/engine/task_engine.py`:
- Around line 108-109: The in-memory _versions map is lost on restart so
optimistic concurrency breaks; update startup and any task-loading paths (e.g.,
get_task, list_tasks, and the processing loop referencing _check_version and
_processing_task) to seed and maintain _versions from persisted task metadata:
when you read tasks from storage populate _versions[task_id] = task.version (and
update it whenever a task is saved/updated), and modify _check_version to
consult persisted task.version if _versions lacks an entry so versions remain
durable across restarts.
- Around line 1-907: The file mixes multiple concerns in one large TaskEngine
class; split it into smaller modules: keep TaskEngine as the lifecycle/queue
owner (start, stop, submit, _processing_loop, _fail_remaining_futures,
is_running) and move mutation application logic into a TaskProcessor (methods
_apply_mutation,
_apply_create/_apply_update/_apply_transition/_apply_delete/_apply_cancel and
_not_found_result), the snapshot logic into a SnapshotPublisher (method
_publish_snapshot), and version handling into a VersionManager (fields
_versions, methods _bump_version, _check_version); also move _MutationEnvelope
dataclass to a small helpers module. Update TaskEngine to instantiate and
delegate to these collaborators (e.g. self._processor.apply(mutation),
self._publisher.publish(mutation, result), self._version_manager.bump/check)
while preserving public method names (create_task, update_task, transition_task,
delete_task, cancel_task, submit) and the existing TaskMutationResult wiring so
external behavior and tests remain unchanged.
- Around line 590-603: _wrap Task(...) construction in _apply_create and the
Task.model_validate(...) call in _apply_update with try/except blocks that catch
the validation exception type used by your model layer (e.g.,
pydantic.ValidationError or your custom ValidationError), and on catch return a
TaskMutationResult with error_code="validation" and include the validation
details/error message rather than letting the exception escape to _process_one;
mirror the pattern used by _apply_transition and _apply_cancel so the mutation
handlers consistently return typed validation errors instead of
error_code="internal".
- Around line 490-505: The generic exception handlers in _processing_loop and
_process_one are currently swallowing fail-fast exceptions from
_publish_snapshot; update both exception blocks to explicitly catch
(MemoryError, RecursionError) and immediately re-raise them before the generic
except Exception logic so these errors propagate; keep the existing logging
(TASK_ENGINE_LOOP_ERROR) and TaskMutationResult fallback only for non-fail-fast
exceptions and ensure you check envelope.future.done() only after the re-raise
branch.
In `@tests/unit/engine/test_agent_engine.py`:
- Around line 1009-1035: Expand the assertions in test_terminal_status_reported
to validate the full transition_task payload rather than only the status: after
awaiting engine.run, inspect mock_te.transition_task.call_args and assert the
first positional arg matches the Task ID from sample_task_with_criteria (or the
Task object as expected), the second arg equals TaskStatus.COMPLETED, and that
the payload includes the original requested_by (e.g., check kwargs or the third
positional arg depending on how transition_task is called). Use the existing
test names (test_terminal_status_reported, AgentEngine.run) and the mock object
mock_te.transition_task to locate where to add these additional assertions.
In `@tests/unit/engine/test_task_engine_integration.py`:
- Around line 153-182: In test_queue_full_raises, add a brief explanatory
comment above the lines that set eng._running = True and directly put an item
into eng._queue explaining that the test intentionally manipulates internal
state (eng._running and eng._queue) to simulate a full queue/backpressure
scenario because triggering this condition via the public API is difficult;
reference the test name (test_queue_full_raises) and the fields eng._running and
eng._queue in the comment so future readers understand the rationale.
---
Outside diff comments:
In `@tests/unit/observability/test_events.py`:
- Around line 175-223: The test only checks that the task_engine module is
present but doesn't assert the actual event constant names/values; add a new
unit test (e.g., test_task_engine_events_exist) that imports the
events.task_engine module and asserts the expected TASK_ENGINE_* constants
(names and/or values) are defined and equal to their intended strings/values, or
use a parametrized pytest.mark.parametrize over the expected constant names to
assert hasattr(events.task_engine, name) and equality to the expected literal;
reference the module events.task_engine and the specific constant symbols
(TASK_ENGINE_START, TASK_ENGINE_STOP, TASK_ENGINE_ERROR, etc.) to prevent future
drift.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: f0620322-8a27-4dfe-ac33-14cf78528e8a
📒 Files selected for processing (28)
CLAUDE.mddocs/architecture/tech-stack.mddocs/design/engine.mdsrc/ai_company/api/app.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/api/state.pysrc/ai_company/config/defaults.pysrc/ai_company/config/schema.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/task_engine_config.pysrc/ai_company/engine/task_engine_models.pysrc/ai_company/observability/events/api.pysrc/ai_company/observability/events/task_engine.pytests/unit/api/conftest.pytests/unit/api/test_app.pytests/unit/api/test_state.pytests/unit/config/conftest.pytests/unit/engine/conftest.pytests/unit/engine/task_engine_helpers.pytests/unit/engine/test_agent_engine.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_task_engine_lifecycle.pytests/unit/engine/test_task_engine_models.pytests/unit/engine/test_task_engine_mutations.pytests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Agent
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14
Add type hints to all public functions — enforce mypy strict mode
Use Google-style docstrings — required on public classes and functions, enforced by ruff D rules
For frozen Pydantic models with dict/list fields, usecopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use Pydantic v2BaseModel,model_validator,computed_field, andConfigDict— avoid redundant stored fields with@computed_field
UseNotBlankStr(fromcore.types) for all identifier/name fields, including optional (NotBlankStr | None) and tuple variants, instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code instead of barecreate_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly — never silently swallow exceptions
Enforce line length of 88 characters via ruff
Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (usingmodel_copy(update=...)) for runtime state that evolves
Never mix static config fields with mutable runtime fields in one model
Files:
tests/unit/engine/test_task_engine_models.pytests/unit/observability/test_events.pytests/unit/engine/conftest.pysrc/ai_company/engine/errors.pysrc/ai_company/observability/events/api.pytests/unit/engine/test_task_engine_mutations.pytests/unit/api/test_state.pysrc/ai_company/engine/task_engine_config.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_agent_engine.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/config/schema.pytests/unit/engine/test_task_engine_lifecycle.pysrc/ai_company/api/state.pytests/unit/engine/task_engine_helpers.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/observability/events/task_engine.pytests/unit/api/conftest.pysrc/ai_company/api/app.pysrc/ai_company/config/defaults.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/__init__.pytests/unit/api/test_app.pytests/unit/config/conftest.pysrc/ai_company/engine/task_engine_models.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slowmarkers for test organization
Prefer@pytest.mark.parametrizefor testing similar cases
Never use real vendor names in test files — usetest-provider,test-small-001, and generic model names
Files:
tests/unit/engine/test_task_engine_models.pytests/unit/observability/test_events.pytests/unit/engine/conftest.pytests/unit/engine/test_task_engine_mutations.pytests/unit/api/test_state.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_agent_engine.pytests/unit/engine/test_task_engine_lifecycle.pytests/unit/engine/task_engine_helpers.pytests/unit/api/conftest.pytests/unit/api/test_app.pytests/unit/config/conftest.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging,logging.getLogger(), orprint()in application code — use the observability logger instead
Always useloggeras the variable name (not_loggerorlog)
Use event name constants from domain-specific modules underai_company.observability.events(e.g.,PROVIDER_CALL_STARTfromevents.provider,BUDGET_RECORD_ADDEDfromevents.budget) — import directly
Use structured logging with kwargs:logger.info(EVENT, key=value)— never use string formatting likelogger.info("msg %s", val)
Log all error paths at WARNING or ERROR with context before raising
Log all state transitions at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions
Files:
src/ai_company/engine/errors.pysrc/ai_company/observability/events/api.pysrc/ai_company/engine/task_engine_config.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/config/schema.pysrc/ai_company/api/state.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/observability/events/task_engine.pysrc/ai_company/api/app.pysrc/ai_company/config/defaults.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/task_engine_models.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names:
example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc.
Files:
src/ai_company/engine/errors.pysrc/ai_company/observability/events/api.pysrc/ai_company/engine/task_engine_config.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/config/schema.pysrc/ai_company/api/state.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/observability/events/task_engine.pysrc/ai_company/api/app.pysrc/ai_company/config/defaults.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/__init__.pysrc/ai_company/engine/task_engine_models.py
🧠 Learnings (9)
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly
Applied to files:
tests/unit/observability/test_events.pytests/unit/engine/conftest.pysrc/ai_company/observability/events/api.pyCLAUDE.mdsrc/ai_company/api/state.pysrc/ai_company/observability/events/task_engine.pysrc/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`
Applied to files:
CLAUDE.mdsrc/ai_company/api/state.pysrc/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging`, `logging.getLogger()`, or `print()` in application code — use the observability logger instead
Applied to files:
CLAUDE.mdsrc/ai_company/api/state.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Always use `logger` as the variable name (not `_logger` or `log`)
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use structured logging with kwargs: `logger.info(EVENT, key=value)` — never use string formatting like `logger.info("msg %s", val)`
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all error paths at WARNING or ERROR with context before raising
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all state transitions at INFO level
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use DEBUG level for object creation, internal flow, and entry/exit of key functions
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/providers/**/*.py : Set `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`
Applied to files:
src/ai_company/config/schema.py
🧬 Code graph analysis (14)
tests/unit/engine/test_task_engine_models.py (2)
src/ai_company/core/enums.py (4)
Complexity(247-253)Priority(238-244)TaskStatus(198-224)TaskType(227-235)src/ai_company/engine/task_engine_models.py (8)
CancelTaskMutation(201-218)CreateTaskData(20-59)CreateTaskMutation(65-80)DeleteTaskMutation(183-198)TaskMutationResult(234-275)TaskStateChanged(281-318)TransitionTaskMutation(142-180)UpdateTaskMutation(98-129)
tests/unit/engine/conftest.py (3)
src/ai_company/engine/task_engine.py (3)
TaskEngine(81-907)start(119-137)stop(139-176)src/ai_company/engine/task_engine_config.py (1)
TaskEngineConfig(6-37)tests/unit/engine/task_engine_helpers.py (4)
FakeMessageBus(58-76)FakePersistence(47-55)start(65-66)stop(68-69)
tests/unit/engine/test_task_engine_mutations.py (2)
src/ai_company/engine/errors.py (3)
TaskMutationError(97-98)TaskNotFoundError(101-102)TaskVersionConflictError(105-106)src/ai_company/engine/task_engine_models.py (4)
CancelTaskMutation(201-218)TransitionTaskMutation(142-180)UpdateTaskMutation(98-129)TaskMutationResult(234-275)
src/ai_company/engine/task_engine_config.py (1)
src/ai_company/tools/base.py (1)
description(138-140)
tests/unit/engine/test_task_engine_integration.py (5)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/engine/errors.py (1)
TaskEngineQueueFullError(93-94)src/ai_company/engine/task_engine.py (5)
TaskEngine(81-907)_MutationEnvelope(68-78)start(119-137)stop(139-176)submit(200-235)src/ai_company/engine/task_engine_config.py (1)
TaskEngineConfig(6-37)src/ai_company/engine/task_engine_models.py (4)
CreateTaskMutation(65-80)DeleteTaskMutation(183-198)TransitionTaskMutation(142-180)UpdateTaskMutation(98-129)
tests/unit/engine/test_agent_engine.py (3)
src/ai_company/engine/errors.py (1)
TaskMutationError(97-98)src/ai_company/engine/task_engine.py (1)
transition_task(311-358)src/ai_company/memory/errors.py (1)
MemoryError(13-14)
src/ai_company/engine/agent_engine.py (3)
src/ai_company/engine/errors.py (1)
TaskMutationError(97-98)src/ai_company/api/state.py (1)
task_engine(107-109)src/ai_company/core/enums.py (1)
TaskStatus(198-224)
src/ai_company/config/schema.py (3)
tests/unit/engine/conftest.py (1)
engine(432-443)src/ai_company/engine/task_engine_config.py (1)
TaskEngineConfig(6-37)src/ai_company/api/state.py (1)
task_engine(107-109)
src/ai_company/api/state.py (2)
tests/unit/engine/conftest.py (1)
engine(432-443)src/ai_company/engine/task_engine.py (1)
TaskEngine(81-907)
src/ai_company/api/controllers/tasks.py (7)
src/ai_company/api/pagination.py (1)
paginate(26-53)src/ai_company/api/state.py (1)
task_engine(107-109)src/ai_company/core/task.py (1)
Task(45-261)src/ai_company/engine/errors.py (5)
TaskEngineNotRunningError(89-90)TaskEngineQueueFullError(93-94)TaskInternalError(109-116)TaskMutationError(97-98)TaskNotFoundError(101-102)src/ai_company/engine/task_engine_models.py (1)
CreateTaskData(20-59)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/engine/task_engine.py (6)
list_tasks(455-476)get_task(444-453)create_task(237-268)update_task(270-309)transition_task(311-358)delete_task(360-389)
tests/unit/api/conftest.py (3)
tests/unit/engine/conftest.py (1)
engine(432-443)src/ai_company/api/state.py (2)
task_engine(107-109)persistence(87-89)src/ai_company/engine/task_engine.py (1)
TaskEngine(81-907)
src/ai_company/api/app.py (3)
src/ai_company/api/state.py (2)
task_engine(107-109)AppState(22-154)src/ai_company/memory/errors.py (1)
MemoryError(13-14)src/ai_company/communication/bus_protocol.py (1)
MessageBus(20-209)
src/ai_company/engine/task_engine.py (6)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/core/task.py (1)
Task(45-261)src/ai_company/engine/errors.py (6)
TaskEngineNotRunningError(89-90)TaskEngineQueueFullError(93-94)TaskInternalError(109-116)TaskMutationError(97-98)TaskNotFoundError(101-102)TaskVersionConflictError(105-106)src/ai_company/engine/task_engine_config.py (1)
TaskEngineConfig(6-37)src/ai_company/engine/task_engine_models.py (1)
CancelTaskMutation(201-218)src/ai_company/persistence/protocol.py (1)
PersistenceBackend(27-167)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
Complexity(247-253)Priority(238-244)TaskStatus(198-224)TaskType(227-235)src/ai_company/core/task.py (1)
Task(45-261)
🪛 LanguageTool
CLAUDE.md
[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...
(EG_NO_COMMA)
docs/architecture/tech-stack.md
[style] ~122-~122: Consider using the typographical ellipsis character here instead.
Context: ... Agents submit requests; engine applies model_copy(update=...) sequentially and publishes snapshots....
(ELLIPSIS)
docs/design/engine.md
[style] ~186-~186: Consider using the typographical ellipsis character here instead.
Context: ...mmutable updates**: Each mutation calls model_copy(update=...) on frozen Task models — the origi...
(ELLIPSIS)
🔇 Additional comments (21)
tests/unit/config/conftest.py (1)
80-80: Factory default looks correct.Adding
escalation_paths = ()keeps the test factory aligned withRootConfigwhile using an immutable default.src/ai_company/observability/events/api.py (1)
38-38: Good addition to the shared API event surface.Keeping this as a named constant preserves the existing observability pattern and avoids ad-hoc event strings in controllers.
Based on learnings, use event name constants from domain-specific modules under
ai_company.observability.events.src/ai_company/config/schema.py (1)
24-24:RootConfigwiring looks consistent.Exposing
TaskEngineConfighere keeps the engine’s knobs in the immutable configuration surface instead of leaking runtime state into app wiring.As per coding guidelines, "Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using
model_copy(update=...)) for runtime state that evolves."Also applies to: 418-529
src/ai_company/api/state.py (1)
38-68: Consistent service-guard pattern.The new
task_engineaccessors mirror the existing auth-service behavior and keep the same explicit 503-on-missing contract for callers.Also applies to: 106-133
tests/unit/engine/test_task_engine_lifecycle.py (1)
14-109: Nice coverage on lifecycle edges.This hits start/stop idempotency, restart, stopped-submit behavior, and config immutability, which are the main failure modes for the new engine surface.
tests/unit/api/test_state.py (1)
95-136: Good coverage for deferredtask_enginebinding.These cases pin the new AppState surface to the same access/set-once semantics as
auth_service, which should keep the two code paths aligned.tests/unit/engine/test_agent_engine.py (1)
940-1007: The failure-mode matrix here is solid.Covering the no-engine, non-terminal, typed-mutation-error, generic-exception, and
MemoryErrorpaths makes_report_to_task_engine()much harder to regress.Also applies to: 1037-1117
tests/unit/engine/conftest.py (1)
413-460: Useful shared fixture surface for the new engine tests.Centralizing TaskEngine construction and teardown here should keep lifecycle assumptions consistent across the TaskEngine test modules.
tests/unit/api/conftest.py (2)
620-644: Passing the shared TaskEngine throughcreate_app()is the right test seam.That keeps API tests on the real startup/shutdown path instead of manually patching app state in each case.
609-617: No issue here—the fixture lifecycle is properly managed.The
fake_task_enginefixture is safe. Verification confirms there are no direct consumers outsidetest_client, andcreate_app()explicitly callstask_engine.start()(line 278 ofsrc/ai_company/api/app.py) when the injected engine is provided. The fixture design correctly delegates startup to the app layer.> Likely an incorrect or invalid review comment.src/ai_company/engine/task_engine_config.py (1)
1-37: LGTM!The
TaskEngineConfigmodel is well-designed:
- Properly frozen for immutable configuration
- Sensible field constraints with clear defaults
- Comprehensive Google-style docstring documenting all attributes
tests/unit/api/test_app.py (1)
86-130: LGTM!Good additions testing TaskEngine lifecycle integration:
test_task_engine_failure_cleans_upverifies proper cleanup of persistence and bus when engine fails to starttest_shutdown_task_engine_failure_does_not_propagateensures shutdown errors are swallowed gracefullyThe mock setup correctly simulates failure scenarios.
tests/unit/engine/test_task_engine_models.py (1)
1-312: LGTM!Comprehensive test coverage for task engine models:
- Validates construction with minimal and full parameters
- Tests validation constraints (NotBlankStr, non-negative budget, positive version)
- Verifies frozen model immutability
- Checks serialization roundtrip for events
- Tests consistency validators on
TaskMutationResultWell-organized with clear test class separation by model type.
tests/unit/engine/test_task_engine_integration.py (1)
278-326: LGTM!The drain timeout test thoroughly verifies:
- Futures in the queue are failed with
error_code="internal"when stop times out- Proper cleanup of blocked tasks via cancellation
- The
contextlib.suppresscorrectly handles the cleanup phasetests/unit/engine/test_task_engine_mutations.py (1)
1-583: LGTM!Excellent test coverage for TaskEngine mutations:
- All CRUD operations tested with success and failure scenarios
- Typed error propagation verified (
TaskNotFoundError,TaskVersionConflictError)previous_statustracking validated across mutation types- Immutable field rejection tested at the model validation level
- Error sanitization verified (internal errors don't leak implementation details)
tests/unit/engine/task_engine_helpers.py (2)
90-102: LGTM!The
_make_create_datahelper is well-designed with sensible defaults and proper override support. The dynamic import ofTaskTypeinside the function is acceptable for a test helper to avoid circular imports.
8-9: Remove unnecessaryTYPE_CHECKINGblock for Python 3.14 compatibility.With Python 3.14's PEP 649,
TaskStatuscan be imported directly instead of using aTYPE_CHECKINGguard, since it's only used in type hints (line 30). Move the import to the top-level imports.Suggested fix
-from typing import TYPE_CHECKING - from ai_company.core.task import Task # noqa: TC001 +from ai_company.core.enums import TaskStatus from ai_company.engine.task_engine_models import CreateTaskData - -if TYPE_CHECKING: - from ai_company.core.enums import TaskStatus> Likely an incorrect or invalid review comment.src/ai_company/api/controllers/tasks.py (1)
253-260: Good error handling for transition failures.Properly logs the transition failure with context before raising
ApiValidationError. The distinction betweenTaskNotFoundError(404) and generalTaskMutationError(validation error) is correct.src/ai_company/engine/agent_engine.py (3)
102-115: LGTM!Excellent documentation explaining why
FAILEDandINTERRUPTEDare included in_REPORTABLE_STATUSESdespite not being strictly terminal in the task lifecycle. This clarifies the design intent for future maintainers.
666-718: LGTM!The
_report_to_task_enginemethod follows best practices:
- Early returns for no-op cases (no engine, no task execution, non-reportable status)
- Correctly re-raises
MemoryErrorandRecursionError(non-recoverable)- Distinguishes
TaskMutationError(WARNING - expected rejection) from unexpected exceptions (ERROR with state divergence note)- Best-effort semantics align with the docstring
354-378: Good pipeline integration.The
_report_to_task_enginecall is correctly placed after local transitions are applied but before recovery, ensuring the centralized TaskEngine receives the final execution status.
| try: | ||
| await self._process_one(envelope) | ||
| except Exception: | ||
| logger.exception( | ||
| TASK_ENGINE_LOOP_ERROR, | ||
| error="Unhandled exception in processing loop", | ||
| ) | ||
| if not envelope.future.done(): | ||
| envelope.future.set_result( | ||
| TaskMutationResult( | ||
| request_id=envelope.mutation.request_id, | ||
| success=False, | ||
| error="Internal error in processing loop", | ||
| error_code="internal", | ||
| ), | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate the file and check its size
find . -name "task_engine.py" -path "*/engine/*" | head -5Repository: Aureliolo/synthorg
Length of output: 100
🏁 Script executed:
# Once we have the file, let's examine the specific lines mentioned in the review
wc -l src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 102
🏁 Script executed:
# Read the relevant sections of the file
cat -n src/ai_company/engine/task_engine.py | sed -n '480,510p'Repository: Aureliolo/synthorg
Length of output: 1535
🏁 Script executed:
# Also check the second range
cat -n src/ai_company/engine/task_engine.py | sed -n '515,545p'Repository: Aureliolo/synthorg
Length of output: 1516
🏁 Script executed:
# And the third range
cat -n src/ai_company/engine/task_engine.py | sed -n '840,870p'Repository: Aureliolo/synthorg
Length of output: 1439
The fail-fast MemoryError / RecursionError exceptions are caught and swallowed by enclosing handlers.
_publish_snapshot() correctly re-raises these exceptions (lines 865–866), but both calling functions catch them immediately via generic except Exception handlers:
_processing_loop()at line 492 catches and converts to logged internal failure_process_one()at line 521 catches and converts to logged internal failure
Both handlers need an explicit re-raise branch before the generic fallback to allow fail-fast behavior to propagate.
🛑 Proposed fix
try:
await self._process_one(envelope)
+ except MemoryError, RecursionError:
+ raise
except Exception:
logger.exception(
TASK_ENGINE_LOOP_ERROR,
error="Unhandled exception in processing loop",
) try:
result = await self._apply_mutation(mutation)
if not envelope.future.done():
envelope.future.set_result(result)
if result.success and self._config.publish_snapshots:
await self._publish_snapshot(mutation, result)
+ except MemoryError, RecursionError:
+ raise
except Exception as exc:
internal_msg = f"{type(exc).__name__}: {exc}"
logger.exception(
TASK_ENGINE_MUTATION_FAILED,
mutation_type=mutation.mutation_type,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| try: | |
| await self._process_one(envelope) | |
| except Exception: | |
| logger.exception( | |
| TASK_ENGINE_LOOP_ERROR, | |
| error="Unhandled exception in processing loop", | |
| ) | |
| if not envelope.future.done(): | |
| envelope.future.set_result( | |
| TaskMutationResult( | |
| request_id=envelope.mutation.request_id, | |
| success=False, | |
| error="Internal error in processing loop", | |
| error_code="internal", | |
| ), | |
| ) | |
| try: | |
| await self._process_one(envelope) | |
| except MemoryError, RecursionError: | |
| raise | |
| except Exception: | |
| logger.exception( | |
| TASK_ENGINE_LOOP_ERROR, | |
| error="Unhandled exception in processing loop", | |
| ) | |
| if not envelope.future.done(): | |
| envelope.future.set_result( | |
| TaskMutationResult( | |
| request_id=envelope.mutation.request_id, | |
| success=False, | |
| error="Internal error in processing loop", | |
| error_code="internal", | |
| ), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/task_engine.py` around lines 490 - 505, The generic
exception handlers in _processing_loop and _process_one are currently swallowing
fail-fast exceptions from _publish_snapshot; update both exception blocks to
explicitly catch (MemoryError, RecursionError) and immediately re-raise them
before the generic except Exception logic so these errors propagate; keep the
existing logging (TASK_ENGINE_LOOP_ERROR) and TaskMutationResult fallback only
for non-fail-fast exceptions and ensure you check envelope.future.done() only
after the re-raise branch.
src/ai_company/engine/task_engine.py
Outdated
| task = Task( | ||
| id=task_id, | ||
| title=data.title, | ||
| description=data.description, | ||
| type=data.type, | ||
| priority=data.priority, | ||
| project=data.project, | ||
| created_by=data.created_by, | ||
| assigned_to=data.assigned_to, | ||
| estimated_complexity=data.estimated_complexity, | ||
| budget_limit=data.budget_limit, | ||
| ) | ||
| await self._persistence.tasks.save(task) | ||
| self._versions[task_id] = 1 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -type f -name "task_engine.py" | head -20Repository: Aureliolo/synthorg
Length of output: 153
🏁 Script executed:
wc -l src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 102
🏁 Script executed:
sed -n '580,610p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 969
🏁 Script executed:
sed -n '645,660p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 583
🏁 Script executed:
rg "_process_one" src/ai_company/engine/task_engine.py -A 20Repository: Aureliolo/synthorg
Length of output: 1653
🏁 Script executed:
rg "def.*transition|def.*cancel" src/ai_company/engine/task_engine.py -A 15Repository: Aureliolo/synthorg
Length of output: 2100
🏁 Script executed:
rg "validation.*result|error_code" src/ai_company/engine/task_engine.py | head -20Repository: Aureliolo/synthorg
Length of output: 455
🏁 Script executed:
rg "error_code=\"validation\"" src/ai_company/engine/task_engine.py -B 5 -A 2Repository: Aureliolo/synthorg
Length of output: 504
🏁 Script executed:
rg "class.*Result|TaskMutationResult" src/ai_company/engine/task_engine.py | head -20Repository: Aureliolo/synthorg
Length of output: 903
🏁 Script executed:
sed -n '420,480p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 2039
🏁 Script executed:
sed -n '500,560p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 2591
🏁 Script executed:
rg "_apply_mutation|_apply_create|_apply_update|_apply_transition|_apply_cancel" src/ai_company/engine/task_engine.py -A 1 | head -40Repository: Aureliolo/synthorg
Length of output: 869
🏁 Script executed:
sed -n '560,620p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 1834
🏁 Script executed:
sed -n '620,680p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 2097
🏁 Script executed:
rg "PydanticValidationError|ValidationError|model_validate" src/ai_company/engine/task_engine.py -B 3 -A 3Repository: Aureliolo/synthorg
Length of output: 291
🏁 Script executed:
sed -n '665,730p' src/ai_company/engine/task_engine.pyRepository: Aureliolo/synthorg
Length of output: 2193
Add validation error handling to create and update mutation methods to maintain consistent error typing.
Task() in _apply_create (lines 590–603) and Task.model_validate() in _apply_update (lines 651–654) can fail on caller-controlled input but don't catch validation errors. Exceptions escape to _process_one(), which returns error_code="internal" instead of the error_code="validation" contract used by _apply_transition and _apply_cancel. Wrap both with try-except to catch validation failures and return typed TaskMutationResult with error_code="validation", consistent with other mutation handlers.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/task_engine.py` around lines 590 - 603, _wrap Task(...)
construction in _apply_create and the Task.model_validate(...) call in
_apply_update with try/except blocks that catch the validation exception type
used by your model layer (e.g., pydantic.ValidationError or your custom
ValidationError), and on catch return a TaskMutationResult with
error_code="validation" and include the validation details/error message rather
than letting the exception escape to _process_one; mirror the pattern used by
_apply_transition and _apply_cancel so the mutation handlers consistently return
typed validation errors instead of error_code="internal".
…handling, add tests - Extract VersionTracker into task_engine_version.py (optimistic concurrency) - Extract mutation apply logic into task_engine_apply.py (dispatch + apply_*) - Reduce task_engine.py from 976 to ~620 lines (well under 800 limit) - Make TaskInternalError a sibling of TaskMutationError, not a subtype - Add MemoryError/RecursionError re-raise guards in all except-Exception blocks - Add _try_stop helper in app.py to reduce C901 complexity - Add _extract_requester and _map_task_engine_errors helpers in tasks controller - Add deep-copy at system boundaries for mutable dict fields in frozen models - Add unknown-key validation in UpdateTaskMutation and TransitionTaskMutation - Track in-flight envelope for drain-timeout resolution - Fix _safe_shutdown argument ordering (task_engine first, mirrors cleanup) - Fix existing test bug: mock_te passed as bridge instead of task_engine - Update docs/design/engine.md for immutable-style updates and version tracking - Add 62 new tests: VersionTracker, apply functions, coverage edge cases, controller helpers, _try_stop, in-flight resolution, processing loop resilience
| mutation = UpdateTaskMutation( | ||
| request_id=uuid4().hex, | ||
| requested_by=requested_by, | ||
| task_id=task_id, | ||
| updates=updates, | ||
| expected_version=expected_version, | ||
| ) |
There was a problem hiding this comment.
pydantic.ValidationError escapes the typed error hierarchy
UpdateTaskMutation(...) runs _reject_immutable_fields at construction time. When that validator fires — e.g. a caller passes updates={"id": "new-id"} — Pydantic raises ValidationError before self.submit(mutation) is reached. This exception is:
- Not listed in the
update_taskdocstring'sRaisessection. - Not caught by the
except (TaskEngineNotRunningError, …, TaskMutationError)block inTaskController.update_task— so it would surface as an unhandled 500 at the API layer instead of the expected 422.
The same gap exists in transition_task at the TransitionTaskMutation(...) construction (line 353), where _reject_immutable_overrides can fire.
The intended contract is that all mutation failures surface as TaskMutationError sub-classes. The simplest fix is to wrap mutation construction in the convenience methods:
try:
mutation = UpdateTaskMutation(
request_id=uuid4().hex,
requested_by=requested_by,
task_id=task_id,
updates=updates,
expected_version=expected_version,
)
except ValidationError as exc:
raise TaskMutationError(str(exc)) from excApply the same pattern to transition_task and create_task.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 306-312
Comment:
**`pydantic.ValidationError` escapes the typed error hierarchy**
`UpdateTaskMutation(...)` runs `_reject_immutable_fields` at construction time. When that validator fires — e.g. a caller passes `updates={"id": "new-id"}` — Pydantic raises `ValidationError` *before* `self.submit(mutation)` is reached. This exception is:
1. Not listed in the `update_task` docstring's `Raises` section.
2. Not caught by the `except (TaskEngineNotRunningError, …, TaskMutationError)` block in `TaskController.update_task` — so it would surface as an unhandled 500 at the API layer instead of the expected 422.
The same gap exists in `transition_task` at the `TransitionTaskMutation(...)` construction (line 353), where `_reject_immutable_overrides` can fire.
The intended contract is that all mutation failures surface as `TaskMutationError` sub-classes. The simplest fix is to wrap mutation construction in the convenience methods:
```python
try:
mutation = UpdateTaskMutation(
request_id=uuid4().hex,
requested_by=requested_by,
task_id=task_id,
updates=updates,
expected_version=expected_version,
)
except ValidationError as exc:
raise TaskMutationError(str(exc)) from exc
```
Apply the same pattern to `transition_task` and `create_task`.
How can I resolve this? If you propose a fix, please make it concise.| except PydanticValidationError as exc: | ||
| error_msg = f"Invalid task data: {exc}" | ||
| logger.warning( | ||
| TASK_ENGINE_MUTATION_FAILED, | ||
| mutation_type="create", | ||
| request_id=mutation.request_id, | ||
| error=error_msg, | ||
| ) | ||
| return TaskMutationResult( | ||
| request_id=mutation.request_id, | ||
| success=False, | ||
| error=error_msg, | ||
| error_code="validation", |
There was a problem hiding this comment.
Verbose Pydantic error string forwarded to callers
f"Invalid task data: {exc}" converts a PydanticValidationError to its full string representation, which includes field names, constraint details, and the literal input values submitted by the user. This string flows through TaskMutationResult.error → TaskMutationError(result.error) → ApiValidationError(str(exc)) all the way to the HTTP response body.
While the data being echoed is the caller's own submission (so not a traditional information leak), the verbose Pydantic format can expose implementation internals (model field names, constraint types, library-generated URL hints like input_url=https://errors.pydantic.dev/…). The same pattern appears in apply_update at line 185.
Consider extracting only the human-readable portions:
errors = [f"{'.'.join(str(loc) for loc in e['loc'])}: {e['msg']}" for e in exc.errors()]
error_msg = "Invalid task data: " + "; ".join(errors)or simply error_msg = f"Invalid task data: {len(exc.errors())} validation error(s)" for a leaner message.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine_apply.py
Line: 119-131
Comment:
**Verbose Pydantic error string forwarded to callers**
`f"Invalid task data: {exc}"` converts a `PydanticValidationError` to its full string representation, which includes field names, constraint details, and the literal input values submitted by the user. This string flows through `TaskMutationResult.error` → `TaskMutationError(result.error)` → `ApiValidationError(str(exc))` all the way to the HTTP response body.
While the data being echoed is the caller's own submission (so not a traditional information leak), the verbose Pydantic format can expose implementation internals (model field names, constraint types, library-generated URL hints like `input_url=https://errors.pydantic.dev/…`). The same pattern appears in `apply_update` at line 185.
Consider extracting only the human-readable portions:
```python
errors = [f"{'.'.join(str(loc) for loc in e['loc'])}: {e['msg']}" for e in exc.errors()]
error_msg = "Invalid task data: " + "; ".join(errors)
```
or simply `error_msg = f"Invalid task data: {len(exc.errors())} validation error(s)"` for a leaner message.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (3)
src/ai_company/engine/task_engine.py (1)
167-176:⚠️ Potential issue | 🟠 MajorDrain timeout still misses the active envelope.
stop()waits for the canceled loop before cleanup, but_process_one()clearsself._in_flightinfinally, so_fail_remaining_futures()only covers queued envelopes. The submitter already inside_process_one()can still hang indefinitely — the new integration test has toblocked_task.cancel()manually for exactly this reason.Proposed fix
except TimeoutError: logger.warning( TASK_ENGINE_DRAIN_TIMEOUT, remaining=self._queue.qsize(), ) + timed_out_in_flight = self._in_flight self._processing_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._processing_task - self._fail_remaining_futures() + self._fail_remaining_futures(timed_out_in_flight) self._processing_task = None @@ - def _fail_remaining_futures(self) -> None: + def _fail_remaining_futures( + self, + timed_out_in_flight: _MutationEnvelope | None = None, + ) -> None: """Fail in-flight and remaining enqueued futures after drain timeout.""" shutdown_result_for = self._shutdown_result - in_flight = self._in_flight + in_flight = timed_out_in_flight or self._in_flight if in_flight is not None and not in_flight.future.done(): in_flight.future.set_result(shutdown_result_for(in_flight)) self._in_flight = NoneAlso applies to: 180-186, 557-558
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine.py` around lines 167 - 176, stop() cancels the processing task but _process_one() clears self._in_flight in its finally block, so any submitter already inside _process_one() can be left hanging; before cancelling self._processing_task in stop() (and the similar blocks around lines 180-186 and 557-558) snapshot and clear the current in-flight envelope(s) (e.g., copy self._in_flight or move its future(s) to a local variable), then cancel the task and after awaiting its cancellation call _fail_remaining_futures() with that snapshot (or ensure _fail_remaining_futures() also handles both queued and in-flight items) so the submitter futures are always completed even if _process_one() wipes self._in_flight.src/ai_company/engine/task_engine_version.py (1)
18-21:⚠️ Potential issue | 🟠 MajorThe restart-durability claim is not implemented.
This tracker never sees persisted version state, so
seed()hard-resets unknown tasks to1andget()reports0. After a restart, a task that was already at version5can falsely conflict againstexpected_version=5, or worse, incorrectly accept a staleexpected_version=1write.Also applies to: 27-30, 43-45, 61-64
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine_version.py` around lines 18 - 21, The tracker currently ignores persisted version state causing seed() to unconditionally set unknown tasks to 1 and get() to return 0 after a restart; update the TaskEngineVersionTracker (seed, get, and any initialization logic) to load persisted versions on construction and, when encountering a task ID present in persistent storage, initialize/seed the in-memory tracker to the stored version (not 1), ensure get() returns that loaded version, and adjust seed() to only set 1 for truly new tasks; reference the seed() and get() methods and the tracker initialization code so persisted version lookups are performed before any reset.src/ai_company/engine/task_engine_models.py (1)
51-54:⚠️ Potential issue | 🟠 Major
assigned_tocurrently advertises a create path that cannot succeed.
CreateTaskDataaccepts an assignee, butsrc/ai_company/engine/task_engine_apply.pypasses it straight intoTask(...)andsrc/ai_company/core/task.pyrejects any non-Noneassignee while the initial status is stillCREATED. So a non-emptyassigned_tois guaranteed to fail later during creation; either reject it here or derive the initial status from it before persisting.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine_models.py` around lines 51 - 54, The model currently allows CreateTaskData.assigned_to but Task(...) (in task_engine_apply.py) and the Task class (src/ai_company/core/task.py) disallow non-None assignee when status is CREATED; fix by ensuring the initial status matches presence of an assignee: when constructing Task(...) from CreateTaskData, check CreateTaskData.assigned_to and set the new Task's status to ASSIGNED (or the appropriate enum value) if an assignee is provided, otherwise keep CREATED; update the Task(...) construction site in task_engine_apply.py to derive status from assigned_to so persistence won't be rejected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/api/controllers/tasks.py`:
- Around line 271-284: Combine the two except blocks into a single except that
catches (TaskEngineNotRunningError, TaskEngineQueueFullError, TaskNotFoundError,
TaskInternalError, TaskMutationError) as exc; inside it, if isinstance(exc,
TaskMutationError) call logger.warning(API_TASK_TRANSITION_FAILED,
task_id=task_id, error=str(exc)) to preserve the special log, then raise
_map_task_engine_errors(exc, task_id=task_id) from exc. This preserves logging
behavior for TaskMutationError while removing the duplicated
_map_task_engine_errors call; reference symbols: _map_task_engine_errors,
TaskMutationError, TaskEngineNotRunningError, TaskEngineQueueFullError,
TaskNotFoundError, TaskInternalError, logger.warning,
API_TASK_TRANSITION_FAILED, task_id.
In `@src/ai_company/engine/task_engine.py`:
- Around line 272-278: In create_task(), do not convert engine typed failures
into a plain TaskMutationError; instead, when result.success is false call the
existing _raise_typed_error(result) (or otherwise propagate the typed exception
raised by _raise_typed_error) so errors with error_code like "internal" become
TaskInternalError (or the appropriate typed subclass) rather than being wrapped
as TaskMutationError; keep the subsequent check for result.task is None and
raise TaskMutationError only for the inconsistent-success case.
In `@tests/unit/engine/test_task_engine_apply.py`:
- Around line 140-150: Extract the duplicated async helper into a single
module-level function (e.g., _create_task_helper) that accepts (persistence:
FakePersistence, versions: VersionTracker) and returns TaskMutationResult by
calling the existing CreateTaskMutation + apply_create logic; then replace the
two class-local methods named _create_task in TestApplyUpdate and
TestApplyTransition with calls to this new module-level helper (or convert it to
a module-level pytest fixture if preferred) so both tests reuse the same helper
implementation.
In `@tests/unit/engine/test_task_engine_integration.py`:
- Around line 287-294: The test currently uses time-based sleep to order
operations which is flaky; instead modify the drain-timeout test to signal when
the patched save handler is entered by setting an asyncio.Event (the existing
block Event) from inside slow_save (which wraps original_save) and have the test
await that event before sending/queueing the second envelope; specifically, keep
original_save, replace persistence.tasks.save with slow_save, set block.set() at
the start of slow_save, and in the test wait for block.wait() instead of
asyncio.sleep(...) so the second envelope is only queued after slow_save is
active.
---
Duplicate comments:
In `@src/ai_company/engine/task_engine_models.py`:
- Around line 51-54: The model currently allows CreateTaskData.assigned_to but
Task(...) (in task_engine_apply.py) and the Task class
(src/ai_company/core/task.py) disallow non-None assignee when status is CREATED;
fix by ensuring the initial status matches presence of an assignee: when
constructing Task(...) from CreateTaskData, check CreateTaskData.assigned_to and
set the new Task's status to ASSIGNED (or the appropriate enum value) if an
assignee is provided, otherwise keep CREATED; update the Task(...) construction
site in task_engine_apply.py to derive status from assigned_to so persistence
won't be rejected.
In `@src/ai_company/engine/task_engine_version.py`:
- Around line 18-21: The tracker currently ignores persisted version state
causing seed() to unconditionally set unknown tasks to 1 and get() to return 0
after a restart; update the TaskEngineVersionTracker (seed, get, and any
initialization logic) to load persisted versions on construction and, when
encountering a task ID present in persistent storage, initialize/seed the
in-memory tracker to the stored version (not 1), ensure get() returns that
loaded version, and adjust seed() to only set 1 for truly new tasks; reference
the seed() and get() methods and the tracker initialization code so persisted
version lookups are performed before any reset.
In `@src/ai_company/engine/task_engine.py`:
- Around line 167-176: stop() cancels the processing task but _process_one()
clears self._in_flight in its finally block, so any submitter already inside
_process_one() can be left hanging; before cancelling self._processing_task in
stop() (and the similar blocks around lines 180-186 and 557-558) snapshot and
clear the current in-flight envelope(s) (e.g., copy self._in_flight or move its
future(s) to a local variable), then cancel the task and after awaiting its
cancellation call _fail_remaining_futures() with that snapshot (or ensure
_fail_remaining_futures() also handles both queued and in-flight items) so the
submitter futures are always completed even if _process_one() wipes
self._in_flight.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d81e6a01-5dd3-465e-a02e-1bbf6c0b2a91
📒 Files selected for processing (15)
docs/design/engine.mdsrc/ai_company/api/app.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/task_engine_apply.pysrc/ai_company/engine/task_engine_models.pysrc/ai_company/engine/task_engine_version.pytests/unit/api/controllers/test_task_helpers.pytests/unit/api/test_app.pytests/unit/engine/test_agent_engine.pytests/unit/engine/test_task_engine_apply.pytests/unit/engine/test_task_engine_coverage.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_task_engine_version.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14
Add type hints to all public functions — enforce mypy strict mode
Use Google-style docstrings — required on public classes and functions, enforced by ruff D rules
For frozen Pydantic models with dict/list fields, usecopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use Pydantic v2BaseModel,model_validator,computed_field, andConfigDict— avoid redundant stored fields with@computed_field
UseNotBlankStr(fromcore.types) for all identifier/name fields, including optional (NotBlankStr | None) and tuple variants, instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code instead of barecreate_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly — never silently swallow exceptions
Enforce line length of 88 characters via ruff
Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (usingmodel_copy(update=...)) for runtime state that evolves
Never mix static config fields with mutable runtime fields in one model
Files:
tests/unit/engine/test_task_engine_coverage.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/api/app.pytests/unit/api/controllers/test_task_helpers.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_task_engine_version.pytests/unit/engine/test_task_engine_apply.pytests/unit/api/test_app.pytests/unit/engine/test_agent_engine.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/task_engine_version.pysrc/ai_company/engine/task_engine_apply.pysrc/ai_company/engine/task_engine_models.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Use@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slowmarkers for test organization
Prefer@pytest.mark.parametrizefor testing similar cases
Never use real vendor names in test files — usetest-provider,test-small-001, and generic model names
Files:
tests/unit/engine/test_task_engine_coverage.pytests/unit/api/controllers/test_task_helpers.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_task_engine_version.pytests/unit/engine/test_task_engine_apply.pytests/unit/api/test_app.pytests/unit/engine/test_agent_engine.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging,logging.getLogger(), orprint()in application code — use the observability logger instead
Always useloggeras the variable name (not_loggerorlog)
Use event name constants from domain-specific modules underai_company.observability.events(e.g.,PROVIDER_CALL_STARTfromevents.provider,BUDGET_RECORD_ADDEDfromevents.budget) — import directly
Use structured logging with kwargs:logger.info(EVENT, key=value)— never use string formatting likelogger.info("msg %s", val)
Log all error paths at WARNING or ERROR with context before raising
Log all state transitions at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions
Files:
src/ai_company/api/controllers/tasks.pysrc/ai_company/api/app.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/task_engine_version.pysrc/ai_company/engine/task_engine_apply.pysrc/ai_company/engine/task_engine_models.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names:
example-provider,example-large-001,example-medium-001,example-small-001,large/medium/smallas aliases. Tests must usetest-provider,test-small-001, etc.
Files:
src/ai_company/api/controllers/tasks.pysrc/ai_company/api/app.pysrc/ai_company/engine/errors.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/task_engine_version.pysrc/ai_company/engine/task_engine_apply.pysrc/ai_company/engine/task_engine_models.py
🧠 Learnings (6)
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves
Applied to files:
docs/design/engine.mdsrc/ai_company/engine/task_engine_models.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Handle errors explicitly — never silently swallow exceptions
Applied to files:
src/ai_company/api/app.pysrc/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly
Applied to files:
src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`
Applied to files:
src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Keep functions under 50 lines and files under 800 lines
Applied to files:
src/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : For frozen Pydantic models with dict/list fields, use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Applied to files:
src/ai_company/engine/task_engine_models.py
🧬 Code graph analysis (11)
tests/unit/engine/test_task_engine_coverage.py (3)
src/ai_company/engine/task_engine.py (4)
TaskEngine(82-620)_MutationEnvelope(69-79)_raise_typed_error(439-450)_shutdown_result(194-201)src/ai_company/engine/task_engine_models.py (2)
CreateTaskMutation(69-84)TaskMutationResult(256-297)tests/unit/engine/task_engine_helpers.py (2)
FakePersistence(47-55)_make_create_data(90-102)
src/ai_company/api/controllers/tasks.py (5)
src/ai_company/api/state.py (2)
AppState(22-154)task_engine(107-109)src/ai_company/core/task.py (1)
Task(45-261)src/ai_company/engine/errors.py (5)
TaskEngineNotRunningError(89-90)TaskEngineQueueFullError(93-94)TaskInternalError(109-120)TaskMutationError(97-98)TaskNotFoundError(101-102)src/ai_company/engine/task_engine_models.py (1)
CreateTaskData(24-63)src/ai_company/engine/task_engine.py (6)
list_tasks(465-486)get_task(454-463)create_task(247-278)update_task(280-319)transition_task(321-368)delete_task(370-399)
src/ai_company/api/app.py (3)
src/ai_company/api/state.py (4)
task_engine(107-109)persistence(87-89)message_bus(92-94)AppState(22-154)src/ai_company/engine/task_engine.py (3)
TaskEngine(82-620)stop(141-178)start(121-139)src/ai_company/memory/errors.py (1)
MemoryError(13-14)
tests/unit/api/controllers/test_task_helpers.py (3)
src/ai_company/api/controllers/tasks.py (2)
_extract_requester(46-55)_map_task_engine_errors(58-81)src/ai_company/api/errors.py (3)
ApiValidationError(32-38)NotFoundError(23-29)ServiceUnavailableError(68-74)src/ai_company/engine/errors.py (5)
TaskEngineNotRunningError(89-90)TaskEngineQueueFullError(93-94)TaskInternalError(109-120)TaskMutationError(97-98)TaskNotFoundError(101-102)
tests/unit/engine/test_task_engine_version.py (2)
src/ai_company/engine/errors.py (1)
TaskVersionConflictError(105-106)src/ai_company/engine/task_engine_version.py (7)
VersionTracker(15-76)seed(27-30)get(43-45)set_initial(32-34)bump(36-41)remove(47-49)check(51-76)
tests/unit/api/test_app.py (2)
src/ai_company/api/app.py (3)
_safe_startup(236-308)_safe_shutdown(311-346)_try_stop(137-153)src/ai_company/api/state.py (2)
persistence(87-89)AppState(22-154)
tests/unit/engine/test_agent_engine.py (2)
src/ai_company/engine/errors.py (1)
TaskMutationError(97-98)src/ai_company/engine/task_engine.py (1)
transition_task(321-368)
src/ai_company/engine/task_engine.py (5)
src/ai_company/engine/task_engine_apply.py (1)
dispatch(68-91)src/ai_company/engine/task_engine_config.py (1)
TaskEngineConfig(6-37)src/ai_company/engine/task_engine_version.py (2)
VersionTracker(15-76)get(43-45)src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/persistence/protocol.py (1)
PersistenceBackend(27-167)
src/ai_company/engine/task_engine_version.py (3)
src/ai_company/engine/errors.py (1)
TaskVersionConflictError(105-106)src/ai_company/observability/_logger.py (1)
get_logger(8-28)src/ai_company/api/state.py (1)
task_engine(107-109)
src/ai_company/engine/task_engine_apply.py (4)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/engine/errors.py (1)
TaskVersionConflictError(105-106)src/ai_company/engine/task_engine_models.py (6)
CancelTaskMutation(223-240)CreateTaskMutation(69-84)DeleteTaskMutation(205-220)TaskMutationResult(256-297)TransitionTaskMutation(155-202)UpdateTaskMutation(102-142)src/ai_company/engine/task_engine_version.py (6)
VersionTracker(15-76)set_initial(32-34)get(43-45)check(51-76)bump(36-41)remove(47-49)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
Complexity(247-253)Priority(238-244)TaskStatus(198-224)TaskType(227-235)src/ai_company/core/task.py (1)
Task(45-261)
🔇 Additional comments (13)
tests/unit/engine/test_agent_engine.py (2)
1009-1039: LGTM! The assertions now validate the fulltransition_task()payload.The test now asserts the task ID (line 1035), target status (line 1036), and
requested_by(lines 1037-1039), addressing the previous review feedback about verifying the complete integration boundary.
936-1121: Comprehensive test coverage for TaskEngine integration.The
TestReportToTaskEngineclass thoroughly exercises the_report_to_task_enginebehavior with well-isolated scenarios covering the happy path, error swallowing semantics, and critical error propagation (MemoryError). The test structure follows good practices with clear docstrings and focused assertions.tests/unit/engine/test_task_engine_version.py (1)
1-94: LGTM! Comprehensive VersionTracker test coverage.The tests thoroughly exercise all VersionTracker methods with clear boundary cases:
- Idempotent seeding behavior
- Auto-seeding on bump for unknown tasks
- Version conflict detection with proper error types
- Seed-on-first-check semantics
The test names are descriptive and each test validates a single behavior.
tests/unit/api/controllers/test_task_helpers.py (1)
1-93: LGTM! Thorough helper function test coverage.The tests validate both
_extract_requesterand_map_task_engine_errorswith all relevant edge cases:
- User presence/absence and missing
user_idattribute- Complete error type mapping coverage including pass-through for unknown errors
The inline
FakeUser/FakeStateclasses are appropriate for testing these simple helpers without heavier mocking.tests/unit/engine/test_task_engine_coverage.py (2)
67-68: Verify the exception handling syntax.The
contextlib.suppress(Exception, asyncio.CancelledError)is correct, butasyncio.CancelledErroris a subclass ofBaseException(notException) since Python 3.8, so it needs to be listed separately for proper suppression. The current code handles this correctly.
1-234: LGTM! Excellent edge case coverage for TaskEngine internals.The tests effectively validate critical edge cases:
- In-flight envelope resolution during drain timeout
- Exception handling in
_process_oneproducing internal errors- Snapshot publish failures being isolated from mutation results
- Typed error mapping from result codes
- Processing loop resilience after failures
The use of
try/finallyblocks ensures proper cleanup, and the white-box testing of_in_flightis appropriate for verifying internal state machine behavior.docs/design/engine.md (2)
190-196: LGTM! The versioning documentation now correctly describes the source of truth.The documentation properly states that "the persisted task version is the source of truth" and that "any in-memory cache is an optimization that is seeded from persistence on task load and may be invalid after a restart." This addresses the previous review feedback about not describing optimistic concurrency as in-memory-only.
169-228: Well-structured TaskEngine documentation.The new section clearly explains:
- Single-writer architecture with queue-based mutation processing
- Immutable-style update semantics via
model_copy- Optimistic concurrency with persisted versions as source of truth
- Read-through bypass for safe reads
- Comprehensive error handling with typed errors
The ASCII diagram effectively illustrates the data flow.
src/ai_company/engine/errors.py (2)
109-120: LGTM!TaskInternalErroris correctly implemented as a sibling ofTaskMutationError.The inheritance (
TaskInternalError(TaskEngineError)) ensures that broadexcept TaskMutationErrorhandlers won't accidentally catch internal engine faults. The docstring clearly documents this deliberate design decision. This addresses the previous review feedback.
83-120: Well-designed error hierarchy.The TaskEngine error types follow a clear hierarchy:
TaskEngineErroras the base for all engine errorsTaskMutationErrorand subtypes (TaskNotFoundError,TaskVersionConflictError) for business-rule failuresTaskInternalErroras a sibling for internal faults (maps to 5xx)- Operational errors (
TaskEngineNotRunningError,TaskEngineQueueFullError) at the base levelThis enables precise error handling at the API layer.
src/ai_company/api/controllers/tasks.py (2)
46-81: LGTM! Clean helper extraction for requester identity and error mapping.The
_extract_requesterfunction properly extracts the authenticated user identity with a safe fallback, addressing the previous review feedback about audit trail preservation. The_map_task_engine_errorsfunction centralizes the exception mapping pattern that was previously duplicated across operations.
1-326: LGTM! Clean migration to TaskEngine-based operations.The controller successfully migrates from direct persistence access to TaskEngine-based operations with:
- Proper error mapping to API-level errors
- Requester identity extraction for audit trails
- Structured logging with appropriate event constants
- Consistent exception handling across all CRUD operations
tests/unit/engine/test_task_engine_apply.py (1)
1-527: LGTM! Excellent test coverage for mutation apply functions.The test file comprehensively covers all mutation types with:
- Happy path tests for each operation
- Not-found error handling
- Version conflict detection
- Validation error handling
- State tracking (previous_status, version increments)
- Edge cases (empty updates, invalid transitions)
The tests clearly validate the contract between mutation inputs and expected results.
| except ( | ||
| TaskEngineNotRunningError, | ||
| TaskEngineQueueFullError, | ||
| TaskNotFoundError, | ||
| TaskInternalError, | ||
| ) as exc: | ||
| raise _map_task_engine_errors(exc, task_id=task_id) from exc | ||
| except TaskMutationError as exc: | ||
| logger.warning( | ||
| TASK_STATUS_CHANGED, | ||
| API_TASK_TRANSITION_FAILED, | ||
| task_id=task_id, | ||
| error=str(exc), | ||
| ) | ||
| raise ApiValidationError(str(exc)) from exc | ||
| await app_state.persistence.tasks.save(new_task) | ||
| raise _map_task_engine_errors(exc, task_id=task_id) from exc |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider consolidating the transition_task exception handling.
The transition operation handles TaskMutationError separately (lines 278-284) from other errors (lines 271-277) to emit API_TASK_TRANSITION_FAILED. This is intentional for logging purposes, but it duplicates the _map_task_engine_errors call.
A minor improvement could be to emit the warning first and then use a single except block, though the current approach is functionally correct and the intent is clear.
♻️ Optional: Simplify with single except block
try:
task, from_status = await app_state.task_engine.transition_task(
task_id,
data.target_status,
**transition_kwargs, # type: ignore[arg-type]
)
- except (
- TaskEngineNotRunningError,
- TaskEngineQueueFullError,
- TaskNotFoundError,
- TaskInternalError,
- ) as exc:
- raise _map_task_engine_errors(exc, task_id=task_id) from exc
- except TaskMutationError as exc:
+ except (
+ TaskEngineNotRunningError,
+ TaskEngineQueueFullError,
+ TaskNotFoundError,
+ TaskInternalError,
+ TaskMutationError,
+ ) as exc:
+ if isinstance(exc, TaskMutationError):
+ logger.warning(
+ API_TASK_TRANSITION_FAILED,
+ task_id=task_id,
+ error=str(exc),
+ )
- logger.warning(
- API_TASK_TRANSITION_FAILED,
- task_id=task_id,
- error=str(exc),
- )
raise _map_task_engine_errors(exc, task_id=task_id) from exc🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/api/controllers/tasks.py` around lines 271 - 284, Combine the
two except blocks into a single except that catches (TaskEngineNotRunningError,
TaskEngineQueueFullError, TaskNotFoundError, TaskInternalError,
TaskMutationError) as exc; inside it, if isinstance(exc, TaskMutationError) call
logger.warning(API_TASK_TRANSITION_FAILED, task_id=task_id, error=str(exc)) to
preserve the special log, then raise _map_task_engine_errors(exc,
task_id=task_id) from exc. This preserves logging behavior for TaskMutationError
while removing the duplicated _map_task_engine_errors call; reference symbols:
_map_task_engine_errors, TaskMutationError, TaskEngineNotRunningError,
TaskEngineQueueFullError, TaskNotFoundError, TaskInternalError, logger.warning,
API_TASK_TRANSITION_FAILED, task_id.
| async def _create_task( | ||
| self, | ||
| persistence: FakePersistence, | ||
| versions: VersionTracker, | ||
| ) -> TaskMutationResult: | ||
| mutation = CreateTaskMutation( | ||
| request_id="req-c", | ||
| requested_by="alice", | ||
| task_data=_make_create_data(), | ||
| ) | ||
| return await apply_create(mutation, persistence, versions) # type: ignore[arg-type] |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Consider extracting the shared _create_task helper.
The _create_task helper method is duplicated in TestApplyUpdate (lines 140-150) and TestApplyTransition (lines 264-274). Consider moving this to a module-level fixture or helper function in the test file to reduce duplication.
♻️ Optional: Extract shared helper to module level
async def _create_task_helper(
persistence: FakePersistence,
versions: VersionTracker,
) -> TaskMutationResult:
"""Helper to create a task for tests."""
mutation = CreateTaskMutation(
request_id="req-c",
requested_by="alice",
task_data=_make_create_data(),
)
return await apply_create(mutation, persistence, versions) # type: ignore[arg-type]Then use _create_task_helper(persistence, versions) in tests instead of duplicated methods.
Also applies to: 264-274
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_task_engine_apply.py` around lines 140 - 150, Extract
the duplicated async helper into a single module-level function (e.g.,
_create_task_helper) that accepts (persistence: FakePersistence, versions:
VersionTracker) and returns TaskMutationResult by calling the existing
CreateTaskMutation + apply_create logic; then replace the two class-local
methods named _create_task in TestApplyUpdate and TestApplyTransition with calls
to this new module-level helper (or convert it to a module-level pytest fixture
if preferred) so both tests reuse the same helper implementation.
| block = asyncio.Event() | ||
| original_save = persistence.tasks.save | ||
|
|
||
| async def slow_save(task: object) -> None: | ||
| await block.wait() | ||
| await original_save(task) # type: ignore[arg-type] | ||
|
|
||
| persistence.tasks.save = slow_save # type: ignore[method-assign] |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Avoid the timing-based wait in the drain-timeout test.
await asyncio.sleep(0.05) makes this setup scheduler-dependent; on a slow runner, the second envelope can still be queued before the first mutation reaches slow_save(). Signal entry into slow_save() with an asyncio.Event instead.
Proposed refactor
# Block the processing loop with a slow save
block = asyncio.Event()
+ entered_save = asyncio.Event()
original_save = persistence.tasks.save
async def slow_save(task: object) -> None:
+ entered_save.set()
await block.wait()
await original_save(task) # type: ignore[arg-type]
@@
envelope = _MutationEnvelope(mutation=mutation2)
- # Give the engine a tick to start processing the first task
- await asyncio.sleep(0.05)
+ await entered_save.wait()
eng._queue.put_nowait(envelope)Also applies to: 311-313
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_task_engine_integration.py` around lines 287 - 294,
The test currently uses time-based sleep to order operations which is flaky;
instead modify the drain-timeout test to signal when the patched save handler is
entered by setting an asyncio.Event (the existing block Event) from inside
slow_save (which wraps original_save) and have the test await that event before
sending/queueing the second envelope; specifically, keep original_save, replace
persistence.tasks.save with slow_save, set block.set() at the start of
slow_save, and in the test wait for block.wait() instead of asyncio.sleep(...)
so the second envelope is only queued after slow_save is active.
…overage Review fixes: - create_task now uses _raise_typed_error for proper error dispatch - Add PydanticValidationError catch in all controller write methods - Use _extract_requester for create_task audit trail consistency - Add logging for TaskInternalError and TaskEngineQueueFullError in controller - Add TaskEngineError catch in agent_engine _report_to_task_engine - Extract hardcoded values to class constants (_POLL_INTERVAL_SECONDS, etc.) - Add failed_count tracking and logging in _fail_remaining_futures - Add reason field to TaskStateChanged event for audit trail - Add bounds check to VersionTracker.set_initial - Fix _try_stop parameter type from object to Awaitable[None] - Fix State Coordination docs (model_copy -> model_validate/with_transition) - Improve docstrings for _processing_loop, _publish_snapshot, apply_cancel, etc. New tests (33 total): - FIFO ordering guarantee - Default reason generation in transition_task - Delete snapshot new_status=None verification - Cancel version bump correctness - _raise_typed_error dispatch for all error codes - Snapshot reason propagation (transition, cancel, create, update) - MemoryError re-raise in processing loop - _fail_remaining_futures count tracking - Deep-copy isolation for UpdateTaskMutation and TransitionTaskMutation - Unknown field rejection in updates and overrides - VersionTracker.set_initial bounds check (zero and negative) - TaskStateChanged reason field (populated, default none, cancel) - create_task typed error dispatch (internal, validation) - Transition overrides and previous_status via engine
| if isinstance(exc, TaskMutationError): | ||
| return ApiValidationError(str(exc)) |
There was a problem hiding this comment.
TaskVersionConflictError silently mapped to HTTP 422 instead of 409
TaskVersionConflictError is a subclass of TaskMutationError, so it falls through every earlier check and lands on isinstance(exc, TaskMutationError) → ApiValidationError (HTTP 422 Unprocessable Entity). This is semantically wrong: a version conflict is not a validation error from the caller's perspective — it signals that another writer modified the task between the caller's read and write.
api/errors.py already has a ConflictError class that maps to HTTP 409 Conflict, which is the correct status for optimistic-concurrency failures. Without an explicit guard here, callers cannot distinguish "invalid input" (422) from "concurrent modification" (409), making correct retry logic impossible.
| if isinstance(exc, TaskMutationError): | |
| return ApiValidationError(str(exc)) | |
| if isinstance(exc, TaskVersionConflictError): | |
| return ConflictError(str(exc)) | |
| if isinstance(exc, TaskMutationError): | |
| return ApiValidationError(str(exc)) |
This also requires importing TaskVersionConflictError and ConflictError at the top of the file.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/api/controllers/tasks.py
Line: 100-101
Comment:
**`TaskVersionConflictError` silently mapped to HTTP 422 instead of 409**
`TaskVersionConflictError` is a subclass of `TaskMutationError`, so it falls through every earlier check and lands on `isinstance(exc, TaskMutationError)` → `ApiValidationError` (HTTP 422 Unprocessable Entity). This is semantically wrong: a version conflict is not a validation error from the caller's perspective — it signals that another writer modified the task between the caller's read and write.
`api/errors.py` already has a `ConflictError` class that maps to HTTP 409 Conflict, which is the correct status for optimistic-concurrency failures. Without an explicit guard here, callers cannot distinguish "invalid input" (422) from "concurrent modification" (409), making correct retry logic impossible.
```suggestion
if isinstance(exc, TaskVersionConflictError):
return ConflictError(str(exc))
if isinstance(exc, TaskMutationError):
return ApiValidationError(str(exc))
```
This also requires importing `TaskVersionConflictError` and `ConflictError` at the top of the file.
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Pull request overview
Copilot reviewed 35 out of 35 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| id=task_id, | ||
| ) | ||
| return NotFoundError(str(exc)) | ||
| if isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError): |
There was a problem hiding this comment.
isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError) will raise TypeError at runtime because isinstance() does not accept PEP 604 union types. Use a tuple of types instead (e.g., (TaskEngineNotRunningError, TaskEngineQueueFullError)).
| if isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError): | |
| if isinstance(exc, (TaskEngineNotRunningError, TaskEngineQueueFullError)): |
| Seeds the version from persistence if not yet tracked so that | ||
| optimistic concurrency survives engine restarts. |
There was a problem hiding this comment.
The docstring says the tracker is “seeded from persistence” / that optimistic concurrency “survives engine restarts”, but the implementation only seeds missing task IDs to version=1 (no persistence lookup). Please update the docstring to reflect the actual behavior (volatile in-memory counter) or implement real seeding from persisted version metadata.
| Seeds the version from persistence if not yet tracked so that | |
| optimistic concurrency survives engine restarts. | |
| Seeds an in-memory version for untracked tasks so that | |
| optimistic-concurrency checks work for the lifetime of the | |
| current process only (no persistence across restarts). |
| - **Optimistic concurrency**: Per-task version counters. The persisted | ||
| task version is the source of truth; any in-memory cache is an | ||
| optimization that is seeded from persistence on task load and may be | ||
| invalid after a restart. Callers can pass `expected_version` to detect | ||
| stale writes; on mismatch the engine returns a failed | ||
| `TaskMutationResult` with `error_code="version_conflict"`. Convenience | ||
| methods raise `TaskVersionConflictError`. |
There was a problem hiding this comment.
This “Optimistic concurrency” section states “The persisted task version is the source of truth” and that the in-memory cache is “seeded from persistence”, but the current VersionTracker implementation is purely in-memory and seeds unknown tasks to version=1. Please align the documentation with the implemented semantics (or add persisted versioning if that’s the intent).
| - **Optimistic concurrency**: Per-task version counters. The persisted | |
| task version is the source of truth; any in-memory cache is an | |
| optimization that is seeded from persistence on task load and may be | |
| invalid after a restart. Callers can pass `expected_version` to detect | |
| stale writes; on mismatch the engine returns a failed | |
| `TaskMutationResult` with `error_code="version_conflict"`. Convenience | |
| methods raise `TaskVersionConflictError`. | |
| - **Optimistic concurrency**: Per-task version counters maintained in-memory | |
| (for example via a `VersionTracker`). Versions are not currently | |
| persisted; unknown tasks are initialized with `version=1`, and version | |
| tracking state is lost on process restart. Callers can pass | |
| `expected_version` to detect stale writes; on mismatch the engine returns | |
| a failed `TaskMutationResult` with `error_code="version_conflict"`. | |
| Convenience methods raise `TaskVersionConflictError`. |
| restart. After a restart, the first optimistic-concurrency check | ||
| for any task will succeed regardless of the true version history | ||
| because the tracker seeds the version at 1. Durable version | ||
| tracking (persisted alongside the task) is a future enhancement. |
There was a problem hiding this comment.
The VersionTracker limitation text claims the first optimistic-concurrency check after restart “will succeed regardless of the true version history”. With the current logic (seed missing to version=1, then strict equality check), callers providing any expected_version != 1 will get a conflict. Please reword this limitation to match the actual behavior/risks of seeding-to-1.
| restart. After a restart, the first optimistic-concurrency check | |
| for any task will succeed regardless of the true version history | |
| because the tracker seeds the version at 1. Durable version | |
| tracking (persisted alongside the task) is a future enhancement. | |
| restart. After a restart, previously unseen tasks are treated as | |
| having version 1. The first optimistic-concurrency check will | |
| therefore only succeed when callers expect version 1; callers that | |
| expect a higher version may see a conflict even if persistence holds | |
| a different (newer) version. Durable version tracking (persisted | |
| alongside the task) is a future enhancement. |
| task = await persistence.tasks.get(mutation.task_id) | ||
| if task is None: | ||
| return not_found_result("update", mutation.request_id, mutation.task_id) | ||
|
|
||
| try: | ||
| versions.check(mutation.task_id, mutation.expected_version) | ||
| except TaskVersionConflictError as exc: | ||
| return TaskMutationResult( | ||
| request_id=mutation.request_id, | ||
| success=False, | ||
| error=str(exc), | ||
| error_code="version_conflict", | ||
| ) | ||
|
|
||
| if not mutation.updates: | ||
| version = versions.get(mutation.task_id) | ||
| return TaskMutationResult( | ||
| request_id=mutation.request_id, | ||
| success=True, | ||
| task=task, | ||
| version=version, | ||
| previous_status=task.status, | ||
| ) |
There was a problem hiding this comment.
In apply_update, the empty-updates no-op path returns versions.get(task_id), but VersionTracker.get() returns 0 for untracked tasks. After an engine restart (or when operating on pre-existing persisted tasks), this would report version=0 for a real task. Consider seeding the task ID before reading the version (or otherwise ensuring an existing persisted task never yields version 0).
Summary
Typed error hierarchy: Add TaskNotFoundError, TaskEngineQueueFullError, TaskVersionConflictError for precise error classification — API controllers catch these directly instead of parsing error strings
Error sanitization: Internal exception details (SQL paths, stack traces) no longer leak to API responses
Immutable field protection: Model validators on UpdateTaskMutation and TransitionTaskMutation reject writes to id, status, created_by, created_at, etc.
Processing loop hardening: Guard _process_one against unhandled exceptions, thread previous_status through mutation results and snapshots, add _fail_remaining_futures for drain timeout cleanup
Logging coverage: Add event constants and structured logging for engine creation, version conflicts, and loop errors
AgentEngine integration: Split broad except Exception in _report_to_task_engine into TaskMutationError (warning) vs Exception (error with divergence note); extract _TERMINAL_STATUSES constant
Code quality: Extract _not_found_result helper, use Self in model validators, add _check_consistency validator on TaskMutationResult, exhaustive match default branch
Docs: Update tech-stack "State coordination" to Adopted, add TaskEngine to CLAUDE.md engine description and logging events, add TaskEngine architecture subsection to engine design page
Test plan
TestAppStateTaskEngine — 6 tests for task_engine property, has_task_engine, set_task_engine
TestReportToTaskEngine — 5 tests: no-op without engine, skip non-terminal, report terminal, swallow TaskMutationError, swallow unexpected errors
TestAppLifecycle — 2 new tests: task engine startup failure cleans up persistence+bus, shutdown task engine failure doesn't propagate
TestVersionConflictOnTransition — version mismatch returns failure
TestCancelNotFound — cancel on non-existent task returns failure
TestPreviousStatus — previous_status populated correctly (create=None, transition=CREATED, cancel=ASSIGNED)
TestImmutableFieldRejection — update/transition reject immutable fields
TestTypedErrors — convenience methods raise TaskNotFoundError
All 6925 tests pass, 94.39% coverage
ruff check + format clean
mypy strict clean
Closes #204