Skip to content

fix(scheduler): queue processor should respect maxConcurrency config in global config#1482

Merged
yohamta0 merged 6 commits into
mainfrom
fix-queue-processor
Dec 14, 2025
Merged

fix(scheduler): queue processor should respect maxConcurrency config in global config#1482
yohamta0 merged 6 commits into
mainfrom
fix-queue-processor

Conversation

@yohamta0

@yohamta0 yohamta0 commented Dec 14, 2025

Copy link
Copy Markdown
Collaborator

Summary by CodeRabbit

  • New Features

    • Configurable exponential backoff for scheduler retries.
    • Global queues from configuration persist when idle and keep configured concurrency.
  • Improvements

    • Increased server startup timeout for greater reliability.
    • Queue management now distinguishes persistent global queues from dynamic queues.
  • Tests

    • Added integration test validating global-queue concurrency.
    • Updated scheduler tests to inject a fast backoff config for deterministic retries.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai

coderabbitai Bot commented Dec 14, 2025

Copy link
Copy Markdown

Walkthrough

Introduces a configurable exponential backoff for the scheduler queue processor, marks config-defined queues as global (preserved and treated differently for concurrency), adds an integration test for global-queue max concurrency (duplicated in the diff), and increases test server startup timeout to 10 seconds.

Changes

Cohort / File(s) Summary
Queue Processor Core
internal/service/scheduler/queue_processor.go
Added BackoffConfig and DefaultBackoffConfig(); made QueueProcessor accept functional options (QueueProcessorOption) and WithBackoffConfig(); added backoffConfig field; introduced isGlobal on queue and isGlobalQueue() accessor; preserve global queues during cleanup and skip updating their max-concurrency at runtime.
Queue Processor Tests
internal/service/scheduler/queue_processor_test.go
Replaced global backoff constant mutations with a testBackoffConfig() helper and inject via WithBackoffConfig(...); marked TestQueueProcessor_StrictFIFO to run in parallel.
Integration Tests
internal/integration/scheduler_test.go
Added TestGlobalQueueMaxConcurrency integration test (appears twice in diff) that configures a global queue with maxActiveRuns, enqueues multiple DAG runs, starts the scheduler, and asserts concurrent starts by comparing run start times.
Test Server
internal/test/server.go
Increased SetupServer startup timeout from 5s to 10s.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Areas requiring extra attention:

  • Duplicate TestGlobalQueueMaxConcurrency in internal/integration/scheduler_test.go — likely a merge artifact.
  • Global vs dynamic queue lifecycle and cleanup behavior across runs.
  • New variadic NewQueueProcessor options — verify existing call sites compile and receive correct defaults.
  • Timing-based assertions in the new integration test — potential flakiness under slow CI.

Poem

🐰 I hopped through queues both global and new,
Tweaked backoffs that tumble, then gently renew.
Tests race the sunrise to prove runs can play,
Ten seconds to wake—then all start in a fray.
A rabbit-approved scheduler, snug for the day.

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The pull request title clearly summarizes the main change: making the queue processor respect maxConcurrency configuration defined in the global config, which aligns with the key behavioral changes across multiple files including BackoffConfig, global queue persistence, and max concurrency handling.
Docstring Coverage ✅ Passed Docstring coverage is 88.89% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix-queue-processor

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1f14ac2 and f3e5175.

