Skip to content

Commit fe6377c

Browse files
author
Lucy Zhang
committed
jobs,*: make job clients responsible for generating IDs
Job IDs used to be randomly generated in the job registry when creating a job, which meant that they were not stable across restarts when jobs were created in a txn closure meant to be idempotent. For `StartableJobs`, we also created a tracing span and registered the "new" job each time `CreateStartableJobWithTxn` was called, so these would leak in the presence of restarts. This commit adds a job ID argument to registry methods that create jobs, so that callers can generate stable IDs. It also modifies the `StartableJob` API to help ensure jobs (identified by a stable ID) are only registered once in the presence of restarts: `CreateStartableJobWithTxn` now takes a `*StartableJob`, and will not create a tracing span and register the job again if the reference is non-nil. This API is not ideal because it's probably easy to use it incorrectly, but it at least makes the correct behavior on txn restarts possible. Release note: None
1 parent bdcdf88 commit fe6377c

30 files changed

Lines changed: 323 additions & 209 deletions

pkg/ccl/backupccl/backup_planning.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,41 +1224,42 @@ func backupPlanHook(
12241224
if backupStmt.Options.Detached {
12251225
// When running inside an explicit transaction, we simply create the job
12261226
// record. We do not wait for the job to finish.
1227-
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
1228-
ctx, jr, p.ExtendedEvalContext().Txn)
1227+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
1228+
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
1229+
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
12291230
if err != nil {
12301231
return err
12311232
}
12321233

1233-
if err := doWriteBackupManifestCheckpoint(ctx, *aj.ID()); err != nil {
1234+
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
12341235
return err
12351236
}
12361237

12371238
// The protect timestamp logic for a DETACHED BACKUP can be run within the
12381239
// same txn as the BACKUP is being planned in, because we do not wait for
12391240
// the BACKUP job to complete.
1240-
err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spans,
1241+
err = protectTimestampForBackup(ctx, p, p.ExtendedEvalContext().Txn, jobID, spans,
12411242
startTime, endTime, backupDetails)
12421243
if err != nil {
12431244
return err
12441245
}
12451246

1246-
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
1247+
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
12471248
collectTelemetry()
12481249
return nil
12491250
}
12501251

