fix: queue processing bug#1595
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThe changes introduce a new queue integration test framework with reusable fixtures and test helpers, then establish a dedicated queue_test.go module. Additionally, the queue processor's DAG reading logic is enhanced with a retry mechanism featuring exponential backoff to handle race conditions during DAG file writes. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Fix all issues with AI agents
In `@internal/intg/queue/fixture_test.go`:
- Around line 150-160: Stop() currently ignores the value received from
f.schedDone so scheduler errors from RunCommandWithError are lost; update Stop()
to read the error from f.schedDone in the select, capture it into a variable
(e.g., err := <-f.schedDone), and assert/handle it (for tests call
f.t.Fatalf("scheduler error on shutdown: %v", err) or fail the test if err !=
nil). Keep the existing timeout branch but ensure the non-timeout path checks
the error and fails when it's not nil; reference fixture.Stop, f.schedDone,
f.th.Cancel(), and RunCommandWithError when making the change.
- Around line 101-115: The helper enqueueWithPriority silently ignores errors;
update it to check and fail tests on errors by capturing returned errors from
CreateAttempt, os.MkdirAll, att.Open, att.Write, att.Close and
f.th.QueueStore.Enqueue and assert with require.NoError(f.t, err) after each
call (keeping the same variables and flow in enqueueWithPriority) so failures
surface consistently like other fixture methods.
- Around line 140-148: The WaitDrain closure currently ignores errors from
QueueStore.List which can make the drain check succeed falsely; update the
closure in function WaitDrain to capture the returned error (items, err :=
f.th.QueueStore.List(f.th.Context, f.queue)), assert it with
require.NoError(f.t, err) (or require.NoErrorf to include context) before using
len(items), and keep the existing f.t.Logf for visibility; reference WaitDrain,
QueueStore.List, f.th.QueueStore, f.t.Logf and require.Eventually to locate and
modify the code.
- Around line 177-185: In collectStartTimes, don't ignore errors from
f.th.DAGRunStore.FindAttempt, att.ReadStatus, and stringutil.ParseTime; update
the function to call require.NoError (and require.NotNil where appropriate) on
the returned error/values so the test fails fast instead of causing nil derefs
or silent zero timestamps—specifically wrap the calls to
f.th.DAGRunStore.FindAttempt, att.ReadStatus, and stringutil.ParseTime in error
checks using require.NoError (and require.NotNil for att/st) inside
collectStartTimes.
In `@internal/service/scheduler/queue_processor_test.go`:
- Around line 109-121: The test fixture function enqueueToQueue currently
ignores errors from CreateAttempt, Open, Write, Close and Enqueue; update
enqueueToQueue (and preserve enqueueWithPriority) to capture each returned error
(e.g. run, err := f.dagRunStore.CreateAttempt(...)) and assert success using
require.NoError(f.t, err) (or require.NoError(requireCtx, err) consistent with
the file) immediately after each call (CreateAttempt, run.Open, run.Write,
run.Close, and f.queueStore.Enqueue) so failures fail fast and tests are
reliable.
- Around line 197-208: The test TestQueueProcessor_ConcurrencyLimit ignores the
error return from f.queueStore.List(f.ctx, "conc-dag"); update this test (and
the preceding similar case) to capture the returned error and assert it with
require.NoError(t, err) before using items, e.g., change calls to "items, err :=
f.queueStore.List(...)" and then "require.NoError(t, err)" so backend failures
are surfaced; make the same fix in the neighboring test that also drops the List
error.
- Around line 80-92: In enqueueRuns, several calls ignore errors
(dagRunStore.CreateAttempt, run.Open, run.Write, run.Close, queueStore.Enqueue).
Replace the blank identifier assignments with explicit checks using
require.NoError on the fixture's testing field (use require.NoError(f.t, err))
immediately after each call (for CreateAttempt's err, and for err returned by
Open, Write, Close, Enqueue) so the test fixture fails fast on setup errors;
locate these in the queueFixture.enqueueRuns function and add the
require.NoError checks referencing those call results.
- Around line 166-195: In TestQueueProcessor_PriorityOrdering, each call to
item.Data() (for item1..item4) ignores the returned error; update the four calls
to capture the error (e.g., ref1, err := item1.Data()) and add
require.NoError(t, err) before accessing ref1.ID (and similarly for
item2..item4) so data errors are validated prior to assertions; change only the
item.Data() assignments and add the require.NoError checks in the existing test.
- Around line 154-164: The test TestQueueProcessor_ItemsRemainOnFailure ignores
the error returned by f.queueStore.List which can mask store failures; update
the assertion to capture the error (e.g., items, err := f.queueStore.List(f.ctx,
"fifo-dag")) and assert require.NoError(t, err) before require.Len so the test
fails on store errors; ensure you reference f.queueStore.List and use
require.NoError(t, err) then proceed to require.Len(t, items, 2, "...").
🧹 Nitpick comments (1)
internal/service/scheduler/queue_processor.go (1)
503-551: Consider reusing BackoffConfig for DAG-read retries.Hard-coded retry counts/intervals make tuning harder and diverge from the processor’s configurable backoff settings.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1595 +/- ##
==========================================
- Coverage 61.83% 61.82% -0.01%
==========================================
Files 281 282 +1
Lines 31173 31462 +289
==========================================
+ Hits 19276 19452 +176
- Misses 10179 10279 +100
- Partials 1718 1731 +13
... and 13 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Fix an issue reported by @ghansham
Summary by CodeRabbit
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.