fix(queue): reclaim phantom lane slots that wedge embedded-run dispatch#12
Conversation
On the membrane deployment, WhatsApp DM agent turns intermittently stalled forever at embedded_run:started: the liveness snapshot showed active=0 waiting=0 queued=1 while one entry sat queued, the stage-stall watchdog logged nothing, and no drainLane/lane-wait/lane-task logs fired during the stall (tulgey#238). Root cause (hypothesis 1, reproduced): the embedded-run session lane is enqueued with NO finite taskTimeoutMs — only the inner global-lane enqueue carried withLaneTimeout. A session-lane task whose promise never settles holds its activeTaskIds slot forever. Subsequent enqueues pump, but the while-loop sees activeTaskIds.size >= maxConcurrent and returns silently, so queued work never dispatches and nothing logs. The stuck-session recovery only resets the session lane (released=0 when the slot is the phantom), so the loop re-enters. Same "unsettled promise leaves a slot held" family as #11. Fix (minimal + observable): - Track per-active-task start + progress timestamps; in pump(), before giving up on a saturated lane, reclaim any slot held past a 10-minute no-progress ceiling (bumps generation so the late completion is ignored, mirroring the #11 steal pattern) and warn `lane slot reclaimed`. - Emit a throttled `lane dispatch blocked` warn at the previously-silent decision point (lane, active, maxConcurrent, queued, oldestActiveAgeMs). - Apply withLaneTimeout to the embedded-run session lane too, so the outer slot self-expires like the global one already did. Tests (both hang/fail pre-fix): "reclaims a phantom slot whose untimed task never settles" and "warns at the dispatch-blocked decision point". Refs imperfect-co/tulgey#238 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address cr review: bumping state.generation in reclaimStalledSlots invalidated every in-flight task on the lane, not just the phantom — on a multi-concurrency lane a healthy sibling's completion would be silently dropped (completeTask returns false, no pump/wake). Reclaim now removes only the stalled taskIds; completeTask no-ops via a membership check so a late completion of a reclaimed task frees nothing and does not re-pump. Adds a multi-concurrency regression test: reclaiming one stalled slot leaves a healthy sibling task intact. Refs imperfect-co/tulgey#238 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Warning Review limit reached
More reviews will be available in 51 minutes and 28 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis PR hardens command-queue lane concurrency control by detecting and reclaiming phantom active slots held by non-progressing tasks. It introduces per-lane stall bookkeeping, implements slot reclamation in the pump loop, and applies the same timeout/progress tracking to embedded-agent session lanes to prevent indefinite queue exhaustion. ChangesStall Detection and Recovery Infrastructure
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/process/command-queue.ts`:
- Around line 283-291: When reclaiming stalled tasks you remove IDs from
state.activeTaskIds but never wake any waiters; call notifyActiveTaskWaiters()
after deleting the stalled IDs (before returning) so waitForActiveTasks()
re-checks and unblocks any waiters; locate the loop that deletes taskId from
state.activeTaskIds / state.activeTaskInfo in command-queue.ts and add a call to
notifyActiveTaskWaiters() (the existing method notifyActiveTaskWaiters)
immediately after the deletions and before the diag.warn/return.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3d294079-07eb-4e25-885a-eab5fa663dfa
📒 Files selected for processing (3)
src/agents/embedded-agent-runner/run.tssrc/process/command-queue.test.tssrc/process/command-queue.ts
| for (const taskId of stalled) { | ||
| state.activeTaskIds.delete(taskId); | ||
| state.activeTaskInfo.delete(taskId); | ||
| } | ||
| diag.warn( | ||
| `lane slot reclaimed: lane=${state.lane} reclaimed=${stalled.length} ` + | ||
| `ceilingMs=${LANE_SLOT_STALL_CEILING_MS} active=${state.activeTaskIds.size} queued=${state.queue.length}`, | ||
| ); | ||
| return stalled.length; |
There was a problem hiding this comment.
Notify active-task waiters when reclaim frees a slot.
Reclaiming removes the taskId from activeTaskIds, but waitForActiveTasks() is only re-checked via notifyActiveTaskWaiters(). Since reclaimed tasks now take the no-op path in completeTask(), a waiter captured before reclaim can hang forever even though the slot was freed.
Suggested fix
for (const taskId of stalled) {
state.activeTaskIds.delete(taskId);
state.activeTaskInfo.delete(taskId);
}
+ notifyActiveTaskWaiters();
diag.warn(
`lane slot reclaimed: lane=${state.lane} reclaimed=${stalled.length} ` +
`ceilingMs=${LANE_SLOT_STALL_CEILING_MS} active=${state.activeTaskIds.size} queued=${state.queue.length}`,
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for (const taskId of stalled) { | |
| state.activeTaskIds.delete(taskId); | |
| state.activeTaskInfo.delete(taskId); | |
| } | |
| diag.warn( | |
| `lane slot reclaimed: lane=${state.lane} reclaimed=${stalled.length} ` + | |
| `ceilingMs=${LANE_SLOT_STALL_CEILING_MS} active=${state.activeTaskIds.size} queued=${state.queue.length}`, | |
| ); | |
| return stalled.length; | |
| for (const taskId of stalled) { | |
| state.activeTaskIds.delete(taskId); | |
| state.activeTaskInfo.delete(taskId); | |
| } | |
| notifyActiveTaskWaiters(); | |
| diag.warn( | |
| `lane slot reclaimed: lane=${state.lane} reclaimed=${stalled.length} ` + | |
| `ceilingMs=${LANE_SLOT_STALL_CEILING_MS} active=${state.activeTaskIds.size} queued=${state.queue.length}`, | |
| ); | |
| return stalled.length; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/process/command-queue.ts` around lines 283 - 291, When reclaiming stalled
tasks you remove IDs from state.activeTaskIds but never wake any waiters; call
notifyActiveTaskWaiters() after deleting the stalled IDs (before returning) so
waitForActiveTasks() re-checks and unblocks any waiters; locate the loop that
deletes taskId from state.activeTaskIds / state.activeTaskInfo in
command-queue.ts and add a call to notifyActiveTaskWaiters() (the existing
method notifyActiveTaskWaiters) immediately after the deletions and before the
diag.warn/return.
…aimed Address cr review: give the healthy sibling a fresh taskTimeoutProgressAtMs callback so the ceiling check spares it (previously it would also look stalled, weakening the assertion), capture the trigger promise instead of fire-and-forget, and assert the post-reclaim lane state synchronously before the queued task's microtask frees its slot. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…ettled flushes (#15) A flush whose downstream work never settles (hung pre-agent media call in processMessage) parked every later inbound for that chat key on a dead promise chain: messages logged as Inbound, never reached the command queue, and the chat went permanently silent until restart re-primed the same poison. Fourth member of the unbounded await-the-prior-holder family (session lock #6, models.json #11, lane slot #12). Bounded at OPENCLAW_INBOUND_CHAIN_WAIT_MS (default 5 min) with a warn naming the chat key. Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Prod symptom
On the membrane deployment (fork 2026.6.3 @ 9a1a211, already includes models.json lock fix #11 + watchdog #10), WhatsApp DM agent turns intermittently stalled forever at
embedded_run:started(imperfect-co/tulgey#238):[diagnostic] stalled session: ... state=idle queueDepth=1 reason=active_work_without_progress activeWorkKind=embedded_run lastProgress=embedded_run:started recovery=noneactive=0 waiting=0 queued=1— idle while one entry sits queueddrainLane/lane wait/lane tasklogs fired during the stall — the silent failure was inside the queue's dispatch decisionreleased=0and re-entered the loopRoot cause (hypothesis 1, reproduced)
The embedded-run session lane was enqueued with no finite
taskTimeoutMs— only the inner global-lane enqueue carriedwithLaneTimeout(run.tsenqueueGlobalvsenqueueSession). A session-lane task whose promise never settles holds itsactiveTaskIdsslot forever. Later enqueuespump(), but thewhileloop seesactiveTaskIds.size >= maxConcurrentand returns silently — queued work never dispatches and nothing logs. Stuck-session recovery only resets the session lane (resetCommandLane(sessionLane)), and with the phantom being the session slot it released 0. Same "unsettled promise leaves a slot held" family as #11.Fix (minimal + observable)
pump(), before giving up on a saturated lane, reclaim any slot held past a 10-minute no-progress ceiling and warnlane slot reclaimed. Only the stalled taskIds are removed;completeTaskno-ops via a membership check so a healthy sibling on a multi-concurrency lane is never dropped (per cr review).lane dispatch blockedwarn at the previously-silent decision point (lane, active, maxConcurrent, queued, oldestActiveAgeMs).withLaneTimeoutto the embedded-run session lane too, so the outer slot self-expires like the global one already did.Tests
Both hang/fail against unfixed source (the phantom test hangs to the 120s test timeout — the exact prod stall):
reclaims a phantom slot whose untimed task never settles (tulgey#238)reclaiming one stalled slot leaves a healthy sibling task intactwarns at the dispatch-blocked decision point that was previously silentFull
src/process/command-queue.test.tssuite (33) + runner lane/timeout tests green; typecheck clean.Follow-up
lane slot reclaimed/lane dispatch blockedwarns appear for any recurrence and that no session stalls past the ceiling.🤖 Generated with Claude Code
Summary by CodeRabbit
Bug Fixes
Tests