Skip to content

fix: queue processing bug#1595

Merged
yohamta0 merged 11 commits into
mainfrom
queue-processing-bug
Jan 21, 2026
Merged

fix: queue processing bug#1595
yohamta0 merged 11 commits into
mainfrom
queue-processing-bug

Conversation

@yohamta0

@yohamta0 yohamta0 commented Jan 21, 2026

Copy link
Copy Markdown
Collaborator

Fix an issue reported by @ghansham

Summary by CodeRabbit

  • Bug Fixes

    • Enhanced queue processor stability with improved error handling and recovery mechanisms for DAG file operations during concurrent processing.
  • Tests

    • Significantly expanded integration test coverage for queue management and scheduling. Reorganized tests with improved fixtures and assertions to verify concurrency limits, priority handling, and DAG execution behavior.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Jan 21, 2026

Copy link
Copy Markdown

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough

Walkthrough

The changes introduce a new queue integration test framework with reusable fixtures and test helpers, then establish a dedicated queue_test.go module. Additionally, the queue processor's DAG reading logic is enhanced with a retry mechanism featuring exponential backoff to handle race conditions during DAG file writes.

Changes

Cohort / File(s) Change Summary
Queue integration test fixtures
internal/intg/queue/fixture_test.go
New test fixture providing reusable scaffolding for DAG-based queue testing, including configuration helpers (WithQueue, WithGlobalQueue), enqueue operations with optional priorities, scheduler management (StartScheduler, Stop), and concurrency verification utilities (WaitDrain, AssertConcurrent).
Queue test suite
internal/intg/queue/queue_test.go
New integration test suite with four tests (TestBasicProcessing, TestGlobalConcurrency, TestDAGMaxActiveRuns, TestPriorityOrdering) validating queue processing, concurrency limits, DAG-level active run constraints, and priority ordering semantics.
Scheduler integration test cleanup
internal/intg/sched_test.go
Large-scale pruning removing queue-related tests (TestQueueProcessing, TestGlobalQueueMaxConcurrency, TestDAGQueueMaxActiveRuns, and others) with retention of only TestCronScheduleRunsTwice; consolidation of queue tests into dedicated queue_test.go.
Queue processor retry mechanism
internal/service/scheduler/queue_processor.go
Introduces readDAGFromAttempt helper with 3-attempt retry loop and exponential backoff to handle race conditions where DAG files are not yet written during item processing; adds context-aware cancellation handling during backoff delays.
Queue processor test refactoring
internal/service/scheduler/queue_processor_test.go
Comprehensive refactor from monolithic test setup to fixture-based approach using new types (syncBuffer, queueFixture); replaces hard-coded configuration with fixture-driven processor initialization; expands test coverage with new cases for dynamic concurrency, global queues, priority ordering, and failure scenarios.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • dagu-org/dagu#1482: Modifies queue_processor.go to change DAG-read behavior and how queue maxConcurrency is updated for non-global queues with retry/backoff logic.
  • dagu-org/dagu#1457: Alters queue_processor.go's item processing flow by changing how item data is read and adding retry/backoff adjustments to control flow.
🚥 Pre-merge checks | ✅ 1 | ❌ 2
❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 28.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Title check ❓ Inconclusive The title 'fix: queue processing bug' is vague and generic, lacking specificity about which aspect of queue processing is being fixed. Clarify the title with a more specific description of the bug being fixed, e.g., 'fix: retry logic in queue processor for race condition' or similar.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🤖 Fix all issues with AI agents
In `@internal/intg/queue/fixture_test.go`:
- Around line 150-160: Stop() currently ignores the value received from
f.schedDone so scheduler errors from RunCommandWithError are lost; update Stop()
to read the error from f.schedDone in the select, capture it into a variable
(e.g., err := <-f.schedDone), and assert/handle it (for tests call
f.t.Fatalf("scheduler error on shutdown: %v", err) or fail the test if err !=
nil). Keep the existing timeout branch but ensure the non-timeout path checks
the error and fails when it's not nil; reference fixture.Stop, f.schedDone,
f.th.Cancel(), and RunCommandWithError when making the change.
- Around line 101-115: The helper enqueueWithPriority silently ignores errors;
update it to check and fail tests on errors by capturing returned errors from
CreateAttempt, os.MkdirAll, att.Open, att.Write, att.Close and
f.th.QueueStore.Enqueue and assert with require.NoError(f.t, err) after each
call (keeping the same variables and flow in enqueueWithPriority) so failures
surface consistently like other fixture methods.
- Around line 140-148: The WaitDrain closure currently ignores errors from
QueueStore.List which can make the drain check succeed falsely; update the
closure in function WaitDrain to capture the returned error (items, err :=
f.th.QueueStore.List(f.th.Context, f.queue)), assert it with
require.NoError(f.t, err) (or require.NoErrorf to include context) before using
len(items), and keep the existing f.t.Logf for visibility; reference WaitDrain,
QueueStore.List, f.th.QueueStore, f.t.Logf and require.Eventually to locate and
modify the code.
- Around line 177-185: In collectStartTimes, don't ignore errors from
f.th.DAGRunStore.FindAttempt, att.ReadStatus, and stringutil.ParseTime; update
the function to call require.NoError (and require.NotNil where appropriate) on
the returned error/values so the test fails fast instead of causing nil derefs
or silent zero timestamps—specifically wrap the calls to
f.th.DAGRunStore.FindAttempt, att.ReadStatus, and stringutil.ParseTime in error
checks using require.NoError (and require.NotNil for att/st) inside
collectStartTimes.

