Skip to content

feat: incremental AgentEngine → TaskEngine status sync#331

Merged
Aureliolo merged 7 commits intomainfrom
feat/task-engine-status-sync
Mar 13, 2026
Merged

feat: incremental AgentEngine → TaskEngine status sync#331
Aureliolo merged 7 commits intomainfrom
feat/task-engine-status-sync

Conversation

@Aureliolo
Copy link
Copy Markdown
Owner

@Aureliolo Aureliolo commented Mar 12, 2026

Summary

  • Incremental sync: AgentEngine now syncs task status transitions to TaskEngine at each lifecycle point (IN_PROGRESS → IN_REVIEW → COMPLETED), rather than reporting only the final status. This gives real-time visibility into execution progress and improves crash recovery (a crash mid-execution leaves the task at the last-reached stage, not stuck at ASSIGNED).
  • Extracted engine/task_sync.py: Sync and transition logic extracted from AgentEngine (1094→858 lines) into a dedicated 263-line module for cohesion and maintainability.
  • Fatal error sync: Added TaskEngine sync in _handle_fatal_error to prevent tasks being stuck at IN_PROGRESS after fatal errors that bypass the normal post-execution pipeline.
  • Channel fix: TaskEngine._SNAPSHOT_CHANNEL changed from "task_engine" to "tasks" to match CHANNEL_TASKS in api.channels, ensuring WebSocket consumers receive task state events.

Changes

File Change
src/ai_company/engine/task_sync.py NEW — extracted sync_to_task_engine, transition_task_if_needed, apply_post_execution_transitions, and internal transition helpers
src/ai_company/engine/agent_engine.py Removed 5 extracted methods, updated call sites, added fatal error sync, cleaned up imports
src/ai_company/engine/task_engine.py Fixed _SNAPSHOT_CHANNEL to match CHANNEL_TASKS
src/ai_company/observability/events/execution.py Added EXECUTION_ENGINE_TASK_SYNCED and EXECUTION_ENGINE_SYNC_FAILED event constants (moved to ENGINE cluster)
docs/design/engine.md Added incremental sync design section, fixed run() signature, added step 10 cross-reference
CLAUDE.md Updated engine/ package description
tests/unit/engine/test_agent_engine.py Rewrote sync tests for incremental paths (COMPLETED/3, SHUTDOWN/2, ERROR/2, MAX_TURNS/1), added RecursionError propagation test, added channel name invariant test

Review coverage

Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 14 findings addressed:

  • Removed dead expected_version parameter and int|None return type (optimistic concurrency was never wired up)
  • Moved uuid4 import to module level
  • Added fatal error sync to prevent state divergence
  • Added _SNAPSHOT_CHANNEL == CHANNEL_TASKS invariant test
  • Added RecursionError propagation test
  • Consolidated duplicate exception handling in sync function
  • Fixed docs inaccuracies and added missing cross-references

Test plan

  • All 7093 tests pass (2 new: RecursionError propagation, channel invariant)
  • 94.56% coverage (threshold: 80%)
  • ruff lint + format clean
  • mypy strict clean (888 source files)
  • Pre-commit hooks pass

Closes #323

Replace the single final-status report with incremental sync at every
lifecycle transition (IN_PROGRESS, IN_REVIEW, COMPLETED, INTERRUPTED,
FAILED).  This gives real-time visibility into execution progress and
improves crash recovery — a crash mid-execution now leaves the task at
the last-reached stage instead of stuck at ASSIGNED.

- Add _sync_to_task_engine() best-effort helper using submit() directly
  for optimistic concurrency version access
- Fix channel mismatch: TaskEngine._SNAPSHOT_CHANNEL "task_engine" → "tasks"
  so events reach MessageBusBridge and WebSocket consumers
- Remove _REPORTABLE_STATUSES and _report_to_task_engine() (superseded)
- Add EXECUTION_ENGINE_TASK_SYNCED / EXECUTION_ENGINE_SYNC_FAILED events
- Rewrite tests: verify 3 syncs (COMPLETED), 2 (SHUTDOWN/ERROR),
  1 (MAX_TURNS), best-effort isolation, error swallowing
- Document incremental sync pattern in engine design spec

Closes #323
…findings

Pre-reviewed by 9 agents, 14 findings addressed:
- Extract sync/transition methods from AgentEngine (1094→858 lines)
  into engine/task_sync.py (263 lines) for cohesion
- Remove dead expected_version parameter and int|None return from
  sync (optimistic concurrency was never wired up)
- Move uuid4 import to module level (no circular dep risk)
- Add TaskEngine sync in _handle_fatal_error (prevents task stuck
  at IN_PROGRESS after fatal errors)
- Add _SNAPSHOT_CHANNEL == CHANNEL_TASKS invariant test
- Add RecursionError propagation test
- Move event constants to ENGINE cluster in execution.py
- Fix docs: run() signature, concurrency claim, step 10 cross-ref
- Update CLAUDE.md engine/ description
Copilot AI review requested due to automatic review settings March 12, 2026 23:02
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Mar 12, 2026

Dependency Review

✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.

Scanned Files

None

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the visibility and resilience of task execution by introducing incremental status synchronization between the AgentEngine and TaskEngine. Instead of a single final status update, the AgentEngine now reports task progress at key lifecycle points, which provides real-time insights and improves crash recovery by ensuring tasks are left in their last-reached valid state. The underlying synchronization logic has been refactored into a new, dedicated module, and robust error handling has been implemented to ensure sync failures do not impede agent execution.

Highlights

  • Incremental Task Status Sync: AgentEngine now syncs task status transitions (IN_PROGRESS → IN_REVIEW → COMPLETED) to TaskEngine at each lifecycle point, providing real-time visibility and improving crash recovery.
  • Extracted Sync Logic: The task status sync and transition logic was extracted from AgentEngine into a new, dedicated engine/task_sync.py module, enhancing code cohesion and maintainability.
  • Fatal Error Synchronization: Added TaskEngine sync within _handle_fatal_error to ensure tasks are not left stuck at IN_PROGRESS after fatal errors that bypass the normal post-execution pipeline.
  • Channel Alignment Fix: The TaskEngine._SNAPSHOT_CHANNEL was corrected from "task_engine" to "tasks" to match CHANNEL_TASKS in api.channels, ensuring proper WebSocket event delivery for task state changes.
Changelog
  • CLAUDE.md
    • Updated the engine/ package description.
  • docs/design/engine.md
    • Added an incremental sync design section, fixed run() signature, and added a step 10 cross-reference.
  • src/ai_company/engine/agent_engine.py
    • Removed 5 extracted methods, updated call sites, added fatal error sync, and cleaned up imports.
  • src/ai_company/engine/task_engine.py
    • Fixed _SNAPSHOT_CHANNEL to match CHANNEL_TASKS.
  • src/ai_company/engine/task_sync.py
    • New module added, containing extracted sync_to_task_engine, transition_task_if_needed, apply_post_execution_transitions, and internal transition helpers.
  • src/ai_company/observability/events/execution.py
    • Added EXECUTION_ENGINE_TASK_SYNCED and EXECUTION_ENGINE_SYNC_FAILED event constants.
  • tests/unit/engine/test_agent_engine.py
    • Rewrote sync tests for incremental paths (COMPLETED/3, SHUTDOWN/2, ERROR/2, MAX_TURNS/1).
    • Added RecursionError propagation test.
    • Added a channel name invariant test.
Activity
  • Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency).
  • 14 findings were addressed, including removing a dead parameter, moving an import, adding fatal error sync, adding an invariant test, adding a RecursionError propagation test, consolidating exception handling, and fixing documentation inaccuracies.
  • All 7093 tests pass, with 2 new tests for RecursionError propagation and channel invariant.
  • Achieved 94.56% code coverage (above the 80% threshold).
  • Ruff lint and format checks are clean.
  • Mypy strict clean across 888 source files.
  • Pre-commit hooks passed successfully.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 12, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds a best-effort, incremental task-status synchronization path from AgentEngine to TaskEngine via a new task_sync module; refactors AgentEngine to await async context preparation and delegate transitions; updates TaskEngine snapshot channel, observability events, docs, and tests to validate incremental syncing.

Changes

Cohort / File(s) Summary
Design & Docs
CLAUDE.md, docs/design/engine.md
Document incremental AgentEngine ↔ TaskEngine syncing, per-path transition sequences; add effective_autonomy? to AgentEngine.run docs and introduce PARKED in termination reasons.
Task sync module
src/ai_company/engine/task_sync.py
New module implementing sync_to_task_engine, transition_task_if_needed, and apply_post_execution_transitions with best-effort transition sequencing, logging, and exception semantics.
Agent core
src/ai_company/engine/agent_engine.py
Refactor: _prepare_context made async; removed internal transition/report helpers; delegate pre/post-execution and fatal-error syncs to task_sync helpers; update run/post-exec flow to await those helpers.
Task engine
src/ai_company/engine/task_engine.py
Changed _SNAPSHOT_CHANNEL from "task_engine" to "tasks" (must match CHANNEL_TASKS); minor docstring update.
Observability
src/ai_company/observability/events/execution.py
Added EXECUTION_ENGINE_TASK_SYNCED and EXECUTION_ENGINE_SYNC_FAILED event constants for sync observability.
Tests — agent engine
tests/unit/engine/test_agent_engine.py
Refactored tests to expect incremental sync via TaskEngine.submit returning TaskMutationResult; updated expected sync counts, sequences, and exception propagation.
Tests — task sync
tests/unit/engine/test_task_sync.py
New comprehensive unit tests for task_sync, covering normal transitions, partial failures, exception propagation (MemoryError/RecursionError), critical-flag behavior, and no-op when TaskEngine is None.