12511252
var sj *jobs.StartableJob
1253+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
12521254
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
1253-
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
1254-
if err != nil {
1255+
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
12551256
return err
12561257
}
1257-
if err := doWriteBackupManifestCheckpoint(ctx, *sj.ID()); err != nil {
1258+
if err := doWriteBackupManifestCheckpoint(ctx, jobID); err != nil {
12581259
return err
12591260
}
12601261

1261-
return protectTimestampForBackup(ctx, p, txn, *sj.ID(), spans, startTime, endTime,
1262+
return protectTimestampForBackup(ctx, p, txn, jobID, spans, startTime, endTime,
12621263
backupDetails)
12631264
}); err != nil {
12641265
if sj != nil {

pkg/ccl/backupccl/restore_job.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1766,7 +1766,7 @@ func (r *restoreResumer) dropDescriptors(
17661766
Progress: jobspb.SchemaChangeGCProgress{},
17671767
NonCancelable: true,
17681768
}
1769-
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
1769+
if _, err := jr.CreateJobWithTxn(ctx, gcJobRecord, jr.MakeJobID(), txn); err != nil {
17701770
return err
17711771
}
17721772

pkg/ccl/backupccl/restore_planning.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,23 +1755,21 @@ func doRestorePlan(
17551755
if restoreStmt.Options.Detached {
17561756
// When running in detached mode, we simply create the job record.
17571757
// We do not wait for the job to finish.
1758-
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
1759-
ctx, jr, p.ExtendedEvalContext().Txn)
1758+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
1759+
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
1760+
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
17601761
if err != nil {
17611762
return err
17621763
}
1763-
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
1764+
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
17641765
collectTelemetry()
17651766
return nil
17661767
}
17671768

17681769
var sj *jobs.StartableJob
1770+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
17691771
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
1770-
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
1771-
if err != nil {
1772-
return err
1773-
}
1774-
return nil
1772+
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
17751773
}); err != nil {
17761774
if sj != nil {
17771775
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {

pkg/ccl/backupccl/restore_schema_change_creation.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,11 @@ func createTypeChangeJobFromDesc(
146146
// Type change jobs are not cancellable.
147147
NonCancelable: true,
148148
}
149-
job, err := jr.CreateJobWithTxn(ctx, record, txn)
150-
if err != nil {
149+
jobID := jr.MakeJobID()
150+
if _, err := jr.CreateJobWithTxn(ctx, record, jobID, txn); err != nil {
151151
return err
152152
}
153-
log.Infof(ctx, "queued new type schema change job %d for type %d", *job.ID(), typ.GetID())
153+
log.Infof(ctx, "queued new type schema change job %d for type %d", jobID, typ.GetID())
154154
return nil
155155
}
156156

@@ -201,18 +201,18 @@ func createSchemaChangeJobsFromMutations(
201201
},
202202
Progress: jobspb.SchemaChangeProgress{},
203203
}
204-
newJob, err := jr.CreateJobWithTxn(ctx, jobRecord, txn)
205-
if err != nil {
204+
jobID := jr.MakeJobID()
205+
if _, err := jr.CreateJobWithTxn(ctx, jobRecord, jobID, txn); err != nil {
206206
return err
207207
}
208208
newMutationJob := descpb.TableDescriptor_MutationJob{
209209
MutationID: mutationID,
210-
JobID: *newJob.ID(),
210+
JobID: jobID,
211211
}
212212
mutationJobs = append(mutationJobs, newMutationJob)
213213

214214
log.Infof(ctx, "queued new schema change job %d for table %d, mutation %d",
215-
*newJob.ID(), tableDesc.ID, mutationID)
215+
jobID, tableDesc.ID, mutationID)
216216
}
217217
tableDesc.MutationJobs = mutationJobs
218218
return nil

pkg/ccl/changefeedccl/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ go_library(
4040
"//pkg/kv/kvserver",
4141
"//pkg/kv/kvserver/closedts",
4242
"//pkg/kv/kvserver/protectedts",
43+
"//pkg/kv/kvserver/protectedts/ptpb:ptpb_go_proto",
4344
"//pkg/roachpb",
4445
"//pkg/security",
4546
"//pkg/server/telemetry",

pkg/ccl/changefeedccl/changefeed_stmt.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/cockroachdb/cockroach/pkg/kv"
2929
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
3030
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
31+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
3132
"github.com/cockroachdb/cockroach/pkg/roachpb"
3233
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
3334
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -327,13 +328,17 @@ func changefeedPlanHook(
327328
// changeFrontier.manageProtectedTimestamps for more details on the handling of
328329
// protected timestamps.
329330
var sj *jobs.StartableJob
331+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
330332
{
331333
var protectedTimestampID uuid.UUID
332334
var spansToProtect []roachpb.Span
335+
var ptr *ptpb.Record
333336
if hasInitialScan := initialScanFromOptions(details.Opts); hasInitialScan {
334337
protectedTimestampID = uuid.MakeV4()
335338
spansToProtect = makeSpansToProtect(p.ExecCfg().Codec, details.Targets)
336339
progress.GetChangefeed().ProtectedTimestampRecord = protectedTimestampID
340+
ptr = jobsprotectedts.MakeRecord(protectedTimestampID, jobID,
341+
statementTime, spansToProtect)
337342
}
338343

339344
jr := jobs.Record{
@@ -348,19 +353,15 @@ func changefeedPlanHook(
348353
Details: details,
349354
Progress: *progress.GetChangefeed(),
350355
}
351-
createJobAndProtectedTS := func(ctx context.Context, txn *kv.Txn) (err error) {
352-
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
353-
if err != nil {
356+
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
357+
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
354358
return err
355359
}
356-
if protectedTimestampID == uuid.Nil {
357-
return nil
360+
if ptr != nil {
361+
return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr)
358362
}
359-
ptr := jobsprotectedts.MakeRecord(protectedTimestampID, *sj.ID(),
360-
statementTime, spansToProtect)
361-
return p.ExecCfg().ProtectedTimestampProvider.Protect(ctx, txn, ptr)
362-
}
363-
if err := p.ExecCfg().DB.Txn(ctx, createJobAndProtectedTS); err != nil {
363+
return nil
364+
}); err != nil {
364365
if sj != nil {
365366
if err := sj.CleanupOnRollback(ctx); err != nil {
366367
log.Warningf(ctx, "failed to cleanup aborted job: %v", err)
@@ -392,7 +393,7 @@ func changefeedPlanHook(
392393
case <-ctx.Done():
393394
return ctx.Err()
394395
case resultsCh <- tree.Datums{
395-
tree.NewDInt(tree.DInt(*sj.ID())),
396+
tree.NewDInt(tree.DInt(jobID)),
396397
}:
397398
return nil
398399
}

pkg/ccl/importccl/import_stmt.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -914,30 +914,31 @@ func importPlanHook(
914914
if isDetached {
915915
// When running inside an explicit transaction, we simply create the job
916916
// record. We do not wait for the job to finish.
917-
aj, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
918-
ctx, jr, p.ExtendedEvalContext().Txn)
917+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
918+
_, err := p.ExecCfg().JobRegistry.CreateAdoptableJobWithTxn(
919+
ctx, jr, jobID, p.ExtendedEvalContext().Txn)
919920
if err != nil {
920921
return err
921922
}
922923

923-
if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, *aj.ID(), spansToProtect,
924+
if err = protectTimestampForImport(ctx, p, p.ExtendedEvalContext().Txn, jobID, spansToProtect,
924925
walltime, importDetails); err != nil {
925926
return err
926927
}
927928

928929
addToFileFormatTelemetry(format.Format.String(), "started")
929-
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(*aj.ID()))}
930+
resultsCh <- tree.Datums{tree.NewDInt(tree.DInt(jobID))}
930931
return nil
931932
}
932933

933934
var sj *jobs.StartableJob
935+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
934936
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
935-
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
936-
if err != nil {
937+
if err := p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr); err != nil {
937938
return err
938939
}
939940

940-
return protectTimestampForImport(ctx, p, txn, *sj.ID(), spansToProtect, walltime, importDetails)
941+
return protectTimestampForImport(ctx, p, txn, jobID, spansToProtect, walltime, importDetails)
941942
}); err != nil {
942943
if sj != nil {
943944
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {
@@ -1915,7 +1916,8 @@ func (r *importResumer) dropTables(
19151916
Progress: jobspb.SchemaChangeGCProgress{},
19161917
NonCancelable: true,
19171918
}
1918-
if _, err := execCfg.JobRegistry.CreateJobWithTxn(ctx, gcJobRecord, txn); err != nil {
1919+
if _, err := execCfg.JobRegistry.CreateJobWithTxn(
1920+
ctx, gcJobRecord, execCfg.JobRegistry.MakeJobID(), txn); err != nil {
19191921
return err
19201922
}
19211923

pkg/ccl/importccl/import_stmt_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4716,7 +4716,7 @@ func TestImportControlJobRBAC(t *testing.T) {
47164716
})
47174717

47184718
startLeasedJob := func(t *testing.T, record jobs.Record) *jobs.StartableJob {
4719-
job, err := registry.CreateAndStartJob(ctx, nil, record)
4719+
job, err := registry.TestingCreateAndStartJob(ctx, nil, record)
47204720
require.NoError(t, err)
47214721
return job
47224722
}

pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func TestStreamIngestionJobRollBack(t *testing.T) {
6363
},
6464
Progress: jobspb.StreamIngestionProgress{},
6565
}
66-
j, err := registry.CreateAndStartJob(ctx, nil, streamIngestJobRecord)
66+
j, err := registry.TestingCreateAndStartJob(ctx, nil, streamIngestJobRecord)
6767
require.NoError(t, err)
6868

6969
// Insert more data in the table. These changes should be rollback during job

pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ func ingestionPlanHook(
9898
}
9999

100100
var sj *jobs.StartableJob
101+
jobID := p.ExecCfg().JobRegistry.MakeJobID()
101102
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
102-
sj, err = p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, jr, txn)
103-
return err
103+
return p.ExecCfg().JobRegistry.CreateStartableJobWithTxn(ctx, &sj, jobID, txn, jr)
104104
}); err != nil {
105105
if sj != nil {
106106
if cleanupErr := sj.CleanupOnRollback(ctx); cleanupErr != nil {

0 commit comments

Comments
 (0)