fix: reduce scheduler retry and notification scanning#2274
Conversation
Skip DAG-run notification event reads while no destinations are configured by advancing the monitor cursor to the current head. Use file-backed retry candidate sidecars for scheduler retry scans, with rebuild and stale-candidate cleanup paths so status files remain the source of truth. Add regression coverage for no-destination notifications and retry candidate listing.
📝 WalkthroughWalkthroughThis PR adds file-based retry candidate persistence and integrates it into the retry scanner to optimize failed DAG run discovery. Concurrently, it optimizes the notification monitor to skip event processing when no destinations are configured. The changes span storage persistence, scheduler logic, and monitor behavior. ChangesRetry candidate tracking and scheduler integration
Monitor notification optimization
🎯 3 (Moderate) | ⏱️ ~20 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@internal/persis/file/dagrun/retry_candidates.go`:
- Around line 91-94: The code currently swallows read/unmarshal errors from
readRetryCandidateFile(candidatePath) and simply continues, which lets corrupted
retry-candidate sidecars silently block retries; update the error path so that
on failure you remove or quarantine the corrupted sidecar (e.g., delete
candidatePath), log the corruption, and mark the corresponding dagrun/day as
dirty or enqueue a rebuild (call the existing markDayDirty/markDagRunDirty or
rebuild trigger helper used elsewhere) so retries are rescheduled; ensure you
still continue processing after cleanup so the loop remains robust.
In `@internal/service/chatbridge/monitor_external_test.go`:
- Around line 193-200: The Never assertion's short 50*time.Millisecond window in
the require.Never call makes the "old-run" absence check flaky; increase the
timeout (the first duration argument) to a larger value (e.g.,
200-500*time.Millisecond) to allow async delivery paths to settle while keeping
the poll interval (second argument) unchanged, targeting the call that iterates
transport.deliveredNames() and looks for "old-run".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 10c68639-cf5c-468a-9700-ed148f7be43e
📒 Files selected for processing (8)
internal/persis/file/dagrun/attempt.gointernal/persis/file/dagrun/retry_candidates.gointernal/persis/file/dagrun/retry_candidates_external_test.gointernal/service/chatbridge/monitor.gointernal/service/chatbridge/monitor_external_test.gointernal/service/scheduler/export_test.gointernal/service/scheduler/retry_scanner.gointernal/service/scheduler/retry_scanner_external_test.go
| candidate, err := readRetryCandidateFile(candidatePath) | ||
| if err != nil { | ||
| continue | ||
| } |
There was a problem hiding this comment.
Corrupted retry-candidate sidecars are silently dropped without self-healing.
At Line 91-Line 94, read/unmarshal failures are ignored with continue. A bad sidecar can then suppress retries for that run indefinitely because this path does not mark the day dirty or trigger rebuild.
Suggested fix
candidatePath := filepath.Join(candidateDir, entry.Name())
candidate, err := readRetryCandidateFile(candidatePath)
if err != nil {
- continue
+ // self-heal broken sidecars
+ _ = fileutil.Remove(candidatePath)
+ _ = fileutil.WriteFileAtomic(
+ filepath.Join(dayPath, retryCandidateDirtyFileName),
+ []byte("dirty\n"),
+ 0600,
+ )
+ continue
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/persis/file/dagrun/retry_candidates.go` around lines 91 - 94, The
code currently swallows read/unmarshal errors from
readRetryCandidateFile(candidatePath) and simply continues, which lets corrupted
retry-candidate sidecars silently block retries; update the error path so that
on failure you remove or quarantine the corrupted sidecar (e.g., delete
candidatePath), log the corruption, and mark the corresponding dagrun/day as
dirty or enqueue a rebuild (call the existing markDayDirty/markDagRunDirty or
rebuild trigger helper used elsewhere) so retries are rescheduled; ensure you
still continue processing after cleanup so the loop remains robust.
| require.Never(t, func() bool { | ||
| for _, name := range transport.deliveredNames() { | ||
| if name == "old-run" { | ||
| return true | ||
| } | ||
| } | ||
| return false | ||
| }, 50*time.Millisecond, 5*time.Millisecond) |
There was a problem hiding this comment.
Increase the require.Never window to reduce CI timing flakes.
Line 193’s 50*time.Millisecond window is tight for asynchronous polling/delivery paths and can pass/fail nondeterministically on slower runners. Use a longer window so the “old-run is never delivered” assertion is stable.
Suggested change
- require.Never(t, func() bool {
+ require.Never(t, func() bool {
for _, name := range transport.deliveredNames() {
if name == "old-run" {
return true
}
}
return false
- }, 50*time.Millisecond, 5*time.Millisecond)
+ }, 300*time.Millisecond, 10*time.Millisecond)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| require.Never(t, func() bool { | |
| for _, name := range transport.deliveredNames() { | |
| if name == "old-run" { | |
| return true | |
| } | |
| } | |
| return false | |
| }, 50*time.Millisecond, 5*time.Millisecond) | |
| require.Never(t, func() bool { | |
| for _, name := range transport.deliveredNames() { | |
| if name == "old-run" { | |
| return true | |
| } | |
| } | |
| return false | |
| }, 300*time.Millisecond, 10*time.Millisecond) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@internal/service/chatbridge/monitor_external_test.go` around lines 193 - 200,
The Never assertion's short 50*time.Millisecond window in the require.Never call
makes the "old-run" absence check flaky; increase the timeout (the first
duration argument) to a larger value (e.g., 200-500*time.Millisecond) to allow
async delivery paths to settle while keeping the poll interval (second argument)
unchanged, targeting the call that iterates transport.deliveredNames() and looks
for "old-run".
There was a problem hiding this comment.
1 issue found across 3 files (changes from recent commits).
Tip: Review your code locally with the cubic CLI to iterate faster.
Re-trigger cubic
Summary
Root Cause
The scheduler-related growth came from two independent history-reading paths: the notification monitor read DAG-run events even when there were no destinations to notify, and the retry scanner used generic failed-status listing across current-day run history. Both paths scaled with accumulated history rather than actionable work.
Testing
go test ./internal/persis/file/dagrun -run 'TestStoreListRetryCandidates(RebuildsDirtyCandidateDirectory|RemovesCandidateWhenRunIsGone|IgnoresChildAttemptStatusFiles)' -count=1go test ./internal/persis/file/dagrun -count=1go test ./internal/persis/file/dagrun ./internal/persis/file/eventstore ./internal/service/chatbridge ./internal/service/scheduler ./internal/service/healthcheck -count=1go test ./internal/core/exec ./internal/runtime/... -count=1git diff --checkSummary by cubic
Reduce memory and I/O by removing broad history scans in scheduler retries and notification monitoring. Skip event reads when no destinations are set and use file-backed retry-candidate sidecars for targeted retry scans.
NotificationMonitoradvances the source cursor to head whenNotificationDestinations()is empty, avoiding DAG-run event reads and only delivering future events once destinations are set..dagrun.retry-candidatessidecars updated on status writes;RetryScannerusesListRetryCandidateswhen available (falls back toListStatusesotherwise) to avoid wide failed-status scans.Written for commit aacbf2e. Summary will update on new commits.
Summary by CodeRabbit