fix: resolve orchestrator deadlock on parallel task completion#178
fix: resolve orchestrator deadlock on parallel task completion#178rileyhilliard merged 2 commits intomainfrom
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughAdds an Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Dispatcher
participant Workers
participant ResultCollector
participant allDone as allDone (chan)
Client->>Dispatcher: Run(tasks, hosts)
Dispatcher->>Workers: populate taskQueue / send tasks
loop per-task processing
Workers->>Workers: execute task
alt success or terminal failure
Workers->>ResultCollector: send TaskResult
ResultCollector->>ResultCollector: increment collected
else retryable / host unreachable
Workers->>Dispatcher: send TaskInfo to requeueChan
Dispatcher->>Dispatcher: requeue or mark host unavailable
end
Dispatcher->>Dispatcher: monitor allDone in select
end
Note over ResultCollector, allDone: when collected == expected
ResultCollector->>allDone: close allDone
allDone-->>Dispatcher: unblock dispatcher select
Dispatcher->>Dispatcher: drain requeueChan, emit failures if all hosts unavailable
Dispatcher->>Client: return final aggregated results
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3 | ❌ 3❌ Failed checks (1 warning, 2 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…el tasks complete (#177) Workers blocked on taskQueue while the dispatcher blocked on requeueChan, creating a circular dependency that prevented shutdown. Added an allDone signal channel (guarded by sync.Once) that fires when all expected results are collected, letting the dispatcher exit and unblock the workers.
c78dbb2 to
223ee9f
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/parallel/orchestrator.go (1)
168-199:⚠️ Potential issue | 🟠 MajorRequeued tasks can be lost when
allHostsUnavailable()triggers mid-dispatch.If some tasks have already been sent to
taskQueue(removed frompending) beforeallHostsUnavailable()returns true, those tasks may be picked up by workers, fail, get sent torequeueChan, and then be silently discarded — the dispatcher has already returned, and the cleanup goroutine just closesrequeueChanwithout draining it.This means those tasks produce no
TaskResult, socollectednever reachesexpectedResults, andallDonenever fires. The function still returns (becauseresultChangets closed by the cleanup goroutine), but with incomplete results.Consider draining
requeueChanafterwg.Wait()and emitting failure results for any orphaned tasks before closingresultChan:Proposed fix in the cleanup goroutine (lines 258-263)
go func() { wg.Wait() close(requeueChan) + // Drain any orphaned requeued tasks and emit failure results + for task := range requeueChan { + resultChan <- TaskResult{ + TaskName: task.Name, + TaskIndex: task.Index, + Command: task.Command, + Host: "none", + ExitCode: 1, + Error: fmt.Errorf("all hosts unavailable"), + StartTime: time.Now(), + EndTime: time.Now(), + } + } <-dispatcherDone close(resultChan) }()Wait —
close(requeueChan)makesrange requeueChandrain the remaining buffered items. That should work. But note: this drains items sent before the close. Since workers have already exited (wg.Wait()returned), no new sends can happen.
🤖 Fix all issues with AI agents
In `@internal/parallel/orchestrator_test.go`:
- Around line 870-875: Replace the weak "at least one" assertion on
result.TaskResults with a strict equality check against the expected count:
assert that len(result.TaskResults) equals the expectedResults (which is
len(o.tasks) in this test) so the test fails if any task result is missing; keep
the existing loop that asserts each tr.Success() is false to ensure all returned
results are failures.
🧹 Nitpick comments (1)
internal/parallel/orchestrator.go (1)
206-243: Dispatcher dropping requeued tasks onallDone(lines 237-238) is safe only if the invariant holds.When
allDonefires in the inner select (line 237), the requeuedtaskread at line 212 is silently dropped. This is correct only ifcollected >= expectedResultsguarantees every task already has a result. Since requeued tasks explicitly skip sending a result (line 385-402 inhostWorkerWithRequeue), a requeued task in-flight here should meancollectedhasn't counted it yet — which contradictsallDonehaving fired.In practice this path is likely unreachable under normal conditions, but it's worth a brief comment explaining why the drop is safe (or adding a defensive failure result).
…iple tasks, only 1 result was being produced instead of N
Closes #177
Summary
Parallel tasks hang indefinitely after all tasks complete. The reported symptom is correct, but the root cause is different from what the issue describes.
Root cause analysis
#177 attributes the hang to a signal handler goroutine leak (
signal.Notifywithoutsignal.Stop). That's not it. A goroutine blocked on<-sigChancan't prevent Go from exiting, because Go terminates all goroutines whenmain()returns. Theworkflow.gocleanup pattern is good hygiene but isn't what keeps the process alive.The actual problem is a channel deadlock in the orchestrator's shutdown sequence (
internal/parallel/orchestrator.go). After all tasks finish:taskQueuefor task := range requeueChan, waiting for requeued tasks that will never comewg.Wait(), waiting for workers to exitCircular dependency: workers wait on
taskQueue(closed by dispatcher) while the dispatcher waits onrequeueChan(closed only after workers exit). Nobody can make progress.Reproduction
Built the binary, ran
rr quick-check(parallel: [vet, lint]). Process hung after both tasks completed. Usedkill -SIGABRTto get goroutine dumps confirming workers stuck in[select]at thetaskQueueread and the dispatcher stuck onrequeueChan.Fix
Added an
allDonesignal channel that gets closed (viasync.Once) when all expected results are collected. The dispatcher now selects onallDonealongsiderequeueChanandctx.Done(), so it can exit once everything's accounted for. This closestaskQueue, which unblocks workers, and the whole shutdown chain completes.Changes
allDonechannel to break the circular shutdown dependency inorchestrator.Run()close(allDone)withsync.Onceto prevent double-close if a future code path produces unexpected extra resultsallDonein both the initial dispatch loop and the post-dispatch requeue loopexpectedResultsandcollectedcount in the result-gathering loopTesting
TestOrchestrator_RemotePath_CompletesWithoutDeadlockregression test with 4 scenarios (single/multi task, single/multi host, more tasks than hosts)Notes
The signal handler cleanup the reporter flagged (
signal.Stop/closeinparallel.go,run.go,task.go) is a real gap worth fixing for hygiene, but it's a separate concern. It doesn't cause the hang and should be addressed in a follow-up.Summary by CodeRabbit
Bug Fixes
Tests