📒 Files selected for processing (1)
  • internal/integration/scheduler_test.go (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/integration/scheduler_test.go
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
internal/integration/scheduler_test.go (1)

252-256: Consider failing the test if start time parsing fails.

The current logic silently skips runs where ParseTime fails or returns zero. This could mask test issues where DAGs didn't actually start. Consider failing explicitly to ensure all runs have valid start times.

-		startedAt, err := stringutil.ParseTime(status.StartedAt)
-		if err == nil && !startedAt.IsZero() {
-			startTimes = append(startTimes, startedAt)
-		}
+		startedAt, err := stringutil.ParseTime(status.StartedAt)
+		require.NoError(t, err, "Failed to parse start time for run %s", runID)
+		require.False(t, startedAt.IsZero(), "Start time is zero for run %s", runID)
+		startTimes = append(startTimes, startedAt)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 23a29f3 and 9825c39.

📒 Files selected for processing (4)
  • internal/integration/scheduler_test.go (2 hunks)
  • internal/service/scheduler/queue_processor.go (5 hunks)
  • internal/service/scheduler/queue_processor_test.go (3 hunks)
  • internal/test/server.go (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Backend entrypoint in cmd/ orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under internal/* (for example internal/runtime, internal/persistence)
Keep Go files gofmt/goimports clean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, and Err... names for package-level errors
Repository linting relies on golangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers in internal/common

Files:

  • internal/test/server.go
  • internal/integration/scheduler_test.go
  • internal/service/scheduler/queue_processor.go
  • internal/service/scheduler/queue_processor_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Co-locate Go tests as *_test.go; favour table-driven cases and cover failure paths
Use stretchr/testify/require and shared fixtures from internal/test instead of duplicating mocks

Files:

  • internal/integration/scheduler_test.go
  • internal/service/scheduler/queue_processor_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)

Applied to files:

  • internal/integration/scheduler_test.go
🧬 Code graph analysis (3)
internal/integration/scheduler_test.go (3)
internal/common/config/config.go (3)
  • Config (9-39)
  • Queues (240-243)
  • QueueConfig (246-249)
internal/core/status.go (1)
  • Queued (12-12)
internal/core/execution/queue.go (1)
  • QueueStore (16-35)
internal/service/scheduler/queue_processor.go (1)
internal/common/config/config.go (1)
  • Queues (240-243)
internal/service/scheduler/queue_processor_test.go (1)
internal/service/scheduler/queue_processor.go (3)
  • BackoffConfig (24-28)
  • NewQueueProcessor (90-122)
  • WithBackoffConfig (83-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Test on ubuntu-latest
🔇 Additional comments (13)
internal/test/server.go (1)

58-59: LGTM!

The timeout increase from 5s to 10s provides a more generous buffer for server startup in test environments, which is sensible given the additional queue processing complexity introduced in this PR.

internal/integration/scheduler_test.go (4)

142-155: Well-structured test setup.

Using WithConfigMutator to configure the global queue before setup ensures the config is properly propagated. This correctly tests the scenario where global queue settings should override DAG-level settings.


157-200: LGTM!

The test design is sound: using a 1-second sleep in each DAG step allows clear differentiation between concurrent execution (all start within ~2s) and sequential execution (starts ~3+ seconds apart). The queue name correctly matches the configured global queue.


207-241: LGTM!

The scheduler execution pattern mirrors the existing TestQueueProcessing test, maintaining consistency. The 15s timeout for queue draining is reasonable given CI variability.


264-282: LGTM!

The pairwise start time comparison correctly identifies whether DAGs executed concurrently. The 2-second threshold appropriately distinguishes concurrent execution from sequential processing (which would show ~3+ seconds gap with 1-second sleep steps).

internal/service/scheduler/queue_processor.go (6)

23-37: LGTM!

Clean separation of backoff configuration with sensible production defaults. The exported type enables test injection of faster backoff values without modifying production behavior.


55-77: LGTM!

The isGlobal flag cleanly distinguishes config-defined queues from dynamically created ones. The isGlobalQueue() accessor properly uses mutex protection, consistent with other accessors like maxConc().


79-87: LGTM!

Idiomatic functional options pattern that enables clean configuration injection without breaking existing callers of NewQueueProcessor.


89-122: LGTM!

The initialization order is correct: defaults are set first, then options are applied to allow overrides. Global queues from config are properly marked with isGlobal: true.


194-228: LGTM!

The cleanup logic correctly preserves global queues while allowing dynamic queues to be garbage collected when inactive. This ensures config-defined queue settings persist across idle periods.


403-421: This is the core fix for the PR.

Global queues now correctly retain their config-defined maxConcurrency instead of being overridden by individual DAG's maxActiveRuns. Dynamic (DAG-based) queues continue to inherit the DAG's setting. The backoff configuration is properly applied from the processor's config.

internal/service/scheduler/queue_processor_test.go (2)

17-24: LGTM!

The fast backoff configuration (10ms initial, 50ms max, 2 retries) keeps test execution quick while still validating retry behavior. Good practice to use a helper function for consistent test configuration.


26-87: LGTM!

Adding t.Parallel() follows the coding guidelines for test files. Using the functional option WithBackoffConfig(testBackoffConfig()) is cleaner than mutating global backoff defaults, and keeps tests isolated. As per coding guidelines, favoring table-driven cases and shared fixtures from internal/test is encouraged.

@yohamta0 yohamta0 merged commit 3ebfa3c into main Dec 14, 2025
5 checks passed
@yohamta0 yohamta0 deleted the fix-queue-processor branch December 14, 2025 08:56
@codecov

codecov Bot commented Dec 14, 2025

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 65.71429% with 12 lines in your changes missing coverage. Please review.
✅ Project coverage is 59.91%. Comparing base (0498640) to head (f3e5175).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
internal/service/scheduler/queue_processor.go 65.71% 11 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #1482      +/-   ##
==========================================
- Coverage   59.91%   59.91%   -0.01%     
==========================================
  Files         195      195              
  Lines       21886    21913      +27     
==========================================
+ Hits        13114    13129      +15     
- Misses       7364     7376      +12     
  Partials     1408     1408              
Files with missing lines Coverage Δ
internal/service/scheduler/queue_processor.go 39.57% <65.71%> (+0.72%) ⬆️

... and 2 files with indirect coverage changes


Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 23a29f3...f3e5175. Read the comment docs.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant