feat: incremental AgentEngine → TaskEngine status sync#331
Conversation
Replace the single final-status report with incremental sync at every lifecycle transition (IN_PROGRESS, IN_REVIEW, COMPLETED, INTERRUPTED, FAILED). This gives real-time visibility into execution progress and improves crash recovery — a crash mid-execution now leaves the task at the last-reached stage instead of stuck at ASSIGNED. - Add _sync_to_task_engine() best-effort helper using submit() directly for optimistic concurrency version access - Fix channel mismatch: TaskEngine._SNAPSHOT_CHANNEL "task_engine" → "tasks" so events reach MessageBusBridge and WebSocket consumers - Remove _REPORTABLE_STATUSES and _report_to_task_engine() (superseded) - Add EXECUTION_ENGINE_TASK_SYNCED / EXECUTION_ENGINE_SYNC_FAILED events - Rewrite tests: verify 3 syncs (COMPLETED), 2 (SHUTDOWN/ERROR), 1 (MAX_TURNS), best-effort isolation, error swallowing - Document incremental sync pattern in engine design spec Closes #323
…findings Pre-reviewed by 9 agents, 14 findings addressed: - Extract sync/transition methods from AgentEngine (1094→858 lines) into engine/task_sync.py (263 lines) for cohesion - Remove dead expected_version parameter and int|None return from sync (optimistic concurrency was never wired up) - Move uuid4 import to module level (no circular dep risk) - Add TaskEngine sync in _handle_fatal_error (prevents task stuck at IN_PROGRESS after fatal errors) - Add _SNAPSHOT_CHANNEL == CHANNEL_TASKS invariant test - Add RecursionError propagation test - Move event constants to ENGINE cluster in execution.py - Fix docs: run() signature, concurrency claim, step 10 cross-ref - Update CLAUDE.md engine/ description
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the visibility and resilience of task execution by introducing incremental status synchronization between the AgentEngine and TaskEngine. Instead of a single final status update, the AgentEngine now reports task progress at key lifecycle points, which provides real-time insights and improves crash recovery by ensuring tasks are left in their last-reached valid state. The underlying synchronization logic has been refactored into a new, dedicated module, and robust error handling has been implemented to ensure sync failures do not impede agent execution. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a best-effort, incremental task-status synchronization path from AgentEngine to TaskEngine via a new task_sync module; refactors AgentEngine to await async context preparation and delegate transitions; updates TaskEngine snapshot channel, observability events, docs, and tests to validate incremental syncing. Changes
Sequence Diagram(s)sequenceDiagram
participant Agent as AgentEngine
participant Sync as task_sync
participant TaskEng as TaskEngine
Agent->>Sync: transition_task_if_needed(ctx, agent_id, task_id, task_engine)
Sync->>TaskEng: submit(TransitionTaskMutation IN_PROGRESS)
TaskEng-->>Sync: TaskMutationResult
Sync-->>Agent: updated ctx
Agent->>Agent: execute task
Agent->>Sync: apply_post_execution_transitions(result, agent_id, task_id, task_engine)
alt Completed path
Sync->>TaskEng: submit(TransitionTaskMutation IN_PROGRESS→IN_REVIEW)
TaskEng-->>Sync: TaskMutationResult
Sync->>TaskEng: submit(TransitionTaskMutation IN_REVIEW→COMPLETED)
TaskEng-->>Sync: TaskMutationResult
else Shutdown/Error
Sync->>TaskEng: submit(TransitionTaskMutation IN_PROGRESS→INTERRUPTED)
TaskEng-->>Sync: TaskMutationResult
end
Sync->>TaskEng: sync_to_task_engine(final_status, reason, critical?)
TaskEng-->>Sync: TaskMutationResult
Sync-->>Agent: return final result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
📝 Coding Plan
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces incremental task status synchronization between AgentEngine and TaskEngine, significantly improving real-time visibility and crash recovery. The extraction of sync logic into a dedicated task_sync.py module enhances maintainability and cohesion. Documentation and unit tests have been thoroughly updated to reflect these changes, covering various transition paths and error scenarios. The fix for _SNAPSHOT_CHANNEL in TaskEngine ensures correct WebSocket event propagation. Overall, this is a well-structured and impactful feature.
src/ai_company/engine/task_sync.py
Outdated
| reason=reason, | ||
| ) | ||
| result = await task_engine.submit(mutation) | ||
| except MemoryError, RecursionError: |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #331 +/- ##
==========================================
+ Coverage 93.75% 93.76% +0.01%
==========================================
Files 433 434 +1
Lines 19831 19850 +19
Branches 1911 1915 +4
==========================================
+ Hits 18592 18613 +21
+ Misses 957 956 -1
+ Partials 282 281 -1 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR improves real-time task lifecycle visibility by syncing AgentEngine task status transitions to TaskEngine incrementally throughout execution (instead of only syncing a final status), and extracts the sync logic into a dedicated module for maintainability.
Changes:
- Add
engine/task_sync.pywith best-effort incremental sync helpers and transition application logic. - Update
AgentEngineto use incremental sync at start, post-execution, and fatal-error paths. - Fix
TaskEnginesnapshot publication channel to match the API/WebSocket channel ("tasks"), and expand tests/docs accordingly.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/ai_company/engine/task_sync.py |
New extracted module implementing best-effort incremental TaskEngine sync and transition helpers. |
src/ai_company/engine/agent_engine.py |
Integrates the new sync module across lifecycle points and adds fatal-error sync. |
src/ai_company/engine/task_engine.py |
Updates snapshot channel constant to "tasks" to align with WebSocket consumers. |
src/ai_company/observability/events/execution.py |
Adds execution event constants for sync success/failure. |
tests/unit/engine/test_agent_engine.py |
Reworks task-engine sync tests for incremental transitions + adds channel invariant test. |
docs/design/engine.md |
Documents the incremental sync design and updates the run() signature reference. |
CLAUDE.md |
Updates package description to mention incremental status sync. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| target_status=error_ctx.task_execution.status, | ||
| task_id=task_id, | ||
| agent_id=agent_id, | ||
| reason=f"Fatal error recovery: {error_msg}", |
There was a problem hiding this comment.
The transition reason sent to TaskEngine includes the full exception string (error_msg). That reason is propagated into TaskEngine snapshot events (see TaskEngine._publish_snapshot uses mutation.reason), which can leak internal details to WebSocket consumers (file paths, tokens, etc.). Consider using a sanitized/generic reason (or at most the exception type) and keep full details only in logs/metrics.
| reason=f"Fatal error recovery: {error_msg}", | |
| reason=f"Fatal error recovery: {type(exc).__name__}", |
| | Path | Synced transitions | | ||
| |------|--------------------| | ||
| | Happy (COMPLETED) | `IN_PROGRESS` → `IN_REVIEW` → `COMPLETED` | | ||
| | Shutdown | `IN_PROGRESS` → `INTERRUPTED` | | ||
| | Error | `IN_PROGRESS` → `FAILED` (after recovery) | |
There was a problem hiding this comment.
Markdown table formatting here uses double pipes (|| ... | ... |), which will render as an extra empty first column in most Markdown renderers. Use standard table syntax with single leading pipes on each row (e.g., | Path | Synced transitions |).
Greptile SummaryThis PR introduces incremental Key findings from this review:
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Caller
participant AE as AgentEngine
participant TS as task_sync
participant TE as TaskEngine
participant WS as WebSocket (CHANNEL_TASKS)
C->>AE: run(identity, task)
AE->>TS: transition_task_if_needed(ctx, critical=True)
TS->>TE: submit(ASSIGNED → IN_PROGRESS)
TE-->>TS: TaskMutationResult(success=True)
TS-->>AE: updated ctx (IN_PROGRESS)
AE->>AE: _run_loop_with_timeout()
AE-->>AE: ExecutionResult(COMPLETED)
AE->>TS: apply_post_execution_transitions(COMPLETED)
TS->>TE: submit(IN_PROGRESS → IN_REVIEW)
TE->>WS: publish snapshot (tasks channel)
TE-->>TS: TaskMutationResult(success=True)
TS->>TE: submit(IN_REVIEW → COMPLETED)
TE->>WS: publish snapshot (tasks channel)
TE-->>TS: TaskMutationResult(success=True)
TS-->>AE: ExecutionResult(ctx=COMPLETED)
alt TerminationReason.ERROR
AE->>TS: _apply_recovery → sync_to_task_engine(FAILED)
TS->>TE: submit(IN_PROGRESS → FAILED)
TE->>WS: publish snapshot
end
alt Fatal error (bypass pipeline)
AE->>AE: _handle_fatal_error()
AE->>TS: sync_to_task_engine(FAILED)
TS->>TE: submit(→ FAILED)
TE->>WS: publish snapshot
end
AE-->>C: AgentRunResult
Prompt To Fix All With AIThis is a comment left during a code review.
Path: src/ai_company/engine/agent_engine.py
Line: 835-841
Comment:
**Fatal-error sync missing `critical=True`**
The `sync_to_task_engine` call here uses the default `critical=False`, so any failure is logged at WARNING rather than ERROR. Fatal errors are arguably the most dangerous scenario for leaving a task permanently stuck at `IN_PROGRESS` — the very problem this feature aims to prevent. By contrast, the `ASSIGNED → IN_PROGRESS` sync in `transition_task_if_needed` already uses `critical=True`.
Passing `critical=True` here would surface missed syncs at ERROR level, matching the severity of the error path that landed here:
```suggestion
await sync_to_task_engine(
self._task_engine,
target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {type(exc).__name__}",
critical=True,
)
```
The same argument applies to the analogous `sync_to_task_engine` call in `_post_execute` (the `TerminationReason.ERROR` recovery path, lines ~397-403) which also uses the default `critical=False`.
How can I resolve this? If you propose a fix, please make it concise.
---
This is a comment left during a code review.
Path: src/ai_company/engine/task_sync.py
Line: 85-86
Comment:
**Missing parentheses for multi-exception `except` clause**
`except MemoryError, RecursionError, asyncio.CancelledError:` uses the Python 2 bare-comma form. In Python 3 the required form is a parenthesised tuple; the bare-comma variant is a `SyntaxError`. The existing `agent_engine.py` uses the same bare-comma pattern in several pre-existing `except MemoryError, RecursionError:` clauses, but none of those clauses include a third item, which may explain why they haven't been caught. This new three-item clause is the most exposed instance.
```suggestion
except (MemoryError, RecursionError, asyncio.CancelledError):
```
How can I resolve this? If you propose a fix, please make it concise.Last reviewed commit: 67f463c |
- Add missing EXECUTION_ENGINE_TASK_TRANSITION logs for ERROR/FAILED paths - Fix partial-completion return to always use model_copy with furthest ctx - Sanitize fatal error reason to type name only (prevent info leak) - Capture exception in classification catch block for detailed logging - Expand sync_to_task_engine docstring with Raises section - Add rejection error_code and fallback message to sync failure log - Fix engine.md transition count range and IN_PROGRESS description - Add PARKED to TerminationReason in engine.md - Clarify comments for recovery strategy and post-recovery paths - Add 23 dedicated unit tests for task_sync module
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/ai_company/engine/task_sync.py
Outdated
| result = await task_engine.submit(mutation) | ||
| except MemoryError, RecursionError: | ||
| raise | ||
| except Exception as exc: | ||
| error = ( | ||
| "TaskEngine unavailable" |
src/ai_company/engine/task_sync.py
Outdated
| try: | ||
| ctx = await _transition_to_complete(ctx, agent_id, task_id, task_engine) | ||
| except (ValueError, ExecutionStateError) as exc: | ||
| logger.exception( | ||
| EXECUTION_ENGINE_ERROR, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| error=f"Post-execution transition failed: {exc}", | ||
| ) | ||
| # Return with whatever context _transition_to_complete reached | ||
| # so the caller reflects the furthest-synced state. | ||
|
|
||
| return execution_result.model_copy(update={"context": ctx}) |
| logger.info( | ||
| EXECUTION_ENGINE_TASK_TRANSITION, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| from_status="recovery", | ||
| to_status=ctx.task_execution.status.value, | ||
| ) |
| logger.info( | ||
| EXECUTION_ENGINE_TASK_TRANSITION, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| from_status="recovery", |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/design/engine.md`:
- Around line 289-290: Update the post-execution lifecycle docs to describe the
newly exposed enum value PARKED: state explicitly whether a PARKED termination
is a no-op that leaves the task in its current state (same behavior as MAX_TURNS
and BUDGET_EXHAUSTED) or whether it transitions to a specific state;
specifically modify the post-execution section in docs/design/engine.md to
mention PARKED alongside MAX_TURNS and BUDGET_EXHAUSTED and state the exact
semantics (e.g., "PARKED: leaves task in current state" or the alternative) and
keep the note about max_turns defaulting to 20 intact for context.
In `@src/ai_company/engine/agent_engine.py`:
- Around line 376-393: The code unconditionally logs
EXECUTION_ENGINE_TASK_TRANSITION and calls sync_to_task_engine after calling
_apply_recovery even when recovery returned the original ExecutionResult; update
the logic in the recovery handling (use the local variable execution_result and
its .context.task_execution) to compare the pre-recovery status and
post-recovery status (e.g., capture pre_status =
execution_result.context.task_execution.status before calling _apply_recovery
and post_status after) and only emit the EXECUTION_ENGINE_TASK_TRANSITION log
and call sync_to_task_engine when post_status != pre_status; extract this
compare-and-sync behavior into a small helper (used by both the current block
around execution_result handling and the other recovery block that also
logs/syncs) so the two sites share the same implementation and cannot drift,
referencing _apply_recovery, ExecutionResult, ctx.task_execution,
EXECUTION_ENGINE_TASK_TRANSITION, and sync_to_task_engine.
In `@src/ai_company/engine/task_sync.py`:
- Around line 32-110: The sync_to_task_engine function exceeds the 50-line
limit; split its non-success paths into two private helpers: create a helper
_handle_sync_exception(exc, agent_id, task_id, target_status, critical) to
contain the except Exception branch (determine the error message for
TaskEngineError vs other exceptions and call logger.error or logger.warning
based on critical), and another helper _handle_rejected_mutation(result,
agent_id, task_id, target_status, critical) to contain the else:
rejected-mutation logging (log error text, error_code and use correct severity).
Refactor sync_to_task_engine to keep only the happy-path mutation
construction/await and then call these two helpers from the except and rejected
branches, preserving existing args and log keys (EXECUTION_ENGINE_SYNC_FAILED,
EXECUTION_ENGINE_TASK_SYNCED) and propagating MemoryError/RecursionError
unchanged.
- Around line 170-182: The call to _transition_to_complete can partially succeed
(e.g., move IN_PROGRESS -> IN_REVIEW) then raise, but the except block currently
returns the original ctx, losing the furthest-synced state; update the except
handler so that on exception you try to re-load the latest task context from the
engine (using task_engine and agent_id/task_id) and set ctx to that latest
context (falling back to the original ctx only if reload fails), then return
execution_result.model_copy(update={"context": ctx}); keep references to
_transition_to_complete, task_engine, execution_result, ctx, agent_id, and
task_id to locate the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 25b8d0a5-be6b-46e8-93e1-1b2ade5d703e
📒 Files selected for processing (4)
docs/design/engine.mdsrc/ai_company/engine/agent_engine.pysrc/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (5)
docs/design/*.md
📄 CodeRabbit inference engine (CLAUDE.md)
When approved deviations occur, update the relevant
docs/design/page to reflect the new reality
Files:
docs/design/engine.md
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slow
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/engine/test_task_sync.py
{src,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like
example-provider,example-large-001,test-provider,test-small-001
Files:
tests/unit/engine/test_task_sync.pysrc/ai_company/engine/agent_engine.pysrc/ai_company/engine/task_sync.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:(no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, usecopy.deepcopy()at construction andMappingProxyTypewrapping for read-only enforcement
Fordict/listfields in frozen Pydantic models, usecopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state usingmodel_copy(update=...)
Use Pydantic v2 withBaseModel,model_validator,computed_field,ConfigDict
Use@computed_fieldfor derived values instead of storing redundant fields (e.g.TokenUsage.total_tokens)
UseNotBlankStrfromcore.typesfor all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over barecreate_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Use onlyloggeras the variable name (not_logger, notlog)
Use domain-specific event name constants fromai_company.observability.eventsmodules (e.g...
Files:
src/ai_company/engine/agent_engine.pysrc/ai_company/engine/task_sync.py
src/ai_company/{providers,engine}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
RetryExhaustedErrorsignals all retries failed — the engine layer catches this to trigger fallback chains
Files:
src/ai_company/engine/agent_engine.pysrc/ai_company/engine/task_sync.py
🧠 Learnings (4)
📓 Common learnings
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/ai_company/{providers,engine}/**/*.py : `RetryExhaustedError` signals all retries failed — the engine layer catches this to trigger fallback chains
Applied to files:
src/ai_company/engine/agent_engine.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use `except A, B:` (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Applied to files:
src/ai_company/engine/task_sync.py
🧬 Code graph analysis (1)
src/ai_company/engine/agent_engine.py (4)
src/ai_company/engine/errors.py (1)
ExecutionStateError(12-13)src/ai_company/engine/task_sync.py (2)
sync_to_task_engine(32-109)transition_task_if_needed(112-135)src/ai_company/engine/run_result.py (1)
termination_reason(64-66)src/ai_company/engine/loop_protocol.py (1)
TerminationReason(28-36)
| : Enum: `COMPLETED`, `MAX_TURNS`, `BUDGET_EXHAUSTED`, `SHUTDOWN`, `ERROR`, | ||
| `PARKED`. `max_turns` defaults to 20. |
There was a problem hiding this comment.
Document what PARKED does to task state.
The enum now exposes PARKED, but the post-execution section still only documents MAX_TURNS and BUDGET_EXHAUSTED as no-op terminations. Please spell out whether parked runs also leave the task in its current state so the lifecycle description stays complete.
As per coding guidelines, "docs/design/*.md: When approved deviations occur, update the relevant docs/design/ page to reflect the new reality".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/design/engine.md` around lines 289 - 290, Update the post-execution
lifecycle docs to describe the newly exposed enum value PARKED: state explicitly
whether a PARKED termination is a no-op that leaves the task in its current
state (same behavior as MAX_TURNS and BUDGET_EXHAUSTED) or whether it
transitions to a specific state; specifically modify the post-execution section
in docs/design/engine.md to mention PARKED alongside MAX_TURNS and
BUDGET_EXHAUSTED and state the exact semantics (e.g., "PARKED: leaves task in
current state" or the alternative) and keep the note about max_turns defaulting
to 20 intact for context.
- Propagate asyncio.CancelledError in sync_to_task_engine (not swallowed) - Restructure completion transitions to stepwise loop so partial failure returns furthest-reached state (IN_REVIEW, not IN_PROGRESS) - Replace from_status="recovery" with actual pre-recovery TaskStatus value in both ERROR and fatal-error paths for consistent log event schema - Add test_cancelled_error_propagates test case
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/engine/test_task_sync.py`:
- Around line 238-251: The test test_critical_flag_does_not_raise currently only
checks that sync_to_task_engine does not raise when TaskEngineError is raised
with critical=True; update the test to also assert that the expected log
event/level is emitted (e.g., capture logs or use a test logger fixture and
assert an ERROR/CRITICAL log entry was produced) so the behavior "changes log
level" is actually verified; reference sync_to_task_engine, TaskEngineError and
the critical=True invocation to locate where to add the log capture and
assertion.
- Around line 439-481: Replace the two almost-identical tests
test_max_turns_returns_unchanged and test_budget_exhausted_returns_unchanged
with a single parametrized test using pytest.mark.parametrize over the
termination reasons (TerminationReason.MAX_TURNS and
TerminationReason.BUDGET_EXHAUSTED); inside the new test reuse
AgentContext.from_identity, ctx.with_task_transition,
_make_execution_result(reason=reason), and call apply_post_execution_transitions
with the same agent_id/task_id/task_engine arguments, asserting the returned
value is the same object (assert out is result); keep the original test name(s)
descriptive and remove the duplicated functions.
- Around line 180-237: Replace the four near-duplicate tests
(test_task_engine_error_swallowed, test_unexpected_exception_swallowed,
test_memory_error_propagates, test_recursion_error_propagates) with a single
parametrized test using pytest.mark.parametrize that iterates over cases built
with _make_mock_task_engine(..., side_effect=...) and an indicator of whether
the exception should propagate; inside the test call sync_to_task_engine(...)
for each case and use pytest.raises(expected_exception) for the
MemoryError/RecursionError cases and a simple await (no raise) for the swallowed
cases (TaskEngineError and RuntimeError); reference sync_to_task_engine and
_make_mock_task_engine in the new parametrized test to locate and reuse the same
setup.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 90687c2d-b45c-4c20-a529-d594f24d145f
📒 Files selected for processing (1)
tests/unit/engine/test_task_sync.py
📜 Review details
🧰 Additional context used
📓 Path-based instructions (2)
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slow
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/engine/test_task_sync.py
{src,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like
example-provider,example-large-001,test-provider,test-small-001
Files:
tests/unit/engine/test_task_sync.py
🧠 Learnings (1)
📓 Common learnings
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
🧬 Code graph analysis (1)
tests/unit/engine/test_task_sync.py (6)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/engine/errors.py (2)
ExecutionStateError(12-13)TaskEngineError(85-86)src/ai_company/engine/loop_protocol.py (3)
ExecutionResult(79-140)TerminationReason(28-36)TurnRecord(39-76)src/ai_company/engine/task_sync.py (1)
sync_to_task_engine(32-109)src/ai_company/providers/enums.py (1)
FinishReason(15-22)src/ai_company/core/agent.py (1)
AgentIdentity(266-342)
🔇 Additional comments (1)
tests/unit/engine/test_task_sync.py (1)
383-412: Good coverage of the COMPLETED transition chain.This segment correctly verifies both local final state (
COMPLETED) and remote sync sequence (IN_REVIEWthenCOMPLETED), which directly exercises the incremental-sync contract.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/ai_company/engine/agent_engine.py (1)
344-420: 🛠️ Refactor suggestion | 🟠 MajorRefactor
_post_execution_pipelineunder 50 lines.Line 344–Line 420 exceeds the function-size policy. Split into focused helpers (cost recording, post-execution transition application, recovery+sync, classification) to keep behavior readable and policy-compliant.
As per coding guidelines, “Functions must be less than 50 lines; files must be less than 800 lines”.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/agent_engine.py` around lines 344 - 420, Split _post_execution_pipeline into small helpers: move cost recording into a helper (e.g., _record_execution_costs) that calls record_execution_costs with the same args and tracker, move transition logic into an _apply_post_transitions helper that invokes apply_post_execution_transitions, extract the recovery+sync block into _handle_recovery_and_sync which captures pre_recovery_status, calls self._apply_recovery, logs EXECUTION_ENGINE_TASK_TRANSITION, and calls sync_to_task_engine exactly as before, and move the classification block into _classify_execution_errors which wraps classify_execution_errors in the same try/except (re-raising MemoryError and RecursionError, catching Exception and logging EXECUTION_ENGINE_ERROR with exc_info). Then have _post_execution_pipeline call these helpers in the same order (record -> transitions -> if error then handle_recovery_and_sync -> classify) and return the final execution_result unchanged; ensure all original symbols (record_execution_costs, apply_post_execution_transitions, self._apply_recovery, sync_to_task_engine, classify_execution_errors, self._error_taxonomy_config, EXECUTION_ENGINE_TASK_TRANSITION, EXECUTION_ENGINE_ERROR, TerminationReason) and behavior (pre_recovery_status capture and exact log messages/parameters) are preserved.
♻️ Duplicate comments (5)
tests/unit/engine/test_task_sync.py (3)
238-251:⚠️ Potential issue | 🟡 Minor
critical=Truetest does not verify the claimed behavior.The docstring at Line 239 says log level changes, but the test only asserts “no raise”. Add a log assertion for severity/event to validate the critical-path contract.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_task_sync.py` around lines 238 - 251, The test test_critical_flag_does_not_raise currently only ensures no exception when calling sync_to_task_engine with critical=True but does not assert that logging severity changed; update the test to capture/log records (e.g., using caplog or a mock logger) around the call and assert that a log entry at the expected higher severity (e.g., ERROR/CRITICAL) was emitted by sync_to_task_engine when the TaskEngineError("unavailable") side_effect occurs on mock_te; reference the test function test_critical_flag_does_not_raise, the mock task engine mock_te, and the sync_to_task_engine invocation to locate where to add the log capture and severity assertion.
180-237: 🧹 Nitpick | 🔵 TrivialCollapse exception-path sync tests into a single parametrized test.
Line 180–Line 237 repeats the same structure for swallowed vs propagated exceptions. Please consolidate with
@pytest.mark.parametrizeto reduce duplication and improve extensibility.As per coding guidelines, “tests/**/*.py: Prefer
@pytest.mark.parametrizefor testing similar cases”.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_task_sync.py` around lines 180 - 237, Collapse the four nearly identical tests (test_task_engine_error_swallowed, test_unexpected_exception_swallowed, test_memory_error_propagates, test_recursion_error_propagates) into one parametrized pytest using `@pytest.mark.parametrize`: enumerate cases of (side_effect_exception, should_raise) where side_effects are TaskEngineError("engine down"), RuntimeError("connection lost"), MemoryError("out of memory"), RecursionError("maximum recursion depth exceeded") and should_raise is False for the first two and True for the latter two; within the single test, call _make_mock_task_engine(side_effect=...) to create mock_te and invoke sync_to_task_engine(...) and assert either that the call completes (when should_raise is False) or that it raises the expected exception with pytest.raises(..., match=...) when should_raise is True, preserving test names and docstrings as needed for clarity.
456-498: 🧹 Nitpick | 🔵 TrivialParametrize unchanged-termination tests.
Line 456–Line 498 (
MAX_TURNSandBUDGET_EXHAUSTED) assert the same behavior and should be one parametrized test.As per coding guidelines, “tests/**/*.py: Prefer
@pytest.mark.parametrizefor testing similar cases”.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/engine/test_task_sync.py` around lines 456 - 498, Two tests (test_max_turns_returns_unchanged and test_budget_exhausted_returns_unchanged) duplicate the same logic; refactor them into a single parametrized test using pytest.mark.parametrize over TerminationReason values (TerminationReason.MAX_TURNS, TerminationReason.BUDGET_EXHAUSTED) to avoid duplication. Update the test function to accept a termination_reason parameter, reuse AgentContext.from_identity, ctx.with_task_transition, and _make_execution_result(…, reason=termination_reason), then call apply_post_execution_transitions and assert the result is unchanged; keep existing fixtures (sample_agent_with_personality, sample_task_with_criteria) and the same agent_id/task_id arguments.src/ai_company/engine/agent_engine.py (1)
385-399:⚠️ Potential issue | 🟠 MajorOnly log/sync when recovery actually changes task status.
At Line 385 and Line 819, transition logging/sync runs even when
pre_*_status == post_status. This can emit fake transition events and generate no-op/rejected sync attempts. Gate these blocks on a real status change and reuse one helper for both call sites.🛠️ Proposed fix pattern
- if ctx.task_execution is not None and pre_recovery_status is not None: + if ( + ctx.task_execution is not None + and pre_recovery_status is not None + and ctx.task_execution.status != pre_recovery_status + ): logger.info( EXECUTION_ENGINE_TASK_TRANSITION, @@ - if error_ctx.task_execution is not None and pre_fatal_status is not None: + if ( + error_ctx.task_execution is not None + and pre_fatal_status is not None + and error_ctx.task_execution.status != pre_fatal_status + ): logger.info( EXECUTION_ENGINE_TASK_TRANSITION,Also applies to: 819-833
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/agent_engine.py` around lines 385 - 399, The transition logging/sync block should only run when the task status actually changed: add a check comparing pre_recovery_status (or pre_*_status) against ctx.task_execution.status (or post status) and skip logging/sync if equal; extract the duplicated logic into a single helper (e.g., _emit_task_transition_and_sync or similar) that accepts agent_id, task_id, pre_status, post_status and the task_engine, calls logger.info(EXECUTION_ENGINE_TASK_TRANSITION, ...) and then awaits sync_to_task_engine(...) so both call sites (the blocks using ctx.task_execution and the other site referenced around the second occurrence) call this helper only when pre_status != post_status.src/ai_company/engine/task_sync.py (1)
43-122: 🛠️ Refactor suggestion | 🟠 MajorSplit
sync_to_task_engineto comply with the 50-line function limit.Line 43–Line 122 exceeds the repository’s hard function-length limit, which makes this control flow harder to maintain and reason about. Please extract exception/rejected-mutation branches into private helpers.
As per coding guidelines, “Functions must be less than 50 lines; files must be less than 800 lines”.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_sync.py` around lines 43 - 122, sync_to_task_engine is too long; split its exception-handling and rejected-mutation branches into small private helpers to keep the function under 50 lines. Create e.g. _handle_sync_exception(exc, agent_id, task_id, target_status, critical) that re-raises MemoryError/RecursionError/asyncio.CancelledError and otherwise logs using logger.error or logger.warning based on the critical flag (preserving the EXECUTION_ENGINE_SYNC_FAILED message and exc_info). Create e.g. _handle_rejected_mutation(result, agent_id, task_id, target_status, critical) that logs the rejection (preserving result.error/result.error_code and EXECUTION_ENGINE_SYNC_FAILED) and keep sync_to_task_engine focused on building the mutation, awaiting task_engine.submit, calling the two helpers from the except/else branches, and returning on success (preserve the debug log for EXECUTION_ENGINE_TASK_SYNCED and unchanged semantics).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/ai_company/engine/task_sync.py`:
- Line 76: The current code creates a `log` alias (e.g., `log = logger.error if
critical else logger.warning`) which violates the naming rule; remove any `log =
...` assignments and instead call `logger.error(...)` or `logger.warning(...)`
directly based on the `critical` condition at each call site. Locate the `log =
...` assignment and every subsequent `log(...)` usage (notably the blocks around
the comment ranges 76, 94-101, and 114-121) and replace them with conditional
calls like `if critical: logger.error(...) else: logger.warning(...)` so
`logger` remains the only logger variable name. Ensure no new logger aliases
(`log`, `_logger`, etc.) are introduced elsewhere in the function or module.
- Around line 150-205: The function apply_post_execution_transitions is over 50
lines and should be split into smaller helpers: extract a shutdown handler
(e.g., handle_shutdown) that invokes _transition_to_interrupted with the same
args and returns the resulting ExecutionResult or early return when
ctx.task_execution is None; extract a completed handler (e.g.,
handle_completed_transitions) that receives the current ctx, agent_id, task_id,
task_engine and runs the for loop over _COMPLETION_STEPS calling
_transition_and_sync and logging/exiting on ValueError/ExecutionStateError
exactly as now; keep the final merge/return in apply_post_execution_transitions
to call the new helpers and return
execution_result.model_copy(update={"context": ctx}) while preserving
propagation of MemoryError and RecursionError and all existing logging and error
strings (EXECUTION_ENGINE_ERROR, step_reason, _transition_to_interrupted,
_transition_and_sync).
---
Outside diff comments:
In `@src/ai_company/engine/agent_engine.py`:
- Around line 344-420: Split _post_execution_pipeline into small helpers: move
cost recording into a helper (e.g., _record_execution_costs) that calls
record_execution_costs with the same args and tracker, move transition logic
into an _apply_post_transitions helper that invokes
apply_post_execution_transitions, extract the recovery+sync block into
_handle_recovery_and_sync which captures pre_recovery_status, calls
self._apply_recovery, logs EXECUTION_ENGINE_TASK_TRANSITION, and calls
sync_to_task_engine exactly as before, and move the classification block into
_classify_execution_errors which wraps classify_execution_errors in the same
try/except (re-raising MemoryError and RecursionError, catching Exception and
logging EXECUTION_ENGINE_ERROR with exc_info). Then have
_post_execution_pipeline call these helpers in the same order (record ->
transitions -> if error then handle_recovery_and_sync -> classify) and return
the final execution_result unchanged; ensure all original symbols
(record_execution_costs, apply_post_execution_transitions, self._apply_recovery,
sync_to_task_engine, classify_execution_errors, self._error_taxonomy_config,
EXECUTION_ENGINE_TASK_TRANSITION, EXECUTION_ENGINE_ERROR, TerminationReason) and
behavior (pre_recovery_status capture and exact log messages/parameters) are
preserved.
---
Duplicate comments:
In `@src/ai_company/engine/agent_engine.py`:
- Around line 385-399: The transition logging/sync block should only run when
the task status actually changed: add a check comparing pre_recovery_status (or
pre_*_status) against ctx.task_execution.status (or post status) and skip
logging/sync if equal; extract the duplicated logic into a single helper (e.g.,
_emit_task_transition_and_sync or similar) that accepts agent_id, task_id,
pre_status, post_status and the task_engine, calls
logger.info(EXECUTION_ENGINE_TASK_TRANSITION, ...) and then awaits
sync_to_task_engine(...) so both call sites (the blocks using ctx.task_execution
and the other site referenced around the second occurrence) call this helper
only when pre_status != post_status.
In `@src/ai_company/engine/task_sync.py`:
- Around line 43-122: sync_to_task_engine is too long; split its
exception-handling and rejected-mutation branches into small private helpers to
keep the function under 50 lines. Create e.g. _handle_sync_exception(exc,
agent_id, task_id, target_status, critical) that re-raises
MemoryError/RecursionError/asyncio.CancelledError and otherwise logs using
logger.error or logger.warning based on the critical flag (preserving the
EXECUTION_ENGINE_SYNC_FAILED message and exc_info). Create e.g.
_handle_rejected_mutation(result, agent_id, task_id, target_status, critical)
that logs the rejection (preserving result.error/result.error_code and
EXECUTION_ENGINE_SYNC_FAILED) and keep sync_to_task_engine focused on building
the mutation, awaiting task_engine.submit, calling the two helpers from the
except/else branches, and returning on success (preserve the debug log for
EXECUTION_ENGINE_TASK_SYNCED and unchanged semantics).
In `@tests/unit/engine/test_task_sync.py`:
- Around line 238-251: The test test_critical_flag_does_not_raise currently only
ensures no exception when calling sync_to_task_engine with critical=True but
does not assert that logging severity changed; update the test to capture/log
records (e.g., using caplog or a mock logger) around the call and assert that a
log entry at the expected higher severity (e.g., ERROR/CRITICAL) was emitted by
sync_to_task_engine when the TaskEngineError("unavailable") side_effect occurs
on mock_te; reference the test function test_critical_flag_does_not_raise, the
mock task engine mock_te, and the sync_to_task_engine invocation to locate where
to add the log capture and severity assertion.
- Around line 180-237: Collapse the four nearly identical tests
(test_task_engine_error_swallowed, test_unexpected_exception_swallowed,
test_memory_error_propagates, test_recursion_error_propagates) into one
parametrized pytest using `@pytest.mark.parametrize`: enumerate cases of
(side_effect_exception, should_raise) where side_effects are
TaskEngineError("engine down"), RuntimeError("connection lost"),
MemoryError("out of memory"), RecursionError("maximum recursion depth exceeded")
and should_raise is False for the first two and True for the latter two; within
the single test, call _make_mock_task_engine(side_effect=...) to create mock_te
and invoke sync_to_task_engine(...) and assert either that the call completes
(when should_raise is False) or that it raises the expected exception with
pytest.raises(..., match=...) when should_raise is True, preserving test names
and docstrings as needed for clarity.
- Around line 456-498: Two tests (test_max_turns_returns_unchanged and
test_budget_exhausted_returns_unchanged) duplicate the same logic; refactor them
into a single parametrized test using pytest.mark.parametrize over
TerminationReason values (TerminationReason.MAX_TURNS,
TerminationReason.BUDGET_EXHAUSTED) to avoid duplication. Update the test
function to accept a termination_reason parameter, reuse
AgentContext.from_identity, ctx.with_task_transition, and
_make_execution_result(…, reason=termination_reason), then call
apply_post_execution_transitions and assert the result is unchanged; keep
existing fixtures (sample_agent_with_personality, sample_task_with_criteria) and
the same agent_id/task_id arguments.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: dc564689-ff6c-4d3e-b94b-00640e0ab923
📒 Files selected for processing (3)
src/ai_company/engine/agent_engine.pysrc/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (4)
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:(no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, usecopy.deepcopy()at construction andMappingProxyTypewrapping for read-only enforcement
Fordict/listfields in frozen Pydantic models, usecopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state usingmodel_copy(update=...)
Use Pydantic v2 withBaseModel,model_validator,computed_field,ConfigDict
Use@computed_fieldfor derived values instead of storing redundant fields (e.g.TokenUsage.total_tokens)
UseNotBlankStrfromcore.typesfor all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over barecreate_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Use onlyloggeras the variable name (not_logger, notlog)
Use domain-specific event name constants fromai_company.observability.eventsmodules (e.g...
Files:
src/ai_company/engine/task_sync.pysrc/ai_company/engine/agent_engine.py
src/ai_company/{providers,engine}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
RetryExhaustedErrorsignals all retries failed — the engine layer catches this to trigger fallback chains
Files:
src/ai_company/engine/task_sync.pysrc/ai_company/engine/agent_engine.py
{src,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like
example-provider,example-large-001,test-provider,test-small-001
Files:
src/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.pysrc/ai_company/engine/agent_engine.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slow
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/engine/test_task_sync.py
🧠 Learnings (6)
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
Applied to files:
src/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use `except A, B:` (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Functions must be less than 50 lines; files must be less than 800 lines
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: When review agents find valid issues (including pre-existing issues, suggestions, and findings adjacent to the PR's changes), fix them all — no deferring or 'out of scope' skipping
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to tests/**/*.py : Prefer `pytest.mark.parametrize` for testing similar cases
Applied to files:
tests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/ai_company/{providers,engine}/**/*.py : `RetryExhaustedError` signals all retries failed — the engine layer catches this to trigger fallback chains
Applied to files:
src/ai_company/engine/agent_engine.py
🧬 Code graph analysis (2)
src/ai_company/engine/task_sync.py (5)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/engine/errors.py (2)
ExecutionStateError(12-13)TaskEngineError(85-86)src/ai_company/engine/loop_protocol.py (2)
TerminationReason(28-36)ExecutionResult(79-140)src/ai_company/engine/task_engine_models.py (1)
TransitionTaskMutation(185-236)src/ai_company/engine/context.py (2)
AgentContext(87-307)with_task_transition(235-278)
src/ai_company/engine/agent_engine.py (4)
src/ai_company/engine/errors.py (1)
ExecutionStateError(12-13)src/ai_company/engine/task_sync.py (3)
apply_post_execution_transitions(150-204)sync_to_task_engine(43-121)transition_task_if_needed(124-147)src/ai_company/engine/run_result.py (1)
termination_reason(64-66)src/ai_company/engine/loop_protocol.py (1)
TerminationReason(28-36)
| async def apply_post_execution_transitions( | ||
| execution_result: ExecutionResult, | ||
| agent_id: str, | ||
| task_id: str, | ||
| task_engine: TaskEngine | None, | ||
| ) -> ExecutionResult: | ||
| """Apply post-execution task transitions based on termination reason. | ||
|
|
||
| COMPLETED triggers IN_PROGRESS -> IN_REVIEW -> COMPLETED. | ||
| SHUTDOWN triggers current status -> INTERRUPTED. | ||
| Each transition is synced to TaskEngine incrementally. | ||
| Transition failures are logged but never discard the result. | ||
| ``MemoryError`` and ``RecursionError`` propagate unconditionally. | ||
|
|
||
| Returns the original ``execution_result`` unchanged if no | ||
| transitions apply, or a copy with updated context reflecting | ||
| the furthest-reached state on success or partial failure. | ||
| """ | ||
| ctx = execution_result.context | ||
| if ctx.task_execution is None: | ||
| return execution_result | ||
|
|
||
| reason = execution_result.termination_reason | ||
|
|
||
| if reason == TerminationReason.SHUTDOWN: | ||
| return await _transition_to_interrupted( | ||
| execution_result, ctx, agent_id, task_id, task_engine | ||
| ) | ||
|
|
||
| if reason != TerminationReason.COMPLETED: | ||
| return execution_result | ||
|
|
||
| # Apply IN_PROGRESS -> IN_REVIEW -> COMPLETED stepwise so that | ||
| # ``ctx`` always reflects the furthest-reached state, even when | ||
| # one step raises (partial-completion safety). | ||
| for target, step_reason in _COMPLETION_STEPS: | ||
| try: | ||
| ctx = await _transition_and_sync( | ||
| ctx, | ||
| target_status=target, | ||
| reason=step_reason, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| task_engine=task_engine, | ||
| ) | ||
| except (ValueError, ExecutionStateError) as exc: | ||
| logger.exception( | ||
| EXECUTION_ENGINE_ERROR, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| error=f"Post-execution transition failed: {exc}", | ||
| ) | ||
| break | ||
|
|
||
| return execution_result.model_copy(update={"context": ctx}) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
apply_post_execution_transitions also exceeds the 50-line limit.
Line 150–Line 205 should be split (e.g., shutdown handler, completed handler, and final context merge) to satisfy policy and reduce branching complexity.
As per coding guidelines, “Functions must be less than 50 lines; files must be less than 800 lines”.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/ai_company/engine/task_sync.py` around lines 150 - 205, The function
apply_post_execution_transitions is over 50 lines and should be split into
smaller helpers: extract a shutdown handler (e.g., handle_shutdown) that invokes
_transition_to_interrupted with the same args and returns the resulting
ExecutionResult or early return when ctx.task_execution is None; extract a
completed handler (e.g., handle_completed_transitions) that receives the current
ctx, agent_id, task_id, task_engine and runs the for loop over _COMPLETION_STEPS
calling _transition_and_sync and logging/exiting on
ValueError/ExecutionStateError exactly as now; keep the final merge/return in
apply_post_execution_transitions to call the new helpers and return
execution_result.model_copy(update={"context": ctx}) while preserving
propagation of MemoryError and RecursionError and all existing logging and error
strings (EXECUTION_ENGINE_ERROR, step_reason, _transition_to_interrupted,
_transition_and_sync).
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/ai_company/engine/task_sync.py (1)
171-227: 🧹 Nitpick | 🔵 TrivialFunction size is borderline but acceptable.
At ~42 lines of code (excluding the docstring),
apply_post_execution_transitionsis close to the 50-line limit. The current structure is reasonable with SHUTDOWN delegated to_transition_to_interruptedand the COMPLETED path using a clear stepwise loop. If future changes push this over the limit, consider extracting the COMPLETED loop into a private helper.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/ai_company/engine/task_sync.py` around lines 171 - 227, The function apply_post_execution_transitions is approaching the 50-line limit; to keep it maintainable and avoid exceeding the threshold, extract the COMPLETED-path loop that iterates over _COMPLETION_STEPS and calls _transition_and_sync into a private helper (for example _apply_completion_steps) and have apply_post_execution_transitions call that helper, preserving the existing SHUTDOWN delegation to _transition_to_interrupted and identical error handling/logging semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tests/unit/engine/test_task_sync.py`:
- Around line 489-525: Replace the manual monkeypatching of
AgentContext.with_task_transition (using original_method and assigning
patched_transition) with unittest.mock.patch.object to ensure automatic
restoration; specifically, patch AgentContext.with_task_transition inside a with
patch.object(AgentContext, "with_task_transition", new=patched_transition):
block (or as a decorator) around the invocation of
apply_post_execution_transitions, keeping the same patched_transition behavior
that raises ExecutionStateError on the second call and preserving references to
call_count and original_transition so assertions about
out.context.task_execution and TaskStatus.IN_REVIEW remain valid.
---
Duplicate comments:
In `@src/ai_company/engine/task_sync.py`:
- Around line 171-227: The function apply_post_execution_transitions is
approaching the 50-line limit; to keep it maintainable and avoid exceeding the
threshold, extract the COMPLETED-path loop that iterates over _COMPLETION_STEPS
and calls _transition_and_sync into a private helper (for example
_apply_completion_steps) and have apply_post_execution_transitions call that
helper, preserving the existing SHUTDOWN delegation to
_transition_to_interrupted and identical error handling/logging semantics.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: d92cdab6-5b49-4744-ad2b-0c3ba3d6da01
📒 Files selected for processing (3)
docs/design/engine.mdsrc/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Agent
- GitHub Check: Greptile Review
- GitHub Check: Test (Python 3.14)
🧰 Additional context used
📓 Path-based instructions (5)
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/**/*.py: Nofrom __future__ import annotations— Python 3.14 has PEP 649
Useexcept A, B:(no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Type hints required on all public functions; mypy strict mode enforced
Google-style docstrings required on public classes and functions, enforced by ruff D rules
Create new objects, never mutate existing ones. For non-Pydantic internal collections, usecopy.deepcopy()at construction andMappingProxyTypewrapping for read-only enforcement
Fordict/listfields in frozen Pydantic models, usecopy.deepcopy()at system boundaries (tool execution, LLM provider serialization, inter-agent delegation, persistence serialization)
Use frozen Pydantic models for config/identity; separate mutable-via-copy models for runtime state usingmodel_copy(update=...)
Use Pydantic v2 withBaseModel,model_validator,computed_field,ConfigDict
Use@computed_fieldfor derived values instead of storing redundant fields (e.g.TokenUsage.total_tokens)
UseNotBlankStrfromcore.typesfor all identifier/name fields (including optional and tuple variants) instead of manual whitespace validators
Preferasyncio.TaskGroupfor fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over barecreate_task
Line length must be 88 characters (ruff enforced)
Functions must be less than 50 lines; files must be less than 800 lines
Handle errors explicitly, never silently swallow exceptions
Validate at system boundaries (user input, external APIs, config files)
Every module with business logic must have:from ai_company.observability import get_loggerthenlogger = get_logger(__name__)
Never useimport logging/logging.getLogger()/print()in application code
Use onlyloggeras the variable name (not_logger, notlog)
Use domain-specific event name constants fromai_company.observability.eventsmodules (e.g...
Files:
src/ai_company/engine/task_sync.py
src/ai_company/{providers,engine}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
RetryExhaustedErrorsignals all retries failed — the engine layer catches this to trigger fallback chains
Files:
src/ai_company/engine/task_sync.py
{src,tests}/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Never use real vendor names (Anthropic, OpenAI, Claude, GPT, etc.) in project-owned code, docstrings, comments, tests, or config examples — use generic names like
example-provider,example-large-001,test-provider,test-small-001
Files:
src/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.py
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
tests/**/*.py: Mark tests with@pytest.mark.unit,@pytest.mark.integration,@pytest.mark.e2e, and@pytest.mark.slow
Prefer@pytest.mark.parametrizefor testing similar cases
Files:
tests/unit/engine/test_task_sync.py
docs/design/*.md
📄 CodeRabbit inference engine (CLAUDE.md)
When approved deviations occur, update the relevant
docs/design/page to reflect the new reality
Files:
docs/design/engine.md
🧠 Learnings (7)
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Prefer `asyncio.TaskGroup` for fan-out/fan-in parallel operations in new code (multiple tool invocations, parallel agent calls) — use structured concurrency over bare `create_task`
Applied to files:
src/ai_company/engine/task_sync.pytests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Use `except A, B:` (no parentheses) for exception syntax on Python 3.14 per PEP 758 — ruff enforces this
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: When review agents find valid issues (including pre-existing issues, suggestions, and findings adjacent to the PR's changes), fix them all — no deferring or 'out of scope' skipping
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to src/**/*.py : Functions must be less than 50 lines; files must be less than 800 lines
Applied to files:
src/ai_company/engine/task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to tests/**/*.py : Prefer `pytest.mark.parametrize` for testing similar cases
Applied to files:
tests/unit/engine/test_task_sync.py
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: For trivial/docs-only changes, use `/pre-pr-review quick` to skip agents but still run automated checks
Applied to files:
docs/design/engine.md
📚 Learning: 2026-03-12T22:12:29.867Z
Learnt from: CR
Repo: Aureliolo/synthorg PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T22:12:29.867Z
Learning: Applies to docs/design/*.md : When approved deviations occur, update the relevant `docs/design/` page to reflect the new reality
Applied to files:
docs/design/engine.md
🧬 Code graph analysis (2)
src/ai_company/engine/task_sync.py (5)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/engine/errors.py (2)
ExecutionStateError(12-13)TaskEngineError(85-86)src/ai_company/engine/loop_protocol.py (2)
TerminationReason(28-36)ExecutionResult(79-140)src/ai_company/engine/task_engine_models.py (1)
TransitionTaskMutation(185-236)src/ai_company/engine/context.py (2)
AgentContext(87-307)with_task_transition(235-278)
tests/unit/engine/test_task_sync.py (6)
src/ai_company/core/enums.py (1)
TaskStatus(198-224)src/ai_company/engine/context.py (3)
AgentContext(87-307)from_identity(140-171)with_task_transition(235-278)src/ai_company/engine/loop_protocol.py (3)
ExecutionResult(79-140)TerminationReason(28-36)TurnRecord(39-76)src/ai_company/engine/task_engine_models.py (1)
TaskMutationResult(290-335)src/ai_company/engine/task_sync.py (3)
apply_post_execution_transitions(171-227)sync_to_task_engine(43-121)transition_task_if_needed(145-168)src/ai_company/core/task.py (1)
Task(45-261)
🔇 Additional comments (15)
src/ai_company/engine/task_sync.py (6)
1-31: LGTM!Module setup follows guidelines correctly: proper logger initialization with
get_logger(__name__), appropriate imports, and TYPE_CHECKING guard for type-only imports.
43-121: LGTM!The
sync_to_task_enginefunction correctly implements best-effort semantics:
- Non-recoverable exceptions (
MemoryError,RecursionError,asyncio.CancelledError) propagate unconditionally- Other exceptions are logged and swallowed via the extracted
_log_sync_issuehelper- Uses PEP 758 exception syntax without parentheses as required
124-142: LGTM!The
_log_sync_issuehelper correctly addresses the past review feedback by usinglogger.errorandlogger.warningdirectly instead of alogalias.
145-168: LGTM!The function is concise and correctly implements the pre-execution transition logic with
critical=Truefor the initial sync.
230-263: LGTM!The helper correctly implements the local-then-remote transition pattern with appropriate logging at INFO level for state transitions.
266-291: LGTM!The shutdown transition handler correctly catches transition errors, logs with traceback via
logger.exception, and returns the original result on failure.docs/design/engine.md (4)
229-264: LGTM!The new "AgentEngine ↔ TaskEngine Incremental Sync" section is comprehensive and well-structured. It clearly documents:
- Transition sequences per execution path
- Best-effort semantics with isolated failures
- Critical vs. non-critical sync logging
- Direct
submit()usage rationale- Snapshot channel alignment
288-290: LGTM!The
TerminationReasonenum documentation correctly includes the newPARKEDvalue alongside the existing reasons.
414-417: LGTM!The updated
run()signature correctly includes the neweffective_autonomy?parameter.
460-465: LGTM!The post-execution transitions section now properly documents
PARKEDsemantics as requested in past review feedback: tasks are suspended by an approval-timeout policy and remain at their current status until resumed. The cross-reference to the incremental sync section is helpful.tests/unit/engine/test_task_sync.py (5)
1-100: LGTM!The test module setup is well-organized with clear helper functions (
_make_sync_success,_make_sync_failure,_make_mock_task_engine,_make_execution_result) that reduce boilerplate and improve test readability.
107-254: LGTM!The
TestSyncToTaskEngineclass addresses past review feedback:
- Exception propagation tests are now parametrized (lines 209-235)
- Critical flag test now asserts log level via mock (lines 237-254)
Good coverage of all sync scenarios including no-op, success, rejection, and various exception types.
262-357: LGTM!The
TestTransitionTaskIfNeededclass provides comprehensive coverage of pre-execution transitions including:
- ASSIGNED → IN_PROGRESS with sync verification
- Passthrough for IN_PROGRESS and no task_execution
- Local-only transition when task_engine is None
442-468: LGTM!The test correctly uses
@pytest.mark.parametrizeto combineMAX_TURNSandBUDGET_EXHAUSTEDscenarios, addressing the past review feedback about reducing duplication.
527-644: LGTM!The remaining tests provide excellent coverage of edge cases:
- Shutdown transition failure returns original result
- COMPLETED path works with
task_engine=None- Sync failures (rejections and exceptions) don't block local transitions
These tests validate the core "best-effort sync" design requirement.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Sync post-recovery status to TaskEngine (typically FAILED, | ||
| # depends on recovery strategy). | ||
| ctx = execution_result.context | ||
| if ctx.task_execution is not None and pre_recovery_status is not None: | ||
| logger.info( | ||
| EXECUTION_ENGINE_TASK_TRANSITION, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| from_status=pre_recovery_status.value, | ||
| to_status=ctx.task_execution.status.value, | ||
| ) | ||
| await sync_to_task_engine( | ||
| self._task_engine, | ||
| target_status=ctx.task_execution.status, | ||
| task_id=task_id, | ||
| agent_id=agent_id, | ||
| reason=f"Post-recovery status: {ctx.task_execution.status.value}", | ||
| ) |
| logger.info( | ||
| EXECUTION_ENGINE_TASK_TRANSITION, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| from_status=pre_fatal_status.value, | ||
| to_status=error_ctx.task_execution.status.value, | ||
| ) | ||
| await sync_to_task_engine( | ||
| self._task_engine, | ||
| target_status=error_ctx.task_execution.status, | ||
| task_id=task_id, | ||
| agent_id=agent_id, | ||
| reason=f"Fatal error recovery: {type(exc).__name__}", | ||
| ) |
| _log_sync_issue( | ||
| critical=critical, | ||
| agent_id=agent_id, | ||
| task_id=task_id, | ||
| target_status=target_status, | ||
| error=result.error or "Mutation rejected (no error detail)", | ||
| error_code=result.error_code, | ||
| ) |
There was a problem hiding this comment.
error_code passed without a None fallback
The sibling field result.error correctly applies an or fallback (result.error or "Mutation rejected (no error detail)"), but result.error_code is passed directly to _log_sync_issue without any guard. TaskMutationResult.error_code is typed str | None, so a rejection with no error code (e.g. one produced by an internal mutation path that never sets error_code) will log "error_code": null in structured output. Applying the same treatment keeps the log contract consistent and avoids potential issues in dashboards or alert filters that group by this field.
| _log_sync_issue( | |
| critical=critical, | |
| agent_id=agent_id, | |
| task_id=task_id, | |
| target_status=target_status, | |
| error=result.error or "Mutation rejected (no error detail)", | |
| error_code=result.error_code, | |
| ) | |
| _log_sync_issue( | |
| critical=critical, | |
| agent_id=agent_id, | |
| task_id=task_id, | |
| target_status=target_status, | |
| error=result.error or "Mutation rejected (no error detail)", | |
| error_code=result.error_code or "unknown", | |
| ) |
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/task_sync.py
Line: 114-121
Comment:
**`error_code` passed without a `None` fallback**
The sibling field `result.error` correctly applies an `or` fallback (`result.error or "Mutation rejected (no error detail)"`), but `result.error_code` is passed directly to `_log_sync_issue` without any guard. `TaskMutationResult.error_code` is typed `str | None`, so a rejection with no error code (e.g. one produced by an internal mutation path that never sets `error_code`) will log `"error_code": null` in structured output. Applying the same treatment keeps the log contract consistent and avoids potential issues in dashboards or alert filters that group by this field.
```suggestion
_log_sync_issue(
critical=critical,
agent_id=agent_id,
task_id=task_id,
target_status=target_status,
error=result.error or "Mutation rejected (no error detail)",
error_code=result.error_code or "unknown",
)
```
How can I resolve this? If you propose a fix, please make it concise.- Add status-difference check before post-recovery and fatal-error transition logs/syncs in agent_engine.py — prevents misleading EXECUTION_ENGINE_TASK_TRANSITION events and rejected no-op mutations when recovery strategy leaves status unchanged - Replace manual monkey-patching with patch.object for cleaner automatic restoration in test_task_sync.py
| await sync_to_task_engine( | ||
| self._task_engine, | ||
| target_status=error_ctx.task_execution.status, | ||
| task_id=task_id, | ||
| agent_id=agent_id, | ||
| reason=f"Fatal error recovery: {type(exc).__name__}", | ||
| ) |
There was a problem hiding this comment.
Fatal-error sync missing critical=True
The sync_to_task_engine call here uses the default critical=False, so any failure is logged at WARNING rather than ERROR. Fatal errors are arguably the most dangerous scenario for leaving a task permanently stuck at IN_PROGRESS — the very problem this feature aims to prevent. By contrast, the ASSIGNED → IN_PROGRESS sync in transition_task_if_needed already uses critical=True.
Passing critical=True here would surface missed syncs at ERROR level, matching the severity of the error path that landed here:
| await sync_to_task_engine( | |
| self._task_engine, | |
| target_status=error_ctx.task_execution.status, | |
| task_id=task_id, | |
| agent_id=agent_id, | |
| reason=f"Fatal error recovery: {type(exc).__name__}", | |
| ) | |
| await sync_to_task_engine( | |
| self._task_engine, | |
| target_status=error_ctx.task_execution.status, | |
| task_id=task_id, | |
| agent_id=agent_id, | |
| reason=f"Fatal error recovery: {type(exc).__name__}", | |
| critical=True, | |
| ) |
The same argument applies to the analogous sync_to_task_engine call in _post_execute (the TerminationReason.ERROR recovery path, lines ~397-403) which also uses the default critical=False.
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/ai_company/engine/agent_engine.py
Line: 835-841
Comment:
**Fatal-error sync missing `critical=True`**
The `sync_to_task_engine` call here uses the default `critical=False`, so any failure is logged at WARNING rather than ERROR. Fatal errors are arguably the most dangerous scenario for leaving a task permanently stuck at `IN_PROGRESS` — the very problem this feature aims to prevent. By contrast, the `ASSIGNED → IN_PROGRESS` sync in `transition_task_if_needed` already uses `critical=True`.
Passing `critical=True` here would surface missed syncs at ERROR level, matching the severity of the error path that landed here:
```suggestion
await sync_to_task_engine(
self._task_engine,
target_status=error_ctx.task_execution.status,
task_id=task_id,
agent_id=agent_id,
reason=f"Fatal error recovery: {type(exc).__name__}",
critical=True,
)
```
The same argument applies to the analogous `sync_to_task_engine` call in `_post_execute` (the `TerminationReason.ERROR` recovery path, lines ~397-403) which also uses the default `critical=False`.
How can I resolve this? If you propose a fix, please make it concise.🤖 I have created a release *beep* *boop* --- ## [0.1.3](v0.1.2...v0.1.3) (2026-03-13) ### Features * add Mem0 memory backend adapter ([#345](#345)) ([2788db8](2788db8)), closes [#206](#206) * centralized single-writer TaskEngine with full CRUD API ([#328](#328)) ([9c1a3e1](9c1a3e1)) * incremental AgentEngine → TaskEngine status sync ([#331](#331)) ([7a68d34](7a68d34)), closes [#323](#323) * web dashboard pages — views, components, tests, and review fixes ([#354](#354)) ([b165ec4](b165ec4)) * web dashboard with Vue 3 + PrimeVue + Tailwind CSS ([#347](#347)) ([06416b1](06416b1)) ### Bug Fixes * harden coordination pipeline with validators, logging, and fail-fast ([#333](#333)) ([2f10d49](2f10d49)), closes [#205](#205) * repo-wide security hardening from ZAP, Scorecard, and CodeQL audit ([#357](#357)) ([27eb288](27eb288)) ### CI/CD * add pip-audit, hadolint, OSSF Scorecard, ZAP DAST, and pre-push hooks ([#350](#350)) ([2802d20](2802d20)) * add workflow_dispatch trigger to PR Preview for Dependabot PRs ([#326](#326)) ([4c7b6d9](4c7b6d9)) * bump astral-sh/setup-uv from 7.4.0 to 7.5.0 in the minor-and-patch group ([#335](#335)) ([98dd8ca](98dd8ca)) ### Maintenance * bump the minor-and-patch group across 1 directory with 3 updates ([#352](#352)) ([031b1c9](031b1c9)) * **deps:** bump devalue from 5.6.3 to 5.6.4 in /site in the npm_and_yarn group across 1 directory ([#324](#324)) ([9a9c600](9a9c600)) * migrate docs build from MkDocs to Zensical ([#330](#330)) ([fa8bf1d](fa8bf1d)), closes [#329](#329) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Summary
AgentEnginenow syncs task status transitions toTaskEngineat each lifecycle point (IN_PROGRESS → IN_REVIEW → COMPLETED), rather than reporting only the final status. This gives real-time visibility into execution progress and improves crash recovery (a crash mid-execution leaves the task at the last-reached stage, not stuck at ASSIGNED).engine/task_sync.py: Sync and transition logic extracted fromAgentEngine(1094→858 lines) into a dedicated 263-line module for cohesion and maintainability._handle_fatal_errorto prevent tasks being stuck at IN_PROGRESS after fatal errors that bypass the normal post-execution pipeline.TaskEngine._SNAPSHOT_CHANNELchanged from"task_engine"to"tasks"to matchCHANNEL_TASKSinapi.channels, ensuring WebSocket consumers receive task state events.Changes
src/ai_company/engine/task_sync.pysync_to_task_engine,transition_task_if_needed,apply_post_execution_transitions, and internal transition helperssrc/ai_company/engine/agent_engine.pysrc/ai_company/engine/task_engine.py_SNAPSHOT_CHANNELto matchCHANNEL_TASKSsrc/ai_company/observability/events/execution.pyEXECUTION_ENGINE_TASK_SYNCEDandEXECUTION_ENGINE_SYNC_FAILEDevent constants (moved to ENGINE cluster)docs/design/engine.mdrun()signature, added step 10 cross-referenceCLAUDE.mdengine/package descriptiontests/unit/engine/test_agent_engine.pyReview coverage
Pre-reviewed by 9 agents (code-reviewer, python-reviewer, pr-test-analyzer, silent-failure-hunter, comment-analyzer, type-design-analyzer, logging-audit, resilience-audit, docs-consistency). 14 findings addressed:
expected_versionparameter andint|Nonereturn type (optimistic concurrency was never wired up)uuid4import to module level_SNAPSHOT_CHANNEL==CHANNEL_TASKSinvariant testRecursionErrorpropagation testTest plan
Closes #323