Skip to content

Commit 415dc1a

Browse files
committed
jobs: fix missing creation info in resumed jobs
Release note: none.
1 parent 762ccdb commit 415dc1a

2 files changed

Lines changed: 27 additions & 5 deletions

File tree

pkg/jobs/adopt.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,8 @@ COALESCE(last_run, created) + least(
141141

142142
resumeQueryBaseCols = "status, payload, progress, crdb_internal.sql_liveness_is_alive(claim_session_id)"
143143
resumeQueryWhereBase = `id = $1 AND claim_session_id = $2`
144-
resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run` +
145-
` FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase
144+
resumeQueryWithBackoff = `SELECT ` + resumeQueryBaseCols + `, ` + canRunClause + ` AS can_run,` +
145+
` created_by_type, created_by_id FROM system.jobs, ` + canRunArgs + " WHERE " + resumeQueryWhereBase
146146
)
147147

148148
// getProcessQuery returns the query that selects the jobs that are claimed
@@ -301,7 +301,13 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
301301
if err != nil {
302302
return err
303303
}
304-
job := &Job{id: jobID, registry: r}
304+
305+
createdBy, err := unmarshalCreatedBy(row[5], row[6])
306+
if err != nil {
307+
return err
308+
}
309+
310+
job := &Job{id: jobID, registry: r, createdBy: createdBy}
305311
job.mu.payload = *payload
306312
job.mu.progress = *progress
307313
job.sessionID = s.ID()

pkg/jobs/jobs_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,10 +1751,21 @@ func TestJobLifecycle(t *testing.T) {
17511751
t.Run("job with created by fields", func(t *testing.T) {
17521752
createdByType := "internal_test"
17531753

1754+
resumerJob := make(chan *jobs.Job, 1)
1755+
jobs.RegisterConstructor(
1756+
jobspb.TypeBackup, func(j *jobs.Job, _ *cluster.Settings) jobs.Resumer {
1757+
return jobs.FakeResumer{
1758+
OnResume: func(ctx context.Context) error {
1759+
resumerJob <- j
1760+
return nil
1761+
},
1762+
}
1763+
})
1764+
17541765
jobID := registry.MakeJobID()
17551766
record := jobs.Record{
1756-
Details: jobspb.RestoreDetails{},
1757-
Progress: jobspb.RestoreProgress{},
1767+
Details: jobspb.BackupDetails{},
1768+
Progress: jobspb.BackupProgress{},
17581769
CreatedBy: &jobs.CreatedByInfo{Name: createdByType, ID: 123},
17591770
}
17601771
job, err := registry.CreateAdoptableJobWithTxn(ctx, record, jobID, nil /* txn */)
@@ -1764,6 +1775,11 @@ func TestJobLifecycle(t *testing.T) {
17641775
require.NoError(t, err)
17651776
require.NotNil(t, loadedJob.CreatedBy())
17661777
require.Equal(t, job.CreatedBy(), loadedJob.CreatedBy())
1778+
registry.TestingNudgeAdoptionQueue()
1779+
resumedJob := <-resumerJob
1780+
require.NotNil(t, resumedJob.CreatedBy())
1781+
require.Equal(t, job.CreatedBy(), resumedJob.CreatedBy())
1782+
17671783
})
17681784
}
17691785

0 commit comments

Comments
 (0)