Skip to content

Feat/task-engine implementation#325

Closed
Aureliolo wants to merge 8 commits intomainfrom
feat/task-engine
Closed

Feat/task-engine implementation#325
Aureliolo wants to merge 8 commits intomainfrom
feat/task-engine

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

Summary
Typed error hierarchy: Add TaskNotFoundError, TaskEngineQueueFullError, TaskVersionConflictError for precise error classification — API controllers catch these directly instead of parsing error strings
Error sanitization: Internal exception details (SQL paths, stack traces) no longer leak to API responses
Immutable field protection: Model validators on UpdateTaskMutation and TransitionTaskMutation reject writes to id, status, created_by, created_at, etc.
Processing loop hardening: Guard _process_one against unhandled exceptions, thread previous_status through mutation results and snapshots, add _fail_remaining_futures for drain timeout cleanup
Logging coverage: Add event constants and structured logging for engine creation, version conflicts, and loop errors
AgentEngine integration: Split broad except Exception in _report_to_task_engine into TaskMutationError (warning) vs Exception (error with divergence note); extract _TERMINAL_STATUSES constant

Code quality: Extract _not_found_result helper, use Self in model validators, add _check_consistency validator on TaskMutationResult, exhaustive match default branch

Docs: Update tech-stack "State coordination" to Adopted, add TaskEngine to CLAUDE.md engine description and logging events, add TaskEngine architecture subsection to engine design page

Test plan
TestAppStateTaskEngine — 6 tests for task_engine property, has_task_engine, set_task_engine
TestReportToTaskEngine — 5 tests: no-op without engine, skip non-terminal, report terminal, swallow TaskMutationError, swallow unexpected errors
TestAppLifecycle — 2 new tests: task engine startup failure cleans up persistence+bus, shutdown task engine failure doesn't propagate
TestVersionConflictOnTransition — version mismatch returns failure
TestCancelNotFound — cancel on non-existent task returns failure
TestPreviousStatus — previous_status populated correctly (create=None, transition=CREATED, cancel=ASSIGNED)
TestImmutableFieldRejection — update/transition reject immutable fields
TestTypedErrors — convenience methods raise TaskNotFoundError
All 6925 tests pass, 94.39% coverage
ruff check + format clean
mypy strict clean

Closes #204

Implement actor-like TaskEngine that owns all task state mutations via
asyncio.Queue. A single background task processes mutations sequentially
using model_copy(update=...), persists results, and publishes snapshots
to the message bus. Reads bypass the queue for direct persistence access.

- Add TaskEngine core with start/stop lifecycle, submit/convenience methods
- Add 5 mutation types (create, update, transition, delete, cancel)
- Add TaskMutationResult response and TaskStateChanged event models
- Add TaskEngineConfig (queue size, drain timeout, snapshot toggle)
- Add 4 error types (TaskEngineError, NotRunning, Mutation, VersionConflict)
- Wire into API controllers, AppState, app lifecycle, and config
- Add optional AgentEngine report-back for terminal task status
- Add 57 unit tests covering all mutations, ordering, versioning, drain

Closes #204
Pre-reviewed by 10 agents, 37 findings addressed:

- Add exhaustive match default + typed error hierarchy (TaskNotFoundError,
  TaskEngineQueueFullError, TaskVersionConflictError)
- Sanitize internal exception details from API responses
- Add immutable field rejection validators on UpdateTaskMutation and
  TransitionTaskMutation
- Thread previous_status through TaskMutationResult and snapshots
- Add consistency model_validator to TaskMutationResult
- Guard _processing_loop against unhandled exceptions
- Fix startup cleanup to handle task engine failures
- Replace assert with proper error handling in convenience methods
- Add _fail_remaining_futures for drain timeout cleanup
- Add comprehensive logging coverage (creation, conflicts, loop errors)
- Add _not_found_result helper to reduce duplication
- Extract _TERMINAL_STATUSES module constant
- Use Self return type in model validators
- Split broad except in _report_to_task_engine (TaskMutationError vs Exception)
- Update docs: tech-stack Adopted, CLAUDE.md engine description, engine.md
  TaskEngine architecture subsection
- Add tests: AppState.task_engine, _report_to_task_engine, app lifecycle,
  version conflicts, cancel not-found, previous_status, immutable fields,
  typed errors
- Add error_code discriminator to TaskMutationResult (not_found/version_conflict/validation/internal)
- Fix _raise_typed_error to use error_code match instead of fragile string matching
- Fix _processing_loop outer catch to resolve envelope future (prevents caller deadlock)
- Guard happy-path set_result in _process_one with done() check
- Fix _apply_update to use Task.model_validate() instead of model_copy() (runs validators)
- Clean _IMMUTABLE_TASK_FIELDS: remove 4 non-existent timestamp fields (only id/status/created_by)
- Rename _TERMINAL_STATUSES → _REPORTABLE_STATUSES (FAILED/INTERRUPTED are not strictly terminal)
- Fix assigned_to=None override bug in transition_task controller
- Add from_status to task transition audit log
- Fix failure log to use API_TASK_TRANSITION_FAILED event instead of TASK_STATUS_CHANGED
- Map TaskEngineNotRunningError/TaskEngineQueueFullError → ServiceUnavailableError (503)
- Fix create_task missing error handling
- Fix _on_expire broad exception handler to re-raise MemoryError/RecursionError
- Add MemoryError/RecursionError re-raise to _publish_snapshot
- Add API_TASK_TRANSITION_FAILED event constant
- Fix AppState docstring and set_task_engine docstring
- Add version_conflict test to TestTypedErrors
- Add TestDrainTimeout: verify abandoned futures resolved on _fail_remaining_futures
- Add TestMutationResultConsistency: validate success/error invariants enforced by Pydantic
- Add test_memory_error_propagates to TestReportToTaskEngine
- Fix engine.md: text lang specifier, version conflict description, _IMMUTABLE_TASK_FIELDS, asyncio.wait
…tness

- Add TaskInternalError for internal engine faults (maps to 5xx vs 4xx)
- Thread error_code through all TaskMutationResult failure paths
- Extend _raise_typed_error to cover 'internal' code with -> Never return
- Add TaskInternalError handling in all TaskController endpoints
- Fix bridge leak in _cleanup_on_failure (started_bridge flag was missing)
- Remove phantom 'created_at' from _IMMUTABLE_OVERRIDE_FIELDS (field absent on Task)
- Change transition_task to return tuple[Task, TaskStatus | None] (eliminates extra get_task round-trip in controller)
- Split 1140-line test_task_engine.py into three focused files + helpers
- Move TaskEngine fixtures from helpers to conftest.py (auto-discovery, no F401 hacks)
- Fix all ruff/mypy issues in new test files (TC001, I001, F811, E501, unused-ignore)
Copilot AI review requested due to automatic review settings March 12, 2026 15:49
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 12, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 12, 2026

Caution

Review failed

Pull request was closed or merged during review

📝 Walkthrough

Summary by CodeRabbit

  • New Features

    • Centralized Task Engine for single-writer task state, lifecycle, and CRUD mutations; agent run results reported to the engine.
    • Optimistic concurrency with per-task versioning and typed mutation results; engine maps internal errors to API-visible errors.
    • Task snapshots published to the message bus.
  • Configuration

    • New task engine settings: queue size, drain timeout, and snapshot publishing toggle.
  • Observability

    • Added task-engine and API task transition failure event constants.
  • Documentation

    • Design and architecture docs updated to describe the Task Engine.

Walkthrough

Adds a centralized single-writer TaskEngine with queue-based mutation processing, version tracking, snapshot publishing, typed task errors, Agent/API integration, updated lifecycle wiring, config/schema/defaults, observability events, and extensive tests.

Changes

Cohort / File(s) Summary
Core TaskEngine Implementation
src/ai_company/engine/task_engine.py, src/ai_company/engine/task_engine_apply.py, src/ai_company/engine/task_engine_models.py, src/ai_company/engine/task_engine_config.py, src/ai_company/engine/task_engine_version.py
New TaskEngine, mutation models, apply/dispatch logic, in-memory VersionTracker and config; implements single-writer asyncio.Queue processing, lifecycle (start/stop/drain), submit/create/update/transition/delete/cancel APIs, read-through reads, typed mutation results and error translation, and snapshot publishing.
Engine Surface & Errors
src/ai_company/engine/__init__.py, src/ai_company/engine/errors.py
Export surface expanded to expose TaskEngine, TaskEngineConfig, mutation models and new TaskEngine error hierarchy (not running, queue full, mutation errors, not found, version conflict, internal).
Agent Integration
src/ai_company/engine/agent_engine.py
AgentEngine receives optional TaskEngine dependency and reports terminal task statuses via new best-effort _report_to_task_engine call; new reportable-status set added.
API Wiring & App State
src/ai_company/api/app.py, src/ai_company/api/state.py, src/ai_company/config/schema.py, src/ai_company/config/defaults.py
create_app and lifecycle builders accept TaskEngine; startup/shutdown and failure cleanup wired to start/stop task engine; AppState gains task_engine accessor, has_task_engine, and set_task_engine; config/schema/defaults include task_engine config.
API Controller Migration
src/ai_company/api/controllers/tasks.py
TaskController switched to use TaskEngine for CRUD and transitions; added engine error mapping, requester extraction, CreateTaskData usage, and API event emission for transition failures.
Observability Events
src/ai_company/observability/events/task_engine.py, src/ai_company/observability/events/api.py
New task_engine event constants for lifecycle, mutations, snapshots, drains, queue/backpressure and loop errors; added API_TASK_TRANSITION_FAILED event constant.
Docs
CLAUDE.md, docs/architecture/tech-stack.md, docs/design/engine.md
Documentation updated to describe TaskEngine, mark state coordination Adopted, add design details (mutation taxonomy, errors, lifecycle, snapshot publishing, versioning) and adjust agent execution timeout handling in design text.
Tests & Fixtures
tests/unit/engine/*, tests/unit/api/*, tests/unit/observability/test_events.py, tests/unit/config/conftest.py
Extensive new and updated tests and fixtures: helpers/fakes, unit and integration tests for TaskEngine behavior (lifecycle, mutations, ordering, versioning, backpressure, drain/timeouts, snapshot publishing), AppState tests, controller helper tests, and test_client wired to fake_task_engine.
Test Utilities
tests/unit/engine/task_engine_helpers.py
New in-memory fakes for persistence and message bus and helper _make_create_data for tests.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Controller as TaskController
    participant Engine as TaskEngine
    participant Queue as asyncio.Queue
    participant Processor as _processing_loop
    participant Persistence
    participant Bus as MessageBus

    Client->>Controller: create_task(data)
    Controller->>Engine: create_task(data, requested_by)
    Engine->>Engine: build CreateTaskMutation
    Engine->>Queue: submit(mutation)
    Engine-->>Controller: returns pending Task / awaits result

    Note over Processor,Queue: Background processing
    Queue->>Processor: dequeue _MutationEnvelope
    Processor->>Processor: dispatch -> apply_create
    Processor->>Persistence: tasks.save(task)
    Persistence-->>Processor: saved
    Processor->>Engine: bump version / record result
    Processor->>Bus: publish TaskStateChanged
    Bus-->>Processor: acknowledged
    Processor->>Queue: fulfill future with result
    Queue-->>Controller: result delivered
Loading
sequenceDiagram
    participant App as Litestar App
    participant Lifecycle as LifecycleBuilder
    participant Engine as TaskEngine

    App->>Lifecycle: create_app(task_engine=instance)
    Lifecycle->>Engine: start()
    Engine->>Engine: create queue & spawn _processing_loop
    Engine-->>Lifecycle: started

    App->>Lifecycle: shutdown()
    Lifecycle->>Engine: stop(timeout)
    Engine->>Engine: drain queue, complete/cancel envelopes
    Engine-->>Lifecycle: stopped
Loading
sequenceDiagram
    participant Agent as AgentEngine
    participant Exec as ExecutionLoop
    participant Engine as TaskEngine
    participant Persistence

    Agent->>Exec: run(...)
    Exec-->>Agent: ExecutionResult (COMPLETED)
    Agent->>Agent: _post_execution_pipeline()
    Agent->>Engine: _report_to_task_engine(result, agent_id, task_id)
    alt task_engine configured
        Engine->>Engine: transition_task(... expected_version ...)
        Engine->>Persistence: persist transition
        Persistence-->>Engine: ok
        Engine-->>Agent: ack (best-effort)
    else not configured
        Engine-->>Agent: no-op
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 40.34% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The pull request title 'Feat/task-engine implementation' accurately summarizes the primary change: implementing a TaskEngine for centralized state coordination.
Description check ✅ Passed The pull request description is directly related to the changeset, providing comprehensive details about typed errors, error sanitization, immutable-field protection, processing loop hardening, logging, AgentEngine integration, and documentation updates.
Linked Issues check ✅ Passed The pull request fully addresses the requirements in issue #204: implements a TaskEngine with single-writer asyncio.Queue pattern, sequential mutation application with model validation, snapshot publishing, version tracking for optimistic concurrency, and proper lifecycle management.
Out of Scope Changes check ✅ Passed All code changes are directly related to TaskEngine implementation or its integration. No unrelated or out-of-scope changes are present; all modifications serve the core objective of centralized state coordination.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/task-engine
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/task-engine
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 93.48172% with 41 lines in your changes missing coverage. Please review.
✅ Project coverage is 93.71%. Comparing base (d1203e5) to head (7ec0ab7).
⚠️ Report is 2 commits behind head on main.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/ai_company/engine/task_engine.py 87.86% 15 Missing and 10 partials ⚠️
src/ai_company/api/app.py 80.55% 5 Missing and 2 partials ⚠️
src/ai_company/api/controllers/tasks.py 89.28% 6 Missing ⚠️
src/ai_company/engine/agent_engine.py 87.50% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #325      +/-   ##
==========================================
+ Coverage   93.64%   93.71%   +0.06%     
==========================================
  Files         427      433       +6     
  Lines       19177    19746     +569     
  Branches     1846     1904      +58     
==========================================
+ Hits        17959    18504     +545     
- Misses        943      958      +15     
- Partials      275      284       +9     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a centralized, single-writer TaskEngine to manage task state mutations, enhancing the reliability and observability of task processing. It includes improvements to error handling, data validation, logging, and integration with the AgentEngine, ensuring more robust and secure task management.

Highlights

  • Typed Error Hierarchy: Added TaskNotFoundError, TaskEngineQueueFullError, and TaskVersionConflictError for precise error classification, allowing API controllers to catch these directly instead of parsing error strings.
  • Error Sanitization: Internal exception details, such as SQL paths and stack traces, are now sanitized to prevent leakage to API responses.
  • Immutable Field Protection: Model validators on UpdateTaskMutation and TransitionTaskMutation reject writes to immutable fields like id, status, created_by, and created_at.
  • Processing Loop Hardening: The _process_one method is now guarded against unhandled exceptions, previous_status is threaded through mutation results and snapshots, and _fail_remaining_futures is added for drain timeout cleanup.
  • Logging Coverage: Added event constants and structured logging for engine creation, version conflicts, and loop errors to improve observability.
  • AgentEngine Integration: Refined exception handling in _report_to_task_engine to differentiate between TaskMutationError (warning) and general Exception (error with divergence note); extracted _TERMINAL_STATUSES constant.
Activity
  • This pull request introduces a centralized, single-writer TaskEngine to manage task state mutations.
  • It enhances the reliability and observability of task processing.
  • It includes improvements to error handling, data validation, logging, and integration with the AgentEngine.
  • The changes ensure more robust and secure task management.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces a centralized, single-writer TaskEngine to serialize task mutations (create/update/transition/delete/cancel), publish task state change snapshots, and integrate task state coordination into the API and agent execution pipeline.

Changes:

  • Added TaskEngine (queue-based mutation processing), request/result/event models, configuration, and typed errors.
  • Switched API task CRUD endpoints to use TaskEngine instead of writing directly to persistence; updated app lifecycle/state to optionally wire/start/stop a task engine.
  • Added extensive unit/integration tests plus observability event constants and documentation updates.

Reviewed changes

Copilot reviewed 28 out of 28 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
tests/unit/observability/test_events.py Registers task_engine as an observability domain module.
tests/unit/engine/test_task_engine_mutations.py CRUD + typed error behavior tests for TaskEngine convenience APIs.
tests/unit/engine/test_task_engine_models.py Validation/immutability/serialization tests for task engine models.
tests/unit/engine/test_task_engine_lifecycle.py Start/stop lifecycle + config model tests.
tests/unit/engine/test_task_engine_integration.py Queue/backpressure, ordering, snapshot publishing, drain/timeout behavior tests.
tests/unit/engine/test_agent_engine.py Adds coverage for AgentEngine best-effort reporting to TaskEngine.
tests/unit/engine/task_engine_helpers.py Shared fakes/helpers for TaskEngine tests.
tests/unit/engine/conftest.py Adds TaskEngine fixtures for unit tests.
tests/unit/config/conftest.py Extends root config factory defaults (adds escalation_paths).
tests/unit/api/test_state.py Tests AppState.task_engine accessors and deferred configuration.
tests/unit/api/test_app.py Updates lifecycle helper signatures and adds task-engine startup/shutdown failure coverage.
tests/unit/api/conftest.py Provides a fake TaskEngine and updates test_client fixture to use context manager.
src/ai_company/observability/events/task_engine.py New TaskEngine-specific observability event constants.
src/ai_company/observability/events/api.py Adds API_TASK_TRANSITION_FAILED event constant.
src/ai_company/engine/task_engine.py Implements queue-based single-writer mutation processing + snapshot publishing.
src/ai_company/engine/task_engine_models.py Adds mutation request/result/event Pydantic models + immutability validators.
src/ai_company/engine/task_engine_config.py Adds TaskEngineConfig model (queue size, drain timeout, snapshot toggle).
src/ai_company/engine/errors.py Adds TaskEngine/TaskMutation typed exception hierarchy.
src/ai_company/engine/agent_engine.py Adds optional task_engine + reports terminal statuses best-effort post-run.
src/ai_company/engine/init.py Re-exports TaskEngine APIs and new errors/models/config.
src/ai_company/config/schema.py Adds task_engine: TaskEngineConfig to RootConfig.
src/ai_company/config/defaults.py Adds task_engine defaults block.
src/ai_company/api/state.py Adds task_engine to AppState with accessor/late-binding setter.
src/ai_company/api/controllers/tasks.py Converts task CRUD to go through TaskEngine with typed error mapping.
src/ai_company/api/app.py Wires task engine into lifecycle (startup/shutdown/cleanup), extends create_app() signature.
docs/design/engine.md Documents TaskEngine architecture and updates AgentEngine timeout behavior wording.
docs/architecture/tech-stack.md Marks “State coordination” as Adopted.
CLAUDE.md Updates engine module description and adds TaskEngine event constant example.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +4 to +6
background task consumes mutation requests sequentially, applies
``model_copy(update=...)`` on frozen ``Task`` models, persists the
result, and publishes snapshots to the message bus.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module docstring says TaskEngine applies model_copy(update=...), but the implementation primarily does model_dump() + Task.model_validate(...) (and Task.with_transition() also uses model_validate). Please update the docstring to reflect the actual mutation mechanics to avoid misleading future maintainers.

Suggested change
background task consumes mutation requests sequentially, applies
``model_copy(update=...)`` on frozen ``Task`` models, persists the
result, and publishes snapshots to the message bus.
background task consumes mutation requests sequentially, derives a new
``Task`` instance from the current state and the mutation (e.g. via
``Task.model_validate`` / ``Task.with_transition``), persists the result,
and publishes snapshots to the message bus.

Copilot uses AI. Check for mistakes.
Comment on lines +160 to +173
await asyncio.wait_for(
self._processing_task,
timeout=effective_timeout,
)
logger.info(TASK_ENGINE_DRAIN_COMPLETE)
except TimeoutError:
logger.warning(
TASK_ENGINE_DRAIN_TIMEOUT,
remaining=self._queue.qsize(),
)
self._processing_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._processing_task
self._fail_remaining_futures()
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On drain timeout, stop() cancels _processing_task and then only fails futures still sitting in _queue. Any mutation already dequeued and mid-processing when cancellation happens will have its envelope.future left unresolved, so callers awaiting submit() can hang indefinitely. Consider tracking the in-flight envelope and failing its future on cancellation/timeout (or handling CancelledError in the processing loop to resolve the current envelope).

Copilot uses AI. Check for mistakes.
Comment on lines +490 to +538
try:
await self._process_one(envelope)
except Exception:
logger.exception(
TASK_ENGINE_LOOP_ERROR,
error="Unhandled exception in processing loop",
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=envelope.mutation.request_id,
success=False,
error="Internal error in processing loop",
error_code="internal",
),
)

async def _process_one(self, envelope: _MutationEnvelope) -> None:
"""Process a single mutation envelope."""
mutation = envelope.mutation
logger.debug(
TASK_ENGINE_MUTATION_RECEIVED,
mutation_type=mutation.mutation_type,
request_id=mutation.request_id,
)
try:
result = await self._apply_mutation(mutation)
if not envelope.future.done():
envelope.future.set_result(result)
if result.success and self._config.publish_snapshots:
await self._publish_snapshot(mutation, result)
except Exception as exc:
internal_msg = f"{type(exc).__name__}: {exc}"
logger.exception(
TASK_ENGINE_MUTATION_FAILED,
mutation_type=mutation.mutation_type,
request_id=mutation.request_id,
error=internal_msg,
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=mutation.request_id,
success=False,
error="Internal error processing mutation",
error_code="internal",
),
)

Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_processing_loop() and _process_one() catch broad Exception and convert it into an internal failure result. This will also swallow MemoryError / RecursionError, which elsewhere in the codebase are treated as non-recoverable and re-raised. Add an explicit except MemoryError, RecursionError: raise (or avoid catching them) so the process can fail fast on these conditions.

Copilot uses AI. Check for mistakes.
Comment on lines +672 to +701
async def _apply_transition(
self,
mutation: TransitionTaskMutation,
) -> TaskMutationResult:
"""Perform a task status transition."""
task = await self._persistence.tasks.get(mutation.task_id)
if task is None:
return self._not_found_result(
"transition",
mutation.request_id,
mutation.task_id,
)

try:
self._check_version(mutation.task_id, mutation.expected_version)
except TaskVersionConflictError as exc:
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=str(exc),
error_code="version_conflict",
)

previous_status = task.status

try:
updated = task.with_transition(
mutation.target_status,
**mutation.overrides,
)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TransitionTaskMutation.reason is required and API/AgentEngine populate it, but the TaskEngine never uses it (not in Task.with_transition(), not in logs, and not in TaskStateChanged). Either plumb reason into the emitted snapshot/log fields for auditability, or drop it from the mutation model to avoid carrying unused data.

Copilot uses AI. Check for mistakes.
Comment on lines +148 to +157
"""Reverse cleanup on startup failure (task engine, bridge, bus, persistence)."""
if started_task_engine and task_engine is not None:
try:
await task_engine.stop()
except Exception:
logger.exception(
API_APP_STARTUP,
error="Cleanup: failed to stop task engine",
)
if started_bridge and bridge is not None:
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_cleanup_on_failure() swallows all exceptions when stopping components. In this repo, best-effort exception handling typically re-raises MemoryError / RecursionError rather than swallowing them (you already do this in _on_expire() above). Consider adding except MemoryError, RecursionError: raise before the generic except Exception blocks here as well.

Copilot uses AI. Check for mistakes.
Comment on lines 306 to 323
@@ -263,6 +312,14 @@ async def _safe_shutdown(
API_APP_SHUTDOWN,
error="Failed to stop message bus bridge",
)
if task_engine is not None:
try:
await task_engine.stop()
except Exception:
logger.exception(
API_APP_SHUTDOWN,
error="Failed to stop task engine",
)
if message_bus is not None:
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_safe_shutdown() catches Exception while stopping services, which will also swallow MemoryError / RecursionError. If those occur during shutdown, they should propagate (consistent with other best-effort blocks in this repo). Add an explicit except MemoryError, RecursionError: raise before the broad except Exception clauses.

Copilot uses AI. Check for mistakes.
Comment on lines 346 to 352
cost_tracker: CostTracker | None = None,
approval_store: ApprovalStore | None = None,
auth_service: AuthService | None = None,
task_engine: TaskEngine | None = None,
) -> Litestar:
"""Create and configure the Litestar application.

Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskController now requires AppState.task_engine, but create_app() still only warns about missing persistence/message_bus/cost_tracker. Since task_engine is also an optional constructor arg here, it’s easy to accidentally build an app with persistence configured but no TaskEngine and get surprising 503s on /tasks. Consider either (a) including task_engine in the warning, or (b) auto-constructing one from persistence/message_bus using config.task_engine when not provided.

Copilot uses AI. Check for mistakes.
Comment on lines +186 to +187
- **Immutable updates**: Each mutation calls `model_copy(update=...)` on
frozen `Task` models — the original is never mutated.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section says TaskEngine performs immutable updates via model_copy(update=...), but the current TaskEngine implementation uses model_dump() + Task.model_validate(...) (and Task.with_transition() uses model_validate). Please align the doc wording with the actual implementation so readers don’t assume model_copy() semantics.

Suggested change
- **Immutable updates**: Each mutation calls `model_copy(update=...)` on
frozen `Task` models — the original is never mutated.
- **Immutable-style updates**: Each mutation constructs a new `Task` instance
from the previous one (for example via `Task.model_validate({**task.model_dump(), **updates})`
or `Task.with_transition(...)`); the existing instance is never mutated.

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR implements the centralized TaskEngine — a single-writer actor that serializes all task state mutations through an asyncio.Queue, replacing scattered direct-persistence calls in controllers and AgentEngine. It introduces a typed error hierarchy, immutable-field validators, optimistic-concurrency version tracking, structured logging event constants, and integrates the engine into the app lifecycle and AgentEngine post-execution reporting.

Key changes:

  • New TaskEngine: background processing loop, graceful drain/shutdown, snapshot publishing to MessageBus, VersionTracker for optimistic concurrency
  • Typed errors: TaskNotFoundError, TaskVersionConflictError, TaskEngineQueueFullError, TaskInternalError replacing string parsing
  • Controller migration: TaskController now routes all CRUD through TaskEngine with _map_task_engine_errors for HTTP error mapping
  • AgentEngine integration: best-effort _report_to_task_engine after terminal execution outcomes
  • _try_stop refactor: DRY lifecycle helper replacing duplicated try/except shutdown blocks
  • Issue: TaskVersionConflictError is a subclass of TaskMutationError and is not explicitly handled in _map_task_engine_errors, causing version conflicts to surface as HTTP 422 (Unprocessable Entity) rather than HTTP 409 (Conflict). A ConflictError class already exists in api/errors.py and should be used here.

Confidence Score: 3/5

  • Safe to merge after fixing the version-conflict HTTP status mapping; all other issues are minor observability and style concerns.
  • The core TaskEngine architecture is well-designed and thoroughly tested (6925 tests pass). One logic issue — TaskVersionConflictError falling through to HTTP 422 instead of 409 — affects API contract correctness for callers implementing retry/conflict-resolution logic, and the fix is trivially small. The non-parenthesized except syntax (previously flagged) is still present in new code paths.
  • Pay close attention to src/ai_company/api/controllers/tasks.py — specifically the _map_task_engine_errors function missing an explicit TaskVersionConflictError branch.

Important Files Changed

Filename Overview
src/ai_company/engine/task_engine.py New centralized single-writer TaskEngine: well-structured actor pattern with asyncio.Queue, graceful drain/shutdown, snapshot publishing, and typed error propagation. Minor: redundant task_id extraction in _publish_snapshot; non-parenthesized multi-exception except form (already flagged in previous review).
src/ai_company/api/controllers/tasks.py Migrates task CRUD from direct persistence access to TaskEngine. Critical: TaskVersionConflictError (subclass of TaskMutationError) is not explicitly handled in _map_task_engine_errors, causing version conflicts to return HTTP 422 instead of the more correct HTTP 409 (ConflictError already exists in the error hierarchy). Also: API_RESOURCE_NOT_FOUND event is semantically incorrect for the auth-identity fallback.
src/ai_company/engine/task_engine_apply.py Mutation application logic extracted cleanly into separate module; all five mutation types handled with consistent error reporting via not_found_result. Pydantic validation error strings forwarded to TaskMutationResult.error (previously flagged); no new issues beyond that.
src/ai_company/engine/task_engine_models.py Well-designed frozen Pydantic models for all mutation types with immutable-field validators, deep-copy defensive hygiene on mutable dicts, and a consistency validator on TaskMutationResult. Clean and correct.
src/ai_company/engine/errors.py Clean typed error hierarchy addition: TaskEngineError base with TaskEngineNotRunningError, TaskEngineQueueFullError, TaskMutationError (and subtypes TaskNotFoundError, TaskVersionConflictError), and sibling TaskInternalError.
src/ai_company/engine/agent_engine.py Adds _report_to_task_engine best-effort reporting with split error handling (warning for TaskMutationError, error for TaskEngineError/unexpected). The non-parenthesized except MemoryError, RecursionError: form appears again on line 706 (already flagged in previous thread for this file).
src/ai_company/api/app.py Refactors lifecycle management into _try_stop helper for consistency; integrates TaskEngine into startup/shutdown order correctly (engine stopped first to drain before bus shuts down).

Sequence Diagram

sequenceDiagram
    participant API as TaskController/AgentEngine
    participant TE as TaskEngine
    participant Q as asyncio.Queue
    participant PL as ProcessingLoop
    participant Apply as task_engine_apply
    participant DB as PersistenceBackend
    participant Bus as MessageBus

    API->>TE: submit(mutation)
    TE->>Q: put_nowait(envelope)
    TE-->>API: await envelope.future

    PL->>Q: get() with timeout
    Q-->>PL: envelope
    PL->>Apply: dispatch(mutation, persistence, versions)
    Apply->>DB: save(task) / delete(task_id)
    DB-->>Apply: ok
    Apply-->>PL: TaskMutationResult
    PL->>PL: envelope.future.set_result(result)
    PL-->>API: result via future
    PL->>Bus: publish(TaskStateChanged)

    Note over TE,PL: stop() sets _running=False, drains queue, cancels, fails remaining futures
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/ai_company/api/controllers/tasks.py
Line: 100-101

Comment:
**`TaskVersionConflictError` silently mapped to HTTP 422 instead of 409**

`TaskVersionConflictError` is a subclass of `TaskMutationError`, so it falls through every earlier check and lands on `isinstance(exc, TaskMutationError)``ApiValidationError` (HTTP 422 Unprocessable Entity). This is semantically wrong: a version conflict is not a validation error from the caller's perspective — it signals that another writer modified the task between the caller's read and write.

`api/errors.py` already has a `ConflictError` class that maps to HTTP 409 Conflict, which is the correct status for optimistic-concurrency failures. Without an explicit guard here, callers cannot distinguish "invalid input" (422) from "concurrent modification" (409), making correct retry logic impossible.

```suggestion
    if isinstance(exc, TaskVersionConflictError):
        return ConflictError(str(exc))
    if isinstance(exc, TaskMutationError):
        return ApiValidationError(str(exc))
```

This also requires importing `TaskVersionConflictError` and `ConflictError` at the top of the file.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/api/controllers/tasks.py
Line: 57-61

Comment:
**Misleading log event for missing auth identity**

`API_RESOURCE_NOT_FOUND` is used to log the "no authenticated user" fallback, but that event is intended for missing data resources (tasks, users from the data store, etc.). In structured log queries, a search for `api.resource.not_found` would surface auth-misconfiguration warnings alongside genuine 404-class events, making observability harder.

Consider a dedicated event constant (e.g. `API_AUTH_IDENTITY_MISSING` in `events/api.py`) or re-use an existing auth-scoped constant such as `API_AUTH_FAILED`:
```suggestion
    logger.warning(
        API_AUTH_FAILED,
        resource="authenticated_user",
        note="No authenticated user found, falling back to 'api'",
    )
```

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 1762-1766

Comment:
**Redundant `task_id` extraction in `_publish_snapshot`**

`task_id` is computed inside the `try` block (line 1765 in the diff) and also re-evaluated identically in the `except` block (line 1784 in the diff). Because the assignment on line 1765 runs before the `await` that could raise, the except block will always re-execute the exact same `getattr` call. Moving the extraction before the `try` block would remove the duplication and make the intent clearer:

```python
task_id = getattr(mutation, "task_id", None)
try:
    from ai_company.communication.enums import MessageType  # noqa: PLC0415
    from ai_company.communication.message import Message  # noqa: PLC0415
    msg = Message(...)
    await self._message_bus.publish(msg)
    logger.debug(TASK_ENGINE_SNAPSHOT_PUBLISHED, ..., task_id=task_id)
except MemoryError, RecursionError:
    raise
except Exception:
    logger.warning(TASK_ENGINE_SNAPSHOT_PUBLISH_FAILED, ..., task_id=task_id, exc_info=True)
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 7ec0ab7

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the TaskEngine, a crucial component for centralized and robust task state management. The implementation uses a single-writer actor model via an asyncio.Queue to prevent race conditions, which is an excellent design choice. The changes include a well-defined error hierarchy, immutable data models with strong validation, and graceful lifecycle management within the application. The integration with the API controllers significantly improves their structure and error handling. The documentation is thorough, and the test coverage is extensive and of high quality, covering unit, integration, and lifecycle aspects of the new engine. Overall, this is an exceptionally well-executed feature implementation that greatly enhances the core architecture of the application. I have no suggestions for improvement at this time.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 14

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/unit/observability/test_events.py (1)

175-223: 🧹 Nitpick | 🔵 Trivial

Add explicit contract assertions for task_engine events.

This only verifies that the module exists. The new TASK_ENGINE_* constants can still drift without failing here, unlike the other domains below that pin exact names/values. Please add a dedicated test_task_engine_events_exist (or a parametrized equivalent) for the new observability surface.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/observability/test_events.py` around lines 175 - 223, The test
only checks that the task_engine module is present but doesn't assert the actual
event constant names/values; add a new unit test (e.g.,
test_task_engine_events_exist) that imports the events.task_engine module and
asserts the expected TASK_ENGINE_* constants (names and/or values) are defined
and equal to their intended strings/values, or use a parametrized
pytest.mark.parametrize over the expected constant names to assert
hasattr(events.task_engine, name) and equality to the expected literal;
reference the module events.task_engine and the specific constant symbols
(TASK_ENGINE_START, TASK_ENGINE_STOP, TASK_ENGINE_ERROR, etc.) to prevent future
drift.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/design/engine.md`:
- Around line 188-194: The doc incorrectly states optimistic concurrency uses
in-memory version counters only; update the design text to specify that the
persisted task version is the source of truth and that any in-memory counters
are merely an optimization and may be invalid after restart. Describe that
TaskEngine must persist and read the task version on writes and that callers’
expected_version is compared against the persisted version to produce a
TaskMutationResult with error_code="version_conflict" (and raise
TaskVersionConflictError for convenience). Also note that get_task() and
list_tasks() may use read-through caching but must validate or refresh versions
from persistence before accepting writes. Ensure references to expected_version,
TaskMutationResult, and TaskVersionConflictError remain in the doc.

In `@src/ai_company/api/app.py`:
- Around line 130-132: The shutdown ordering in _safe_shutdown is reversed: call
TaskEngine.stop() (or task_engine.stop()) before bridge.stop() so the TaskEngine
can drain and publish final snapshots that the bridge will forward; update
_safe_shutdown() to mirror _cleanup_on_failure()’s reverse order (stop
task_engine, then bridge, then message_bus/persistence), and ensure
on_shutdown() and any other shutdown paths use this corrected order.

In `@src/ai_company/api/controllers/tasks.py`:
- Around line 138-150: The repeated exception mapping in create_task,
update_task, transition_task, and delete_task should be centralized: implement a
small helper (e.g., map_task_exceptions or a decorator wrap_task_exceptions)
that catches TaskEngineNotRunningError, TaskEngineQueueFullError, and
TaskInternalError and re-raises ServiceUnavailableError(str(exc)) from exc, and
catches TaskMutationError to re-raise ApiValidationError(str(exc)) from exc;
replace the four duplicated try/except blocks by invoking the helper or applying
the decorator around the task engine calls (referencing create_task,
update_task, transition_task, delete_task and the exception classes
TaskEngineNotRunningError, TaskEngineQueueFullError, TaskInternalError,
TaskMutationError) so behavior remains identical but duplication is removed.
- Around line 180-201: The handler is passing a hardcoded requested_by="api"
into app_state.task_engine.update_task (and similar calls for
transition/delete), which loses who initiated the change; extract the actor
identifier from the request context (e.g., authenticated user id,
request.user.id, request.state.api_key or an injected RequestContext) and pass
that value into update_task's requested_by parameter instead of the literal
"api", defaulting to a safe fallback like "unknown" if no identity is present;
update the other call sites (transition_task/delete_task or methods on
app_state.task_engine) to use the same extracted identifier for consistent
auditing.

In `@src/ai_company/engine/errors.py`:
- Around line 109-116: TaskInternalError should not subclass TaskMutationError;
change its definition so it is a separate sibling (e.g., replace "class
TaskInternalError(TaskMutationError):" with "class
TaskInternalError(Exception):" or another common base used for engine/internal
errors) so that broad "except TaskMutationError" handlers won't catch it; keep
the existing docstring and tests but adjust any places that explicitly relied on
TaskInternalError being a subtype of TaskMutationError if needed.

In `@src/ai_company/engine/task_engine_models.py`:
- Around line 47-50: The model field assigned_to currently allows setting an
assignee while Task defaults to status=CREATED (and CREATED rejects non-None
assignees), so either enforce the create-time invariant here or derive the
initial status from the assignee; update assigned_to on the model to validate
that it must be None when creating a Task with default status=CREATED (reject
input) or change the creation logic so that if assigned_to is provided the
default status becomes ASSIGNED/IN_PROGRESS accordingly — adjust the pydantic
Field/validator for assigned_to or the Task constructor/creation path and ensure
consistency with _apply_create() and the CREATED constant.
- Around line 116-129: The updates/overrides dicts are accepted without
validating keys and are not deep-copied, causing silent no-ops on typos and
potential mutation leaks; update the mutation boundary logic (in
UpdateTaskMutation and TransitionTaskMutation) to deep-copy the incoming dicts
with copy.deepcopy() before any processing, then validate that every key in
updates/overrides is a known Task field (use Task.model_fields.keys() or
equivalent) and raise a clear ValueError listing unknown keys if any; keep the
existing _reject_immutable_fields model_validator (which checks against
_IMMUTABLE_TASK_FIELDS) but ensure validation against Task schema occurs before
calling Task.model_validate() so unknown keys are rejected rather than ignored.

In `@src/ai_company/engine/task_engine.py`:
- Around line 108-109: The in-memory _versions map is lost on restart so
optimistic concurrency breaks; update startup and any task-loading paths (e.g.,
get_task, list_tasks, and the processing loop referencing _check_version and
_processing_task) to seed and maintain _versions from persisted task metadata:
when you read tasks from storage populate _versions[task_id] = task.version (and
update it whenever a task is saved/updated), and modify _check_version to
consult persisted task.version if _versions lacks an entry so versions remain
durable across restarts.
- Around line 1-907: The file mixes multiple concerns in one large TaskEngine
class; split it into smaller modules: keep TaskEngine as the lifecycle/queue
owner (start, stop, submit, _processing_loop, _fail_remaining_futures,
is_running) and move mutation application logic into a TaskProcessor (methods
_apply_mutation,
_apply_create/_apply_update/_apply_transition/_apply_delete/_apply_cancel and
_not_found_result), the snapshot logic into a SnapshotPublisher (method
_publish_snapshot), and version handling into a VersionManager (fields
_versions, methods _bump_version, _check_version); also move _MutationEnvelope
dataclass to a small helpers module. Update TaskEngine to instantiate and
delegate to these collaborators (e.g. self._processor.apply(mutation),
self._publisher.publish(mutation, result), self._version_manager.bump/check)
while preserving public method names (create_task, update_task, transition_task,
delete_task, cancel_task, submit) and the existing TaskMutationResult wiring so
external behavior and tests remain unchanged.
- Around line 590-603: _wrap Task(...) construction in _apply_create and the
Task.model_validate(...) call in _apply_update with try/except blocks that catch
the validation exception type used by your model layer (e.g.,
pydantic.ValidationError or your custom ValidationError), and on catch return a
TaskMutationResult with error_code="validation" and include the validation
details/error message rather than letting the exception escape to _process_one;
mirror the pattern used by _apply_transition and _apply_cancel so the mutation
handlers consistently return typed validation errors instead of
error_code="internal".
- Around line 490-505: The generic exception handlers in _processing_loop and
_process_one are currently swallowing fail-fast exceptions from
_publish_snapshot; update both exception blocks to explicitly catch
(MemoryError, RecursionError) and immediately re-raise them before the generic
except Exception logic so these errors propagate; keep the existing logging
(TASK_ENGINE_LOOP_ERROR) and TaskMutationResult fallback only for non-fail-fast
exceptions and ensure you check envelope.future.done() only after the re-raise
branch.

In `@tests/unit/engine/test_agent_engine.py`:
- Around line 1009-1035: Expand the assertions in test_terminal_status_reported
to validate the full transition_task payload rather than only the status: after
awaiting engine.run, inspect mock_te.transition_task.call_args and assert the
first positional arg matches the Task ID from sample_task_with_criteria (or the
Task object as expected), the second arg equals TaskStatus.COMPLETED, and that
the payload includes the original requested_by (e.g., check kwargs or the third
positional arg depending on how transition_task is called). Use the existing
test names (test_terminal_status_reported, AgentEngine.run) and the mock object
mock_te.transition_task to locate where to add these additional assertions.

In `@tests/unit/engine/test_task_engine_integration.py`:
- Around line 153-182: In test_queue_full_raises, add a brief explanatory
comment above the lines that set eng._running = True and directly put an item
into eng._queue explaining that the test intentionally manipulates internal
state (eng._running and eng._queue) to simulate a full queue/backpressure
scenario because triggering this condition via the public API is difficult;
reference the test name (test_queue_full_raises) and the fields eng._running and
eng._queue in the comment so future readers understand the rationale.

---

Outside diff comments:
In `@tests/unit/observability/test_events.py`:
- Around line 175-223: The test only checks that the task_engine module is
present but doesn't assert the actual event constant names/values; add a new
unit test (e.g., test_task_engine_events_exist) that imports the
events.task_engine module and asserts the expected TASK_ENGINE_* constants
(names and/or values) are defined and equal to their intended strings/values, or
use a parametrized pytest.mark.parametrize over the expected constant names to
assert hasattr(events.task_engine, name) and equality to the expected literal;
reference the module events.task_engine and the specific constant symbols
(TASK_ENGINE_START, TASK_ENGINE_STOP, TASK_ENGINE_ERROR, etc.) to prevent future
drift.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: f0620322-8a27-4dfe-ac33-14cf78528e8a

📥 Commits

Reviewing files that changed from the base of the PR and between d1203e5 and 38006d5.

📒 Files selected for processing (28)
  • CLAUDE.md
  • docs/architecture/tech-stack.md
  • docs/design/engine.md
  • src/ai_company/api/app.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/api/state.py
  • src/ai_company/config/defaults.py
  • src/ai_company/config/schema.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_config.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/observability/events/task_engine.py
  • tests/unit/api/conftest.py
  • tests/unit/api/test_app.py
  • tests/unit/api/test_state.py
  • tests/unit/config/conftest.py
  • tests/unit/engine/conftest.py
  • tests/unit/engine/task_engine_helpers.py
  • tests/unit/engine/test_agent_engine.py
  • tests/unit/engine/test_task_engine_integration.py
  • tests/unit/engine/test_task_engine_lifecycle.py
  • tests/unit/engine/test_task_engine_models.py
  • tests/unit/engine/test_task_engine_mutations.py
  • tests/unit/observability/test_events.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14
Add type hints to all public functions — enforce mypy strict mode
Use Google-style docstrings — required on public classes and functions, enforced by ruff D rules
For frozen Pydantic models with dict/list fields, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use Pydantic v2 BaseModel, model_validator, computed_field, and ConfigDict — avoid redundant stored fields with @computed_field
Use NotBlankStr (from core.types) for all identifier/name fields, including optional (NotBlankStr | None) and tuple variants, instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code instead of bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly — never silently swallow exceptions
Enforce line length of 88 characters via ruff
Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves
Never mix static config fields with mutable runtime fields in one model

Files:

  • tests/unit/engine/test_task_engine_models.py
  • tests/unit/observability/test_events.py
  • tests/unit/engine/conftest.py
  • src/ai_company/engine/errors.py
  • src/ai_company/observability/events/api.py
  • tests/unit/engine/test_task_engine_mutations.py
  • tests/unit/api/test_state.py
  • src/ai_company/engine/task_engine_config.py
  • tests/unit/engine/test_task_engine_integration.py
  • tests/unit/engine/test_agent_engine.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/config/schema.py
  • tests/unit/engine/test_task_engine_lifecycle.py
  • src/ai_company/api/state.py
  • tests/unit/engine/task_engine_helpers.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/observability/events/task_engine.py
  • tests/unit/api/conftest.py
  • src/ai_company/api/app.py
  • src/ai_company/config/defaults.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/__init__.py
  • tests/unit/api/test_app.py
  • tests/unit/config/conftest.py
  • src/ai_company/engine/task_engine_models.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow markers for test organization
Prefer @pytest.mark.parametrize for testing similar cases
Never use real vendor names in test files — use test-provider, test-small-001, and generic model names

Files:

  • tests/unit/engine/test_task_engine_models.py
  • tests/unit/observability/test_events.py
  • tests/unit/engine/conftest.py
  • tests/unit/engine/test_task_engine_mutations.py
  • tests/unit/api/test_state.py
  • tests/unit/engine/test_task_engine_integration.py
  • tests/unit/engine/test_agent_engine.py
  • tests/unit/engine/test_task_engine_lifecycle.py
  • tests/unit/engine/task_engine_helpers.py
  • tests/unit/api/conftest.py
  • tests/unit/api/test_app.py
  • tests/unit/config/conftest.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging, logging.getLogger(), or print() in application code — use the observability logger instead
Always use logger as the variable name (not _logger or log)
Use event name constants from domain-specific modules under ai_company.observability.events (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget) — import directly
Use structured logging with kwargs: logger.info(EVENT, key=value) — never use string formatting like logger.info("msg %s", val)
Log all error paths at WARNING or ERROR with context before raising
Log all state transitions at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions

Files:

  • src/ai_company/engine/errors.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/engine/task_engine_config.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/config/schema.py
  • src/ai_company/api/state.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/observability/events/task_engine.py
  • src/ai_company/api/app.py
  • src/ai_company/config/defaults.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/task_engine_models.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/engine/errors.py
  • src/ai_company/observability/events/api.py
  • src/ai_company/engine/task_engine_config.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/config/schema.py
  • src/ai_company/api/state.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/observability/events/task_engine.py
  • src/ai_company/api/app.py
  • src/ai_company/config/defaults.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/__init__.py
  • src/ai_company/engine/task_engine_models.py
🧠 Learnings (9)
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly

Applied to files:

  • tests/unit/observability/test_events.py
  • tests/unit/engine/conftest.py
  • src/ai_company/observability/events/api.py
  • CLAUDE.md
  • src/ai_company/api/state.py
  • src/ai_company/observability/events/task_engine.py
  • src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`

Applied to files:

  • CLAUDE.md
  • src/ai_company/api/state.py
  • src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Never use `import logging`, `logging.getLogger()`, or `print()` in application code — use the observability logger instead

Applied to files:

  • CLAUDE.md
  • src/ai_company/api/state.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Always use `logger` as the variable name (not `_logger` or `log`)

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use structured logging with kwargs: `logger.info(EVENT, key=value)` — never use string formatting like `logger.info("msg %s", val)`

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all error paths at WARNING or ERROR with context before raising

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Log all state transitions at INFO level

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use DEBUG level for object creation, internal flow, and entry/exit of key functions

Applied to files:

  • CLAUDE.md
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/providers/**/*.py : Set `RetryConfig` and `RateLimiterConfig` per-provider in `ProviderConfig`

Applied to files:

  • src/ai_company/config/schema.py
🧬 Code graph analysis (14)
tests/unit/engine/test_task_engine_models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (247-253)
  • Priority (238-244)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/engine/task_engine_models.py (8)
  • CancelTaskMutation (201-218)
  • CreateTaskData (20-59)
  • CreateTaskMutation (65-80)
  • DeleteTaskMutation (183-198)
  • TaskMutationResult (234-275)
  • TaskStateChanged (281-318)
  • TransitionTaskMutation (142-180)
  • UpdateTaskMutation (98-129)
tests/unit/engine/conftest.py (3)
src/ai_company/engine/task_engine.py (3)
  • TaskEngine (81-907)
  • start (119-137)
  • stop (139-176)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
tests/unit/engine/task_engine_helpers.py (4)
  • FakeMessageBus (58-76)
  • FakePersistence (47-55)
  • start (65-66)
  • stop (68-69)
tests/unit/engine/test_task_engine_mutations.py (2)
src/ai_company/engine/errors.py (3)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine_models.py (4)
  • CancelTaskMutation (201-218)
  • TransitionTaskMutation (142-180)
  • UpdateTaskMutation (98-129)
  • TaskMutationResult (234-275)
src/ai_company/engine/task_engine_config.py (1)
src/ai_company/tools/base.py (1)
  • description (138-140)
tests/unit/engine/test_task_engine_integration.py (5)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/errors.py (1)
  • TaskEngineQueueFullError (93-94)
src/ai_company/engine/task_engine.py (5)
  • TaskEngine (81-907)
  • _MutationEnvelope (68-78)
  • start (119-137)
  • stop (139-176)
  • submit (200-235)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/engine/task_engine_models.py (4)
  • CreateTaskMutation (65-80)
  • DeleteTaskMutation (183-198)
  • TransitionTaskMutation (142-180)
  • UpdateTaskMutation (98-129)
tests/unit/engine/test_agent_engine.py (3)
src/ai_company/engine/errors.py (1)
  • TaskMutationError (97-98)
src/ai_company/engine/task_engine.py (1)
  • transition_task (311-358)
src/ai_company/memory/errors.py (1)
  • MemoryError (13-14)
src/ai_company/engine/agent_engine.py (3)
src/ai_company/engine/errors.py (1)
  • TaskMutationError (97-98)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/config/schema.py (3)
tests/unit/engine/conftest.py (1)
  • engine (432-443)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/api/state.py (2)
tests/unit/engine/conftest.py (1)
  • engine (432-443)
src/ai_company/engine/task_engine.py (1)
  • TaskEngine (81-907)
src/ai_company/api/controllers/tasks.py (7)
src/ai_company/api/pagination.py (1)
  • paginate (26-53)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (5)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskInternalError (109-116)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
src/ai_company/engine/task_engine_models.py (1)
  • CreateTaskData (20-59)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/engine/task_engine.py (6)
  • list_tasks (455-476)
  • get_task (444-453)
  • create_task (237-268)
  • update_task (270-309)
  • transition_task (311-358)
  • delete_task (360-389)
tests/unit/api/conftest.py (3)
tests/unit/engine/conftest.py (1)
  • engine (432-443)
src/ai_company/api/state.py (2)
  • task_engine (107-109)
  • persistence (87-89)
src/ai_company/engine/task_engine.py (1)
  • TaskEngine (81-907)
src/ai_company/api/app.py (3)
src/ai_company/api/state.py (2)
  • task_engine (107-109)
  • AppState (22-154)
src/ai_company/memory/errors.py (1)
  • MemoryError (13-14)
src/ai_company/communication/bus_protocol.py (1)
  • MessageBus (20-209)
src/ai_company/engine/task_engine.py (6)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (6)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskInternalError (109-116)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/engine/task_engine_models.py (1)
  • CancelTaskMutation (201-218)
src/ai_company/persistence/protocol.py (1)
  • PersistenceBackend (27-167)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (247-253)
  • Priority (238-244)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/core/task.py (1)
  • Task (45-261)
🪛 LanguageTool
CLAUDE.md

