Skip to content

scheduler: centralized cron job management#1550

Merged
looplj merged 1 commit into
unstablefrom
dev-tmp
Apr 30, 2026
Merged

scheduler: centralized cron job management#1550
looplj merged 1 commit into
unstablefrom
dev-tmp

Conversation

@looplj

@looplj looplj commented Apr 30, 2026

Copy link
Copy Markdown
Owner
  • Add internal/server/scheduler package (TaskSpec with cron+fix-rate modes, Register/Reschedule/Unregister/List/Shutdown)
  • Migrate 8 scattered cron/fix-rate jobs to unified Scheduler
  • Remove 4 self-owned executor instances (backup, gc, channel-probe, provider-quota)
  • Pull system timezone into backup cron via CRONRule.Timezone (fixes [Bug/错误]: 自动备份功能时间引用错误 #1517)
  • Add Reschedule hooks to UpdateAutoBackupSettings, UpdateSystemGeneralSettings, UpdateVideoStorageSettings
  • Fix RunBackupNow not updating lastBackupAt
  • Add timezone indicator to frontend last backup display

Open in Devin Review

@greptile-apps

greptile-apps Bot commented Apr 30, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR introduces a centralized scheduler package that consolidates 8 previously scattered cron/fix-rate jobs behind a single Register/Reschedule/Unregister/List/Shutdown API, removes 4 self-owned executor instances, and wires timezone-aware rescheduling into the settings mutation resolvers. The prior data-race and doc-comment findings from the previous review have been addressed.

  • The t.spec field written by Reschedule is not guarded by t.mu, so a concurrently-running wrapped closure that panics can race on t.spec.Name in its recover block — the race detector will flag this.
  • Reschedule calls cancelFunc() before attempting the new schedule; if s.schedule then fails, the task is silently left without any active schedule for the rest of the process lifetime.

Confidence Score: 4/5

Safe to merge after addressing the t.spec data race in Reschedule; the silent task-loss on reschedule failure is a low-risk hardening gap.

One P1 finding remains: t.spec = newSpec in Reschedule is not protected by t.mu, creating a data race with the concurrent wrapped closure panic-recovery read of t.spec.Name. The fix is a two-line t.mu.Lock()/Unlock() wrapper. The P2 (silent task loss on reschedule failure) is low probability given hardcoded cron expressions. Prior P1 findings from the previous review thread have all been resolved.

internal/server/scheduler/scheduler.go — the Reschedule method needs t.mu protection around the t.spec write.

Important Files Changed

Filename Overview
internal/server/scheduler/scheduler.go New centralized scheduler; t.spec write in Reschedule is not protected by t.mu, causing a data race with the wrapped panic handler.
internal/server/scheduler/task.go Defines task, TaskSpec, TaskInfo; per-task mutex correctly guards lastRunAt and lastError; FixRate-precedence comment is now accurate.
internal/server/backup/autobackup.go Migrated to scheduler; Reschedule silently loses the backup task on schedule failure; RunBackupNow now correctly updates lastBackupAt.
internal/server/biz/provider_quota.go Migrated to scheduler; loadQuotaCache now fires in the constructor with context.Background() before fx lifecycle starts (noted in prior review).
internal/server/gql/system.resolvers.go Adds Reschedule hooks on UpdateSystemGeneralSettings and UpdateVideoStorageSettings.
internal/server/gc/gc.go Removed self-owned executor and lifecycle hooks; migrated to RegisterScheduledTasks.
internal/server/video_storage/worker.go Removed self-owned executor and cancelFunc; migrated to fix-rate scheduler task with Reschedule support.
frontend/src/features/system/components/backup-settings.tsx Adds IANA timezone name and UTC offset to last-backup display; DST-aware via getTimezoneOffset() on the backup date.
internal/server/biz/fx_module.go Wires all RegisterScheduledTasks calls into OnStart lifecycle hooks consistently.

Sequence Diagram

sequenceDiagram
    participant FX as fx Lifecycle (OnStart)
    participant S as Scheduler
    participant Exec as ScheduledExecutor
    participant Task as task (goroutine)

    FX->>S: Register(ctx, TaskSpec, fn)
    S->>Exec: ScheduleFuncAtCronRate / ScheduleFuncAtFixRate
    Exec-->>S: cancelFunc
    S->>S: store task + cancelFunc

    loop every tick
        Exec->>Task: wrapped(ctx)
        Task->>Task: t.mu.Lock, update lastRunAt/lastError, t.mu.Unlock
        Task->>Task: t.fn(ctx)
    end

    note over FX,Task: Settings change
    FX->>S: Reschedule(ctx, name, newSpec)
    S->>S: cancelFunc() - cancel old schedule
    S->>S: t.spec = newSpec (no t.mu - race risk)
    S->>Exec: ScheduleFuncAtCronRate(wrapped, newSpec)
    Exec-->>S: new cancelFunc

    FX->>S: Shutdown(ctx)
    S->>S: call all cancelFuncs
Loading

Reviews (2): Last reviewed commit: "scheduler: centralized cron job manageme..." | Re-trigger Greptile

greptile-apps[bot]

This comment was marked as resolved.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a centralized scheduler package to manage background tasks across various services, replacing the previous ad-hoc scheduling logic. It refactors the backup, garbage collection, channel probing, and video storage services to register their tasks with the new scheduler. Additionally, the frontend now displays the last backup time with explicit timezone information. Key issues identified include a reference to a non-existent function in the backup service, a potential data race in the scheduler's task status updates, and a logic mismatch between the scheduler's implementation and its documentation regarding task precedence.

Description: "Auto backup to configured data storage",
CronExpr: "0 2 * * *",
Timezone: tz,
}, svc.runBackupPeriodically)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The function svc.runBackupPeriodically is not defined in the backup package (it was removed from autobackup.go). Based on the logic in autobackup.go, this should likely be svc.triggerAutoBackup.

Suggested change
}, svc.runBackupPeriodically)
}, svc.triggerAutoBackup)

Comment on lines +145 to +162
wrapped := func(ctx context.Context) {
t.lastRunAt = time.Now()
t.lastError = ""

func() {
defer func() {
if r := recover(); r != nil {
t.lastError = fmt.Sprintf("panic: %v", r)
log.Error(ctx, "scheduler: task panicked",
log.String("name", t.spec.Name),
log.Any("panic", r),
)
}
}()

t.fn(ctx)
}()
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a data race on t.lastRunAt and t.lastError. These fields are updated within the wrapped function, which is executed concurrently by the executor, but they are read in the List() method (lines 123-124) without proper synchronization. Although List() holds a read lock on the scheduler, the writer (wrapped) does not acquire the lock when performing these updates.

	wrapped := func(ctx context.Context) {
		s.mu.Lock()
		t.lastRunAt = time.Now()
		t.lastError = ""
		s.mu.Unlock()

		func() {
			defer func() {
				if r := recover(); r != nil {
					s.mu.Lock()
					t.lastError = fmt.Sprintf("panic: %v", r)
					s.mu.Unlock()
					log.Error(ctx, "scheduler: task panicked",
						log.String("name", t.spec.Name),
						log.Any("panic", r),
					)
				}
			}()

			t.fn(ctx)
		}()
	}

Comment on lines +167 to +178
if t.spec.FixRate > 0 {
cancelFunc, err = s.executor.ScheduleFuncAtFixRate(wrapped, t.spec.FixRate)
} else {
tz := t.spec.Timezone
if tz == "" {
tz = "UTC"
}
cancelFunc, err = s.executor.ScheduleFuncAtCronRate(
wrapped,
executors.CRONRule{Expr: t.spec.CronExpr, Timezone: tz},
)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The implementation of task scheduling precedence contradicts the documentation in task.go. The documentation states that CronExpr takes precedence if both are set, but the code checks t.spec.FixRate > 0 first, giving precedence to the fixed rate.

Suggested change
if t.spec.FixRate > 0 {
cancelFunc, err = s.executor.ScheduleFuncAtFixRate(wrapped, t.spec.FixRate)
} else {
tz := t.spec.Timezone
if tz == "" {
tz = "UTC"
}
cancelFunc, err = s.executor.ScheduleFuncAtCronRate(
wrapped,
executors.CRONRule{Expr: t.spec.CronExpr, Timezone: tz},
)
}
if t.spec.CronExpr != "" {
tz := t.spec.Timezone
if tz == "" {
tz = "UTC"
}
cancelFunc, err = s.executor.ScheduleFuncAtCronRate(
wrapped,
executors.CRONRule{Expr: t.spec.CronExpr, Timezone: tz},
)
} else {
cancelFunc, err = s.executor.ScheduleFuncAtFixRate(wrapped, t.spec.FixRate)
}

devin-ai-integration[bot]

This comment was marked as resolved.

- Add internal/server/scheduler package (TaskSpec with cron+fix-rate modes, Register/Reschedule/Unregister/List/Shutdown)
- Migrate 8 scattered cron/fix-rate jobs to unified Scheduler
- Remove 4 self-owned executor instances (backup, gc, channel-probe, provider-quota)
- Pull system timezone into backup cron via CRONRule.Timezone (fixes #1517)
- Add Reschedule hooks to UpdateAutoBackupSettings, UpdateSystemGeneralSettings, UpdateVideoStorageSettings
- Fix RunBackupNow not updating lastBackupAt
- Add timezone indicator to frontend last backup display
Comment on lines +94 to +96
return nil
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 t.spec write races with the panic-handler read in running tasks

Reschedule writes t.spec = newSpec while holding s.mu (exclusive), but the old wrapped closure that may still be executing concurrently reads t.spec.Name in its recover block without holding t.mu:

// schedule(), wrapped closure
log.Error(ctx, "scheduler: task panicked",
    log.String("name", t.spec.Name),  // reads t.spec without t.mu
    ...
)

List() correctly guards its read with t.mu.Lock(), but Reschedule's write (t.spec = newSpec) is only serialized against List() via s.mu, not against a concurrently-running wrapped goroutine. Running with -race will flag this immediately whenever a rescheduled task panics.

Fix: acquire t.mu.Lock() around the t.spec = newSpec assignment in Reschedule:

t.mu.Lock()
t.spec = newSpec
t.mu.Unlock()

@looplj looplj merged commit e81da01 into unstable Apr 30, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug/错误]: 自动备份功能时间引用错误

1 participant