Skip to content

Commit aed014f

Browse files
committed
jobs: use low-priority transactions for claim, adopt, cancel/pause
These transactions can be slow and long-running and they hold locks. This is unfortunate for UX reasons. Release note: (bug fix): Improved availability of jobs table for reads in large, global clusters by running background tasks at low priority.
1 parent 3eeb35f commit aed014f

2 files changed

Lines changed: 26 additions & 5 deletions

File tree

pkg/jobs/adopt.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
1919
"github.com/cockroachdb/cockroach/pkg/kv"
20+
"github.com/cockroachdb/cockroach/pkg/roachpb"
2021
"github.com/cockroachdb/cockroach/pkg/security"
2122
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2223
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
@@ -47,6 +48,11 @@ const (
4748
// available.
4849
func (r *Registry) claimJobs(ctx context.Context, s sqlliveness.Session) error {
4950
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
51+
// Run the claim transaction at low priority to ensure that it does not
52+
// contend with foreground reads.
53+
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
54+
return errors.WithAssertionFailure(err)
55+
}
5056
numRows, err := r.ex.Exec(
5157
ctx, "claim-jobs", txn, `
5258
UPDATE system.jobs
@@ -283,6 +289,11 @@ func (r *Registry) runJob(
283289

284290
func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
285291
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
292+
// Run the claim transaction at low priority to ensure that it does not
293+
// contend with foreground reads.
294+
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
295+
return errors.WithAssertionFailure(err)
296+
}
286297
// Note that we have to buffer all rows first - before processing each
287298
// job - because we have to make sure that the query executes without an
288299
// error (otherwise, the system.jobs table might diverge from the jobs

pkg/jobs/registry.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/cockroachdb/cockroach/pkg/base"
2323
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
2424
"github.com/cockroachdb/cockroach/pkg/kv"
25+
"github.com/cockroachdb/cockroach/pkg/roachpb"
2526
"github.com/cockroachdb/cockroach/pkg/security"
2627
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
2728
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -659,16 +660,25 @@ func (r *Registry) Start(
659660
}
660661

661662
removeClaimsFromDeadSessions := func(ctx context.Context, s sqlliveness.Session) {
662-
if _, err := r.ex.QueryRowEx(
663-
ctx, "expire-sessions", nil,
664-
sessiondata.InternalExecutorOverride{User: security.RootUserName()}, `
663+
if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
664+
// Run the expiration transaction at low priority to ensure that it does
665+
// not contend with foreground reads. Note that the adoption and cancellation
666+
// queries also use low priority so they will interact nicely.
667+
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
668+
return errors.WithAssertionFailure(err)
669+
}
670+
_, err := r.ex.ExecEx(
671+
ctx, "expire-sessions", nil,
672+
sessiondata.InternalExecutorOverride{User: security.RootUserName()}, `
665673
UPDATE system.jobs
666674
SET claim_session_id = NULL
667675
WHERE claim_session_id <> $1
668676
AND status IN `+claimableStatusTupleString+`
669677
AND NOT crdb_internal.sql_liveness_is_alive(claim_session_id)`,
670-
s.ID().UnsafeBytes(),
671-
); err != nil {
678+
s.ID().UnsafeBytes(),
679+
)
680+
return err
681+
}); err != nil {
672682
log.Errorf(ctx, "error expiring job sessions: %s", err)
673683
}
674684
}

0 commit comments

Comments
 (0)