fix(scheduler): queue processor should respect maxConcurrency config in global config#1482
Conversation
WalkthroughIntroduces a configurable exponential backoff for the scheduler queue processor, marks config-defined queues as global (preserved and treated differently for concurrency), adds an integration test for global-queue max concurrency (duplicated in the diff), and increases test server startup timeout to 10 seconds. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Areas requiring extra attention:
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (1)
internal/integration/scheduler_test.go (1)
252-256: Consider failing the test if start time parsing fails.The current logic silently skips runs where
ParseTimefails or returns zero. This could mask test issues where DAGs didn't actually start. Consider failing explicitly to ensure all runs have valid start times.- startedAt, err := stringutil.ParseTime(status.StartedAt) - if err == nil && !startedAt.IsZero() { - startTimes = append(startTimes, startedAt) - } + startedAt, err := stringutil.ParseTime(status.StartedAt) + require.NoError(t, err, "Failed to parse start time for run %s", runID) + require.False(t, startedAt.IsZero(), "Start time is zero for run %s", runID) + startTimes = append(startTimes, startedAt)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
internal/integration/scheduler_test.go(2 hunks)internal/service/scheduler/queue_processor.go(5 hunks)internal/service/scheduler/queue_processor_test.go(3 hunks)internal/test/server.go(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*.go: Backend entrypoint incmd/orchestrates the scheduler and CLI; runtime, persistence, and service layers sit underinternal/*(for exampleinternal/runtime,internal/persistence)
Keep Go filesgofmt/goimportsclean; use tabs, PascalCase for exported symbols (SchedulerClient), lowerCamelCase for locals, andErr...names for package-level errors
Repository linting relies ongolangci-lint; prefer idiomatic Go patterns, minimal global state, and structured logging helpers ininternal/common
Files:
internal/test/server.gointernal/integration/scheduler_test.gointernal/service/scheduler/queue_processor.gointernal/service/scheduler/queue_processor_test.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
**/*_test.go: Co-locate Go tests as*_test.go; favour table-driven cases and cover failure paths
Usestretchr/testify/requireand shared fixtures frominternal/testinstead of duplicating mocks
Files:
internal/integration/scheduler_test.gointernal/service/scheduler/queue_processor_test.go
🧠 Learnings (1)
📚 Learning: 2025-12-04T10:34:17.062Z
Learnt from: CR
Repo: dagu-org/dagu PR: 0
File: AGENTS.md:0-0
Timestamp: 2025-12-04T10:34:17.062Z
Learning: Applies to **/*.go : Backend entrypoint in `cmd/` orchestrates the scheduler and CLI; runtime, persistence, and service layers sit under `internal/*` (for example `internal/runtime`, `internal/persistence`)
Applied to files:
internal/integration/scheduler_test.go
🧬 Code graph analysis (3)
internal/integration/scheduler_test.go (3)
internal/common/config/config.go (3)
Config(9-39)Queues(240-243)QueueConfig(246-249)internal/core/status.go (1)
Queued(12-12)internal/core/execution/queue.go (1)
QueueStore(16-35)
internal/service/scheduler/queue_processor.go (1)
internal/common/config/config.go (1)
Queues(240-243)
internal/service/scheduler/queue_processor_test.go (1)
internal/service/scheduler/queue_processor.go (3)
BackoffConfig(24-28)NewQueueProcessor(90-122)WithBackoffConfig(83-87)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Test on ubuntu-latest
🔇 Additional comments (13)
internal/test/server.go (1)
58-59: LGTM!The timeout increase from 5s to 10s provides a more generous buffer for server startup in test environments, which is sensible given the additional queue processing complexity introduced in this PR.
internal/integration/scheduler_test.go (4)
142-155: Well-structured test setup.Using
WithConfigMutatorto configure the global queue before setup ensures the config is properly propagated. This correctly tests the scenario where global queue settings should override DAG-level settings.
157-200: LGTM!The test design is sound: using a 1-second sleep in each DAG step allows clear differentiation between concurrent execution (all start within ~2s) and sequential execution (starts ~3+ seconds apart). The queue name correctly matches the configured global queue.
207-241: LGTM!The scheduler execution pattern mirrors the existing
TestQueueProcessingtest, maintaining consistency. The 15s timeout for queue draining is reasonable given CI variability.
264-282: LGTM!The pairwise start time comparison correctly identifies whether DAGs executed concurrently. The 2-second threshold appropriately distinguishes concurrent execution from sequential processing (which would show ~3+ seconds gap with 1-second sleep steps).
internal/service/scheduler/queue_processor.go (6)
23-37: LGTM!Clean separation of backoff configuration with sensible production defaults. The exported type enables test injection of faster backoff values without modifying production behavior.
55-77: LGTM!The
isGlobalflag cleanly distinguishes config-defined queues from dynamically created ones. TheisGlobalQueue()accessor properly uses mutex protection, consistent with other accessors likemaxConc().
79-87: LGTM!Idiomatic functional options pattern that enables clean configuration injection without breaking existing callers of
NewQueueProcessor.
89-122: LGTM!The initialization order is correct: defaults are set first, then options are applied to allow overrides. Global queues from config are properly marked with
isGlobal: true.
194-228: LGTM!The cleanup logic correctly preserves global queues while allowing dynamic queues to be garbage collected when inactive. This ensures config-defined queue settings persist across idle periods.
403-421: This is the core fix for the PR.Global queues now correctly retain their config-defined
maxConcurrencyinstead of being overridden by individual DAG'smaxActiveRuns. Dynamic (DAG-based) queues continue to inherit the DAG's setting. The backoff configuration is properly applied from the processor's config.internal/service/scheduler/queue_processor_test.go (2)
17-24: LGTM!The fast backoff configuration (10ms initial, 50ms max, 2 retries) keeps test execution quick while still validating retry behavior. Good practice to use a helper function for consistent test configuration.
26-87: LGTM!Adding
t.Parallel()follows the coding guidelines for test files. Using the functional optionWithBackoffConfig(testBackoffConfig())is cleaner than mutating global backoff defaults, and keeps tests isolated. As per coding guidelines, favoring table-driven cases and shared fixtures frominternal/testis encouraged.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1482 +/- ##
==========================================
- Coverage 59.91% 59.91% -0.01%
==========================================
Files 195 195
Lines 21886 21913 +27
==========================================
+ Hits 13114 13129 +15
- Misses 7364 7376 +12
Partials 1408 1408
... and 2 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary by CodeRabbit
New Features
Improvements
Tests
✏️ Tip: You can customize this high-level summary in your review settings.