[style] ~130-~130: A comma is missing here.
Context: ...nder ai_company.observability.events (e.g. PROVIDER_CALL_START from `events.prov...

(EG_NO_COMMA)

docs/architecture/tech-stack.md

[style] ~122-~122: Consider using the typographical ellipsis character here instead.
Context: ... Agents submit requests; engine applies model_copy(update=...) sequentially and publishes snapshots....

(ELLIPSIS)

docs/design/engine.md

[style] ~186-~186: Consider using the typographical ellipsis character here instead.
Context: ...mmutable updates**: Each mutation calls model_copy(update=...) on frozen Task models — the origi...

(ELLIPSIS)

🔇 Additional comments (21)
tests/unit/config/conftest.py (1)

80-80: Factory default looks correct.

Adding escalation_paths = () keeps the test factory aligned with RootConfig while using an immutable default.

src/ai_company/observability/events/api.py (1)

38-38: Good addition to the shared API event surface.

Keeping this as a named constant preserves the existing observability pattern and avoids ad-hoc event strings in controllers.

Based on learnings, use event name constants from domain-specific modules under ai_company.observability.events.

src/ai_company/config/schema.py (1)

24-24: RootConfig wiring looks consistent.

Exposing TaskEngineConfig here keeps the engine’s knobs in the immutable configuration surface instead of leaking runtime state into app wiring.

As per coding guidelines, "Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves."

Also applies to: 418-529

src/ai_company/api/state.py (1)

38-68: Consistent service-guard pattern.

The new task_engine accessors mirror the existing auth-service behavior and keep the same explicit 503-on-missing contract for callers.

Also applies to: 106-133

tests/unit/engine/test_task_engine_lifecycle.py (1)

14-109: Nice coverage on lifecycle edges.

This hits start/stop idempotency, restart, stopped-submit behavior, and config immutability, which are the main failure modes for the new engine surface.

tests/unit/api/test_state.py (1)

95-136: Good coverage for deferred task_engine binding.

These cases pin the new AppState surface to the same access/set-once semantics as auth_service, which should keep the two code paths aligned.

tests/unit/engine/test_agent_engine.py (1)

940-1007: The failure-mode matrix here is solid.

Covering the no-engine, non-terminal, typed-mutation-error, generic-exception, and MemoryError paths makes _report_to_task_engine() much harder to regress.

Also applies to: 1037-1117

tests/unit/engine/conftest.py (1)

413-460: Useful shared fixture surface for the new engine tests.

Centralizing TaskEngine construction and teardown here should keep lifecycle assumptions consistent across the TaskEngine test modules.

tests/unit/api/conftest.py (2)

620-644: Passing the shared TaskEngine through create_app() is the right test seam.

That keeps API tests on the real startup/shutdown path instead of manually patching app state in each case.


609-617: No issue here—the fixture lifecycle is properly managed.

The fake_task_engine fixture is safe. Verification confirms there are no direct consumers outside test_client, and create_app() explicitly calls task_engine.start() (line 278 of src/ai_company/api/app.py) when the injected engine is provided. The fixture design correctly delegates startup to the app layer.

			> Likely an incorrect or invalid review comment.
src/ai_company/engine/task_engine_config.py (1)

1-37: LGTM!

The TaskEngineConfig model is well-designed:

  • Properly frozen for immutable configuration
  • Sensible field constraints with clear defaults
  • Comprehensive Google-style docstring documenting all attributes
tests/unit/api/test_app.py (1)

86-130: LGTM!

Good additions testing TaskEngine lifecycle integration:

  • test_task_engine_failure_cleans_up verifies proper cleanup of persistence and bus when engine fails to start
  • test_shutdown_task_engine_failure_does_not_propagate ensures shutdown errors are swallowed gracefully

The mock setup correctly simulates failure scenarios.

tests/unit/engine/test_task_engine_models.py (1)

1-312: LGTM!

Comprehensive test coverage for task engine models:

  • Validates construction with minimal and full parameters
  • Tests validation constraints (NotBlankStr, non-negative budget, positive version)
  • Verifies frozen model immutability
  • Checks serialization roundtrip for events
  • Tests consistency validators on TaskMutationResult

Well-organized with clear test class separation by model type.

tests/unit/engine/test_task_engine_integration.py (1)

278-326: LGTM!

The drain timeout test thoroughly verifies:

  • Futures in the queue are failed with error_code="internal" when stop times out
  • Proper cleanup of blocked tasks via cancellation
  • The contextlib.suppress correctly handles the cleanup phase
tests/unit/engine/test_task_engine_mutations.py (1)

1-583: LGTM!

Excellent test coverage for TaskEngine mutations:

  • All CRUD operations tested with success and failure scenarios
  • Typed error propagation verified (TaskNotFoundError, TaskVersionConflictError)
  • previous_status tracking validated across mutation types
  • Immutable field rejection tested at the model validation level
  • Error sanitization verified (internal errors don't leak implementation details)
tests/unit/engine/task_engine_helpers.py (2)

90-102: LGTM!

The _make_create_data helper is well-designed with sensible defaults and proper override support. The dynamic import of TaskType inside the function is acceptable for a test helper to avoid circular imports.


8-9: Remove unnecessary TYPE_CHECKING block for Python 3.14 compatibility.

With Python 3.14's PEP 649, TaskStatus can be imported directly instead of using a TYPE_CHECKING guard, since it's only used in type hints (line 30). Move the import to the top-level imports.

Suggested fix
-from typing import TYPE_CHECKING
-
 from ai_company.core.task import Task  # noqa: TC001
+from ai_company.core.enums import TaskStatus
 from ai_company.engine.task_engine_models import CreateTaskData
-
-if TYPE_CHECKING:
-    from ai_company.core.enums import TaskStatus
			> Likely an incorrect or invalid review comment.
src/ai_company/api/controllers/tasks.py (1)

253-260: Good error handling for transition failures.

Properly logs the transition failure with context before raising ApiValidationError. The distinction between TaskNotFoundError (404) and general TaskMutationError (validation error) is correct.

src/ai_company/engine/agent_engine.py (3)

102-115: LGTM!

Excellent documentation explaining why FAILED and INTERRUPTED are included in _REPORTABLE_STATUSES despite not being strictly terminal in the task lifecycle. This clarifies the design intent for future maintainers.


666-718: LGTM!

The _report_to_task_engine method follows best practices:

  • Early returns for no-op cases (no engine, no task execution, non-reportable status)
  • Correctly re-raises MemoryError and RecursionError (non-recoverable)
  • Distinguishes TaskMutationError (WARNING - expected rejection) from unexpected exceptions (ERROR with state divergence note)
  • Best-effort semantics align with the docstring

354-378: Good pipeline integration.

The _report_to_task_engine call is correctly placed after local transitions are applied but before recovery, ensuring the centralized TaskEngine receives the final execution status.

Comment on lines +490 to +505
try:
await self._process_one(envelope)
except Exception:
logger.exception(
TASK_ENGINE_LOOP_ERROR,
error="Unhandled exception in processing loop",
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=envelope.mutation.request_id,
success=False,
error="Internal error in processing loop",
error_code="internal",
),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's locate the file and check its size
find . -name "task_engine.py" -path "*/engine/*" | head -5

Repository: Aureliolo/synthorg

Length of output: 100


🏁 Script executed:

# Once we have the file, let's examine the specific lines mentioned in the review
wc -l src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 102


🏁 Script executed:

# Read the relevant sections of the file
cat -n src/ai_company/engine/task_engine.py | sed -n '480,510p'

Repository: Aureliolo/synthorg

Length of output: 1535


🏁 Script executed:

# Also check the second range
cat -n src/ai_company/engine/task_engine.py | sed -n '515,545p'

Repository: Aureliolo/synthorg

Length of output: 1516


🏁 Script executed:

# And the third range
cat -n src/ai_company/engine/task_engine.py | sed -n '840,870p'

Repository: Aureliolo/synthorg

Length of output: 1439


The fail-fast MemoryError / RecursionError exceptions are caught and swallowed by enclosing handlers.

_publish_snapshot() correctly re-raises these exceptions (lines 865–866), but both calling functions catch them immediately via generic except Exception handlers:

  • _processing_loop() at line 492 catches and converts to logged internal failure
  • _process_one() at line 521 catches and converts to logged internal failure

Both handlers need an explicit re-raise branch before the generic fallback to allow fail-fast behavior to propagate.

🛑 Proposed fix
         try:
             await self._process_one(envelope)
+        except MemoryError, RecursionError:
+            raise
         except Exception:
             logger.exception(
                 TASK_ENGINE_LOOP_ERROR,
                 error="Unhandled exception in processing loop",
             )
         try:
             result = await self._apply_mutation(mutation)
             if not envelope.future.done():
                 envelope.future.set_result(result)
             if result.success and self._config.publish_snapshots:
                 await self._publish_snapshot(mutation, result)
+        except MemoryError, RecursionError:
+            raise
         except Exception as exc:
             internal_msg = f"{type(exc).__name__}: {exc}"
             logger.exception(
                 TASK_ENGINE_MUTATION_FAILED,
                 mutation_type=mutation.mutation_type,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
await self._process_one(envelope)
except Exception:
logger.exception(
TASK_ENGINE_LOOP_ERROR,
error="Unhandled exception in processing loop",
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=envelope.mutation.request_id,
success=False,
error="Internal error in processing loop",
error_code="internal",
),
)
try:
await self._process_one(envelope)
except MemoryError, RecursionError:
raise
except Exception:
logger.exception(
TASK_ENGINE_LOOP_ERROR,
error="Unhandled exception in processing loop",
)
if not envelope.future.done():
envelope.future.set_result(
TaskMutationResult(
request_id=envelope.mutation.request_id,
success=False,
error="Internal error in processing loop",
error_code="internal",
),
)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine.py` around lines 490 - 505, The generic
exception handlers in _processing_loop and _process_one are currently swallowing
fail-fast exceptions from _publish_snapshot; update both exception blocks to
explicitly catch (MemoryError, RecursionError) and immediately re-raise them
before the generic except Exception logic so these errors propagate; keep the
existing logging (TASK_ENGINE_LOOP_ERROR) and TaskMutationResult fallback only
for non-fail-fast exceptions and ensure you check envelope.future.done() only
after the re-raise branch.

Comment on lines +590 to +603
task = Task(
id=task_id,
title=data.title,
description=data.description,
type=data.type,
priority=data.priority,
project=data.project,
created_by=data.created_by,
assigned_to=data.assigned_to,
estimated_complexity=data.estimated_complexity,
budget_limit=data.budget_limit,
)
await self._persistence.tasks.save(task)
self._versions[task_id] = 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "task_engine.py" | head -20

Repository: Aureliolo/synthorg

Length of output: 153


🏁 Script executed:

wc -l src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 102


🏁 Script executed:

sed -n '580,610p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 969


🏁 Script executed:

sed -n '645,660p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 583


🏁 Script executed:

rg "_process_one" src/ai_company/engine/task_engine.py -A 20

Repository: Aureliolo/synthorg

Length of output: 1653


🏁 Script executed:

rg "def.*transition|def.*cancel" src/ai_company/engine/task_engine.py -A 15

Repository: Aureliolo/synthorg

Length of output: 2100


🏁 Script executed:

rg "validation.*result|error_code" src/ai_company/engine/task_engine.py | head -20

Repository: Aureliolo/synthorg

Length of output: 455


🏁 Script executed:

rg "error_code=\"validation\"" src/ai_company/engine/task_engine.py -B 5 -A 2

Repository: Aureliolo/synthorg

Length of output: 504


🏁 Script executed:

rg "class.*Result|TaskMutationResult" src/ai_company/engine/task_engine.py | head -20

Repository: Aureliolo/synthorg

Length of output: 903


🏁 Script executed:

sed -n '420,480p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 2039


🏁 Script executed:

sed -n '500,560p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 2591


🏁 Script executed:

rg "_apply_mutation|_apply_create|_apply_update|_apply_transition|_apply_cancel" src/ai_company/engine/task_engine.py -A 1 | head -40

Repository: Aureliolo/synthorg

Length of output: 869


🏁 Script executed:

sed -n '560,620p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 1834


🏁 Script executed:

sed -n '620,680p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 2097


🏁 Script executed:

rg "PydanticValidationError|ValidationError|model_validate" src/ai_company/engine/task_engine.py -B 3 -A 3

Repository: Aureliolo/synthorg

Length of output: 291


🏁 Script executed:

sed -n '665,730p' src/ai_company/engine/task_engine.py

Repository: Aureliolo/synthorg

Length of output: 2193


Add validation error handling to create and update mutation methods to maintain consistent error typing.

Task() in _apply_create (lines 590–603) and Task.model_validate() in _apply_update (lines 651–654) can fail on caller-controlled input but don't catch validation errors. Exceptions escape to _process_one(), which returns error_code="internal" instead of the error_code="validation" contract used by _apply_transition and _apply_cancel. Wrap both with try-except to catch validation failures and return typed TaskMutationResult with error_code="validation", consistent with other mutation handlers.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine.py` around lines 590 - 603, _wrap Task(...)
construction in _apply_create and the Task.model_validate(...) call in
_apply_update with try/except blocks that catch the validation exception type
used by your model layer (e.g., pydantic.ValidationError or your custom
ValidationError), and on catch return a TaskMutationResult with
error_code="validation" and include the validation details/error message rather
than letting the exception escape to _process_one; mirror the pattern used by
_apply_transition and _apply_cancel so the mutation handlers consistently return
typed validation errors instead of error_code="internal".

…handling, add tests

- Extract VersionTracker into task_engine_version.py (optimistic concurrency)
- Extract mutation apply logic into task_engine_apply.py (dispatch + apply_*)
- Reduce task_engine.py from 976 to ~620 lines (well under 800 limit)
- Make TaskInternalError a sibling of TaskMutationError, not a subtype
- Add MemoryError/RecursionError re-raise guards in all except-Exception blocks
- Add _try_stop helper in app.py to reduce C901 complexity
- Add _extract_requester and _map_task_engine_errors helpers in tasks controller
- Add deep-copy at system boundaries for mutable dict fields in frozen models
- Add unknown-key validation in UpdateTaskMutation and TransitionTaskMutation
- Track in-flight envelope for drain-timeout resolution
- Fix _safe_shutdown argument ordering (task_engine first, mirrors cleanup)
- Fix existing test bug: mock_te passed as bridge instead of task_engine
- Update docs/design/engine.md for immutable-style updates and version tracking
- Add 62 new tests: VersionTracker, apply functions, coverage edge cases,
  controller helpers, _try_stop, in-flight resolution, processing loop resilience
Comment on lines +306 to +312
mutation = UpdateTaskMutation(
request_id=uuid4().hex,
requested_by=requested_by,
task_id=task_id,
updates=updates,
expected_version=expected_version,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pydantic.ValidationError escapes the typed error hierarchy

UpdateTaskMutation(...) runs _reject_immutable_fields at construction time. When that validator fires — e.g. a caller passes updates={"id": "new-id"} — Pydantic raises ValidationError before self.submit(mutation) is reached. This exception is:

  1. Not listed in the update_task docstring's Raises section.
  2. Not caught by the except (TaskEngineNotRunningError, …, TaskMutationError) block in TaskController.update_task — so it would surface as an unhandled 500 at the API layer instead of the expected 422.

The same gap exists in transition_task at the TransitionTaskMutation(...) construction (line 353), where _reject_immutable_overrides can fire.

The intended contract is that all mutation failures surface as TaskMutationError sub-classes. The simplest fix is to wrap mutation construction in the convenience methods:

try:
    mutation = UpdateTaskMutation(
        request_id=uuid4().hex,
        requested_by=requested_by,
        task_id=task_id,
        updates=updates,
        expected_version=expected_version,
    )
except ValidationError as exc:
    raise TaskMutationError(str(exc)) from exc

Apply the same pattern to transition_task and create_task.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine.py
Line: 306-312

Comment:
**`pydantic.ValidationError` escapes the typed error hierarchy**

`UpdateTaskMutation(...)` runs `_reject_immutable_fields` at construction time. When that validator fires — e.g. a caller passes `updates={"id": "new-id"}` — Pydantic raises `ValidationError` *before* `self.submit(mutation)` is reached. This exception is:

1. Not listed in the `update_task` docstring's `Raises` section.
2. Not caught by the `except (TaskEngineNotRunningError, …, TaskMutationError)` block in `TaskController.update_task` — so it would surface as an unhandled 500 at the API layer instead of the expected 422.

The same gap exists in `transition_task` at the `TransitionTaskMutation(...)` construction (line 353), where `_reject_immutable_overrides` can fire.

The intended contract is that all mutation failures surface as `TaskMutationError` sub-classes. The simplest fix is to wrap mutation construction in the convenience methods:

```python
try:
    mutation = UpdateTaskMutation(
        request_id=uuid4().hex,
        requested_by=requested_by,
        task_id=task_id,
        updates=updates,
        expected_version=expected_version,
    )
except ValidationError as exc:
    raise TaskMutationError(str(exc)) from exc
```

Apply the same pattern to `transition_task` and `create_task`.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +119 to +131
except PydanticValidationError as exc:
error_msg = f"Invalid task data: {exc}"
logger.warning(
TASK_ENGINE_MUTATION_FAILED,
mutation_type="create",
request_id=mutation.request_id,
error=error_msg,
)
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=error_msg,
error_code="validation",
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Verbose Pydantic error string forwarded to callers

f"Invalid task data: {exc}" converts a PydanticValidationError to its full string representation, which includes field names, constraint details, and the literal input values submitted by the user. This string flows through TaskMutationResult.errorTaskMutationError(result.error)ApiValidationError(str(exc)) all the way to the HTTP response body.

While the data being echoed is the caller's own submission (so not a traditional information leak), the verbose Pydantic format can expose implementation internals (model field names, constraint types, library-generated URL hints like input_url=https://errors.pydantic.dev/…). The same pattern appears in apply_update at line 185.

Consider extracting only the human-readable portions:

errors = [f"{'.'.join(str(loc) for loc in e['loc'])}: {e['msg']}" for e in exc.errors()]
error_msg = "Invalid task data: " + "; ".join(errors)

or simply error_msg = f"Invalid task data: {len(exc.errors())} validation error(s)" for a leaner message.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_engine_apply.py
Line: 119-131

Comment:
**Verbose Pydantic error string forwarded to callers**

`f"Invalid task data: {exc}"` converts a `PydanticValidationError` to its full string representation, which includes field names, constraint details, and the literal input values submitted by the user. This string flows through `TaskMutationResult.error``TaskMutationError(result.error)``ApiValidationError(str(exc))` all the way to the HTTP response body.

While the data being echoed is the caller's own submission (so not a traditional information leak), the verbose Pydantic format can expose implementation internals (model field names, constraint types, library-generated URL hints like `input_url=https://errors.pydantic.dev/…`). The same pattern appears in `apply_update` at line 185.

Consider extracting only the human-readable portions:
```python
errors = [f"{'.'.join(str(loc) for loc in e['loc'])}: {e['msg']}" for e in exc.errors()]
error_msg = "Invalid task data: " + "; ".join(errors)
```
or simply `error_msg = f"Invalid task data: {len(exc.errors())} validation error(s)"` for a leaner message.

How can I resolve this? If you propose a fix, please make it concise.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (3)
src/ai_company/engine/task_engine.py (1)

167-176: ⚠️ Potential issue | 🟠 Major

Drain timeout still misses the active envelope.

stop() waits for the canceled loop before cleanup, but _process_one() clears self._in_flight in finally, so _fail_remaining_futures() only covers queued envelopes. The submitter already inside _process_one() can still hang indefinitely — the new integration test has to blocked_task.cancel() manually for exactly this reason.

Proposed fix
             except TimeoutError:
                 logger.warning(
                     TASK_ENGINE_DRAIN_TIMEOUT,
                     remaining=self._queue.qsize(),
                 )
+                timed_out_in_flight = self._in_flight
                 self._processing_task.cancel()
                 with contextlib.suppress(asyncio.CancelledError):
                     await self._processing_task
-                self._fail_remaining_futures()
+                self._fail_remaining_futures(timed_out_in_flight)
             self._processing_task = None
@@
-    def _fail_remaining_futures(self) -> None:
+    def _fail_remaining_futures(
+        self,
+        timed_out_in_flight: _MutationEnvelope | None = None,
+    ) -> None:
         """Fail in-flight and remaining enqueued futures after drain timeout."""
         shutdown_result_for = self._shutdown_result
-        in_flight = self._in_flight
+        in_flight = timed_out_in_flight or self._in_flight
         if in_flight is not None and not in_flight.future.done():
             in_flight.future.set_result(shutdown_result_for(in_flight))
         self._in_flight = None

Also applies to: 180-186, 557-558

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine.py` around lines 167 - 176, stop() cancels
the processing task but _process_one() clears self._in_flight in its finally
block, so any submitter already inside _process_one() can be left hanging;
before cancelling self._processing_task in stop() (and the similar blocks around
lines 180-186 and 557-558) snapshot and clear the current in-flight envelope(s)
(e.g., copy self._in_flight or move its future(s) to a local variable), then
cancel the task and after awaiting its cancellation call
_fail_remaining_futures() with that snapshot (or ensure
_fail_remaining_futures() also handles both queued and in-flight items) so the
submitter futures are always completed even if _process_one() wipes
self._in_flight.
src/ai_company/engine/task_engine_version.py (1)

18-21: ⚠️ Potential issue | 🟠 Major

The restart-durability claim is not implemented.

This tracker never sees persisted version state, so seed() hard-resets unknown tasks to 1 and get() reports 0. After a restart, a task that was already at version 5 can falsely conflict against expected_version=5, or worse, incorrectly accept a stale expected_version=1 write.

Also applies to: 27-30, 43-45, 61-64

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine_version.py` around lines 18 - 21, The
tracker currently ignores persisted version state causing seed() to
unconditionally set unknown tasks to 1 and get() to return 0 after a restart;
update the TaskEngineVersionTracker (seed, get, and any initialization logic) to
load persisted versions on construction and, when encountering a task ID present
in persistent storage, initialize/seed the in-memory tracker to the stored
version (not 1), ensure get() returns that loaded version, and adjust seed() to
only set 1 for truly new tasks; reference the seed() and get() methods and the
tracker initialization code so persisted version lookups are performed before
any reset.
src/ai_company/engine/task_engine_models.py (1)

51-54: ⚠️ Potential issue | 🟠 Major

assigned_to currently advertises a create path that cannot succeed.

CreateTaskData accepts an assignee, but src/ai_company/engine/task_engine_apply.py passes it straight into Task(...) and src/ai_company/core/task.py rejects any non-None assignee while the initial status is still CREATED. So a non-empty assigned_to is guaranteed to fail later during creation; either reject it here or derive the initial status from it before persisting.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_engine_models.py` around lines 51 - 54, The model
currently allows CreateTaskData.assigned_to but Task(...) (in
task_engine_apply.py) and the Task class (src/ai_company/core/task.py) disallow
non-None assignee when status is CREATED; fix by ensuring the initial status
matches presence of an assignee: when constructing Task(...) from
CreateTaskData, check CreateTaskData.assigned_to and set the new Task's status
to ASSIGNED (or the appropriate enum value) if an assignee is provided,
otherwise keep CREATED; update the Task(...) construction site in
task_engine_apply.py to derive status from assigned_to so persistence won't be
rejected.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/api/controllers/tasks.py`:
- Around line 271-284: Combine the two except blocks into a single except that
catches (TaskEngineNotRunningError, TaskEngineQueueFullError, TaskNotFoundError,
TaskInternalError, TaskMutationError) as exc; inside it, if isinstance(exc,
TaskMutationError) call logger.warning(API_TASK_TRANSITION_FAILED,
task_id=task_id, error=str(exc)) to preserve the special log, then raise
_map_task_engine_errors(exc, task_id=task_id) from exc. This preserves logging
behavior for TaskMutationError while removing the duplicated
_map_task_engine_errors call; reference symbols: _map_task_engine_errors,
TaskMutationError, TaskEngineNotRunningError, TaskEngineQueueFullError,
TaskNotFoundError, TaskInternalError, logger.warning,
API_TASK_TRANSITION_FAILED, task_id.

In `@src/ai_company/engine/task_engine.py`:
- Around line 272-278: In create_task(), do not convert engine typed failures
into a plain TaskMutationError; instead, when result.success is false call the
existing _raise_typed_error(result) (or otherwise propagate the typed exception
raised by _raise_typed_error) so errors with error_code like "internal" become
TaskInternalError (or the appropriate typed subclass) rather than being wrapped
as TaskMutationError; keep the subsequent check for result.task is None and
raise TaskMutationError only for the inconsistent-success case.

In `@tests/unit/engine/test_task_engine_apply.py`:
- Around line 140-150: Extract the duplicated async helper into a single
module-level function (e.g., _create_task_helper) that accepts (persistence:
FakePersistence, versions: VersionTracker) and returns TaskMutationResult by
calling the existing CreateTaskMutation + apply_create logic; then replace the
two class-local methods named _create_task in TestApplyUpdate and
TestApplyTransition with calls to this new module-level helper (or convert it to
a module-level pytest fixture if preferred) so both tests reuse the same helper
implementation.

In `@tests/unit/engine/test_task_engine_integration.py`:
- Around line 287-294: The test currently uses time-based sleep to order
operations which is flaky; instead modify the drain-timeout test to signal when
the patched save handler is entered by setting an asyncio.Event (the existing
block Event) from inside slow_save (which wraps original_save) and have the test
await that event before sending/queueing the second envelope; specifically, keep
original_save, replace persistence.tasks.save with slow_save, set block.set() at
the start of slow_save, and in the test wait for block.wait() instead of
asyncio.sleep(...) so the second envelope is only queued after slow_save is
active.

---

Duplicate comments:
In `@src/ai_company/engine/task_engine_models.py`:
- Around line 51-54: The model currently allows CreateTaskData.assigned_to but
Task(...) (in task_engine_apply.py) and the Task class
(src/ai_company/core/task.py) disallow non-None assignee when status is CREATED;
fix by ensuring the initial status matches presence of an assignee: when
constructing Task(...) from CreateTaskData, check CreateTaskData.assigned_to and
set the new Task's status to ASSIGNED (or the appropriate enum value) if an
assignee is provided, otherwise keep CREATED; update the Task(...) construction
site in task_engine_apply.py to derive status from assigned_to so persistence
won't be rejected.

In `@src/ai_company/engine/task_engine_version.py`:
- Around line 18-21: The tracker currently ignores persisted version state
causing seed() to unconditionally set unknown tasks to 1 and get() to return 0
after a restart; update the TaskEngineVersionTracker (seed, get, and any
initialization logic) to load persisted versions on construction and, when
encountering a task ID present in persistent storage, initialize/seed the
in-memory tracker to the stored version (not 1), ensure get() returns that
loaded version, and adjust seed() to only set 1 for truly new tasks; reference
the seed() and get() methods and the tracker initialization code so persisted
version lookups are performed before any reset.

In `@src/ai_company/engine/task_engine.py`:
- Around line 167-176: stop() cancels the processing task but _process_one()
clears self._in_flight in its finally block, so any submitter already inside
_process_one() can be left hanging; before cancelling self._processing_task in
stop() (and the similar blocks around lines 180-186 and 557-558) snapshot and
clear the current in-flight envelope(s) (e.g., copy self._in_flight or move its
future(s) to a local variable), then cancel the task and after awaiting its
cancellation call _fail_remaining_futures() with that snapshot (or ensure
_fail_remaining_futures() also handles both queued and in-flight items) so the
submitter futures are always completed even if _process_one() wipes
self._in_flight.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: d81e6a01-5dd3-465e-a02e-1bbf6c0b2a91

📥 Commits

Reviewing files that changed from the base of the PR and between 38006d5 and e894134.

📒 Files selected for processing (15)
  • docs/design/engine.md
  • src/ai_company/api/app.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_apply.py
  • src/ai_company/engine/task_engine_models.py
  • src/ai_company/engine/task_engine_version.py
  • tests/unit/api/controllers/test_task_helpers.py
  • tests/unit/api/test_app.py
  • tests/unit/engine/test_agent_engine.py
  • tests/unit/engine/test_task_engine_apply.py
  • tests/unit/engine/test_task_engine_coverage.py
  • tests/unit/engine/test_task_engine_integration.py
  • tests/unit/engine/test_task_engine_version.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: syntax (no parentheses) for exception handling — PEP 758 syntax enforced by ruff on Python 3.14
Add type hints to all public functions — enforce mypy strict mode
Use Google-style docstrings — required on public classes and functions, enforced by ruff D rules
For frozen Pydantic models with dict/list fields, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use Pydantic v2 BaseModel, model_validator, computed_field, and ConfigDict — avoid redundant stored fields with @computed_field
Use NotBlankStr (from core.types) for all identifier/name fields, including optional (NotBlankStr | None) and tuple variants, instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code instead of bare create_task
Keep functions under 50 lines and files under 800 lines
Handle errors explicitly — never silently swallow exceptions
Enforce line length of 88 characters via ruff
Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using model_copy(update=...)) for runtime state that evolves
Never mix static config fields with mutable runtime fields in one model

Files:

  • tests/unit/engine/test_task_engine_coverage.py
  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/api/app.py
  • tests/unit/api/controllers/test_task_helpers.py
  • tests/unit/engine/test_task_engine_integration.py
  • tests/unit/engine/test_task_engine_version.py
  • tests/unit/engine/test_task_engine_apply.py
  • tests/unit/api/test_app.py
  • tests/unit/engine/test_agent_engine.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_version.py
  • src/ai_company/engine/task_engine_apply.py
  • src/ai_company/engine/task_engine_models.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Use @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow markers for test organization
Prefer @pytest.mark.parametrize for testing similar cases
Never use real vendor names in test files — use test-provider, test-small-001, and generic model names

Files:

  • tests/unit/engine/test_task_engine_coverage.py
  • tests/unit/api/controllers/test_task_helpers.py
  • tests/unit/engine/test_task_engine_integration.py
  • tests/unit/engine/test_task_engine_version.py
  • tests/unit/engine/test_task_engine_apply.py
  • tests/unit/api/test_app.py
  • tests/unit/engine/test_agent_engine.py
src/ai_company/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/ai_company/**/*.py: Every module with business logic MUST have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging, logging.getLogger(), or print() in application code — use the observability logger instead
Always use logger as the variable name (not _logger or log)
Use event name constants from domain-specific modules under ai_company.observability.events (e.g., PROVIDER_CALL_START from events.provider, BUDGET_RECORD_ADDED from events.budget) — import directly
Use structured logging with kwargs: logger.info(EVENT, key=value) — never use string formatting like logger.info("msg %s", val)
Log all error paths at WARNING or ERROR with context before raising
Log all state transitions at INFO level
Use DEBUG level for object creation, internal flow, and entry/exit of key functions

Files:

  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/api/app.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_version.py
  • src/ai_company/engine/task_engine_apply.py
  • src/ai_company/engine/task_engine_models.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names: example-provider, example-large-001, example-medium-001, example-small-001, large/medium/small as aliases. Tests must use test-provider, test-small-001, etc.

Files:

  • src/ai_company/api/controllers/tasks.py
  • src/ai_company/api/app.py
  • src/ai_company/engine/errors.py
  • src/ai_company/engine/task_engine.py
  • src/ai_company/engine/task_engine_version.py
  • src/ai_company/engine/task_engine_apply.py
  • src/ai_company/engine/task_engine_models.py
🧠 Learnings (6)
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Config values should use frozen Pydantic models for immutable config/identity; separate mutable-via-copy models (using `model_copy(update=...)`) for runtime state that evolves

Applied to files:

  • docs/design/engine.md
  • src/ai_company/engine/task_engine_models.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Handle errors explicitly — never silently swallow exceptions

Applied to files:

  • src/ai_company/api/app.py
  • src/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Use event name constants from domain-specific modules under `ai_company.observability.events` (e.g., `PROVIDER_CALL_START` from `events.provider`, `BUDGET_RECORD_ADDED` from `events.budget`) — import directly

Applied to files:

  • src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to src/ai_company/**/*.py : Every module with business logic MUST have: `from ai_company.observability import get_logger` then `logger = get_logger(__name__)`

Applied to files:

  • src/ai_company/api/app.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : Keep functions under 50 lines and files under 800 lines

Applied to files:

  • src/ai_company/engine/task_engine.py
📚 Learning: 2026-03-12T11:06:11.009Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T11:06:11.009Z
Learning: Applies to **/*.py : For frozen Pydantic models with dict/list fields, use `copy.deepcopy()` at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)

Applied to files:

  • src/ai_company/engine/task_engine_models.py
🧬 Code graph analysis (11)
tests/unit/engine/test_task_engine_coverage.py (3)
src/ai_company/engine/task_engine.py (4)
  • TaskEngine (82-620)
  • _MutationEnvelope (69-79)
  • _raise_typed_error (439-450)
  • _shutdown_result (194-201)
src/ai_company/engine/task_engine_models.py (2)
  • CreateTaskMutation (69-84)
  • TaskMutationResult (256-297)
tests/unit/engine/task_engine_helpers.py (2)
  • FakePersistence (47-55)
  • _make_create_data (90-102)
src/ai_company/api/controllers/tasks.py (5)
src/ai_company/api/state.py (2)
  • AppState (22-154)
  • task_engine (107-109)
src/ai_company/core/task.py (1)
  • Task (45-261)
src/ai_company/engine/errors.py (5)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskInternalError (109-120)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
src/ai_company/engine/task_engine_models.py (1)
  • CreateTaskData (24-63)
src/ai_company/engine/task_engine.py (6)
  • list_tasks (465-486)
  • get_task (454-463)
  • create_task (247-278)
  • update_task (280-319)
  • transition_task (321-368)
  • delete_task (370-399)
src/ai_company/api/app.py (3)
src/ai_company/api/state.py (4)
  • task_engine (107-109)
  • persistence (87-89)
  • message_bus (92-94)
  • AppState (22-154)
src/ai_company/engine/task_engine.py (3)
  • TaskEngine (82-620)
  • stop (141-178)
  • start (121-139)
src/ai_company/memory/errors.py (1)
  • MemoryError (13-14)
tests/unit/api/controllers/test_task_helpers.py (3)
src/ai_company/api/controllers/tasks.py (2)
  • _extract_requester (46-55)
  • _map_task_engine_errors (58-81)
src/ai_company/api/errors.py (3)
  • ApiValidationError (32-38)
  • NotFoundError (23-29)
  • ServiceUnavailableError (68-74)
src/ai_company/engine/errors.py (5)
  • TaskEngineNotRunningError (89-90)
  • TaskEngineQueueFullError (93-94)
  • TaskInternalError (109-120)
  • TaskMutationError (97-98)
  • TaskNotFoundError (101-102)
tests/unit/engine/test_task_engine_version.py (2)
src/ai_company/engine/errors.py (1)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine_version.py (7)
  • VersionTracker (15-76)
  • seed (27-30)
  • get (43-45)
  • set_initial (32-34)
  • bump (36-41)
  • remove (47-49)
  • check (51-76)
tests/unit/api/test_app.py (2)
src/ai_company/api/app.py (3)
  • _safe_startup (236-308)
  • _safe_shutdown (311-346)
  • _try_stop (137-153)
src/ai_company/api/state.py (2)
  • persistence (87-89)
  • AppState (22-154)
tests/unit/engine/test_agent_engine.py (2)
src/ai_company/engine/errors.py (1)
  • TaskMutationError (97-98)
src/ai_company/engine/task_engine.py (1)
  • transition_task (321-368)
src/ai_company/engine/task_engine.py (5)
src/ai_company/engine/task_engine_apply.py (1)
  • dispatch (68-91)
src/ai_company/engine/task_engine_config.py (1)
  • TaskEngineConfig (6-37)
src/ai_company/engine/task_engine_version.py (2)
  • VersionTracker (15-76)
  • get (43-45)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/persistence/protocol.py (1)
  • PersistenceBackend (27-167)
src/ai_company/engine/task_engine_version.py (3)
src/ai_company/engine/errors.py (1)
  • TaskVersionConflictError (105-106)
src/ai_company/observability/_logger.py (1)
  • get_logger (8-28)
src/ai_company/api/state.py (1)
  • task_engine (107-109)
src/ai_company/engine/task_engine_apply.py (4)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/errors.py (1)
  • TaskVersionConflictError (105-106)
src/ai_company/engine/task_engine_models.py (6)
  • CancelTaskMutation (223-240)
  • CreateTaskMutation (69-84)
  • DeleteTaskMutation (205-220)
  • TaskMutationResult (256-297)
  • TransitionTaskMutation (155-202)
  • UpdateTaskMutation (102-142)
src/ai_company/engine/task_engine_version.py (6)
  • VersionTracker (15-76)
  • set_initial (32-34)
  • get (43-45)
  • check (51-76)
  • bump (36-41)
  • remove (47-49)
src/ai_company/engine/task_engine_models.py (2)
src/ai_company/core/enums.py (4)
  • Complexity (247-253)
  • Priority (238-244)
  • TaskStatus (198-224)
  • TaskType (227-235)
src/ai_company/core/task.py (1)
  • Task (45-261)
🔇 Additional comments (13)
tests/unit/engine/test_agent_engine.py (2)

1009-1039: LGTM! The assertions now validate the full transition_task() payload.

The test now asserts the task ID (line 1035), target status (line 1036), and requested_by (lines 1037-1039), addressing the previous review feedback about verifying the complete integration boundary.


936-1121: Comprehensive test coverage for TaskEngine integration.

The TestReportToTaskEngine class thoroughly exercises the _report_to_task_engine behavior with well-isolated scenarios covering the happy path, error swallowing semantics, and critical error propagation (MemoryError). The test structure follows good practices with clear docstrings and focused assertions.

tests/unit/engine/test_task_engine_version.py (1)

1-94: LGTM! Comprehensive VersionTracker test coverage.

The tests thoroughly exercise all VersionTracker methods with clear boundary cases:

  • Idempotent seeding behavior
  • Auto-seeding on bump for unknown tasks
  • Version conflict detection with proper error types
  • Seed-on-first-check semantics

The test names are descriptive and each test validates a single behavior.

tests/unit/api/controllers/test_task_helpers.py (1)

1-93: LGTM! Thorough helper function test coverage.

The tests validate both _extract_requester and _map_task_engine_errors with all relevant edge cases:

  • User presence/absence and missing user_id attribute
  • Complete error type mapping coverage including pass-through for unknown errors

The inline FakeUser/FakeState classes are appropriate for testing these simple helpers without heavier mocking.

tests/unit/engine/test_task_engine_coverage.py (2)

67-68: Verify the exception handling syntax.

The contextlib.suppress(Exception, asyncio.CancelledError) is correct, but asyncio.CancelledError is a subclass of BaseException (not Exception) since Python 3.8, so it needs to be listed separately for proper suppression. The current code handles this correctly.


1-234: LGTM! Excellent edge case coverage for TaskEngine internals.

The tests effectively validate critical edge cases:

  • In-flight envelope resolution during drain timeout
  • Exception handling in _process_one producing internal errors
  • Snapshot publish failures being isolated from mutation results
  • Typed error mapping from result codes
  • Processing loop resilience after failures

The use of try/finally blocks ensures proper cleanup, and the white-box testing of _in_flight is appropriate for verifying internal state machine behavior.

docs/design/engine.md (2)

190-196: LGTM! The versioning documentation now correctly describes the source of truth.

The documentation properly states that "the persisted task version is the source of truth" and that "any in-memory cache is an optimization that is seeded from persistence on task load and may be invalid after a restart." This addresses the previous review feedback about not describing optimistic concurrency as in-memory-only.


169-228: Well-structured TaskEngine documentation.

The new section clearly explains:

  • Single-writer architecture with queue-based mutation processing
  • Immutable-style update semantics via model_copy
  • Optimistic concurrency with persisted versions as source of truth
  • Read-through bypass for safe reads
  • Comprehensive error handling with typed errors

The ASCII diagram effectively illustrates the data flow.

src/ai_company/engine/errors.py (2)

109-120: LGTM! TaskInternalError is correctly implemented as a sibling of TaskMutationError.

The inheritance (TaskInternalError(TaskEngineError)) ensures that broad except TaskMutationError handlers won't accidentally catch internal engine faults. The docstring clearly documents this deliberate design decision. This addresses the previous review feedback.


83-120: Well-designed error hierarchy.

The TaskEngine error types follow a clear hierarchy:

  • TaskEngineError as the base for all engine errors
  • TaskMutationError and subtypes (TaskNotFoundError, TaskVersionConflictError) for business-rule failures
  • TaskInternalError as a sibling for internal faults (maps to 5xx)
  • Operational errors (TaskEngineNotRunningError, TaskEngineQueueFullError) at the base level

This enables precise error handling at the API layer.

src/ai_company/api/controllers/tasks.py (2)

46-81: LGTM! Clean helper extraction for requester identity and error mapping.

The _extract_requester function properly extracts the authenticated user identity with a safe fallback, addressing the previous review feedback about audit trail preservation. The _map_task_engine_errors function centralizes the exception mapping pattern that was previously duplicated across operations.


1-326: LGTM! Clean migration to TaskEngine-based operations.

The controller successfully migrates from direct persistence access to TaskEngine-based operations with:

  • Proper error mapping to API-level errors
  • Requester identity extraction for audit trails
  • Structured logging with appropriate event constants
  • Consistent exception handling across all CRUD operations
tests/unit/engine/test_task_engine_apply.py (1)

1-527: LGTM! Excellent test coverage for mutation apply functions.

The test file comprehensively covers all mutation types with:

  • Happy path tests for each operation
  • Not-found error handling
  • Version conflict detection
  • Validation error handling
  • State tracking (previous_status, version increments)
  • Edge cases (empty updates, invalid transitions)

The tests clearly validate the contract between mutation inputs and expected results.

Comment on lines +271 to +284
except (
TaskEngineNotRunningError,
TaskEngineQueueFullError,
TaskNotFoundError,
TaskInternalError,
) as exc:
raise _map_task_engine_errors(exc, task_id=task_id) from exc
except TaskMutationError as exc:
logger.warning(
TASK_STATUS_CHANGED,
API_TASK_TRANSITION_FAILED,
task_id=task_id,
error=str(exc),
)
raise ApiValidationError(str(exc)) from exc
await app_state.persistence.tasks.save(new_task)
raise _map_task_engine_errors(exc, task_id=task_id) from exc
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider consolidating the transition_task exception handling.

The transition operation handles TaskMutationError separately (lines 278-284) from other errors (lines 271-277) to emit API_TASK_TRANSITION_FAILED. This is intentional for logging purposes, but it duplicates the _map_task_engine_errors call.

A minor improvement could be to emit the warning first and then use a single except block, though the current approach is functionally correct and the intent is clear.

♻️ Optional: Simplify with single except block
         try:
             task, from_status = await app_state.task_engine.transition_task(
                 task_id,
                 data.target_status,
                 **transition_kwargs,  # type: ignore[arg-type]
             )
-        except (
-            TaskEngineNotRunningError,
-            TaskEngineQueueFullError,
-            TaskNotFoundError,
-            TaskInternalError,
-        ) as exc:
-            raise _map_task_engine_errors(exc, task_id=task_id) from exc
-        except TaskMutationError as exc:
+        except (
+            TaskEngineNotRunningError,
+            TaskEngineQueueFullError,
+            TaskNotFoundError,
+            TaskInternalError,
+            TaskMutationError,
+        ) as exc:
+            if isinstance(exc, TaskMutationError):
+                logger.warning(
+                    API_TASK_TRANSITION_FAILED,
+                    task_id=task_id,
+                    error=str(exc),
+                )
-            logger.warning(
-                API_TASK_TRANSITION_FAILED,
-                task_id=task_id,
-                error=str(exc),
-            )
             raise _map_task_engine_errors(exc, task_id=task_id) from exc
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/api/controllers/tasks.py` around lines 271 - 284, Combine the
two except blocks into a single except that catches (TaskEngineNotRunningError,
TaskEngineQueueFullError, TaskNotFoundError, TaskInternalError,
TaskMutationError) as exc; inside it, if isinstance(exc, TaskMutationError) call
logger.warning(API_TASK_TRANSITION_FAILED, task_id=task_id, error=str(exc)) to
preserve the special log, then raise _map_task_engine_errors(exc,
task_id=task_id) from exc. This preserves logging behavior for TaskMutationError
while removing the duplicated _map_task_engine_errors call; reference symbols:
_map_task_engine_errors, TaskMutationError, TaskEngineNotRunningError,
TaskEngineQueueFullError, TaskNotFoundError, TaskInternalError, logger.warning,
API_TASK_TRANSITION_FAILED, task_id.

Comment on lines +140 to +150
async def _create_task(
self,
persistence: FakePersistence,
versions: VersionTracker,
) -> TaskMutationResult:
mutation = CreateTaskMutation(
request_id="req-c",
requested_by="alice",
task_data=_make_create_data(),
)
return await apply_create(mutation, persistence, versions) # type: ignore[arg-type]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider extracting the shared _create_task helper.

The _create_task helper method is duplicated in TestApplyUpdate (lines 140-150) and TestApplyTransition (lines 264-274). Consider moving this to a module-level fixture or helper function in the test file to reduce duplication.

♻️ Optional: Extract shared helper to module level
async def _create_task_helper(
    persistence: FakePersistence,
    versions: VersionTracker,
) -> TaskMutationResult:
    """Helper to create a task for tests."""
    mutation = CreateTaskMutation(
        request_id="req-c",
        requested_by="alice",
        task_data=_make_create_data(),
    )
    return await apply_create(mutation, persistence, versions)  # type: ignore[arg-type]

Then use _create_task_helper(persistence, versions) in tests instead of duplicated methods.

Also applies to: 264-274

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_engine_apply.py` around lines 140 - 150, Extract
the duplicated async helper into a single module-level function (e.g.,
_create_task_helper) that accepts (persistence: FakePersistence, versions:
VersionTracker) and returns TaskMutationResult by calling the existing
CreateTaskMutation + apply_create logic; then replace the two class-local
methods named _create_task in TestApplyUpdate and TestApplyTransition with calls
to this new module-level helper (or convert it to a module-level pytest fixture
if preferred) so both tests reuse the same helper implementation.

Comment on lines +287 to +294
block = asyncio.Event()
original_save = persistence.tasks.save

async def slow_save(task: object) -> None:
await block.wait()
await original_save(task) # type: ignore[arg-type]

persistence.tasks.save = slow_save # type: ignore[method-assign]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Avoid the timing-based wait in the drain-timeout test.

await asyncio.sleep(0.05) makes this setup scheduler-dependent; on a slow runner, the second envelope can still be queued before the first mutation reaches slow_save(). Signal entry into slow_save() with an asyncio.Event instead.

Proposed refactor
         # Block the processing loop with a slow save
         block = asyncio.Event()
+        entered_save = asyncio.Event()
         original_save = persistence.tasks.save

         async def slow_save(task: object) -> None:
+            entered_save.set()
             await block.wait()
             await original_save(task)  # type: ignore[arg-type]
@@
         envelope = _MutationEnvelope(mutation=mutation2)
-        # Give the engine a tick to start processing the first task
-        await asyncio.sleep(0.05)
+        await entered_save.wait()
         eng._queue.put_nowait(envelope)

Also applies to: 311-313

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_engine_integration.py` around lines 287 - 294,
The test currently uses time-based sleep to order operations which is flaky;
instead modify the drain-timeout test to signal when the patched save handler is
entered by setting an asyncio.Event (the existing block Event) from inside
slow_save (which wraps original_save) and have the test await that event before
sending/queueing the second envelope; specifically, keep original_save, replace
persistence.tasks.save with slow_save, set block.set() at the start of
slow_save, and in the test wait for block.wait() instead of asyncio.sleep(...)
so the second envelope is only queued after slow_save is active.

…overage

Review fixes:
- create_task now uses _raise_typed_error for proper error dispatch
- Add PydanticValidationError catch in all controller write methods
- Use _extract_requester for create_task audit trail consistency
- Add logging for TaskInternalError and TaskEngineQueueFullError in controller
- Add TaskEngineError catch in agent_engine _report_to_task_engine
- Extract hardcoded values to class constants (_POLL_INTERVAL_SECONDS, etc.)
- Add failed_count tracking and logging in _fail_remaining_futures
- Add reason field to TaskStateChanged event for audit trail
- Add bounds check to VersionTracker.set_initial
- Fix _try_stop parameter type from object to Awaitable[None]
- Fix State Coordination docs (model_copy -> model_validate/with_transition)
- Improve docstrings for _processing_loop, _publish_snapshot, apply_cancel, etc.

New tests (33 total):
- FIFO ordering guarantee
- Default reason generation in transition_task
- Delete snapshot new_status=None verification
- Cancel version bump correctness
- _raise_typed_error dispatch for all error codes
- Snapshot reason propagation (transition, cancel, create, update)
- MemoryError re-raise in processing loop
- _fail_remaining_futures count tracking
- Deep-copy isolation for UpdateTaskMutation and TransitionTaskMutation
- Unknown field rejection in updates and overrides
- VersionTracker.set_initial bounds check (zero and negative)
- TaskStateChanged reason field (populated, default none, cancel)
- create_task typed error dispatch (internal, validation)
- Transition overrides and previous_status via engine
Copilot AI review requested due to automatic review settings March 12, 2026 19:02
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 12, 2026 19:03 — with GitHub Actions Inactive
Comment on lines +100 to +101
if isinstance(exc, TaskMutationError):
return ApiValidationError(str(exc))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskVersionConflictError silently mapped to HTTP 422 instead of 409

TaskVersionConflictError is a subclass of TaskMutationError, so it falls through every earlier check and lands on isinstance(exc, TaskMutationError)ApiValidationError (HTTP 422 Unprocessable Entity). This is semantically wrong: a version conflict is not a validation error from the caller's perspective — it signals that another writer modified the task between the caller's read and write.

api/errors.py already has a ConflictError class that maps to HTTP 409 Conflict, which is the correct status for optimistic-concurrency failures. Without an explicit guard here, callers cannot distinguish "invalid input" (422) from "concurrent modification" (409), making correct retry logic impossible.

Suggested change
if isinstance(exc, TaskMutationError):
return ApiValidationError(str(exc))
if isinstance(exc, TaskVersionConflictError):
return ConflictError(str(exc))
if isinstance(exc, TaskMutationError):
return ApiValidationError(str(exc))

This also requires importing TaskVersionConflictError and ConflictError at the top of the file.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/api/controllers/tasks.py
Line: 100-101

Comment:
**`TaskVersionConflictError` silently mapped to HTTP 422 instead of 409**

`TaskVersionConflictError` is a subclass of `TaskMutationError`, so it falls through every earlier check and lands on `isinstance(exc, TaskMutationError)``ApiValidationError` (HTTP 422 Unprocessable Entity). This is semantically wrong: a version conflict is not a validation error from the caller's perspective — it signals that another writer modified the task between the caller's read and write.

`api/errors.py` already has a `ConflictError` class that maps to HTTP 409 Conflict, which is the correct status for optimistic-concurrency failures. Without an explicit guard here, callers cannot distinguish "invalid input" (422) from "concurrent modification" (409), making correct retry logic impossible.

```suggestion
    if isinstance(exc, TaskVersionConflictError):
        return ConflictError(str(exc))
    if isinstance(exc, TaskMutationError):
        return ApiValidationError(str(exc))
```

This also requires importing `TaskVersionConflictError` and `ConflictError` at the top of the file.

How can I resolve this? If you propose a fix, please make it concise.

@Aureliolo Aureliolo closed this Mar 12, 2026
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 12, 2026 19:07 — with GitHub Actions Inactive
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 35 out of 35 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

id=task_id,
)
return NotFoundError(str(exc))
if isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError):
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError) will raise TypeError at runtime because isinstance() does not accept PEP 604 union types. Use a tuple of types instead (e.g., (TaskEngineNotRunningError, TaskEngineQueueFullError)).

Suggested change
if isinstance(exc, TaskEngineNotRunningError | TaskEngineQueueFullError):
if isinstance(exc, (TaskEngineNotRunningError, TaskEngineQueueFullError)):

Copilot uses AI. Check for mistakes.
Comment on lines +74 to +75
Seeds the version from persistence if not yet tracked so that
optimistic concurrency survives engine restarts.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring says the tracker is “seeded from persistence” / that optimistic concurrency “survives engine restarts”, but the implementation only seeds missing task IDs to version=1 (no persistence lookup). Please update the docstring to reflect the actual behavior (volatile in-memory counter) or implement real seeding from persisted version metadata.

Suggested change
Seeds the version from persistence if not yet tracked so that
optimistic concurrency survives engine restarts.
Seeds an in-memory version for untracked tasks so that
optimistic-concurrency checks work for the lifetime of the
current process only (no persistence across restarts).

Copilot uses AI. Check for mistakes.
Comment on lines +190 to +196
- **Optimistic concurrency**: Per-task version counters. The persisted
task version is the source of truth; any in-memory cache is an
optimization that is seeded from persistence on task load and may be
invalid after a restart. Callers can pass `expected_version` to detect
stale writes; on mismatch the engine returns a failed
`TaskMutationResult` with `error_code="version_conflict"`. Convenience
methods raise `TaskVersionConflictError`.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This “Optimistic concurrency” section states “The persisted task version is the source of truth” and that the in-memory cache is “seeded from persistence”, but the current VersionTracker implementation is purely in-memory and seeds unknown tasks to version=1. Please align the documentation with the implemented semantics (or add persisted versioning if that’s the intent).

Suggested change
- **Optimistic concurrency**: Per-task version counters. The persisted
task version is the source of truth; any in-memory cache is an
optimization that is seeded from persistence on task load and may be
invalid after a restart. Callers can pass `expected_version` to detect
stale writes; on mismatch the engine returns a failed
`TaskMutationResult` with `error_code="version_conflict"`. Convenience
methods raise `TaskVersionConflictError`.
- **Optimistic concurrency**: Per-task version counters maintained in-memory
(for example via a `VersionTracker`). Versions are not currently
persisted; unknown tasks are initialized with `version=1`, and version
tracking state is lost on process restart. Callers can pass
`expected_version` to detect stale writes; on mismatch the engine returns
a failed `TaskMutationResult` with `error_code="version_conflict"`.
Convenience methods raise `TaskVersionConflictError`.

Copilot uses AI. Check for mistakes.
Comment on lines +24 to +27
restart. After a restart, the first optimistic-concurrency check
for any task will succeed regardless of the true version history
because the tracker seeds the version at 1. Durable version
tracking (persisted alongside the task) is a future enhancement.
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The VersionTracker limitation text claims the first optimistic-concurrency check after restart “will succeed regardless of the true version history”. With the current logic (seed missing to version=1, then strict equality check), callers providing any expected_version != 1 will get a conflict. Please reword this limitation to match the actual behavior/risks of seeding-to-1.

Suggested change
restart. After a restart, the first optimistic-concurrency check
for any task will succeed regardless of the true version history
because the tracker seeds the version at 1. Durable version
tracking (persisted alongside the task) is a future enhancement.
restart. After a restart, previously unseen tasks are treated as
having version 1. The first optimistic-concurrency check will
therefore only succeed when callers expect version 1; callers that
expect a higher version may see a conflict even if persistence holds
a different (newer) version. Durable version tracking (persisted
alongside the task) is a future enhancement.

Copilot uses AI. Check for mistakes.
Comment on lines +159 to +181
task = await persistence.tasks.get(mutation.task_id)
if task is None:
return not_found_result("update", mutation.request_id, mutation.task_id)

try:
versions.check(mutation.task_id, mutation.expected_version)
except TaskVersionConflictError as exc:
return TaskMutationResult(
request_id=mutation.request_id,
success=False,
error=str(exc),
error_code="version_conflict",
)

if not mutation.updates:
version = versions.get(mutation.task_id)
return TaskMutationResult(
request_id=mutation.request_id,
success=True,
task=task,
version=version,
previous_status=task.status,
)
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In apply_update, the empty-updates no-op path returns versions.get(task_id), but VersionTracker.get() returns 0 for untracked tasks. After an engine restart (or when operating on pre-existing persisted tasks), this would report version=0 for a real task. Consider seeding the task ID before reading the version (or otherwise ensuring an existing persisted task never yields version 0).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: implement centralized single-writer state coordination (TaskEngine)

2 participants