Conversation
Greptile SummaryThis PR introduces a centralized
Confidence Score: 4/5Safe to merge after addressing the t.spec data race in Reschedule; the silent task-loss on reschedule failure is a low-risk hardening gap. One P1 finding remains: t.spec = newSpec in Reschedule is not protected by t.mu, creating a data race with the concurrent wrapped closure panic-recovery read of t.spec.Name. The fix is a two-line t.mu.Lock()/Unlock() wrapper. The P2 (silent task loss on reschedule failure) is low probability given hardcoded cron expressions. Prior P1 findings from the previous review thread have all been resolved. internal/server/scheduler/scheduler.go — the Reschedule method needs t.mu protection around the t.spec write. Important Files Changed
Sequence DiagramsequenceDiagram
participant FX as fx Lifecycle (OnStart)
participant S as Scheduler
participant Exec as ScheduledExecutor
participant Task as task (goroutine)
FX->>S: Register(ctx, TaskSpec, fn)
S->>Exec: ScheduleFuncAtCronRate / ScheduleFuncAtFixRate
Exec-->>S: cancelFunc
S->>S: store task + cancelFunc
loop every tick
Exec->>Task: wrapped(ctx)
Task->>Task: t.mu.Lock, update lastRunAt/lastError, t.mu.Unlock
Task->>Task: t.fn(ctx)
end
note over FX,Task: Settings change
FX->>S: Reschedule(ctx, name, newSpec)
S->>S: cancelFunc() - cancel old schedule
S->>S: t.spec = newSpec (no t.mu - race risk)
S->>Exec: ScheduleFuncAtCronRate(wrapped, newSpec)
Exec-->>S: new cancelFunc
FX->>S: Shutdown(ctx)
S->>S: call all cancelFuncs
Reviews (2): Last reviewed commit: "scheduler: centralized cron job manageme..." | Re-trigger Greptile |
There was a problem hiding this comment.
Code Review
This pull request introduces a centralized scheduler package to manage background tasks across various services, replacing the previous ad-hoc scheduling logic. It refactors the backup, garbage collection, channel probing, and video storage services to register their tasks with the new scheduler. Additionally, the frontend now displays the last backup time with explicit timezone information. Key issues identified include a reference to a non-existent function in the backup service, a potential data race in the scheduler's task status updates, and a logic mismatch between the scheduler's implementation and its documentation regarding task precedence.
| Description: "Auto backup to configured data storage", | ||
| CronExpr: "0 2 * * *", | ||
| Timezone: tz, | ||
| }, svc.runBackupPeriodically) |
There was a problem hiding this comment.
| wrapped := func(ctx context.Context) { | ||
| t.lastRunAt = time.Now() | ||
| t.lastError = "" | ||
|
|
||
| func() { | ||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| t.lastError = fmt.Sprintf("panic: %v", r) | ||
| log.Error(ctx, "scheduler: task panicked", | ||
| log.String("name", t.spec.Name), | ||
| log.Any("panic", r), | ||
| ) | ||
| } | ||
| }() | ||
|
|
||
| t.fn(ctx) | ||
| }() | ||
| } |
There was a problem hiding this comment.
There is a data race on t.lastRunAt and t.lastError. These fields are updated within the wrapped function, which is executed concurrently by the executor, but they are read in the List() method (lines 123-124) without proper synchronization. Although List() holds a read lock on the scheduler, the writer (wrapped) does not acquire the lock when performing these updates.
wrapped := func(ctx context.Context) {
s.mu.Lock()
t.lastRunAt = time.Now()
t.lastError = ""
s.mu.Unlock()
func() {
defer func() {
if r := recover(); r != nil {
s.mu.Lock()
t.lastError = fmt.Sprintf("panic: %v", r)
s.mu.Unlock()
log.Error(ctx, "scheduler: task panicked",
log.String("name", t.spec.Name),
log.Any("panic", r),
)
}
}()
t.fn(ctx)
}()
}| if t.spec.FixRate > 0 { | ||
| cancelFunc, err = s.executor.ScheduleFuncAtFixRate(wrapped, t.spec.FixRate) | ||
| } else { | ||
| tz := t.spec.Timezone | ||
| if tz == "" { | ||
| tz = "UTC" | ||
| } | ||
| cancelFunc, err = s.executor.ScheduleFuncAtCronRate( | ||
| wrapped, | ||
| executors.CRONRule{Expr: t.spec.CronExpr, Timezone: tz}, | ||
| ) | ||
| } |
There was a problem hiding this comment.
The implementation of task scheduling precedence contradicts the documentation in task.go. The documentation states that CronExpr takes precedence if both are set, but the code checks t.spec.FixRate > 0 first, giving precedence to the fixed rate.
| if t.spec.FixRate > 0 { | |
| cancelFunc, err = s.executor.ScheduleFuncAtFixRate(wrapped, t.spec.FixRate) | |
| } else { | |
| tz := t.spec.Timezone | |
| if tz == "" { | |
| tz = "UTC" | |
| } | |
| cancelFunc, err = s.executor.ScheduleFuncAtCronRate( | |
| wrapped, | |
| executors.CRONRule{Expr: t.spec.CronExpr, Timezone: tz}, | |
| ) | |
| } | |
| if t.spec.CronExpr != "" { | |
| tz := t.spec.Timezone | |
| if tz == "" { | |
| tz = "UTC" | |
| } | |
| cancelFunc, err = s.executor.ScheduleFuncAtCronRate( | |
| wrapped, | |
| executors.CRONRule{Expr: t.spec.CronExpr, Timezone: tz}, | |
| ) | |
| } else { | |
| cancelFunc, err = s.executor.ScheduleFuncAtFixRate(wrapped, t.spec.FixRate) | |
| } |
- Add internal/server/scheduler package (TaskSpec with cron+fix-rate modes, Register/Reschedule/Unregister/List/Shutdown) - Migrate 8 scattered cron/fix-rate jobs to unified Scheduler - Remove 4 self-owned executor instances (backup, gc, channel-probe, provider-quota) - Pull system timezone into backup cron via CRONRule.Timezone (fixes #1517) - Add Reschedule hooks to UpdateAutoBackupSettings, UpdateSystemGeneralSettings, UpdateVideoStorageSettings - Fix RunBackupNow not updating lastBackupAt - Add timezone indicator to frontend last backup display
| return nil | ||
| } | ||
|
|
There was a problem hiding this comment.
t.spec write races with the panic-handler read in running tasks
Reschedule writes t.spec = newSpec while holding s.mu (exclusive), but the old wrapped closure that may still be executing concurrently reads t.spec.Name in its recover block without holding t.mu:
// schedule(), wrapped closure
log.Error(ctx, "scheduler: task panicked",
log.String("name", t.spec.Name), // reads t.spec without t.mu
...
)List() correctly guards its read with t.mu.Lock(), but Reschedule's write (t.spec = newSpec) is only serialized against List() via s.mu, not against a concurrently-running wrapped goroutine. Running with -race will flag this immediately whenever a rescheduled task panics.
Fix: acquire t.mu.Lock() around the t.spec = newSpec assignment in Reschedule:
t.mu.Lock()
t.spec = newSpec
t.mu.Unlock()
Uh oh!
There was an error while loading. Please reload this page.