Sequence Diagram(s)

sequenceDiagram
    participant Agent as AgentEngine
    participant Sync as task_sync
    participant TaskEng as TaskEngine

    Agent->>Sync: transition_task_if_needed(ctx, agent_id, task_id, task_engine)
    Sync->>TaskEng: submit(TransitionTaskMutation IN_PROGRESS)
    TaskEng-->>Sync: TaskMutationResult
    Sync-->>Agent: updated ctx

    Agent->>Agent: execute task
    Agent->>Sync: apply_post_execution_transitions(result, agent_id, task_id, task_engine)

    alt Completed path
        Sync->>TaskEng: submit(TransitionTaskMutation IN_PROGRESS→IN_REVIEW)
        TaskEng-->>Sync: TaskMutationResult
        Sync->>TaskEng: submit(TransitionTaskMutation IN_REVIEW→COMPLETED)
        TaskEng-->>Sync: TaskMutationResult
    else Shutdown/Error
        Sync->>TaskEng: submit(TransitionTaskMutation IN_PROGRESS→INTERRUPTED)
        TaskEng-->>Sync: TaskMutationResult
    end

    Sync->>TaskEng: sync_to_task_engine(final_status, reason, critical?)
    TaskEng-->>Sync: TaskMutationResult
    Sync-->>Agent: return final result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The PR title clearly and concisely describes the primary change: introducing incremental status synchronization between AgentEngine and TaskEngine.
Description check ✅ Passed The PR description is comprehensive and directly related to the changeset, providing clear summaries of incremental sync, extracted modules, channel fixes, and testing updates.
Linked Issues check ✅ Passed The PR successfully implements Option A (incremental sync) from issue #323, providing real-time visibility, crash recovery, and proper event emission at lifecycle transitions. All core requirements are addressed.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the linked issue #323 requirements: incremental sync implementation, module extraction, channel fixes, and comprehensive testing.
Docstring Coverage ✅ Passed Docstring coverage is 94.44% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/task-engine-status-sync
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch feat/task-engine-status-sync
📝 Coding Plan
  • Generate coding plan for human review comments

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces incremental task status synchronization between AgentEngine and TaskEngine, significantly improving real-time visibility and crash recovery. The extraction of sync logic into a dedicated task_sync.py module enhances maintainability and cohesion. Documentation and unit tests have been thoroughly updated to reflect these changes, covering various transition paths and error scenarios. The fix for _SNAPSHOT_CHANNEL in TaskEngine ensures correct WebSocket event propagation. Overall, this is a well-structured and impactful feature.

reason=reason,
)
result = await task_engine.submit(mutation)
except MemoryError, RecursionError:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The syntax except MemoryError, RecursionError: is deprecated in Python 3.10+. It should be updated to except (MemoryError, RecursionError): for modern Python compatibility.

except (MemoryError, RecursionError):

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 12, 2026

Codecov Report

❌ Patch coverage is 98.80952% with 1 line in your changes missing coverage. Please review.
✅ Project coverage is 93.76%. Comparing base (fa8bf1d) to head (67f463c).
⚠️ Report is 1 commits behind head on main.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
src/ai_company/engine/agent_engine.py 93.33% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #331      +/-   ##
==========================================
+ Coverage   93.75%   93.76%   +0.01%     
==========================================
  Files         433      434       +1     
  Lines       19831    19850      +19     
  Branches     1911     1915       +4     
==========================================
+ Hits        18592    18613      +21     
+ Misses        957      956       -1     
+ Partials      282      281       -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR improves real-time task lifecycle visibility by syncing AgentEngine task status transitions to TaskEngine incrementally throughout execution (instead of only syncing a final status), and extracts the sync logic into a dedicated module for maintainability.

Changes:

  • Add engine/task_sync.py with best-effort incremental sync helpers and transition application logic.
  • Update AgentEngine to use incremental sync at start, post-execution, and fatal-error paths.
  • Fix TaskEngine snapshot publication channel to match the API/WebSocket channel ("tasks"), and expand tests/docs accordingly.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/ai_company/engine/task_sync.py New extracted module implementing best-effort incremental TaskEngine sync and transition helpers.
src/ai_company/engine/agent_engine.py Integrates the new sync module across lifecycle points and adds fatal-error sync.
src/ai_company/engine/task_engine.py Updates snapshot channel constant to "tasks" to align with WebSocket consumers.
src/ai_company/observability/events/execution.py Adds execution event constants for sync success/failure.
tests/unit/engine/test_agent_engine.py Reworks task-engine sync tests for incremental transitions + adds channel invariant test.
docs/design/engine.md Documents the incremental sync design and updates the run() signature reference.
CLAUDE.md Updates package description to mention incremental status sync.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {error_msg}",
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The transition reason sent to TaskEngine includes the full exception string (error_msg). That reason is propagated into TaskEngine snapshot events (see TaskEngine._publish_snapshot uses mutation.reason), which can leak internal details to WebSocket consumers (file paths, tokens, etc.). Consider using a sanitized/generic reason (or at most the exception type) and keep full details only in logs/metrics.

Suggested change
reason=f"Fatal error recovery: {error_msg}",
reason=f"Fatal error recovery: {type(exc).__name__}",

Copilot uses AI. Check for mistakes.
Comment on lines +239 to +243
| Path | Synced transitions |
|------|--------------------|
| Happy (COMPLETED) | `IN_PROGRESS` → `IN_REVIEW` → `COMPLETED` |
| Shutdown | `IN_PROGRESS` → `INTERRUPTED` |
| Error | `IN_PROGRESS` → `FAILED` (after recovery) |
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Markdown table formatting here uses double pipes (|| ... | ... |), which will render as an extra empty first column in most Markdown renderers. Use standard table syntax with single leading pipes on each row (e.g., | Path | Synced transitions |).

Copilot uses AI. Check for mistakes.
@greptile-apps
Copy link
Copy Markdown

greptile-apps bot commented Mar 12, 2026

Greptile Summary

This PR introduces incremental AgentEngine → TaskEngine status sync, replacing a single end-of-execution report with stepwise transitions (ASSIGNED → IN_PROGRESS → IN_REVIEW → COMPLETED, SHUTDOWN → INTERRUPTED, ERROR → FAILED). The sync logic is cleanly extracted into a new engine/task_sync.py module, reducing agent_engine.py by ~236 lines. A long-standing channel name mismatch (_SNAPSHOT_CHANNEL = "task_engine""tasks") is also corrected, meaning WebSocket consumers will finally receive task state events as intended.

Key findings from this review:

  • Possible SyntaxError in task_sync.py:85except MemoryError, RecursionError, asyncio.CancelledError: is missing parentheses. The bare-comma form is Python 2 syntax; Python 3 requires except (A, B, C):. The same bare-comma pattern pre-exists in agent_engine.py for two-item clauses, but the three-item variant in the new file is the most exposed instance and worth fixing for correctness and clarity.
  • _handle_fatal_error sync uses critical=False (default) — The new fatal-error sync path at agent_engine.py:835, and the analogous ERROR-recovery sync in _post_execute, both use the default critical=False. Given these are the scenarios most likely to leave a task permanently stuck at IN_PROGRESS, they should arguably log sync failures at ERROR (via critical=True), consistent with the ASSIGNED → IN_PROGRESS sync which already uses critical=True.
  • The TerminationReason.MAX_TURNS and BUDGET_EXHAUSTED paths intentionally leave task state unchanged (no sync), which appears to be by design for re-queuing scenarios, but is worth documenting explicitly.

Confidence Score: 3/5

  • PR is mostly safe to merge but has a potential Python 3 syntax issue in the new module and a minor observability gap in the fatal error sync path that should be addressed first.
  • The extraction and incremental sync design is sound and well-tested. The _SNAPSHOT_CHANNEL fix is clearly correct. The primary concern is the except MemoryError, RecursionError, asyncio.CancelledError: clause in the entirely new task_sync.py — if Python 3 treats this as a SyntaxError, the module fails to import and the entire sync feature breaks silently. The pre-existing two-item bare-comma clauses in agent_engine.py provide some evidence the interpreter handles it, but the three-item form in new code is higher risk. The critical=False gap in fatal-error syncs is a lower-severity observability concern.
  • src/ai_company/engine/task_sync.py (line 85 exception clause syntax) and src/ai_company/engine/agent_engine.py (lines 835 and ~400 missing critical=True on error sync paths).