In `@internal/service/scheduler/queue_processor_test.go`:
- Around line 109-121: The test fixture function enqueueToQueue currently
ignores errors from CreateAttempt, Open, Write, Close and Enqueue; update
enqueueToQueue (and preserve enqueueWithPriority) to capture each returned error
(e.g. run, err := f.dagRunStore.CreateAttempt(...)) and assert success using
require.NoError(f.t, err) (or require.NoError(requireCtx, err) consistent with
the file) immediately after each call (CreateAttempt, run.Open, run.Write,
run.Close, and f.queueStore.Enqueue) so failures fail fast and tests are
reliable.
- Around line 197-208: The test TestQueueProcessor_ConcurrencyLimit ignores the
error return from f.queueStore.List(f.ctx, "conc-dag"); update this test (and
the preceding similar case) to capture the returned error and assert it with
require.NoError(t, err) before using items, e.g., change calls to "items, err :=
f.queueStore.List(...)" and then "require.NoError(t, err)" so backend failures
are surfaced; make the same fix in the neighboring test that also drops the List
error.
- Around line 80-92: In enqueueRuns, several calls ignore errors
(dagRunStore.CreateAttempt, run.Open, run.Write, run.Close, queueStore.Enqueue).
Replace the blank identifier assignments with explicit checks using
require.NoError on the fixture's testing field (use require.NoError(f.t, err))
immediately after each call (for CreateAttempt's err, and for err returned by
Open, Write, Close, Enqueue) so the test fixture fails fast on setup errors;
locate these in the queueFixture.enqueueRuns function and add the
require.NoError checks referencing those call results.
- Around line 166-195: In TestQueueProcessor_PriorityOrdering, each call to
item.Data() (for item1..item4) ignores the returned error; update the four calls
to capture the error (e.g., ref1, err := item1.Data()) and add
require.NoError(t, err) before accessing ref1.ID (and similarly for
item2..item4) so data errors are validated prior to assertions; change only the
item.Data() assignments and add the require.NoError checks in the existing test.
- Around line 154-164: The test TestQueueProcessor_ItemsRemainOnFailure ignores
the error returned by f.queueStore.List which can mask store failures; update
the assertion to capture the error (e.g., items, err := f.queueStore.List(f.ctx,
"fifo-dag")) and assert require.NoError(t, err) before require.Len so the test
fails on store errors; ensure you reference f.queueStore.List and use
require.NoError(t, err) then proceed to require.Len(t, items, 2, "...").
🧹 Nitpick comments (1)
internal/service/scheduler/queue_processor.go (1)

503-551: Consider reusing BackoffConfig for DAG-read retries.

Hard-coded retry counts/intervals make tuning harder and diverge from the processor’s configurable backoff settings.

Comment thread internal/intg/queue/fixture_test.go
Comment thread internal/intg/queue/fixture_test.go
Comment thread internal/intg/queue/fixture_test.go
Comment thread internal/intg/queue/fixture_test.go
Comment thread internal/service/scheduler/queue_processor_test.go
Comment thread internal/service/scheduler/queue_processor_test.go
Comment thread internal/service/scheduler/queue_processor_test.go
Comment thread internal/service/scheduler/queue_processor_test.go
Comment thread internal/service/scheduler/queue_processor_test.go
@yohamta0 yohamta0 merged commit af20428 into main Jan 21, 2026
4 checks passed
@yohamta0 yohamta0 deleted the queue-processing-bug branch January 21, 2026 11:03
@codecov

codecov Bot commented Jan 21, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 82.60870% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 61.82%. Comparing base (8852bc9) to head (4c31140).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/scheduler/queue_processor.go 86.36% 2 Missing and 1 partial ⚠️
internal/runtime/manager.go 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1595      +/-   ##
==========================================
- Coverage   61.83%   61.82%   -0.01%     
==========================================
  Files         281      282       +1     
  Lines       31173    31462     +289     
==========================================
+ Hits        19276    19452     +176     
- Misses      10179    10279     +100     
- Partials     1718     1731      +13     
Files with missing lines Coverage Δ
internal/runtime/manager.go 22.83% <0.00%> (ø)
internal/service/scheduler/queue_processor.go 44.51% <86.36%> (+6.21%) ⬆️

... and 13 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 8852bc9...4c31140. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant