feat: centralized single-writer TaskEngine with full CRUD API#328
feat: centralized single-writer TaskEngine with full CRUD API#328
Conversation
Implement actor-like TaskEngine that owns all task state mutations via asyncio.Queue. A single background task processes mutations sequentially using model_copy(update=...), persists results, and publishes snapshots to the message bus. Reads bypass the queue for direct persistence access. - Add TaskEngine core with start/stop lifecycle, submit/convenience methods - Add 5 mutation types (create, update, transition, delete, cancel) - Add TaskMutationResult response and TaskStateChanged event models - Add TaskEngineConfig (queue size, drain timeout, snapshot toggle) - Add 4 error types (TaskEngineError, NotRunning, Mutation, VersionConflict) - Wire into API controllers, AppState, app lifecycle, and config - Add optional AgentEngine report-back for terminal task status - Add 57 unit tests covering all mutations, ordering, versioning, drain Closes #204
Pre-reviewed by 10 agents, 37 findings addressed: - Add exhaustive match default + typed error hierarchy (TaskNotFoundError, TaskEngineQueueFullError, TaskVersionConflictError) - Sanitize internal exception details from API responses - Add immutable field rejection validators on UpdateTaskMutation and TransitionTaskMutation - Thread previous_status through TaskMutationResult and snapshots - Add consistency model_validator to TaskMutationResult - Guard _processing_loop against unhandled exceptions - Fix startup cleanup to handle task engine failures - Replace assert with proper error handling in convenience methods - Add _fail_remaining_futures for drain timeout cleanup - Add comprehensive logging coverage (creation, conflicts, loop errors) - Add _not_found_result helper to reduce duplication - Extract _TERMINAL_STATUSES module constant - Use Self return type in model validators - Split broad except in _report_to_task_engine (TaskMutationError vs Exception) - Update docs: tech-stack Adopted, CLAUDE.md engine description, engine.md TaskEngine architecture subsection - Add tests: AppState.task_engine, _report_to_task_engine, app lifecycle, version conflicts, cancel not-found, previous_status, immutable fields, typed errors
- Add error_code discriminator to TaskMutationResult (not_found/version_conflict/validation/internal) - Fix _raise_typed_error to use error_code match instead of fragile string matching - Fix _processing_loop outer catch to resolve envelope future (prevents caller deadlock) - Guard happy-path set_result in _process_one with done() check - Fix _apply_update to use Task.model_validate() instead of model_copy() (runs validators) - Clean _IMMUTABLE_TASK_FIELDS: remove 4 non-existent timestamp fields (only id/status/created_by) - Rename _TERMINAL_STATUSES → _REPORTABLE_STATUSES (FAILED/INTERRUPTED are not strictly terminal) - Fix assigned_to=None override bug in transition_task controller - Add from_status to task transition audit log - Fix failure log to use API_TASK_TRANSITION_FAILED event instead of TASK_STATUS_CHANGED - Map TaskEngineNotRunningError/TaskEngineQueueFullError → ServiceUnavailableError (503) - Fix create_task missing error handling - Fix _on_expire broad exception handler to re-raise MemoryError/RecursionError - Add MemoryError/RecursionError re-raise to _publish_snapshot - Add API_TASK_TRANSITION_FAILED event constant - Fix AppState docstring and set_task_engine docstring - Add version_conflict test to TestTypedErrors - Add TestDrainTimeout: verify abandoned futures resolved on _fail_remaining_futures - Add TestMutationResultConsistency: validate success/error invariants enforced by Pydantic - Add test_memory_error_propagates to TestReportToTaskEngine - Fix engine.md: text lang specifier, version conflict description, _IMMUTABLE_TASK_FIELDS, asyncio.wait
…tness - Add TaskInternalError for internal engine faults (maps to 5xx vs 4xx) - Thread error_code through all TaskMutationResult failure paths - Extend _raise_typed_error to cover 'internal' code with -> Never return - Add TaskInternalError handling in all TaskController endpoints - Fix bridge leak in _cleanup_on_failure (started_bridge flag was missing) - Remove phantom 'created_at' from _IMMUTABLE_OVERRIDE_FIELDS (field absent on Task) - Change transition_task to return tuple[Task, TaskStatus | None] (eliminates extra get_task round-trip in controller) - Split 1140-line test_task_engine.py into three focused files + helpers - Move TaskEngine fixtures from helpers to conftest.py (auto-discovery, no F401 hacks) - Fix all ruff/mypy issues in new test files (TC001, I001, F811, E501, unused-ignore)
…handling, add tests - Extract VersionTracker into task_engine_version.py (optimistic concurrency) - Extract mutation apply logic into task_engine_apply.py (dispatch + apply_*) - Reduce task_engine.py from 976 to ~620 lines (well under 800 limit) - Make TaskInternalError a sibling of TaskMutationError, not a subtype - Add MemoryError/RecursionError re-raise guards in all except-Exception blocks - Add _try_stop helper in app.py to reduce C901 complexity - Add _extract_requester and _map_task_engine_errors helpers in tasks controller - Add deep-copy at system boundaries for mutable dict fields in frozen models - Add unknown-key validation in UpdateTaskMutation and TransitionTaskMutation - Track in-flight envelope for drain-timeout resolution - Fix _safe_shutdown argument ordering (task_engine first, mirrors cleanup) - Fix existing test bug: mock_te passed as bridge instead of task_engine - Update docs/design/engine.md for immutable-style updates and version tracking - Add 62 new tests: VersionTracker, apply functions, coverage edge cases, controller helpers, _try_stop, in-flight resolution, processing loop resilience
…overage Review fixes: - create_task now uses _raise_typed_error for proper error dispatch - Add PydanticValidationError catch in all controller write methods - Use _extract_requester for create_task audit trail consistency - Add logging for TaskInternalError and TaskEngineQueueFullError in controller - Add TaskEngineError catch in agent_engine _report_to_task_engine - Extract hardcoded values to class constants (_POLL_INTERVAL_SECONDS, etc.) - Add failed_count tracking and logging in _fail_remaining_futures - Add reason field to TaskStateChanged event for audit trail - Add bounds check to VersionTracker.set_initial - Fix _try_stop parameter type from object to Awaitable[None] - Fix State Coordination docs (model_copy -> model_validate/with_transition) - Improve docstrings for _processing_loop, _publish_snapshot, apply_cancel, etc. New tests (33 total): - FIFO ordering guarantee - Default reason generation in transition_task - Delete snapshot new_status=None verification - Cancel version bump correctness - _raise_typed_error dispatch for all error codes - Snapshot reason propagation (transition, cancel, create, update) - MemoryError re-raise in processing loop - _fail_remaining_futures count tracking - Deep-copy isolation for UpdateTaskMutation and TransitionTaskMutation - Unknown field rejection in updates and overrides - VersionTracker.set_initial bounds check (zero and negative) - TaskStateChanged reason field (populated, default none, cancel) - create_task typed error dispatch (internal, validation) - Transition overrides and previous_status via engine
- Add TaskVersionConflictError → ConflictError(409) mapping in _map_task_engine_errors (was silently falling through to 422) - Wrap PydanticValidationError in update_task/transition_task convenience methods so it raises TaskMutationError instead of escaping - Sanitize Pydantic error messages via _format_validation_error helper - Fix VersionTracker docstrings: clarify version is seeded at 1 heuristically, not loaded from persistence - Update engine design doc optimistic concurrency section to match reality - Replace timing-based asyncio.sleep with Event synchronization in drain test - Add _snapshot_content helper for type-safe snapshot access in tests - Add 15 new tests: version conflict via convenience methods, PydanticValidation wrapping, cancel/delete not-found, start/stop lifecycle, is_running property, 409 mapping (6913 total, up from 6898)
…ert #10 Pre-reviewed by 10 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, security-reviewer, docs-consistency), 34 findings addressed. Key fixes: - CRITICAL: drain timeout in-flight future now resolved (saved before cancel) - Fix TaskInternalError for invariant violations (was TaskMutationError → wrong HTTP status) - Sanitize 5xx error messages (no internal details leaked to API clients) - Map TaskVersionConflictError to 409 Conflict - Add MutationType/TaskErrorCode type aliases, length constraints on CreateTaskData - Add logging in _raise_typed_error, read-through error wrapping, safety cap on list_tasks - Fix code scanning alert #10: remove explicit PR head SHA checkout in pages-preview - Replace Any with concrete types in API test fake repositories - Add TaskEngineConfig boundary validation, RecursionError re-raise, TaskEngineError branch tests - Split test_task_engine_extended.py (was 870 lines → 540 + 335) - Add apply_cancel docstring with Args/Returns
|
Caution Review failedPull request was closed or merged during review Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughSummary by CodeRabbit
WalkthroughAdds a centralized single-writer TaskEngine and integrates it across the codebase: new engine modules (models, apply logic, version tracker, config), typed task-engine errors/events, AppState and lifecycle wiring, API controller usage and DTOs, AgentEngine reporting, documentation and CI comment, and extensive tests. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant API as API Controller
participant Engine as TaskEngine
participant Persist as Persistence
participant Bus as MessageBus
Client->>API: POST /tasks (CreateTaskData)
API->>API: build CreateTaskMutation (request_id, requested_by)
API->>Engine: submit(CreateTaskMutation)
activate Engine
Note right of Engine: enqueue mutation, return Future
Engine->>Engine: _processing_loop processes envelope
Engine->>Persist: persist created Task
Persist-->>Engine: saved Task
alt publish_snapshots enabled
Engine->>Bus: publish TaskStateChanged (snapshot)
Bus-->>Engine: ack / error (logged)
end
Engine-->>API: Future resolves (TaskMutationResult)
deactivate Engine
API->>Client: 201 Created (Task)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related issues
Possibly related PRs
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
✨ Simplify code
📝 Coding Plan
Comment |
Keep main's refactored workflow (workflow_dispatch support, PR metadata validation step). Add zizmor ignore comment for untrusted-checkout — the checkout is intentional for preview builds and mitigated (no secrets, read-only perms, persist-credentials: false, deploy uses artifacts).
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a robust and centralized task management system, significantly enhancing the application's ability to handle task state mutations reliably. By consolidating all task operations through a single, asynchronous engine, it eliminates potential race conditions and provides a consistent, version-controlled API for task lifecycle management. The changes also improve error handling, ensure graceful service operation, and integrate seamlessly with existing agent execution flows. Highlights
Changelog
Ignored Files
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Pull request overview
Introduces a centralized, single-writer TaskEngine (actor pattern over an asyncio.Queue) and wires it into the API/app lifecycle so all task mutations are serialized, version-tracked, and can publish TaskStateChanged snapshots.
Changes:
- Added
TaskEnginecore modules (engine, config, mutation models, apply/dispatch, optimistic version tracker) plus new observability event constants. - Switched
/tasksAPI controller and app startup/shutdown to useTaskEngine+ typed error mapping. - Added extensive unit/integration test coverage for lifecycle, mutations, snapshot publishing, drain-timeout behavior, and AgentEngine reporting.
Reviewed changes
Copilot reviewed 37 out of 37 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/unit/observability/test_events.py | Includes task_engine in domain-module discovery assertions. |
| tests/unit/engine/test_task_engine_version.py | Unit tests for VersionTracker behavior. |
| tests/unit/engine/test_task_engine_mutations.py | CRUD + typed-error + consistency tests for TaskEngine convenience methods. |
| tests/unit/engine/test_task_engine_models.py | Tests for mutation/result/event Pydantic models and validators. |
| tests/unit/engine/test_task_engine_lifecycle.py | Start/stop/idempotency/config tests for TaskEngine. |
| tests/unit/engine/test_task_engine_integration.py | Integration tests for publishing, ordering, queue backpressure, versioning, drain. |
| tests/unit/engine/test_task_engine_extended.py | Extra edge-case coverage (FIFO, reasons, snapshots, OOM propagation, shutdown failures). |
| tests/unit/engine/test_task_engine_coverage.py | Additional coverage for drain/in-flight cleanup, snapshot publish failures, loop resilience. |
| tests/unit/engine/test_task_engine_convenience.py | Focused tests for typed error mapping + convenience method behaviors. |
| tests/unit/engine/test_task_engine_apply.py | Tests for mutation dispatch + apply_* functions and persistence/version interactions. |
| tests/unit/engine/test_agent_engine.py | Adds tests for AgentEngine → TaskEngine reporting behavior and error handling. |
| tests/unit/engine/task_engine_helpers.py | Adds shared fake persistence/message bus helpers for engine tests. |
| tests/unit/engine/conftest.py | Adds fixtures for TaskEngine and supporting fakes. |
| tests/unit/config/conftest.py | Updates RootConfigFactory defaults to include escalation_paths. |
| tests/unit/api/test_state.py | Tests new AppState.task_engine accessors and setters. |
| tests/unit/api/test_app.py | Updates startup/shutdown helpers to manage TaskEngine and tests cleanup behavior. |
| tests/unit/api/controllers/test_task_helpers.py | Tests requester extraction and TaskEngine→API error mapping helpers. |
| tests/unit/api/conftest.py | Adds TaskEngine to API test app wiring; improves typing of fake repos. |
| src/ai_company/observability/events/task_engine.py | New event constants for TaskEngine lifecycle/mutations/drain/publishing. |
| src/ai_company/observability/events/api.py | Adds API event constants for auth fallback and task transition failures. |
| src/ai_company/engine/task_engine_version.py | Implements in-memory optimistic version tracking (VersionTracker). |
| src/ai_company/engine/task_engine_models.py | Adds mutation request/result models + TaskStateChanged event model. |
| src/ai_company/engine/task_engine_config.py | Adds config model for queue sizing, drain timeout, snapshot publishing. |
| src/ai_company/engine/task_engine_apply.py | Implements mutation application logic + dispatch with typed error results. |
| src/ai_company/engine/task_engine.py | Implements the single-writer engine loop, CRUD convenience API, draining shutdown, snapshot publishing. |
| src/ai_company/engine/errors.py | Adds typed TaskEngine error hierarchy for API + engine callers. |
| src/ai_company/engine/agent_engine.py | Adds best-effort reporting of final run status back to centralized TaskEngine. |
| src/ai_company/engine/init.py | Re-exports TaskEngine APIs and new error types from ai_company.engine. |
| src/ai_company/config/schema.py | Adds task_engine: TaskEngineConfig to RootConfig. |
| src/ai_company/config/defaults.py | Seeds default config dict with "task_engine": {}. |
| src/ai_company/api/state.py | Adds AppState storage/access for task_engine with 503 behavior when unset. |
| src/ai_company/api/controllers/tasks.py | Migrates task CRUD endpoints from direct repo writes to TaskEngine + error mapping. |
| src/ai_company/api/app.py | Starts/stops TaskEngine during app lifecycle; adds safe shutdown helper. |
| docs/design/engine.md | Documents TaskEngine architecture/behavior and updates loop timeout description. |
| docs/architecture/tech-stack.md | Marks state coordination via TaskEngine as “Adopted” and updates mechanism description. |
| CLAUDE.md | Updates architecture notes and event-name examples to include TaskEngine. |
| .github/workflows/pages-preview.yml | Removes explicit PR head SHA checkout for security hardening. |
Comments suppressed due to low confidence (1)
src/ai_company/api/controllers/tasks.py:136
- The controller claims “full CRUD via TaskEngine”, but there is no cancel endpoint and no API support for
expected_version/optimistic concurrency (UpdateTaskRequest/TransitionTaskRequest don’t expose it and the controller doesn’t accept it). This diverges from the PR description; either add the missing endpoints/parameters or adjust the description/scope to match the actual API surface.
class TaskController(Controller):
"""Full CRUD for tasks via ``TaskEngine``."""
path = "/tasks"
tags = ("tasks",)
guards = [require_read_access] # noqa: RUF012
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assert eng._in_flight is None | ||
|
|
||
| # Release the block and clean up | ||
| block.set() # type: ignore[unreachable] |
There was a problem hiding this comment.
# type: ignore[unreachable] on block.set() looks incorrect/unnecessary here (the line is reachable, and there’s no obvious mypy unreachable warning to suppress). Consider removing the ignore to avoid masking future static-analysis issues.
| block.set() # type: ignore[unreachable] | |
| block.set() |
| id=task_id, | ||
| ) | ||
| return NotFoundError(str(exc)) | ||
| if isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError): |
There was a problem hiding this comment.
isinstance() does not accept a PEP 604 union (A | B) as the second argument; this will raise TypeError at runtime (and will break the helper + its tests). Use a tuple of exception classes instead (e.g. (TaskEngineNotRunningError, TaskEngineQueueFullError)).
| if isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError): | |
| if isinstance(exc, (TaskEngineNotRunningError, TaskEngineQueueFullError)): |
| if data.created_by != requester: | ||
| logger.info( | ||
| API_TASK_UPDATED, | ||
| note="created_by differs from authenticated requester", | ||
| created_by=data.created_by, | ||
| requester=requester, | ||
| ) |
There was a problem hiding this comment.
This audit log for created_by vs authenticated requester uses the API_TASK_UPDATED event constant even though this is part of task creation. That makes downstream event analytics/alerts misleading; consider introducing a dedicated event (or reusing the existing task-created event) for this mismatch audit log.
| logger.error( | ||
| API_TASK_TRANSITION_FAILED, | ||
| resource="task", | ||
| task_id=task_id, | ||
| error=str(exc), | ||
| error_type=type(exc).__name__, | ||
| ) | ||
| return ServiceUnavailableError("Service temporarily unavailable") | ||
| if isinstance(exc, TaskInternalError): | ||
| logger.error( | ||
| API_TASK_TRANSITION_FAILED, | ||
| resource="task", | ||
| task_id=task_id, |
There was a problem hiding this comment.
API_TASK_TRANSITION_FAILED is emitted here for non-transition failures too (create/update/delete can all flow through _map_task_engine_errors). This makes logs/events ambiguous. Consider renaming/adding an event like API_TASK_MUTATION_FAILED and include mutation_type/endpoint context in the log fields.
Greptile SummaryThis PR introduces a centralized single-writer Key findings:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant TE as TaskEngine
participant Q as asyncio.Queue
participant BG as _processing_loop
participant Apply as dispatch
participant DB as Persistence
participant MB as MessageBus
Caller->>TE: submit(mutation)
Note over TE: acquire _lifecycle_lock
TE->>Q: put_nowait(envelope)
Note over TE: release _lifecycle_lock
TE-->>Caller: await envelope.future (suspended)
BG->>Q: get() with poll timeout
Q-->>BG: envelope
BG->>Apply: dispatch(mutation, persistence, versions)
Apply->>DB: get/save/delete task
DB-->>Apply: Task or None
Apply-->>BG: TaskMutationResult
alt mutation succeeded
BG->>MB: publish TaskStateChanged
end
BG->>Caller: future.set_result(result) — caller resumes
Note over TE: stop() called
TE->>BG: set _running=False
BG->>Q: drain pending envelopes
alt drain timeout
TE->>BG: cancel()
TE->>Caller: fail remaining futures
end
Prompt To Fix All With AIThis is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 628-631
Comment:
**Dead loop leaves `_running=True`, causing hung submissions**
When `MemoryError`/`RecursionError` propagates out of `_processing_loop`, the background task dies but `self._running` is never set to `False`. Any subsequent `submit()` call passes the `if not self._running` guard, enqueues its envelope, and then `await envelope.future` hangs indefinitely — the queue has no consumer.
This window exists until `stop()` is called. `RecursionError` is survivable (e.g., deeply nested task data hitting Python's recursion limit), so the engine can remain in this zombie state for an extended period while the application continues to issue mutations that silently stall all callers.
The fix is to set `self._running = False` before re-raising, so `submit()` correctly raises `TaskEngineNotRunningError` instead of hanging:
```python
except (MemoryError, RecursionError) as exc:
if not envelope.future.done():
envelope.future.set_exception(exc)
self._running = False # prevent new submissions from hanging
raise
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 186-192
Comment:
**`stop()` re-raises background task exceptions, aborting the shutdown sequence**
When `_processing_loop` previously died with `MemoryError`/`RecursionError`, `self._processing_task` is already a done task with that exception stored. When `stop()` is subsequently called:
1. `asyncio.shield(self._processing_task)` creates a future that immediately adopts the stored exception.
2. `asyncio.wait_for(...)` raises the exception immediately (no timeout).
3. The `except BaseException: ...; raise` block correctly fails remaining futures but then **re-raises** `MemoryError`/`RecursionError`.
This propagates through `_try_stop` in `_safe_shutdown` (which also re-raises these errors), meaning the bridge, message bus, and persistence backend are **never shut down cleanly** when a catastrophic engine fault precedes shutdown.
`RecursionError` in particular is process-survivable, so clean shutdown matters. Consider distinguishing between an exception that originated *from the background task's prior run* (should be logged-and-absorbed during shutdown) versus one that originates from the drain logic itself (should propagate):
```python
except BaseException as exc:
self._fail_remaining_futures(self._in_flight)
# Only re-raise if this is a new error, not a stale task exception
if self._processing_task is not None and self._processing_task.done():
task_exc = self._processing_task.exception() if not self._processing_task.cancelled() else None
if task_exc is exc:
logger.error(TASK_ENGINE_LOOP_ERROR, error=f"Processing task died: {exc!r}")
return # don't abort shutdown for a pre-existing task failure
raise
```
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine_apply.py
Line: 393-394
Comment:
**`apply_cancel` silently discards `mutation.reason`**
The cancellation reason from `CancelTaskMutation.reason` is logged at line 420 but is **not passed** into the task transition. Compare with `apply_transition`, where `mutation.overrides` (including any reason-related overrides) are forwarded to `task.with_transition()`.
If `Task.with_transition()` accepts a `reason` keyword argument, or if a `cancellation_reason` / `reason` field exists on `Task`, the cancellation reason will not be persisted on the task object. Callers retrieving the task via `get_task()` after a cancellation will see no record of why it was cancelled.
If `with_transition` does not accept a `reason` parameter at all, this is only a logging concern. Either way, it's worth verifying the intent and documenting it explicitly in the `apply_cancel` docstring.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 128-146
Comment:
**`start()` uses `TASK_ENGINE_STARTED` for both warning and info**
On line 136, the `logger.warning(TASK_ENGINE_STARTED, error=msg)` call fires `TASK_ENGINE_STARTED` for the double-start error path. Using the same event constant for a `WARNING` (error case) and an `INFO` (success case on line 143) makes log filtering and alerting ambiguous. Consider introducing a dedicated `TASK_ENGINE_ALREADY_RUNNING` constant for the warning path so operators can distinguish a misconfigured double-start from a normal start event.
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: d23952a |
Local agents (6 agents) + external reviewers (Copilot, Gemini, Greptile, CodeRabbit) identified 14 findings. All implemented: - Add error handling to list_tasks/get_task read endpoints (CRITICAL) - Add error handling to get_task read endpoint (CRITICAL) - Add dedicated event constants for reads, list cap, and futures failure - Merge duplicate except blocks in transition_task - Replace generic API_TASK_TRANSITION_FAILED with API_TASK_MUTATION_FAILED - Add created_by mismatch audit event - Narrow mutation_type to specific Literal per mutation class - Add error_code consistency checks to TaskMutationResult validator - Expand _map_task_engine_errors docstring with mapping table - Replace TASK_ENGINE_DRAIN_TIMEOUT with TASK_ENGINE_FUTURES_FAILED - Add read-through error wrapping tests (4 tests) - Add list_tasks safety cap test - Remove misleading computed_field reference in docstring - Add CreateTaskData max-length boundary tests (4 tests) CodeRabbit follow-up (3 additional items): - Assert error_code on version conflict integration tests - Assert sanitized 503 messages don't leak internal errors - PEP 758 except syntax: not applicable with `as` clause (ruff limitation)
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #328 +/- ##
==========================================
+ Coverage 93.64% 93.75% +0.10%
==========================================
Files 427 433 +6
Lines 19177 19831 +654
Branches 1846 1911 +65
==========================================
+ Hits 17959 18592 +633
- Misses 943 957 +14
- Partials 275 282 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/ai_company/api/controllers/tasks.py (1)
316-404:⚠️ Potential issue | 🟠 MajorAdd the missing cancel route.
The controller exposes list/get/create/update/transition/delete, but nothing maps to
TaskEngine.cancel_task(). That leaves the dedicated cancel mutation path unreachable from/tasks, despite it being part of this feature's API surface.Suggested endpoint shape
`@post`("/{task_id:str}/cancel", guards=[require_write_access]) async def cancel_task( self, state: State, task_id: str, data: CancelTaskRequest, ) -> ApiResponse[Task]: app_state: AppState = state.app_state task = await app_state.task_engine.cancel_task( task_id, requested_by=_extract_requester(state), reason=data.reason, ) return ApiResponse(data=task)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/api/controllers/tasks.py` around lines 316 - 404, The controller is missing an endpoint to expose TaskEngine.cancel_task, so add a new POST route method named cancel_task in the same controller with decorators `@post`("/{task_id:str}/cancel", guards=[require_write_access]) that accepts (state: State, task_id: str, data: CancelTaskRequest) and calls app_state.task_engine.cancel_task(task_id, requested_by=_extract_requester(state), reason=data.reason), returns ApiResponse(data=task) and wrap TaskEngine exceptions with _map_task_engine_errors like the other handlers; include a logger.info call (e.g. API_TASK_CANCELED, task_id=task_id) consistent with existing logging patterns.
♻️ Duplicate comments (8)
src/ai_company/engine/task_engine_models.py (2)
332-374:⚠️ Potential issue | 🟠 MajorAdd a stable
task_idtoTaskStateChanged.
taskis explicitlyNoneon delete, and_publish_snapshot()currently has no other identifier to serialize. Consumers cannot tell which task was removed, so delete snapshots are not actionable.Suggested model change
class TaskStateChanged(BaseModel): @@ + task_id: NotBlankStr = Field(description="Affected task identifier") task: Task | None = Field( default=None, description="Task snapshot after mutation", )Populate it from
result.task.idwhen present, or frommutation.task_idfor deletes.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine_models.py` around lines 332 - 374, TaskStateChanged needs a stable task identifier so delete events remain actionable: add a new field task_id: NotBlankStr | None (or appropriate type) to the TaskStateChanged model and ensure _publish_snapshot() populates it from result.task.id when result.task exists, otherwise use mutation.task_id for delete events; update any constructors/creation sites that build TaskStateChanged to set task_id accordingly while leaving task (snapshot) None on deletes.
148-170:⚠️ Potential issue | 🟠 MajorFreeze
updatesandoverridesafter validation.The deep copy only severs aliasing with caller input. Both fields remain directly mutable on the frozen model, so code can still modify a queued mutation after validation and before the engine consumes it.
Suggested fix
+from collections.abc import Mapping +from types import MappingProxyType @@ - updates: dict[str, object] = Field(description="Field-value pairs to apply") + updates: Mapping[str, object] = Field(description="Field-value pairs to apply") @@ - object.__setattr__(self, "updates", copy.deepcopy(self.updates)) + object.__setattr__( + self, + "updates", + MappingProxyType(copy.deepcopy(dict(self.updates))), + ) @@ - overrides: dict[str, object] = Field( + overrides: Mapping[str, object] = Field( default_factory=dict, description="Additional field overrides", ) @@ - object.__setattr__(self, "overrides", copy.deepcopy(self.overrides)) + object.__setattr__( + self, + "overrides", + MappingProxyType(copy.deepcopy(dict(self.overrides))), + )As per coding guidelines "Use immutability by creating new objects, never mutating existing ones. For non-Pydantic internal collections, use
copy.deepcopy()at construction +MappingProxyTypewrapping."Also applies to: 202-227
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine_models.py` around lines 148 - 170, The updates (and similarly overrides) dicts are deep-copied but left mutable on the model; after validation freeze them by wrapping the deepcopy in types.MappingProxyType and assign via object.__setattr__ so the model exposes an immutable mapping; ensure MappingProxyType is imported and apply the same pattern for both the "updates" field handling in __init__ and the analogous code for "overrides" (referencing the _reject_immutable_fields validator, the "updates" attribute, and the overrides handling block around lines ~202-227).src/ai_company/engine/task_engine.py (3)
622-631: 🛠️ Refactor suggestion | 🟠 MajorLog successful mutation application at INFO.
This is the single authoritative writer, but
_process_one()only emits a DEBUG receipt and optional snapshot logs. The committed mutation itself should produce an INFO event with task/version/status context.Suggested fix
+# also import TASK_ENGINE_MUTATION_APPLIED from +# ai_company.observability.events.task_engine @@ if not envelope.future.done(): envelope.future.set_result(result) + if result.success: + logger.info( + TASK_ENGINE_MUTATION_APPLIED, + mutation_type=mutation.mutation_type, + request_id=mutation.request_id, + task_id=( + result.task.id + if result.task is not None + else getattr(mutation, "task_id", None) + ), + previous_status=( + result.previous_status.value + if result.previous_status is not None + else None + ), + new_status=( + result.task.status.value + if result.task is not None + else None + ), + version=result.version, + ) if result.success and self._config.publish_snapshots: await self._publish_snapshot(mutation, result)Based on learnings "All state transitions must log at INFO level" and "Use DEBUG logging for object creation, internal flow, entry/exit of key functions."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine.py` around lines 622 - 631, Add an INFO-level log when a mutation is successfully applied inside _process_one after awaiting _dispatch_mutation (the block handling result and envelope.future); specifically, when result is returned and before calling _publish_snapshot, emit an info log that includes the mutation id/context (from mutation), task id or name (if available on mutation), the resulting version (from self._versions or result), and the result.status/success to provide authoritative state-transition telemetry; keep the existing DEBUG receipt and snapshot logs intact and only add this single INFO log entry adjacent to the existing envelope.future.set_result and _publish_snapshot calls.
166-183:⚠️ Potential issue | 🔴 CriticalDon't let
wait_for()clear_in_flightbefore the timeout handler runs.
asyncio.wait_for(self._processing_task, ...)cancels the worker on timeout before theexcept TimeoutErrorblock executes. By the timesaved_in_flightis read,_process_one()has already hit itsfinallyblock and cleared it toNone, so the blocked caller's future is never resolved.Suggested fix
try: await asyncio.wait_for( - self._processing_task, + asyncio.shield(self._processing_task), timeout=effective_timeout, ) logger.info(TASK_ENGINE_DRAIN_COMPLETE) except TimeoutError: logger.warning( TASK_ENGINE_DRAIN_TIMEOUT, remaining=self._queue.qsize(), ) - # Capture in-flight ref before cancel — the finally block - # in _process_one clears self._in_flight on CancelledError. + # Shield keeps the task alive long enough for us to capture + # the in-flight envelope before explicit cancellation. saved_in_flight = self._in_flight self._processing_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._processing_task self._fail_remaining_futures(saved_in_flight)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine.py` around lines 166 - 183, The worker task is being cancelled by asyncio.wait_for on timeout which lets _process_one reach its finally block and clear self._in_flight before the except handler can capture it; change the wait_for call to await asyncio.wait_for(asyncio.shield(self._processing_task), timeout=effective_timeout) so the underlying _processing_task is protected from automatic cancellation, then in the TimeoutError handler capture saved_in_flight = self._in_flight, cancel self._processing_task explicitly, await it inside contextlib.suppress(asyncio.CancelledError), and call _fail_remaining_futures(saved_in_flight) as you already do.
101-122:⚠️ Potential issue | 🔴 CriticalGuard shutdown and enqueue with the same lifecycle lock.
A submitter can pass the running check, then
stop()can drain and tear down the worker, and only afterward the envelope gets queued. That future then has no consumer and can hang forever.Suggested fix
def __init__( self, *, persistence: PersistenceBackend, message_bus: MessageBus | None = None, config: TaskEngineConfig | None = None, ) -> None: + self._lifecycle_lock = asyncio.Lock() self._persistence = persistence self._message_bus = message_bus @@ - if not self._running: - return - self._running = False + async with self._lifecycle_lock: + if not self._running or self._processing_task is None: + return + self._running = False + processing_task = self._processing_task @@ - await asyncio.wait_for( - self._processing_task, - timeout=effective_timeout, - ) + await asyncio.wait_for(processing_task, timeout=effective_timeout) @@ - if not self._running: - logger.warning( - TASK_ENGINE_NOT_RUNNING, - mutation_type=mutation.mutation_type, - request_id=mutation.request_id, - ) - msg = "TaskEngine is not running" - raise TaskEngineNotRunningError(msg) - - envelope = _MutationEnvelope(mutation=mutation) - try: - self._queue.put_nowait(envelope) - except asyncio.QueueFull: - logger.warning( - TASK_ENGINE_QUEUE_FULL, - mutation_type=mutation.mutation_type, - request_id=mutation.request_id, - queue_size=self._queue.qsize(), - ) - msg = "TaskEngine queue is full" - raise TaskEngineQueueFullError(msg) from None + async with self._lifecycle_lock: + if not self._running or self._processing_task is None: + logger.warning( + TASK_ENGINE_NOT_RUNNING, + mutation_type=mutation.mutation_type, + request_id=mutation.request_id, + ) + msg = "TaskEngine is not running" + raise TaskEngineNotRunningError(msg) + + envelope = _MutationEnvelope(mutation=mutation) + try: + self._queue.put_nowait(envelope) + except asyncio.QueueFull: + logger.warning( + TASK_ENGINE_QUEUE_FULL, + mutation_type=mutation.mutation_type, + request_id=mutation.request_id, + queue_size=self._queue.qsize(), + ) + msg = "TaskEngine queue is full" + raise TaskEngineQueueFullError(msg) from NoneAlso applies to: 146-184, 237-272
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_engine.py` around lines 101 - 122, The submit/enqueue race occurs because the running check and queue.put are not atomic with stop(); add a lifecycle asyncio.Lock (e.g., self._lifecycle_lock = asyncio.Lock()) in __init__ and use it to guard the check-and-enqueue path in the submit/enqueue functions and the shutdown path in stop() so that stop() can drain/teardown only after acquiring the same lock; ensure you hold the lock while checking self._running and performing self._queue.put_nowait/_queue.put and while stop() flips self._running, cancels/awaits self._processing_task, and drains the queue to avoid enqueuing envelopes with no consumer (also apply same locking to other enqueue/submit sites referenced in the ranges 146-184 and 237-272).tests/unit/engine/test_task_engine_integration.py (1)
99-115: 🧹 Nitpick | 🔵 TrivialPrefer
TaskGroupfor this concurrent fan-out.If one create fails before the final join, the remaining tasks keep running until
gather().TaskGroupgives the same coverage with structured cancellation and cleaner failure handling.Suggested refactor
- create_tasks = [ - asyncio.create_task( - eng.create_task(make_create_data(), requested_by="alice") - ) - for _ in range(5) - ] - - # Yield to the event loop so the tasks can enqueue their mutations before stop() - await asyncio.sleep(0) - - # Stop while tasks may still be in flight — drain timeout is generous - await eng.stop(timeout=5.0) - - # All futures resolved (drained during stop or completed before stop) - results = await asyncio.gather(*create_tasks) + async with asyncio.TaskGroup() as tg: + create_tasks = [ + tg.create_task( + eng.create_task(make_create_data(), requested_by="alice") + ) + for _ in range(5) + ] + + # Yield to the event loop so the tasks can enqueue their mutations before stop() + await asyncio.sleep(0) + + # Stop while tasks may still be in flight — drain timeout is generous + await eng.stop(timeout=5.0) + + # All futures resolved (drained during stop or completed before stop) + results = [task.result() for task in create_tasks]As per coding guidelines "Prefer
asyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_task_engine_integration.py` around lines 99 - 115, Replace the ad-hoc create_task/gather pattern with structured concurrency using asyncio.TaskGroup: instead of building create_tasks with asyncio.create_task and later awaiting asyncio.gather, open an asyncio.TaskGroup, spawn tasks calling eng.create_task(make_create_data(), requested_by="alice") inside it, and rely on the TaskGroup to propagate failures and cancel siblings; keep the yield to the event loop (await asyncio.sleep(0)) if needed before calling await eng.stop(timeout=5.0), and remove the final asyncio.gather by letting the TaskGroup scope ensure all tasks are resolved before asserting the number of completed tasks results from the group.src/ai_company/api/controllers/tasks.py (1)
185-186: 🧹 Nitpick | 🔵 TrivialUse the repository's Python 3.14 multi-except form consistently.
These handlers still use tuple-style
except (...) as exc:blocks instead of theexcept A, B as exc:form required by the project rule.As per coding guidelines "Use
except A, B:syntax (no parentheses) for multiple exception types — PEP 758 on Python 3.14."Also applies to: 211-212, 261-267, 305-312, 354-362, 395-402
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/api/controllers/tasks.py` around lines 185 - 186, Replace tuple-style multi-except blocks with the Python 3.14 multi-except form used across the repo: change occurrences like the except (TaskInternalError, TaskEngineNotRunningError) as exc: to the form except TaskInternalError, TaskEngineNotRunningError as exc: so the same exception object is captured; apply the same change to the other listed handlers in this file (the blocks around lines referenced: the handlers at 211-212, 261-267, 305-312, 354-362, 395-402) and ensure each uses the unparenthesized `except A, B as exc:` syntax and still re-raises via raise _map_task_engine_errors(exc) from exc.tests/unit/api/controllers/test_task_helpers.py (1)
72-88:⚠️ Potential issue | 🟡 MinorAssert the exact sanitized 503 strings.
These cases only prove the raw engine text is hidden. They'll still pass if the mapping regresses to some other 503 message, which weakens the API contract the controller is deliberately sanitizing.
Suggested assertions
def test_not_running_maps_to_service_unavailable(self) -> None: exc = TaskEngineNotRunningError("not running") result = _map_task_engine_errors(exc) assert isinstance(result, ServiceUnavailableError) - assert "not running" not in str(result) + assert str(result) == "Service temporarily unavailable" def test_queue_full_maps_to_service_unavailable(self) -> None: exc = TaskEngineQueueFullError("queue full") result = _map_task_engine_errors(exc) assert isinstance(result, ServiceUnavailableError) - assert "queue full" not in str(result) + assert str(result) == "Service temporarily unavailable" def test_internal_error_maps_to_service_unavailable(self) -> None: exc = TaskInternalError("internal fault") result = _map_task_engine_errors(exc) assert isinstance(result, ServiceUnavailableError) - assert "internal fault" not in str(result) + assert str(result) == "Internal server error" + assert "internal fault" not in str(result)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/api/controllers/test_task_helpers.py` around lines 72 - 88, The tests only check that the original engine text is removed; change each test to assert the exact sanitized 503 text produced by the mapping function so the API contract is enforced. Locate the sanitized message constant (the string used inside _map_task_engine_errors, e.g. SANITIZED_SERVICE_UNAVAILABLE_MESSAGE or similar) and replace the current negative assertions with equality checks like assert str(result) == SANITIZED_SERVICE_UNAVAILABLE_MESSAGE for TaskEngineNotRunningError, TaskEngineQueueFullError, and TaskInternalError to ensure the mapping returns the exact expected ServiceUnavailableError message.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/api/controllers/tasks.py`:
- Around line 296-312: The updates dict produced by
data.model_dump(exclude_none=True) drops expected_version from the call even
though TaskEngine.update_task and transition_task accept optimistic concurrency;
change the controller to extract expected_version from the incoming DTO (e.g.,
data.expected_version or data.model_dump()["expected_version"]) and pass it
through to the engine call (add an expected_version=expected_version argument to
the TaskEngine.update_task and transition_task invocations) while keeping the
existing updates payload and requester (see updates variable, update_task call,
transition_task usage and _extract_requester); also ensure
UpdateTaskRequest/TransitionTaskRequest expose expected_version if missing.
In `@src/ai_company/engine/task_engine.py`:
- Around line 294-299: Create_task(), delete_task(), and cancel_task() currently
allow raw PydanticValidationError to escape when constructing
CreateTaskMutation/DeleteTaskMutation/CancelTaskMutation, breaking the engine
contract; wrap the mutation construction in a try/except that catches
pydantic.ValidationError (or PydanticValidationError) and re-raises as
TaskMutationError with a clear message and the original error as context,
mirroring how update_task() and transition_task() handle model-construction
failures; apply the same fix to the other occurrences where mutations are built
(the other Create/Delete/Cancel mutation sites referenced) so all mutation
construction paths normalize to TaskMutationError before calling self.submit().
In `@tests/unit/engine/test_task_engine_convenience.py`:
- Around line 72-90: The tests for typed-error fallbacks are not exercising the
intended branches; update test_none_code_falls_through to set
TaskMutationResult.error_code=None (so TaskEngine._raise_typed_error will fall
through the typed-error path) and update test_missing_error_uses_default_message
to omit or set an empty/None error on TaskMutationResult (so the fallback
default message is used) and assert TaskMutationError matches that default
message; reference the TaskMutationResult construction in each test and the
TaskEngine._raise_typed_error / TaskMutationError expectations when making the
changes.
---
Outside diff comments:
In `@src/ai_company/api/controllers/tasks.py`:
- Around line 316-404: The controller is missing an endpoint to expose
TaskEngine.cancel_task, so add a new POST route method named cancel_task in the
same controller with decorators `@post`("/{task_id:str}/cancel",
guards=[require_write_access]) that accepts (state: State, task_id: str, data:
CancelTaskRequest) and calls app_state.task_engine.cancel_task(task_id,
requested_by=_extract_requester(state), reason=data.reason), returns
ApiResponse(data=task) and wrap TaskEngine exceptions with
_map_task_engine_errors like the other handlers; include a logger.info call
(e.g. API_TASK_CANCELED, task_id=task_id) consistent with existing logging
patterns.
---
Duplicate comments:
In `@src/ai_company/api/controllers/tasks.py`:
- Around line 185-186: Replace tuple-style multi-except blocks with the Python
3.14 multi-except form used across the repo: change occurrences like the except
(TaskInternalError, TaskEngineNotRunningError) as exc: to the form except
TaskInternalError, TaskEngineNotRunningError as exc: so the same exception
object is captured; apply the same change to the other listed handlers in this
file (the blocks around lines referenced: the handlers at 211-212, 261-267,
305-312, 354-362, 395-402) and ensure each uses the unparenthesized `except A, B
as exc:` syntax and still re-raises via raise _map_task_engine_errors(exc) from
exc.
In `@src/ai_company/engine/task_engine_models.py`:
- Around line 332-374: TaskStateChanged needs a stable task identifier so delete
events remain actionable: add a new field task_id: NotBlankStr | None (or
appropriate type) to the TaskStateChanged model and ensure _publish_snapshot()
populates it from result.task.id when result.task exists, otherwise use
mutation.task_id for delete events; update any constructors/creation sites that
build TaskStateChanged to set task_id accordingly while leaving task (snapshot)
None on deletes.
- Around line 148-170: The updates (and similarly overrides) dicts are
deep-copied but left mutable on the model; after validation freeze them by
wrapping the deepcopy in types.MappingProxyType and assign via
object.__setattr__ so the model exposes an immutable mapping; ensure
MappingProxyType is imported and apply the same pattern for both the "updates"
field handling in __init__ and the analogous code for "overrides" (referencing
the _reject_immutable_fields validator, the "updates" attribute, and the
overrides handling block around lines ~202-227).
In `@src/ai_company/engine/task_engine.py`:
- Around line 622-631: Add an INFO-level log when a mutation is successfully
applied inside _process_one after awaiting _dispatch_mutation (the block
handling result and envelope.future); specifically, when result is returned and
before calling _publish_snapshot, emit an info log that includes the mutation
id/context (from mutation), task id or name (if available on mutation), the
resulting version (from self._versions or result), and the result.status/success
to provide authoritative state-transition telemetry; keep the existing DEBUG
receipt and snapshot logs intact and only add this single INFO log entry
adjacent to the existing envelope.future.set_result and _publish_snapshot calls.
- Around line 166-183: The worker task is being cancelled by asyncio.wait_for on
timeout which lets _process_one reach its finally block and clear
self._in_flight before the except handler can capture it; change the wait_for
call to await asyncio.wait_for(asyncio.shield(self._processing_task),
timeout=effective_timeout) so the underlying _processing_task is protected from
automatic cancellation, then in the TimeoutError handler capture saved_in_flight
= self._in_flight, cancel self._processing_task explicitly, await it inside
contextlib.suppress(asyncio.CancelledError), and call
_fail_remaining_futures(saved_in_flight) as you already do.
- Around line 101-122: The submit/enqueue race occurs because the running check
and queue.put are not atomic with stop(); add a lifecycle asyncio.Lock (e.g.,
self._lifecycle_lock = asyncio.Lock()) in __init__ and use it to guard the
check-and-enqueue path in the submit/enqueue functions and the shutdown path in
stop() so that stop() can drain/teardown only after acquiring the same lock;
ensure you hold the lock while checking self._running and performing
self._queue.put_nowait/_queue.put and while stop() flips self._running,
cancels/awaits self._processing_task, and drains the queue to avoid enqueuing
envelopes with no consumer (also apply same locking to other enqueue/submit
sites referenced in the ranges 146-184 and 237-272).
In `@tests/unit/api/controllers/test_task_helpers.py`:
- Around line 72-88: The tests only check that the original engine text is
removed; change each test to assert the exact sanitized 503 text produced by the
mapping function so the API contract is enforced. Locate the sanitized message
constant (the string used inside _map_task_engine_errors, e.g.
SANITIZED_SERVICE_UNAVAILABLE_MESSAGE or similar) and replace the current
negative assertions with equality checks like assert str(result) ==
SANITIZED_SERVICE_UNAVAILABLE_MESSAGE for TaskEngineNotRunningError,
TaskEngineQueueFullError, and TaskInternalError to ensure the mapping returns
the exact expected ServiceUnavailableError message.
In `@tests/unit/engine/test_task_engine_integration.py`:
- Around line 99-115: Replace the ad-hoc create_task/gather pattern with
structured concurrency using asyncio.TaskGroup: instead of building create_tasks
with asyncio.create_task and later awaiting asyncio.gather, open an
asyncio.TaskGroup, spawn tasks calling eng.create_task(make_create_data(),
requested_by="alice") inside it, and rely on the TaskGroup to propagate failures
and cancel siblings; keep the yield to the event loop (await asyncio.sleep(0))
if needed before calling await eng.stop(timeout=5.0), and remove the final
asyncio.gather by letting the TaskGroup scope ensure all tasks are resolved
before asserting the number of completed tasks results from the group.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 3ca1a4ac-36f4-45e4-b992-a019195b7ad3
📒 Files selected for processing (12)
.github/workflows/pages-preview.ymlCLAUDE.mdsrc/ai_company/api/controllers/tasks.pysrc/ai_company/engine/task_engine.pysrc/ai_company/engine/task_engine_models.pysrc/ai_company/observability/events/api.pysrc/ai_company/observability/events/task_engine.pytests/unit/api/controllers/test_task_helpers.pytests/unit/engine/test_task_engine_convenience.pytests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_task_engine_models.pytests/unit/engine/test_task_engine_mutations.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (5)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:syntax (no parentheses) for multiple exception types — PEP 758 on Python 3.14
All public functions require type hints; mypy strict mode enforced
Docstrings must use Google style and are required on public classes and functions — enforced by ruff D rules
Use immutability by creating new objects, never mutating existing ones. For non-Pydantic internal collections, usecopy.deepcopy()at construction +MappingProxyTypewrapping. For Pydantic frozen models, usemodel_copy(update=...)for runtime state evolution.
Use frozen Pydantic models for config/identity; use mutable-via-copy models for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Adopt Pydantic v2 conventions: use@computed_fieldfor derived values instead of storing redundant fields; useNotBlankStrfor all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls). Prefer structured concurrency over barecreate_task.
Line length must be 88 characters — enforced by ruff
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow errors
Validate at system boundaries (user input, external APIs, config files)
Files:
src/ai_company/observability/events/task_engine.pysrc/ai_company/api/controllers/tasks.pytests/unit/engine/test_task_engine_integration.pysrc/ai_company/observability/events/api.pytests/unit/engine/test_task_engine_convenience.pysrc/ai_company/engine/task_engine_models.pysrc/ai_company/engine/task_engine.pytests/unit/api/controllers/test_task_helpers.pytests/unit/engine/test_task_engine_models.pytests/unit/engine/test_task_engine_mutations.py
src/ai_company/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/ai_company/**/*.py: Every module with business logic MUST importfrom ai_company.observability import get_loggerand createlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code — only use the structured logger fromai_company.observability
Logger variable must always be namedlogger(not_logger, notlog)
Use event name constants from domain-specific modules underai_company.observability.events(e.g.,PROVIDER_CALL_STARTfromevents.provider,BUDGET_RECORD_ADDEDfromevents.budget). Import directly:from ai_company.observability.events.<domain> import EVENT_CONSTANT
Always use structured logging:logger.info(EVENT, key=value)— neverlogger.info('msg %s', val)
All error paths must log at WARNING or ERROR with context before raising
All state transitions must log at INFO level
Use DEBUG logging for object creation, internal flow, entry/exit of key functions
Pure data models, enums, and re-exports do NOT need logging
Files:
src/ai_company/observability/events/task_engine.pysrc/ai_company/api/controllers/tasks.pysrc/ai_company/observability/events/api.pysrc/ai_company/engine/task_engine_models.pysrc/ai_company/engine/task_engine.py
**/{src,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
NEVER use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples. Use generic names:
example-provider,example-large-001,example-medium-001,example-small-001. Tests must usetest-provider,test-small-001, etc. Vendor names only in: (1)docs/design/operations.md, (2).claude/files, (3) third-party import paths.
Files:
src/ai_company/observability/events/task_engine.pysrc/ai_company/api/controllers/tasks.pytests/unit/engine/test_task_engine_integration.pysrc/ai_company/observability/events/api.pytests/unit/engine/test_task_engine_convenience.pysrc/ai_company/engine/task_engine_models.pysrc/ai_company/engine/task_engine.pytests/unit/api/controllers/test_task_helpers.pytests/unit/engine/test_task_engine_models.pytests/unit/engine/test_task_engine_mutations.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Markers for tests must be:@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e,@pytest.mark.slow
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/engine/test_task_engine_integration.pytests/unit/engine/test_task_engine_convenience.pytests/unit/api/controllers/test_task_helpers.pytests/unit/engine/test_task_engine_models.pytests/unit/engine/test_task_engine_mutations.py
src/ai_company/{providers,engine}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
All provider calls go through
BaseCompletionProviderwhich applies retry + rate limiting automatically. Never implement retry logic in driver subclasses or calling code.
Files:
src/ai_company/engine/task_engine_models.pysrc/ai_company/engine/task_engine.py
🧠 Learnings (13)
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST import `from ai_company.observability import get_logger` and create `logger = get_logger(__name__)`
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging` / `logging.getLogger()` / `print()` in application code — only use the structured logger from `ai_company.observability`
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Always use structured logging: `logger.info(EVENT, key=value)` — never `logger.info('msg %s', val)`
Applied to files:
CLAUDE.mdsrc/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Logger variable must always be named `logger` (not `_logger`, not `log`)
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`). Import directly: `from ai_company.observability.events.<domain> import EVENT_CONSTANT`
Applied to files:
CLAUDE.mdsrc/ai_company/observability/events/task_engine.pysrc/ai_company/observability/events/api.pysrc/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : All error paths must log at WARNING or ERROR with context before raising
Applied to files:
CLAUDE.mdsrc/ai_company/api/controllers/tasks.pysrc/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : All state transitions must log at INFO level
Applied to files:
CLAUDE.mdsrc/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Use DEBUG logging for object creation, internal flow, entry/exit of key functions
Applied to files:
CLAUDE.mdsrc/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to src/ai_company/**/*.py : Pure data models, enums, and re-exports do NOT need logging
Applied to files:
CLAUDE.md
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to **/*.py : Use `except A, B:` syntax (no parentheses) for multiple exception types — PEP 758 on Python 3.14
Applied to files:
src/ai_company/api/controllers/tasks.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to **/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls). Prefer structured concurrency over bare `create_task`.
Applied to files:
src/ai_company/api/controllers/tasks.pytests/unit/engine/test_task_engine_integration.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to **/*.py : Use immutability by creating new objects, never mutating existing ones. For non-Pydantic internal collections, use `copy.deepcopy()` at construction + `MappingProxyType` wrapping. For Pydantic frozen models, use `model_copy(update=...)` for runtime state evolution.
Applied to files:
src/ai_company/engine/task_engine_models.py
📚 Learning: 2026-03-12T18:09:14.648Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T18:09:14.648Z
Learning: Applies to **/*.py : Use frozen Pydantic models for config/identity; use mutable-via-copy models for runtime state that evolves. Never mix static config fields with mutable runtime fields in one model.
Applied to files:
src/ai_company/engine/task_engine_models.py
🧬 Code graph analysis (5)
src/ai_company/api/controllers/tasks.py (4)
src/ai_company/api/errors.py (4)
ApiValidationError(32-38)ConflictError(41-47)NotFoundError(23-29)ServiceUnavailableError(68-74)src/ai_company/api/state.py (1)
task_engine(107-109)src/ai_company/engine/errors.py (6)
TaskEngineNotRunningError(89-90)TaskEngineQueueFullError(93-94)TaskInternalError(109-120)TaskMutationError(97-98)TaskNotFoundError(101-102)TaskVersionConflictError(105-106)src/ai_company/engine/task_engine_models.py (1)
CreateTaskData(40-89)
tests/unit/engine/test_task_engine_integration.py (6)
src/ai_company/engine/errors.py (1)
TaskEngineQueueFullError(93-94)src/ai_company/engine/task_engine.py (3)
TaskEngine(87-721)_MutationEnvelope(74-84)stop(146-186)src/ai_company/engine/task_engine_config.py (1)
TaskEngineConfig(6-37)src/ai_company/engine/task_engine_models.py (4)
CreateTaskMutation(95-110)DeleteTaskMutation(230-245)TransitionTaskMutation(180-227)UpdateTaskMutation(130-170)src/ai_company/engine/task_engine_version.py (1)
get(65-67)src/ai_company/engine/parallel_models.py (1)
task_id(87-89)
tests/unit/engine/test_task_engine_convenience.py (3)
src/ai_company/engine/errors.py (4)
TaskInternalError(109-120)TaskMutationError(97-98)TaskNotFoundError(101-102)TaskVersionConflictError(105-106)src/ai_company/engine/task_engine.py (6)
TaskEngine(87-721)_raise_typed_error(473-490)create_task(274-305)start(126-144)stop(146-186)is_running(231-233)src/ai_company/engine/task_engine_models.py (2)
CreateTaskMutation(95-110)TaskMutationResult(281-326)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
Complexity(247-253)Priority(238-244)TaskStatus(198-224)TaskType(227-235)src/ai_company/core/task.py (1)
Task(45-261)
tests/unit/api/controllers/test_task_helpers.py (3)
src/ai_company/api/controllers/tasks.py (2)
_extract_requester(51-65)_map_task_engine_errors(68-145)src/ai_company/api/errors.py (4)
ApiValidationError(32-38)ConflictError(41-47)NotFoundError(23-29)ServiceUnavailableError(68-74)src/ai_company/engine/errors.py (6)
TaskEngineNotRunningError(89-90)TaskEngineQueueFullError(93-94)TaskInternalError(109-120)TaskMutationError(97-98)TaskNotFoundError(101-102)TaskVersionConflictError(105-106)
🪛 LanguageTool
CLAUDE.md
[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...
(EG_NO_COMMA)
🔇 Additional comments (3)
.github/workflows/pages-preview.yml (1)
91-93: Approved — security documentation and zizmor syntax are correct.The comment accurately documents the intentional checkout of untrusted PR code with appropriate mitigations. The
zizmor: ignore[untrusted-checkout]directive syntax is correct per zizmor's documentation format, and the trailing explanation is properly formatted. All stated mitigations (no secrets in build job, limited permissions,persist-credentials:false, deploy uses artifacts) are present in the workflow.tests/unit/engine/test_task_engine_models.py (1)
19-471: Strong model-test coverage with good immutability and validation checks.This test module is well-structured and exercises key boundaries (frozen behavior, unknown fields, nested deep-copy isolation, and event serialization) comprehensively.
tests/unit/engine/test_task_engine_mutations.py (1)
33-694: Comprehensive mutation-path and error-propagation coverage looks solid.The suite covers successful flows, typed failures, read-through error wrapping, and consistency validators in a balanced way.
| def test_none_code_falls_through(self) -> None: | ||
| result = TaskMutationResult( | ||
| request_id="r", | ||
| success=False, | ||
| error="generic", | ||
| error_code="validation", | ||
| ) | ||
| with pytest.raises(TaskMutationError, match="generic"): | ||
| TaskEngine._raise_typed_error(result) | ||
|
|
||
| def test_missing_error_uses_default_message(self) -> None: | ||
| result = TaskMutationResult( | ||
| request_id="r", | ||
| success=False, | ||
| error="Mutation failed", | ||
| error_code="validation", | ||
| ) | ||
| with pytest.raises(TaskMutationError, match="Mutation failed"): | ||
| TaskEngine._raise_typed_error(result) |
There was a problem hiding this comment.
Fix branch-coverage mismatch in typed-error fallback tests.
Line 72 and Line 82 don’t currently test the branches their names describe:
- Line 72:
test_none_code_falls_throughsetserror_code="validation"instead ofNone. - Line 82:
test_missing_error_uses_default_messageprovideserror, so fallback message logic is not actually exercised.
Proposed fix
@@
def test_none_code_falls_through(self) -> None:
result = TaskMutationResult(
request_id="r",
success=False,
error="generic",
- error_code="validation",
+ error_code=None,
)
with pytest.raises(TaskMutationError, match="generic"):
TaskEngine._raise_typed_error(result)
def test_missing_error_uses_default_message(self) -> None:
- result = TaskMutationResult(
+ base = TaskMutationResult(
request_id="r",
success=False,
- error="Mutation failed",
- error_code="validation",
+ error="placeholder",
+ error_code=None,
)
+ result = base.model_copy(update={"error": None})
with pytest.raises(TaskMutationError, match="Mutation failed"):
TaskEngine._raise_typed_error(result)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def test_none_code_falls_through(self) -> None: | |
| result = TaskMutationResult( | |
| request_id="r", | |
| success=False, | |
| error="generic", | |
| error_code="validation", | |
| ) | |
| with pytest.raises(TaskMutationError, match="generic"): | |
| TaskEngine._raise_typed_error(result) | |
| def test_missing_error_uses_default_message(self) -> None: | |
| result = TaskMutationResult( | |
| request_id="r", | |
| success=False, | |
| error="Mutation failed", | |
| error_code="validation", | |
| ) | |
| with pytest.raises(TaskMutationError, match="Mutation failed"): | |
| TaskEngine._raise_typed_error(result) | |
| def test_none_code_falls_through(self) -> None: | |
| result = TaskMutationResult( | |
| request_id="r", | |
| success=False, | |
| error="generic", | |
| error_code=None, | |
| ) | |
| with pytest.raises(TaskMutationError, match="generic"): | |
| TaskEngine._raise_typed_error(result) | |
| def test_missing_error_uses_default_message(self) -> None: | |
| base = TaskMutationResult( | |
| request_id="r", | |
| success=False, | |
| error="placeholder", | |
| error_code=None, | |
| ) | |
| result = base.model_copy(update={"error": None}) | |
| with pytest.raises(TaskMutationError, match="Mutation failed"): | |
| TaskEngine._raise_typed_error(result) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/unit/engine/test_task_engine_convenience.py` around lines 72 - 90, The
tests for typed-error fallbacks are not exercising the intended branches; update
test_none_code_falls_through to set TaskMutationResult.error_code=None (so
TaskEngine._raise_typed_error will fall through the typed-error path) and update
test_missing_error_uses_default_message to omit or set an empty/None error on
TaskMutationResult (so the fallback default message is used) and assert
TaskMutationError matches that default message; reference the TaskMutationResult
construction in each test and the TaskEngine._raise_typed_error /
TaskMutationError expectations when making the changes.
- Fix lifecycle race condition: add asyncio.Lock coordinating submit/stop
- Add asyncio.shield to protect processing task from wait_for cancellation
- Wrap MappingProxyType on UpdateTaskMutation.updates and
TransitionTaskMutation.overrides for true immutability
- Add task_id field to TaskStateChanged event model
- Wrap PydanticValidationError in create_task, delete_task, cancel_task
- Add cancel endpoint (POST /tasks/{id}/cancel) with CancelTaskRequest DTO
- Forward expected_version from UpdateTaskRequest/TransitionTaskRequest DTOs
- Remove dead TaskEngineNotRunningError catch from list/get read-throughs
- Sanitize 503 error messages to prevent leaking internals
- Log created_by mismatch at WARNING instead of INFO
- Add INFO log for successful mutations (TASK_ENGINE_MUTATION_APPLIED)
- Deep-copy tasks in FakeTaskRepository for test isolation
- Replace structlog.configure with capture_logs for test stability
- Replace timing-dependent sleep(0.01) with sleep(0) in race test
- Add 15 new tests covering defensive guards, processing loop recovery,
snapshot publishing, cancel lifecycle, and validation wrapping
…rs, restore mutation log - list_tasks now returns (tasks, total) so pagination.total reflects true count even when safety cap truncates results - paginate() accepts optional total override for capped result sets - Remove dead except PydanticValidationError handlers in create_task, transition_task, cancel_task controllers (engine wraps as TaskMutationError) - Restore TASK_ENGINE_MUTATION_APPLIED INFO log in _process_one for state transition visibility (version, previous/new status) - Resolve futures before re-raising MemoryError/RecursionError in _processing_loop to prevent hung callers - Drain remaining futures on abnormal processing task exit in stop()
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 41 out of 41 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if result.success: | ||
| task_id = getattr(mutation, "task_id", None) | ||
| logger.info( | ||
| TASK_ENGINE_MUTATION_APPLIED, | ||
| mutation_type=mutation.mutation_type, | ||
| request_id=mutation.request_id, | ||
| task_id=task_id or (result.task.id if result.task else None), | ||
| version=result.version, | ||
| previous_status=( | ||
| result.previous_status.value if result.previous_status else None | ||
| ), | ||
| new_status=(result.task.status.value if result.task else None), | ||
| ) |
There was a problem hiding this comment.
TASK_ENGINE_MUTATION_APPLIED is logged inside the per-mutation apply_* functions (in task_engine_apply.py) and then logged again here in _process_one() for every successful mutation. This will duplicate events/metrics and increase log volume. Consider emitting the "mutation applied" event in exactly one place (either in apply_* or in _process_one) and keeping the other layer focused on returning structured results.
| if result.success: | |
| task_id = getattr(mutation, "task_id", None) | |
| logger.info( | |
| TASK_ENGINE_MUTATION_APPLIED, | |
| mutation_type=mutation.mutation_type, | |
| request_id=mutation.request_id, | |
| task_id=task_id or (result.task.id if result.task else None), | |
| version=result.version, | |
| previous_status=( | |
| result.previous_status.value if result.previous_status else None | |
| ), | |
| new_status=(result.task.status.value if result.task else None), | |
| ) |
| effective_total = total if total is not None else len(items) | ||
| offset = max(0, min(offset, len(items))) | ||
| limit = max(1, min(limit, MAX_LIMIT)) | ||
| page = items[offset : offset + limit] | ||
| meta = PaginationMeta( | ||
| total=len(items), | ||
| total=effective_total, | ||
| offset=offset, |
There was a problem hiding this comment.
When total is provided (meaning items may already be truncated upstream), clamping offset against len(items) can make the returned pagination.offset inconsistent with the true total. Example: total=20000, len(items)=10000, request offset=15000 gets clamped to 10000, so the response reports a different offset than requested even though total indicates there are more items. Consider clamping against effective_total for the metadata (and only clamp the slice index against len(items) if needed), or otherwise explicitly handle the "offset beyond safety cap" case so metadata remains coherent.
| except (MemoryError, RecursionError) as exc: | ||
| if not envelope.future.done(): | ||
| envelope.future.set_exception(exc) | ||
| raise |
There was a problem hiding this comment.
Dead loop leaves _running=True, causing hung submissions
When MemoryError/RecursionError propagates out of _processing_loop, the background task dies but self._running is never set to False. Any subsequent submit() call passes the if not self._running guard, enqueues its envelope, and then await envelope.future hangs indefinitely — the queue has no consumer.
This window exists until stop() is called. RecursionError is survivable (e.g., deeply nested task data hitting Python's recursion limit), so the engine can remain in this zombie state for an extended period while the application continues to issue mutations that silently stall all callers.
The fix is to set self._running = False before re-raising, so submit() correctly raises TaskEngineNotRunningError instead of hanging:
except (MemoryError, RecursionError) as exc:
if not envelope.future.done():
envelope.future.set_exception(exc)
self._running = False # prevent new submissions from hanging
raisePrompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 628-631
Comment:
**Dead loop leaves `_running=True`, causing hung submissions**
When `MemoryError`/`RecursionError` propagates out of `_processing_loop`, the background task dies but `self._running` is never set to `False`. Any subsequent `submit()` call passes the `if not self._running` guard, enqueues its envelope, and then `await envelope.future` hangs indefinitely — the queue has no consumer.
This window exists until `stop()` is called. `RecursionError` is survivable (e.g., deeply nested task data hitting Python's recursion limit), so the engine can remain in this zombie state for an extended period while the application continues to issue mutations that silently stall all callers.
The fix is to set `self._running = False` before re-raising, so `submit()` correctly raises `TaskEngineNotRunningError` instead of hanging:
```python
except (MemoryError, RecursionError) as exc:
if not envelope.future.done():
envelope.future.set_exception(exc)
self._running = False # prevent new submissions from hanging
raise
```
How can I resolve this? If you propose a fix, please make it concise.| self._processing_task.cancel() | ||
| with contextlib.suppress(asyncio.CancelledError): | ||
| await self._processing_task | ||
| self._fail_remaining_futures(saved_in_flight) | ||
| except BaseException: | ||
| self._fail_remaining_futures(self._in_flight) | ||
| raise |
There was a problem hiding this comment.
stop() re-raises background task exceptions, aborting the shutdown sequence
When _processing_loop previously died with MemoryError/RecursionError, self._processing_task is already a done task with that exception stored. When stop() is subsequently called:
asyncio.shield(self._processing_task)creates a future that immediately adopts the stored exception.asyncio.wait_for(...)raises the exception immediately (no timeout).- The
except BaseException: ...; raiseblock correctly fails remaining futures but then re-raisesMemoryError/RecursionError.
This propagates through _try_stop in _safe_shutdown (which also re-raises these errors), meaning the bridge, message bus, and persistence backend are never shut down cleanly when a catastrophic engine fault precedes shutdown.
RecursionError in particular is process-survivable, so clean shutdown matters. Consider distinguishing between an exception that originated from the background task's prior run (should be logged-and-absorbed during shutdown) versus one that originates from the drain logic itself (should propagate):
except BaseException as exc:
self._fail_remaining_futures(self._in_flight)
# Only re-raise if this is a new error, not a stale task exception
if self._processing_task is not None and self._processing_task.done():
task_exc = self._processing_task.exception() if not self._processing_task.cancelled() else None
if task_exc is exc:
logger.error(TASK_ENGINE_LOOP_ERROR, error=f"Processing task died: {exc!r}")
return # don't abort shutdown for a pre-existing task failure
raisePrompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 186-192
Comment:
**`stop()` re-raises background task exceptions, aborting the shutdown sequence**
When `_processing_loop` previously died with `MemoryError`/`RecursionError`, `self._processing_task` is already a done task with that exception stored. When `stop()` is subsequently called:
1. `asyncio.shield(self._processing_task)` creates a future that immediately adopts the stored exception.
2. `asyncio.wait_for(...)` raises the exception immediately (no timeout).
3. The `except BaseException: ...; raise` block correctly fails remaining futures but then **re-raises** `MemoryError`/`RecursionError`.
This propagates through `_try_stop` in `_safe_shutdown` (which also re-raises these errors), meaning the bridge, message bus, and persistence backend are **never shut down cleanly** when a catastrophic engine fault precedes shutdown.
`RecursionError` in particular is process-survivable, so clean shutdown matters. Consider distinguishing between an exception that originated *from the background task's prior run* (should be logged-and-absorbed during shutdown) versus one that originates from the drain logic itself (should propagate):
```python
except BaseException as exc:
self._fail_remaining_futures(self._in_flight)
# Only re-raise if this is a new error, not a stale task exception
if self._processing_task is not None and self._processing_task.done():
task_exc = self._processing_task.exception() if not self._processing_task.cancelled() else None
if task_exc is exc:
logger.error(TASK_ENGINE_LOOP_ERROR, error=f"Processing task died: {exc!r}")
return # don't abort shutdown for a pre-existing task failure
raise
```
How can I resolve this? If you propose a fix, please make it concise.🤖 I have created a release *beep* *boop* --- ## [0.1.3](v0.1.2...v0.1.3) (2026-03-13) ### Features * add Mem0 memory backend adapter ([#345](#345)) ([2788db8](2788db8)), closes [#206](#206) * centralized single-writer TaskEngine with full CRUD API ([#328](#328)) ([9c1a3e1](9c1a3e1)) * incremental AgentEngine → TaskEngine status sync ([#331](#331)) ([7a68d34](7a68d34)), closes [#323](#323) * web dashboard pages — views, components, tests, and review fixes ([#354](#354)) ([b165ec4](b165ec4)) * web dashboard with Vue 3 + PrimeVue + Tailwind CSS ([#347](#347)) ([06416b1](06416b1)) ### Bug Fixes * harden coordination pipeline with validators, logging, and fail-fast ([#333](#333)) ([2f10d49](2f10d49)), closes [#205](#205) * repo-wide security hardening from ZAP, Scorecard, and CodeQL audit ([#357](#357)) ([27eb288](27eb288)) ### CI/CD * add pip-audit, hadolint, OSSF Scorecard, ZAP DAST, and pre-push hooks ([#350](#350)) ([2802d20](2802d20)) * add workflow_dispatch trigger to PR Preview for Dependabot PRs ([#326](#326)) ([4c7b6d9](4c7b6d9)) * bump astral-sh/setup-uv from 7.4.0 to 7.5.0 in the minor-and-patch group ([#335](#335)) ([98dd8ca](98dd8ca)) ### Maintenance * bump the minor-and-patch group across 1 directory with 3 updates ([#352](#352)) ([031b1c9](031b1c9)) * **deps:** bump devalue from 5.6.3 to 5.6.4 in /site in the npm_and_yarn group across 1 directory ([#324](#324)) ([9a9c600](9a9c600)) * migrate docs build from MkDocs to Zensical ([#330](#330)) ([fa8bf1d](fa8bf1d)), closes [#329](#329) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Summary
asyncio.Queue— all task state mutations flow through a single background processing loop, eliminating race conditions/tasks) with create, read, update, delete, transition, and cancel endpointsVersionTrackerwithexpected_versionsupport on mutationsTaskStateChangedevents published to message bus after each mutation (configurable viaTaskEngineConfig.publish_snapshots)TaskNotFoundError,TaskVersionConflictError,TaskInternalErrormapped to proper HTTP status codes (404, 409, 503)stop(), in-flight futures resolved on timeoutcreated_bymismatch, safety cap onlist_tasksresultspages-preview.yml(untrusted checkout)Test plan
pytest -n auto)ruff check— all checks passedruff format— all files formattedmypy— no new errors (all pre-existing)Review coverage
Pre-reviewed by 10 specialized agents, 34 findings addressed including:
🤖 Generated with Claude Code