Important Files Changed

Filename Overview
src/ai_company/engine/task_sync.py New 263-line module extracted from AgentEngine; contains a bare multi-exception except clause (Python 2 syntax, line 85) and pre-existing error_code/result.error null-logging gaps (flagged in prior review threads).
src/ai_company/engine/agent_engine.py Reduced from 1094→858 lines by extracting sync logic; adds fatal-error sync in _handle_fatal_error (new) and ERROR-recovery sync in _post_execute (new), both omitting critical=True despite being the highest-severity sync paths.
src/ai_company/engine/task_engine.py One-line channel fix: _SNAPSHOT_CHANNEL changed from "task_engine" to "tasks" to match CHANNEL_TASKS; invariant test added to prevent regression.
tests/unit/engine/test_task_sync.py Comprehensive new test suite covering happy path, rejection, exception swallowing, non-swallowed exceptions, partial-failure semantics, and critical flag; no gaps found beyond the error_code=None / error=None cases noted in prior threads.
tests/unit/engine/test_agent_engine.py Sync tests rewritten for incremental paths; adds RecursionError propagation and channel invariant tests; no coverage of the new critical=True behaviour in _handle_fatal_error sync.

Sequence Diagram

sequenceDiagram
    participant C as Caller
    participant AE as AgentEngine
    participant TS as task_sync
    participant TE as TaskEngine
    participant WS as WebSocket (CHANNEL_TASKS)

    C->>AE: run(identity, task)
    AE->>TS: transition_task_if_needed(ctx, critical=True)
    TS->>TE: submit(ASSIGNED → IN_PROGRESS)
    TE-->>TS: TaskMutationResult(success=True)
    TS-->>AE: updated ctx (IN_PROGRESS)

    AE->>AE: _run_loop_with_timeout()
    AE-->>AE: ExecutionResult(COMPLETED)

    AE->>TS: apply_post_execution_transitions(COMPLETED)
    TS->>TE: submit(IN_PROGRESS → IN_REVIEW)
    TE->>WS: publish snapshot (tasks channel)
    TE-->>TS: TaskMutationResult(success=True)
    TS->>TE: submit(IN_REVIEW → COMPLETED)
    TE->>WS: publish snapshot (tasks channel)
    TE-->>TS: TaskMutationResult(success=True)
    TS-->>AE: ExecutionResult(ctx=COMPLETED)

    alt TerminationReason.ERROR
        AE->>TS: _apply_recovery → sync_to_task_engine(FAILED)
        TS->>TE: submit(IN_PROGRESS → FAILED)
        TE->>WS: publish snapshot
    end

    alt Fatal error (bypass pipeline)
        AE->>AE: _handle_fatal_error()
        AE->>TS: sync_to_task_engine(FAILED)
        TS->>TE: submit(→ FAILED)
        TE->>WS: publish snapshot
    end

    AE-->>C: AgentRunResult
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: src/ai_company/engine/agent_engine.py
Line: 835-841

Comment:
**Fatal-error sync missing `critical=True`**

The `sync_to_task_engine` call here uses the default `critical=False`, so any failure is logged at WARNING rather than ERROR. Fatal errors are arguably the most dangerous scenario for leaving a task permanently stuck at `IN_PROGRESS` — the very problem this feature aims to prevent. By contrast, the `ASSIGNED → IN_PROGRESS` sync in `transition_task_if_needed` already uses `critical=True`.

Passing `critical=True` here would surface missed syncs at ERROR level, matching the severity of the error path that landed here:

```suggestion
                await sync_to_task_engine(
                    self._task_engine,
                    target_status=error_ctx.task_execution.status,
                    task_id=task_id,
                    agent_id=agent_id,
                    reason=f"Fatal error recovery: {type(exc).__name__}",
                    critical=True,
                )
```

The same argument applies to the analogous `sync_to_task_engine` call in `_post_execute` (the `TerminationReason.ERROR` recovery path, lines ~397-403) which also uses the default `critical=False`.

How can I resolve this? If you propose a fix, please make it concise.

---

This is a comment left during a code review.
Path: src/ai_company/engine/task_sync.py
Line: 85-86

Comment:
**Missing parentheses for multi-exception `except` clause**

`except MemoryError, RecursionError, asyncio.CancelledError:` uses the Python 2 bare-comma form. In Python 3 the required form is a parenthesised tuple; the bare-comma variant is a `SyntaxError`. The existing `agent_engine.py` uses the same bare-comma pattern in several pre-existing `except MemoryError, RecursionError:` clauses, but none of those clauses include a third item, which may explain why they haven't been caught. This new three-item clause is the most exposed instance.

```suggestion
    except (MemoryError, RecursionError, asyncio.CancelledError):
```

How can I resolve this? If you propose a fix, please make it concise.

Last reviewed commit: 67f463c

- Add missing EXECUTION_ENGINE_TASK_TRANSITION logs for ERROR/FAILED paths
- Fix partial-completion return to always use model_copy with furthest ctx
- Sanitize fatal error reason to type name only (prevent info leak)
- Capture exception in classification catch block for detailed logging
- Expand sync_to_task_engine docstring with Raises section
- Add rejection error_code and fallback message to sync failure log
- Fix engine.md transition count range and IN_PROGRESS description
- Add PARKED to TerminationReason in engine.md
- Clarify comments for recovery strategy and post-recovery paths
- Add 23 dedicated unit tests for task_sync module
Copilot AI review requested due to automatic review settings March 12, 2026 23:29
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 12, 2026 23:30 — with GitHub Actions Inactive
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +73 to +78
result = await task_engine.submit(mutation)
except MemoryError, RecursionError:
raise
except Exception as exc:
error = (
"TaskEngine unavailable"
Comment on lines +170 to +182
try:
ctx = await _transition_to_complete(ctx, agent_id, task_id, task_engine)
except (ValueError, ExecutionStateError) as exc:
logger.exception(
EXECUTION_ENGINE_ERROR,
agent_id=agent_id,
task_id=task_id,
error=f"Post-execution transition failed: {exc}",
)
# Return with whatever context _transition_to_complete reached
# so the caller reflects the furthest-synced state.

return execution_result.model_copy(update={"context": ctx})
Comment on lines +380 to +386
logger.info(
EXECUTION_ENGINE_TASK_TRANSITION,
agent_id=agent_id,
task_id=task_id,
from_status="recovery",
to_status=ctx.task_execution.status.value,
)
Comment on lines +809 to +813
logger.info(
EXECUTION_ENGINE_TASK_TRANSITION,
agent_id=agent_id,
task_id=task_id,
from_status="recovery",
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/design/engine.md`:
- Around line 289-290: Update the post-execution lifecycle docs to describe the
newly exposed enum value PARKED: state explicitly whether a PARKED termination
is a no-op that leaves the task in its current state (same behavior as MAX_TURNS
and BUDGET_EXHAUSTED) or whether it transitions to a specific state;
specifically modify the post-execution section in docs/design/engine.md to
mention PARKED alongside MAX_TURNS and BUDGET_EXHAUSTED and state the exact
semantics (e.g., "PARKED: leaves task in current state" or the alternative) and
keep the note about max_turns defaulting to 20 intact for context.

In `@src/ai_company/engine/agent_engine.py`:
- Around line 376-393: The code unconditionally logs
EXECUTION_ENGINE_TASK_TRANSITION and calls sync_to_task_engine after calling
_apply_recovery even when recovery returned the original ExecutionResult; update
the logic in the recovery handling (use the local variable execution_result and
its .context.task_execution) to compare the pre-recovery status and
post-recovery status (e.g., capture pre_status =
execution_result.context.task_execution.status before calling _apply_recovery
and post_status after) and only emit the EXECUTION_ENGINE_TASK_TRANSITION log
and call sync_to_task_engine when post_status != pre_status; extract this
compare-and-sync behavior into a small helper (used by both the current block
around execution_result handling and the other recovery block that also
logs/syncs) so the two sites share the same implementation and cannot drift,
referencing _apply_recovery, ExecutionResult, ctx.task_execution,
EXECUTION_ENGINE_TASK_TRANSITION, and sync_to_task_engine.

In `@src/ai_company/engine/task_sync.py`:
- Around line 32-110: The sync_to_task_engine function exceeds the 50-line
limit; split its non-success paths into two private helpers: create a helper
_handle_sync_exception(exc, agent_id, task_id, target_status, critical) to
contain the except Exception branch (determine the error message for
TaskEngineError vs other exceptions and call logger.error or logger.warning
based on critical), and another helper _handle_rejected_mutation(result,
agent_id, task_id, target_status, critical) to contain the else:
rejected-mutation logging (log error text, error_code and use correct severity).
Refactor sync_to_task_engine to keep only the happy-path mutation
construction/await and then call these two helpers from the except and rejected
branches, preserving existing args and log keys (EXECUTION_ENGINE_SYNC_FAILED,
EXECUTION_ENGINE_TASK_SYNCED) and propagating MemoryError/RecursionError
unchanged.
- Around line 170-182: The call to _transition_to_complete can partially succeed
(e.g., move IN_PROGRESS -> IN_REVIEW) then raise, but the except block currently
returns the original ctx, losing the furthest-synced state; update the except
handler so that on exception you try to re-load the latest task context from the
engine (using task_engine and agent_id/task_id) and set ctx to that latest
context (falling back to the original ctx only if reload fails), then return
execution_result.model_copy(update={"context": ctx}); keep references to
_transition_to_complete, task_engine, execution_result, ctx, agent_id, and
task_id to locate the change.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 25b8d0a5-be6b-46e8-93e1-1b2ade5d703e

📥 Commits

Reviewing files that changed from the base of the PR and between b3b0be7 and e073556.

📒 Files selected for processing (4)
  • docs/design/engine.md
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (5)
docs/design/*.md

📄 CodeRabbit inference engine (CLAUDE.md)

When approved deviations occur, update the relevant docs/design/ page to reflect the new reality

Files:

  • docs/design/engine.md
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow
Prefer @pytest.mark.parametrize for testing similar cases

Files:

  • tests/unit/engine/test_task_sync.py
{src,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like example-provider, example-large-001, test-provider, test-small-001

Files:

  • tests/unit/engine/test_task_sync.py
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/task_sync.py
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement
For dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state using model_copy(update=...)
Use Pydantic v2 with BaseModel, model_validator, computed_field, ConfigDict
Use @computed_field for derived values instead of storing redundant fields (e.g. TokenUsage.total_tokens)
Use NotBlankStr from core.types for all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare create_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Use only logger as the variable name (not _logger, not log)
Use domain-specific event name constants from ai_company.observability.events modules (e.g...

Files:

  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/task_sync.py
src/ai_company/{providers,engine}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

RetryExhaustedError signals all retries failed — the engine layer catches this to trigger fallback chains

Files:

  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/task_sync.py
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/ai_company/{providers,engine}/**/*.py : `RetryExhaustedError` signals all retries failed — the engine layer catches this to trigger fallback chains

Applied to files:

  • src/ai_company/engine/agent_engine.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use `except A, B:` (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this

Applied to files:

  • src/ai_company/engine/task_sync.py
🧬 Code graph analysis (1)
src/ai_company/engine/agent_engine.py (4)
src/ai_company/engine/errors.py (1)
  • ExecutionStateError (12-13)
src/ai_company/engine/task_sync.py (2)
  • sync_to_task_engine (32-109)
  • transition_task_if_needed (112-135)
src/ai_company/engine/run_result.py (1)
  • termination_reason (64-66)
src/ai_company/engine/loop_protocol.py (1)
  • TerminationReason (28-36)

Comment on lines +289 to +290
: Enum: `COMPLETED`, `MAX_TURNS`, `BUDGET_EXHAUSTED`, `SHUTDOWN`, `ERROR`,
`PARKED`. `max_turns` defaults to 20.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Document what PARKED does to task state.

The enum now exposes PARKED, but the post-execution section still only documents MAX_TURNS and BUDGET_EXHAUSTED as no-op terminations. Please spell out whether parked runs also leave the task in its current state so the lifecycle description stays complete.

As per coding guidelines, "docs/design/*.md: When approved deviations occur, update the relevant docs/design/ page to reflect the new reality".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/design/engine.md` around lines 289 - 290, Update the post-execution
lifecycle docs to describe the newly exposed enum value PARKED: state explicitly
whether a PARKED termination is a no-op that leaves the task in its current
state (same behavior as MAX_TURNS and BUDGET_EXHAUSTED) or whether it
transitions to a specific state; specifically modify the post-execution section
in docs/design/engine.md to mention PARKED alongside MAX_TURNS and
BUDGET_EXHAUSTED and state the exact semantics (e.g., "PARKED: leaves task in
current state" or the alternative) and keep the note about max_turns defaulting
to 20 intact for context.

- Propagate asyncio.CancelledError in sync_to_task_engine (not swallowed)
- Restructure completion transitions to stepwise loop so partial failure
  returns furthest-reached state (IN_REVIEW, not IN_PROGRESS)
- Replace from_status="recovery" with actual pre-recovery TaskStatus value
  in both ERROR and fatal-error paths for consistent log event schema
- Add test_cancelled_error_propagates test case
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/engine/test_task_sync.py`:
- Around line 238-251: The test test_critical_flag_does_not_raise currently only
checks that sync_to_task_engine does not raise when TaskEngineError is raised
with critical=True; update the test to also assert that the expected log
event/level is emitted (e.g., capture logs or use a test logger fixture and
assert an ERROR/CRITICAL log entry was produced) so the behavior "changes log
level" is actually verified; reference sync_to_task_engine, TaskEngineError and
the critical=True invocation to locate where to add the log capture and
assertion.
- Around line 439-481: Replace the two almost-identical tests
test_max_turns_returns_unchanged and test_budget_exhausted_returns_unchanged
with a single parametrized test using pytest.mark.parametrize over the
termination reasons (TerminationReason.MAX_TURNS and
TerminationReason.BUDGET_EXHAUSTED); inside the new test reuse
AgentContext.from_identity, ctx.with_task_transition,
_make_execution_result(reason=reason), and call apply_post_execution_transitions
with the same agent_id/task_id/task_engine arguments, asserting the returned
value is the same object (assert out is result); keep the original test name(s)
descriptive and remove the duplicated functions.
- Around line 180-237: Replace the four near-duplicate tests
(test_task_engine_error_swallowed, test_unexpected_exception_swallowed,
test_memory_error_propagates, test_recursion_error_propagates) with a single
parametrized test using pytest.mark.parametrize that iterates over cases built
with _make_mock_task_engine(..., side_effect=...) and an indicator of whether
the exception should propagate; inside the test call sync_to_task_engine(...)
for each case and use pytest.raises(expected_exception) for the
MemoryError/RecursionError cases and a simple await (no raise) for the swallowed
cases (TaskEngineError and RuntimeError); reference sync_to_task_engine and
_make_mock_task_engine in the new parametrized test to locate and reuse the same
setup.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 90687c2d-b45c-4c20-a529-d594f24d145f

📥 Commits

Reviewing files that changed from the base of the PR and between e073556 and b150ca7.

📒 Files selected for processing (1)
  • tests/unit/engine/test_task_sync.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (2)
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow
Prefer @pytest.mark.parametrize for testing similar cases

Files:

  • tests/unit/engine/test_task_sync.py
{src,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like example-provider, example-large-001, test-provider, test-small-001

Files:

  • tests/unit/engine/test_task_sync.py
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
🧬 Code graph analysis (1)
tests/unit/engine/test_task_sync.py (6)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/errors.py (2)
  • ExecutionStateError (12-13)
  • TaskEngineError (85-86)
src/ai_company/engine/loop_protocol.py (3)
  • ExecutionResult (79-140)
  • TerminationReason (28-36)
  • TurnRecord (39-76)
src/ai_company/engine/task_sync.py (1)
  • sync_to_task_engine (32-109)
src/ai_company/providers/enums.py (1)
  • FinishReason (15-22)
src/ai_company/core/agent.py (1)
  • AgentIdentity (266-342)
🔇 Additional comments (1)
tests/unit/engine/test_task_sync.py (1)

383-412: Good coverage of the COMPLETED transition chain.

This segment correctly verifies both local final state (COMPLETED) and remote sync sequence (IN_REVIEW then COMPLETED), which directly exercises the incremental-sync contract.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/ai_company/engine/agent_engine.py (1)

344-420: 🛠️ Refactor suggestion | 🟠 Major

Refactor _post_execution_pipeline under 50 lines.

Line 344–Line 420 exceeds the function-size policy. Split into focused helpers (cost recording, post-execution transition application, recovery+sync, classification) to keep behavior readable and policy-compliant.

As per coding guidelines, “Functions must be less than 50 lines; files must be less than 800 lines”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/agent_engine.py` around lines 344 - 420, Split
_post_execution_pipeline into small helpers: move cost recording into a helper
(e.g., _record_execution_costs) that calls record_execution_costs with the same
args and tracker, move transition logic into an _apply_post_transitions helper
that invokes apply_post_execution_transitions, extract the recovery+sync block
into _handle_recovery_and_sync which captures pre_recovery_status, calls
self._apply_recovery, logs EXECUTION_ENGINE_TASK_TRANSITION, and calls
sync_to_task_engine exactly as before, and move the classification block into
_classify_execution_errors which wraps classify_execution_errors in the same
try/except (re-raising MemoryError and RecursionError, catching Exception and
logging EXECUTION_ENGINE_ERROR with exc_info). Then have
_post_execution_pipeline call these helpers in the same order (record ->
transitions -> if error then handle_recovery_and_sync -> classify) and return
the final execution_result unchanged; ensure all original symbols
(record_execution_costs, apply_post_execution_transitions, self._apply_recovery,
sync_to_task_engine, classify_execution_errors, self._error_taxonomy_config,
EXECUTION_ENGINE_TASK_TRANSITION, EXECUTION_ENGINE_ERROR, TerminationReason) and
behavior (pre_recovery_status capture and exact log messages/parameters) are
preserved.
♻️ Duplicate comments (5)
tests/unit/engine/test_task_sync.py (3)

238-251: ⚠️ Potential issue | 🟡 Minor

critical=True test does not verify the claimed behavior.

The docstring at Line 239 says log level changes, but the test only asserts “no raise”. Add a log assertion for severity/event to validate the critical-path contract.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_sync.py` around lines 238 - 251, The test
test_critical_flag_does_not_raise currently only ensures no exception when
calling sync_to_task_engine with critical=True but does not assert that logging
severity changed; update the test to capture/log records (e.g., using caplog or
a mock logger) around the call and assert that a log entry at the expected
higher severity (e.g., ERROR/CRITICAL) was emitted by sync_to_task_engine when
the TaskEngineError("unavailable") side_effect occurs on mock_te; reference the
test function test_critical_flag_does_not_raise, the mock task engine mock_te,
and the sync_to_task_engine invocation to locate where to add the log capture
and severity assertion.

180-237: 🧹 Nitpick | 🔵 Trivial

Collapse exception-path sync tests into a single parametrized test.

Line 180–Line 237 repeats the same structure for swallowed vs propagated exceptions. Please consolidate with @pytest.mark.parametrize to reduce duplication and improve extensibility.

As per coding guidelines, “tests/**/*.py: Prefer @pytest.mark.parametrize for testing similar cases”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_sync.py` around lines 180 - 237, Collapse the
four nearly identical tests (test_task_engine_error_swallowed,
test_unexpected_exception_swallowed, test_memory_error_propagates,
test_recursion_error_propagates) into one parametrized pytest using
`@pytest.mark.parametrize`: enumerate cases of (side_effect_exception,
should_raise) where side_effects are TaskEngineError("engine down"),
RuntimeError("connection lost"), MemoryError("out of memory"),
RecursionError("maximum recursion depth exceeded") and should_raise is False for
the first two and True for the latter two; within the single test, call
_make_mock_task_engine(side_effect=...) to create mock_te and invoke
sync_to_task_engine(...) and assert either that the call completes (when
should_raise is False) or that it raises the expected exception with
pytest.raises(..., match=...) when should_raise is True, preserving test names
and docstrings as needed for clarity.

456-498: 🧹 Nitpick | 🔵 Trivial

Parametrize unchanged-termination tests.

Line 456–Line 498 (MAX_TURNS and BUDGET_EXHAUSTED) assert the same behavior and should be one parametrized test.

As per coding guidelines, “tests/**/*.py: Prefer @pytest.mark.parametrize for testing similar cases”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/engine/test_task_sync.py` around lines 456 - 498, Two tests
(test_max_turns_returns_unchanged and test_budget_exhausted_returns_unchanged)
duplicate the same logic; refactor them into a single parametrized test using
pytest.mark.parametrize over TerminationReason values
(TerminationReason.MAX_TURNS, TerminationReason.BUDGET_EXHAUSTED) to avoid
duplication. Update the test function to accept a termination_reason parameter,
reuse AgentContext.from_identity, ctx.with_task_transition, and
_make_execution_result(…, reason=termination_reason), then call
apply_post_execution_transitions and assert the result is unchanged; keep
existing fixtures (sample_agent_with_personality, sample_task_with_criteria) and
the same agent_id/task_id arguments.
src/ai_company/engine/agent_engine.py (1)

385-399: ⚠️ Potential issue | 🟠 Major

Only log/sync when recovery actually changes task status.

At Line 385 and Line 819, transition logging/sync runs even when pre_*_status == post_status. This can emit fake transition events and generate no-op/rejected sync attempts. Gate these blocks on a real status change and reuse one helper for both call sites.

🛠️ Proposed fix pattern
-            if ctx.task_execution is not None and pre_recovery_status is not None:
+            if (
+                ctx.task_execution is not None
+                and pre_recovery_status is not None
+                and ctx.task_execution.status != pre_recovery_status
+            ):
                 logger.info(
                     EXECUTION_ENGINE_TASK_TRANSITION,
@@
-            if error_ctx.task_execution is not None and pre_fatal_status is not None:
+            if (
+                error_ctx.task_execution is not None
+                and pre_fatal_status is not None
+                and error_ctx.task_execution.status != pre_fatal_status
+            ):
                 logger.info(
                     EXECUTION_ENGINE_TASK_TRANSITION,

Also applies to: 819-833

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/agent_engine.py` around lines 385 - 399, The transition
logging/sync block should only run when the task status actually changed: add a
check comparing pre_recovery_status (or pre_*_status) against
ctx.task_execution.status (or post status) and skip logging/sync if equal;
extract the duplicated logic into a single helper (e.g.,
_emit_task_transition_and_sync or similar) that accepts agent_id, task_id,
pre_status, post_status and the task_engine, calls
logger.info(EXECUTION_ENGINE_TASK_TRANSITION, ...) and then awaits
sync_to_task_engine(...) so both call sites (the blocks using ctx.task_execution
and the other site referenced around the second occurrence) call this helper
only when pre_status != post_status.
src/ai_company/engine/task_sync.py (1)

43-122: 🛠️ Refactor suggestion | 🟠 Major

Split sync_to_task_engine to comply with the 50-line function limit.

Line 43–Line 122 exceeds the repository’s hard function-length limit, which makes this control flow harder to maintain and reason about. Please extract exception/rejected-mutation branches into private helpers.

As per coding guidelines, “Functions must be less than 50 lines; files must be less than 800 lines”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_sync.py` around lines 43 - 122,
sync_to_task_engine is too long; split its exception-handling and
rejected-mutation branches into small private helpers to keep the function under
50 lines. Create e.g. _handle_sync_exception(exc, agent_id, task_id,
target_status, critical) that re-raises
MemoryError/RecursionError/asyncio.CancelledError and otherwise logs using
logger.error or logger.warning based on the critical flag (preserving the
EXECUTION_ENGINE_SYNC_FAILED message and exc_info). Create e.g.
_handle_rejected_mutation(result, agent_id, task_id, target_status, critical)
that logs the rejection (preserving result.error/result.error_code and
EXECUTION_ENGINE_SYNC_FAILED) and keep sync_to_task_engine focused on building
the mutation, awaiting task_engine.submit, calling the two helpers from the
except/else branches, and returning on success (preserve the debug log for
EXECUTION_ENGINE_TASK_SYNCED and unchanged semantics).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/ai_company/engine/task_sync.py`:
- Line 76: The current code creates a `log` alias (e.g., `log = logger.error if
critical else logger.warning`) which violates the naming rule; remove any `log =
...` assignments and instead call `logger.error(...)` or `logger.warning(...)`
directly based on the `critical` condition at each call site. Locate the `log =
...` assignment and every subsequent `log(...)` usage (notably the blocks around
the comment ranges 76, 94-101, and 114-121) and replace them with conditional
calls like `if critical: logger.error(...) else: logger.warning(...)` so
`logger` remains the only logger variable name. Ensure no new logger aliases
(`log`, `_logger`, etc.) are introduced elsewhere in the function or module.
- Around line 150-205: The function apply_post_execution_transitions is over 50
lines and should be split into smaller helpers: extract a shutdown handler
(e.g., handle_shutdown) that invokes _transition_to_interrupted with the same
args and returns the resulting ExecutionResult or early return when
ctx.task_execution is None; extract a completed handler (e.g.,
handle_completed_transitions) that receives the current ctx, agent_id, task_id,
task_engine and runs the for loop over _COMPLETION_STEPS calling
_transition_and_sync and logging/exiting on ValueError/ExecutionStateError
exactly as now; keep the final merge/return in apply_post_execution_transitions
to call the new helpers and return
execution_result.model_copy(update={"context": ctx}) while preserving
propagation of MemoryError and RecursionError and all existing logging and error
strings (EXECUTION_ENGINE_ERROR, step_reason, _transition_to_interrupted,
_transition_and_sync).

---

Outside diff comments:
In `@src/ai_company/engine/agent_engine.py`:
- Around line 344-420: Split _post_execution_pipeline into small helpers: move
cost recording into a helper (e.g., _record_execution_costs) that calls
record_execution_costs with the same args and tracker, move transition logic
into an _apply_post_transitions helper that invokes
apply_post_execution_transitions, extract the recovery+sync block into
_handle_recovery_and_sync which captures pre_recovery_status, calls
self._apply_recovery, logs EXECUTION_ENGINE_TASK_TRANSITION, and calls
sync_to_task_engine exactly as before, and move the classification block into
_classify_execution_errors which wraps classify_execution_errors in the same
try/except (re-raising MemoryError and RecursionError, catching Exception and
logging EXECUTION_ENGINE_ERROR with exc_info). Then have
_post_execution_pipeline call these helpers in the same order (record ->
transitions -> if error then handle_recovery_and_sync -> classify) and return
the final execution_result unchanged; ensure all original symbols
(record_execution_costs, apply_post_execution_transitions, self._apply_recovery,
sync_to_task_engine, classify_execution_errors, self._error_taxonomy_config,
EXECUTION_ENGINE_TASK_TRANSITION, EXECUTION_ENGINE_ERROR, TerminationReason) and
behavior (pre_recovery_status capture and exact log messages/parameters) are
preserved.

---

Duplicate comments:
In `@src/ai_company/engine/agent_engine.py`:
- Around line 385-399: The transition logging/sync block should only run when
the task status actually changed: add a check comparing pre_recovery_status (or
pre_*_status) against ctx.task_execution.status (or post status) and skip
logging/sync if equal; extract the duplicated logic into a single helper (e.g.,
_emit_task_transition_and_sync or similar) that accepts agent_id, task_id,
pre_status, post_status and the task_engine, calls
logger.info(EXECUTION_ENGINE_TASK_TRANSITION, ...) and then awaits
sync_to_task_engine(...) so both call sites (the blocks using ctx.task_execution
and the other site referenced around the second occurrence) call this helper
only when pre_status != post_status.

In `@src/ai_company/engine/task_sync.py`:
- Around line 43-122: sync_to_task_engine is too long; split its
exception-handling and rejected-mutation branches into small private helpers to
keep the function under 50 lines. Create e.g. _handle_sync_exception(exc,
agent_id, task_id, target_status, critical) that re-raises
MemoryError/RecursionError/asyncio.CancelledError and otherwise logs using
logger.error or logger.warning based on the critical flag (preserving the
EXECUTION_ENGINE_SYNC_FAILED message and exc_info). Create e.g.
_handle_rejected_mutation(result, agent_id, task_id, target_status, critical)
that logs the rejection (preserving result.error/result.error_code and
EXECUTION_ENGINE_SYNC_FAILED) and keep sync_to_task_engine focused on building
the mutation, awaiting task_engine.submit, calling the two helpers from the
except/else branches, and returning on success (preserve the debug log for
EXECUTION_ENGINE_TASK_SYNCED and unchanged semantics).

In `@tests/unit/engine/test_task_sync.py`:
- Around line 238-251: The test test_critical_flag_does_not_raise currently only
ensures no exception when calling sync_to_task_engine with critical=True but
does not assert that logging severity changed; update the test to capture/log
records (e.g., using caplog or a mock logger) around the call and assert that a
log entry at the expected higher severity (e.g., ERROR/CRITICAL) was emitted by
sync_to_task_engine when the TaskEngineError("unavailable") side_effect occurs
on mock_te; reference the test function test_critical_flag_does_not_raise, the
mock task engine mock_te, and the sync_to_task_engine invocation to locate where
to add the log capture and severity assertion.
- Around line 180-237: Collapse the four nearly identical tests
(test_task_engine_error_swallowed, test_unexpected_exception_swallowed,
test_memory_error_propagates, test_recursion_error_propagates) into one
parametrized pytest using `@pytest.mark.parametrize`: enumerate cases of
(side_effect_exception, should_raise) where side_effects are
TaskEngineError("engine down"), RuntimeError("connection lost"),
MemoryError("out of memory"), RecursionError("maximum recursion depth exceeded")
and should_raise is False for the first two and True for the latter two; within
the single test, call _make_mock_task_engine(side_effect=...) to create mock_te
and invoke sync_to_task_engine(...) and assert either that the call completes
(when should_raise is False) or that it raises the expected exception with
pytest.raises(..., match=...) when should_raise is True, preserving test names
and docstrings as needed for clarity.
- Around line 456-498: Two tests (test_max_turns_returns_unchanged and
test_budget_exhausted_returns_unchanged) duplicate the same logic; refactor them
into a single parametrized test using pytest.mark.parametrize over
TerminationReason values (TerminationReason.MAX_TURNS,
TerminationReason.BUDGET_EXHAUSTED) to avoid duplication. Update the test
function to accept a termination_reason parameter, reuse
AgentContext.from_identity, ctx.with_task_transition, and
_make_execution_result(…, reason=termination_reason), then call
apply_post_execution_transitions and assert the result is unchanged; keep
existing fixtures (sample_agent_with_personality, sample_task_with_criteria) and
the same agent_id/task_id arguments.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: dc564689-ff6c-4d3e-b94b-00640e0ab923

📥 Commits

Reviewing files that changed from the base of the PR and between b150ca7 and 0cd9770.

📒 Files selected for processing (3)
  • src/ai_company/engine/agent_engine.py
  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement
For dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state using model_copy(update=...)
Use Pydantic v2 with BaseModel, model_validator, computed_field, ConfigDict
Use @computed_field for derived values instead of storing redundant fields (e.g. TokenUsage.total_tokens)
Use NotBlankStr from core.types for all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare create_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Use only logger as the variable name (not _logger, not log)
Use domain-specific event name constants from ai_company.observability.events modules (e.g...

Files:

  • src/ai_company/engine/task_sync.py
  • src/ai_company/engine/agent_engine.py
src/ai_company/{providers,engine}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

RetryExhaustedError signals all retries failed — the engine layer catches this to trigger fallback chains

Files:

  • src/ai_company/engine/task_sync.py
  • src/ai_company/engine/agent_engine.py
{src,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like example-provider, example-large-001, test-provider, test-small-001

Files:

  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
  • src/ai_company/engine/agent_engine.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow
Prefer @pytest.mark.parametrize for testing similar cases

Files:

  • tests/unit/engine/test_task_sync.py
🧠 Learnings (6)
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`

Applied to files:

  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use `except A, B:` (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Functions must be less than 50 lines; files must be less than 800 lines

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: When review agents find valid issues (including pre-existing issues, suggestions, and findings adjacent to the PR's changes), fix them all — no deferring or 'out of scope' skipping

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to tests/**/*.py : Prefer `pytest.mark.parametrize` for testing similar cases

Applied to files:

  • tests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/ai_company/{providers,engine}/**/*.py : `RetryExhaustedError` signals all retries failed — the engine layer catches this to trigger fallback chains

Applied to files:

  • src/ai_company/engine/agent_engine.py
🧬 Code graph analysis (2)
src/ai_company/engine/task_sync.py (5)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/errors.py (2)
  • ExecutionStateError (12-13)
  • TaskEngineError (85-86)
src/ai_company/engine/loop_protocol.py (2)
  • TerminationReason (28-36)
  • ExecutionResult (79-140)
src/ai_company/engine/task_engine_models.py (1)
  • TransitionTaskMutation (185-236)
src/ai_company/engine/context.py (2)
  • AgentContext (87-307)
  • with_task_transition (235-278)
src/ai_company/engine/agent_engine.py (4)
src/ai_company/engine/errors.py (1)
  • ExecutionStateError (12-13)
src/ai_company/engine/task_sync.py (3)
  • apply_post_execution_transitions (150-204)
  • sync_to_task_engine (43-121)
  • transition_task_if_needed (124-147)
src/ai_company/engine/run_result.py (1)
  • termination_reason (64-66)
src/ai_company/engine/loop_protocol.py (1)
  • TerminationReason (28-36)

Comment on lines +150 to +205
async def apply_post_execution_transitions(
execution_result: ExecutionResult,
agent_id: str,
task_id: str,
task_engine: TaskEngine | None,
) -> ExecutionResult:
"""Apply post-execution task transitions based on termination reason.

COMPLETED triggers IN_PROGRESS -> IN_REVIEW -> COMPLETED.
SHUTDOWN triggers current status -> INTERRUPTED.
Each transition is synced to TaskEngine incrementally.
Transition failures are logged but never discard the result.
``MemoryError`` and ``RecursionError`` propagate unconditionally.

Returns the original ``execution_result`` unchanged if no
transitions apply, or a copy with updated context reflecting
the furthest-reached state on success or partial failure.
"""
ctx = execution_result.context
if ctx.task_execution is None:
return execution_result

reason = execution_result.termination_reason

if reason == TerminationReason.SHUTDOWN:
return await _transition_to_interrupted(
execution_result, ctx, agent_id, task_id, task_engine
)

if reason != TerminationReason.COMPLETED:
return execution_result

# Apply IN_PROGRESS -> IN_REVIEW -> COMPLETED stepwise so that
# ``ctx`` always reflects the furthest-reached state, even when
# one step raises (partial-completion safety).
for target, step_reason in _COMPLETION_STEPS:
try:
ctx = await _transition_and_sync(
ctx,
target_status=target,
reason=step_reason,
agent_id=agent_id,
task_id=task_id,
task_engine=task_engine,
)
except (ValueError, ExecutionStateError) as exc:
logger.exception(
EXECUTION_ENGINE_ERROR,
agent_id=agent_id,
task_id=task_id,
error=f"Post-execution transition failed: {exc}",
)
break

return execution_result.model_copy(update={"context": ctx})

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

apply_post_execution_transitions also exceeds the 50-line limit.

Line 150–Line 205 should be split (e.g., shutdown handler, completed handler, and final context merge) to satisfy policy and reduce branching complexity.

As per coding guidelines, “Functions must be less than 50 lines; files must be less than 800 lines”.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_sync.py` around lines 150 - 205, The function
apply_post_execution_transitions is over 50 lines and should be split into
smaller helpers: extract a shutdown handler (e.g., handle_shutdown) that invokes
_transition_to_interrupted with the same args and returns the resulting
ExecutionResult or early return when ctx.task_execution is None; extract a
completed handler (e.g., handle_completed_transitions) that receives the current
ctx, agent_id, task_id, task_engine and runs the for loop over _COMPLETION_STEPS
calling _transition_and_sync and logging/exiting on
ValueError/ExecutionStateError exactly as now; keep the final merge/return in
apply_post_execution_transitions to call the new helpers and return
execution_result.model_copy(update={"context": ctx}) while preserving
propagation of MemoryError and RecursionError and all existing logging and error
strings (EXECUTION_ENGINE_ERROR, step_reason, _transition_to_interrupted,
_transition_and_sync).

Copilot AI review requested due to automatic review settings March 13, 2026 05:48
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 05:49 — with GitHub Actions Inactive
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/ai_company/engine/task_sync.py (1)

171-227: 🧹 Nitpick | 🔵 Trivial

Function size is borderline but acceptable.

At ~42 lines of code (excluding the docstring), apply_post_execution_transitions is close to the 50-line limit. The current structure is reasonable with SHUTDOWN delegated to _transition_to_interrupted and the COMPLETED path using a clear stepwise loop. If future changes push this over the limit, consider extracting the COMPLETED loop into a private helper.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/ai_company/engine/task_sync.py` around lines 171 - 227, The function
apply_post_execution_transitions is approaching the 50-line limit; to keep it
maintainable and avoid exceeding the threshold, extract the COMPLETED-path loop
that iterates over _COMPLETION_STEPS and calls _transition_and_sync into a
private helper (for example _apply_completion_steps) and have
apply_post_execution_transitions call that helper, preserving the existing
SHUTDOWN delegation to _transition_to_interrupted and identical error
handling/logging semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/unit/engine/test_task_sync.py`:
- Around line 489-525: Replace the manual monkeypatching of
AgentContext.with_task_transition (using original_method and assigning
patched_transition) with unittest.mock.patch.object to ensure automatic
restoration; specifically, patch AgentContext.with_task_transition inside a with
patch.object(AgentContext, "with_task_transition", new=patched_transition):
block (or as a decorator) around the invocation of
apply_post_execution_transitions, keeping the same patched_transition behavior
that raises ExecutionStateError on the second call and preserving references to
call_count and original_transition so assertions about
out.context.task_execution and TaskStatus.IN_REVIEW remain valid.

---

Duplicate comments:
In `@src/ai_company/engine/task_sync.py`:
- Around line 171-227: The function apply_post_execution_transitions is
approaching the 50-line limit; to keep it maintainable and avoid exceeding the
threshold, extract the COMPLETED-path loop that iterates over _COMPLETION_STEPS
and calls _transition_and_sync into a private helper (for example
_apply_completion_steps) and have apply_post_execution_transitions call that
helper, preserving the existing SHUTDOWN delegation to
_transition_to_interrupted and identical error handling/logging semantics.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: d92cdab6-5b49-4744-ad2b-0c3ba3d6da01

📥 Commits

Reviewing files that changed from the base of the PR and between 0cd9770 and 3acb349.

📒 Files selected for processing (3)
  • docs/design/engine.md
  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Agent
  • GitHub Check: Greptile Review
  • GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (5)
src/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

src/**/*.py: No from __future__ import annotations — Python 3.14 has PEP 649
Use except A, B: (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, use copy.deepcopy() at construction and MappingProxyType wrapping for read-only enforcement
For dict/list fields in frozen Pydantic models, use copy.deepcopy() at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state using model_copy(update=...)
Use Pydantic v2 with BaseModel, model_validator, computed_field, ConfigDict
Use @computed_field for derived values instead of storing redundant fields (e.g. TokenUsage.total_tokens)
Use NotBlankStr from core.types for all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Prefer asyncio.TaskGroup for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare create_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have: from ai_company.observability import get_logger then logger = get_logger(__name__)
Never use import logging / logging.getLogger() / print() in application code
Use only logger as the variable name (not _logger, not log)
Use domain-specific event name constants from ai_company.observability.events modules (e.g...

Files:

  • src/ai_company/engine/task_sync.py
src/ai_company/{providers,engine}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

RetryExhaustedError signals all retries failed — the engine layer catches this to trigger fallback chains

Files:

  • src/ai_company/engine/task_sync.py
{src,tests}/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like example-provider, example-large-001, test-provider, test-small-001

Files:

  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
tests/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

tests/**/*.py: Mark tests with @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.e2e, and @pytest.mark.slow
Prefer @pytest.mark.parametrize for testing similar cases

Files:

  • tests/unit/engine/test_task_sync.py
docs/design/*.md

📄 CodeRabbit inference engine (CLAUDE.md)

When approved deviations occur, update the relevant docs/design/ page to reflect the new reality

Files:

  • docs/design/engine.md
🧠 Learnings (7)
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`

Applied to files:

  • src/ai_company/engine/task_sync.py
  • tests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use `except A, B:` (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: When review agents find valid issues (including pre-existing issues, suggestions, and findings adjacent to the PR's changes), fix them all — no deferring or 'out of scope' skipping

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Functions must be less than 50 lines; files must be less than 800 lines

Applied to files:

  • src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to tests/**/*.py : Prefer `pytest.mark.parametrize` for testing similar cases

Applied to files:

  • tests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: For trivial/docs-only changes, use `/pre-pr-review quick` to skip agents but still run automated checks

Applied to files:

  • docs/design/engine.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to docs/design/*.md : When approved deviations occur, update the relevant `docs/design/` page to reflect the new reality

Applied to files:

  • docs/design/engine.md
🧬 Code graph analysis (2)
src/ai_company/engine/task_sync.py (5)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/errors.py (2)
  • ExecutionStateError (12-13)
  • TaskEngineError (85-86)
src/ai_company/engine/loop_protocol.py (2)
  • TerminationReason (28-36)
  • ExecutionResult (79-140)
src/ai_company/engine/task_engine_models.py (1)
  • TransitionTaskMutation (185-236)
src/ai_company/engine/context.py (2)
  • AgentContext (87-307)
  • with_task_transition (235-278)
tests/unit/engine/test_task_sync.py (6)
src/ai_company/core/enums.py (1)
  • TaskStatus (198-224)
src/ai_company/engine/context.py (3)
  • AgentContext (87-307)
  • from_identity (140-171)
  • with_task_transition (235-278)
src/ai_company/engine/loop_protocol.py (3)
  • ExecutionResult (79-140)
  • TerminationReason (28-36)
  • TurnRecord (39-76)
src/ai_company/engine/task_engine_models.py (1)
  • TaskMutationResult (290-335)
src/ai_company/engine/task_sync.py (3)
  • apply_post_execution_transitions (171-227)
  • sync_to_task_engine (43-121)
  • transition_task_if_needed (145-168)
src/ai_company/core/task.py (1)
  • Task (45-261)
🔇 Additional comments (15)
src/ai_company/engine/task_sync.py (6)

1-31: LGTM!

Module setup follows guidelines correctly: proper logger initialization with get_logger(__name__), appropriate imports, and TYPE_CHECKING guard for type-only imports.


43-121: LGTM!

The sync_to_task_engine function correctly implements best-effort semantics:

  • Non-recoverable exceptions (MemoryError, RecursionError, asyncio.CancelledError) propagate unconditionally
  • Other exceptions are logged and swallowed via the extracted _log_sync_issue helper
  • Uses PEP 758 exception syntax without parentheses as required

124-142: LGTM!

The _log_sync_issue helper correctly addresses the past review feedback by using logger.error and logger.warning directly instead of a log alias.


145-168: LGTM!

The function is concise and correctly implements the pre-execution transition logic with critical=True for the initial sync.


230-263: LGTM!

The helper correctly implements the local-then-remote transition pattern with appropriate logging at INFO level for state transitions.


266-291: LGTM!

The shutdown transition handler correctly catches transition errors, logs with traceback via logger.exception, and returns the original result on failure.

docs/design/engine.md (4)

229-264: LGTM!

The new "AgentEngine ↔ TaskEngine Incremental Sync" section is comprehensive and well-structured. It clearly documents:

  • Transition sequences per execution path
  • Best-effort semantics with isolated failures
  • Critical vs. non-critical sync logging
  • Direct submit() usage rationale
  • Snapshot channel alignment

288-290: LGTM!

The TerminationReason enum documentation correctly includes the new PARKED value alongside the existing reasons.


414-417: LGTM!

The updated run() signature correctly includes the new effective_autonomy? parameter.


460-465: LGTM!

The post-execution transitions section now properly documents PARKED semantics as requested in past review feedback: tasks are suspended by an approval-timeout policy and remain at their current status until resumed. The cross-reference to the incremental sync section is helpful.

tests/unit/engine/test_task_sync.py (5)

1-100: LGTM!

The test module setup is well-organized with clear helper functions (_make_sync_success, _make_sync_failure, _make_mock_task_engine, _make_execution_result) that reduce boilerplate and improve test readability.


107-254: LGTM!

The TestSyncToTaskEngine class addresses past review feedback:

  • Exception propagation tests are now parametrized (lines 209-235)
  • Critical flag test now asserts log level via mock (lines 237-254)

Good coverage of all sync scenarios including no-op, success, rejection, and various exception types.


262-357: LGTM!

The TestTransitionTaskIfNeeded class provides comprehensive coverage of pre-execution transitions including:

  • ASSIGNED → IN_PROGRESS with sync verification
  • Passthrough for IN_PROGRESS and no task_execution
  • Local-only transition when task_engine is None

442-468: LGTM!

The test correctly uses @pytest.mark.parametrize to combine MAX_TURNS and BUDGET_EXHAUSTED scenarios, addressing the past review feedback about reducing duplication.


527-644: LGTM!

The remaining tests provide excellent coverage of edge cases:

  • Shutdown transition failure returns original result
  • COMPLETED path works with task_engine=None
  • Sync failures (rejections and exceptions) don't block local transitions

These tests validate the core "best-effort sync" design requirement.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +382 to +399
# Sync post-recovery status to TaskEngine (typically FAILED,
# depends on recovery strategy).
ctx = execution_result.context
if ctx.task_execution is not None and pre_recovery_status is not None:
logger.info(
EXECUTION_ENGINE_TASK_TRANSITION,
agent_id=agent_id,
task_id=task_id,
from_status=pre_recovery_status.value,
to_status=ctx.task_execution.status.value,
)
await sync_to_task_engine(
self._task_engine,
target_status=ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Post-recovery status: {ctx.task_execution.status.value}",
)
Comment on lines +820 to +833
logger.info(
EXECUTION_ENGINE_TASK_TRANSITION,
agent_id=agent_id,
task_id=task_id,
from_status=pre_fatal_status.value,
to_status=error_ctx.task_execution.status.value,
)
await sync_to_task_engine(
self._task_engine,
target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {type(exc).__name__}",
)
Comment on lines +114 to +121
_log_sync_issue(
critical=critical,
agent_id=agent_id,
task_id=task_id,
target_status=target_status,
error=result.error or "Mutation rejected (no error detail)",
error_code=result.error_code,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error_code passed without a None fallback

The sibling field result.error correctly applies an or fallback (result.error or "Mutation rejected (no error detail)"), but result.error_code is passed directly to _log_sync_issue without any guard. TaskMutationResult.error_code is typed str | None, so a rejection with no error code (e.g. one produced by an internal mutation path that never sets error_code) will log "error_code": null in structured output. Applying the same treatment keeps the log contract consistent and avoids potential issues in dashboards or alert filters that group by this field.

Suggested change
_log_sync_issue(
critical=critical,
agent_id=agent_id,
task_id=task_id,
target_status=target_status,
error=result.error or "Mutation rejected (no error detail)",
error_code=result.error_code,
)
_log_sync_issue(
critical=critical,
agent_id=agent_id,
task_id=task_id,
target_status=target_status,
error=result.error or "Mutation rejected (no error detail)",
error_code=result.error_code or "unknown",
)
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_sync.py
Line: 114-121

Comment:
**`error_code` passed without a `None` fallback**

The sibling field `result.error` correctly applies an `or` fallback (`result.error or "Mutation rejected (no error detail)"`), but `result.error_code` is passed directly to `_log_sync_issue` without any guard. `TaskMutationResult.error_code` is typed `str | None`, so a rejection with no error code (e.g. one produced by an internal mutation path that never sets `error_code`) will log `"error_code": null` in structured output. Applying the same treatment keeps the log contract consistent and avoids potential issues in dashboards or alert filters that group by this field.

```suggestion
    _log_sync_issue(
        critical=critical,
        agent_id=agent_id,
        task_id=task_id,
        target_status=target_status,
        error=result.error or "Mutation rejected (no error detail)",
        error_code=result.error_code or "unknown",
    )
```

How can I resolve this? If you propose a fix, please make it concise.

- Add status-difference check before post-recovery and fatal-error
  transition logs/syncs in agent_engine.py — prevents misleading
  EXECUTION_ENGINE_TASK_TRANSITION events and rejected no-op mutations
  when recovery strategy leaves status unchanged
- Replace manual monkey-patching with patch.object for cleaner
  automatic restoration in test_task_sync.py
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 06:07 — with GitHub Actions Inactive
@Aureliolo Aureliolo merged commit 7a68d34 into main Mar 13, 2026
17 checks passed
@Aureliolo Aureliolo deleted the feat/task-engine-status-sync branch March 13, 2026 06:11
@Aureliolo Aureliolo temporarily deployed to cloudflare-preview March 13, 2026 06:11 — with GitHub Actions Inactive
Comment on lines +835 to +841
await sync_to_task_engine(
self._task_engine,
target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {type(exc).__name__}",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal-error sync missing critical=True

The sync_to_task_engine call here uses the default critical=False, so any failure is logged at WARNING rather than ERROR. Fatal errors are arguably the most dangerous scenario for leaving a task permanently stuck at IN_PROGRESS — the very problem this feature aims to prevent. By contrast, the ASSIGNED → IN_PROGRESS sync in transition_task_if_needed already uses critical=True.

Passing critical=True here would surface missed syncs at ERROR level, matching the severity of the error path that landed here:

Suggested change
await sync_to_task_engine(
self._task_engine,
target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {type(exc).__name__}",
)
await sync_to_task_engine(
self._task_engine,
target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {type(exc).__name__}",
critical=True,
)

The same argument applies to the analogous sync_to_task_engine call in _post_execute (the TerminationReason.ERROR recovery path, lines ~397-403) which also uses the default critical=False.

Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/agent_engine.py
Line: 835-841

Comment:
**Fatal-error sync missing `critical=True`**

The `sync_to_task_engine` call here uses the default `critical=False`, so any failure is logged at WARNING rather than ERROR. Fatal errors are arguably the most dangerous scenario for leaving a task permanently stuck at `IN_PROGRESS` — the very problem this feature aims to prevent. By contrast, the `ASSIGNED → IN_PROGRESS` sync in `transition_task_if_needed` already uses `critical=True`.

Passing `critical=True` here would surface missed syncs at ERROR level, matching the severity of the error path that landed here:

```suggestion
                await sync_to_task_engine(
                    self._task_engine,
                    target_status=error_ctx.task_execution.status,
                    task_id=task_id,
                    agent_id=agent_id,
                    reason=f"Fatal error recovery: {type(exc).__name__}",
                    critical=True,
                )
```

The same argument applies to the analogous `sync_to_task_engine` call in `_post_execute` (the `TerminationReason.ERROR` recovery path, lines ~397-403) which also uses the default `critical=False`.

How can I resolve this? If you propose a fix, please make it concise.

Aureliolo added a commit that referenced this pull request Mar 13, 2026
🤖 I have created a release *beep* *boop*
---


##
[0.1.3](v0.1.2...v0.1.3)
(2026-03-13)


### Features

* add Mem0 memory backend adapter
([#345](#345))
([2788db8](2788db8)),
closes [#206](#206)
* centralized single-writer TaskEngine with full CRUD API
([#328](#328))
([9c1a3e1](9c1a3e1))
* incremental AgentEngine → TaskEngine status sync
([#331](#331))
([7a68d34](7a68d34)),
closes [#323](#323)
* web dashboard pages — views, components, tests, and review fixes
([#354](#354))
([b165ec4](b165ec4))
* web dashboard with Vue 3 + PrimeVue + Tailwind CSS
([#347](#347))
([06416b1](06416b1))


### Bug Fixes

* harden coordination pipeline with validators, logging, and fail-fast
([#333](#333))
([2f10d49](2f10d49)),
closes [#205](#205)
* repo-wide security hardening from ZAP, Scorecard, and CodeQL audit
([#357](#357))
([27eb288](27eb288))


### CI/CD

* add pip-audit, hadolint, OSSF Scorecard, ZAP DAST, and pre-push hooks
([#350](#350))
([2802d20](2802d20))
* add workflow_dispatch trigger to PR Preview for Dependabot PRs
([#326](#326))
([4c7b6d9](4c7b6d9))
* bump astral-sh/setup-uv from 7.4.0 to 7.5.0 in the minor-and-patch
group ([#335](#335))
([98dd8ca](98dd8ca))


### Maintenance

* bump the minor-and-patch group across 1 directory with 3 updates
([#352](#352))
([031b1c9](031b1c9))
* **deps:** bump devalue from 5.6.3 to 5.6.4 in /site in the
npm_and_yarn group across 1 directory
([#324](#324))
([9a9c600](9a9c600))
* migrate docs build from MkDocs to Zensical
([#330](#330))
([fa8bf1d](fa8bf1d)),
closes [#329](#329)

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
This was referenced Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

research: AgentEngine → TaskEngine real-time status sync strategy

2 participants