jobs: allow jobs to be created and started separately#42521
jobs: allow jobs to be created and started separately#42521craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
|
I typed this in my protected timestamp PR, figured I'd pull it out. |
spaskob
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @spaskob)
pkg/jobs/jobs.go, line 493 at r1 (raw file):
} j.mu.Lock()
Was there a race condition here previously?
pkg/jobs/registry.go, line 223 at r1 (raw file):
// DEFAULT unique_rowid(), to avoid a race condition where the job exists // in the jobs table but is not yet present in our registry, which would // allow another node to adopt it.
This comment block belongs above r.makeJobID() statement
spaskob
left a comment
There was a problem hiding this comment.
I added some comments, not sure if you see them.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @dt)
spaskob
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @dt)
pkg/jobs/jobs.go, line 105 at r1 (raw file):
// Start starts a previously unstarted job. The job may already have been // created but must not have ever been started. func (j *Job) Start(ctx context.Context, resultsCh chan<- tree.Datums) (<-chan error, error) {
Would it make more sense for this function to be called on a registry by passing the job as param, since the registry is responsible for the lifecycle management of jobs? Also the code of startJob is also in registry, so this makes it easier for the reader of the code.
ajwerner
left a comment
There was a problem hiding this comment.
I see them. Other than the moving of the method does the high level approach make sense?
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
pkg/jobs/jobs.go, line 105 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
Would it make more sense for this function to be called on a registry by passing the job as param, since the registry is responsible for the lifecycle management of jobs? Also the code of startJob is also in registry, so this makes it easier for the reader of the code.
It very much makes sense. I just wasn't clear on what to call it given we have StartJob. Maybe I'll move the existing one to CreateAndStartJob?
pkg/jobs/jobs.go, line 493 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
Was there a race condition here previously?
Look at all the accessors. The mutex and comments imply that things are safe for concurrency and yet we don't use it all here. Imagine somebody calls Job.Progress() and Job.Update() concurrently.
Either way, this should be in a different PR. I'll remove it.
pkg/jobs/registry.go, line 223 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
This comment block belongs above r.makeJobID() statement
true.
947d6aa to
ee70dcd
Compare
ajwerner
left a comment
There was a problem hiding this comment.
This didn't totally work because when we were creating jobs (using Created()) outside of StartJob() and we weren't giving the job a lease (see Job.insert() which opens them up to being started by other nodes. Should be fixed now.
See how this makes you feel. I don't have super strong feelings, I just knew you were messing around in this area and felt like I should share this at the very least for the sake of discussion of the approach.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dt and @spaskob)
spaskob
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @spaskob)
pkg/jobs/jobs.go, line 105 at r1 (raw file):
Previously, ajwerner wrote…
It very much makes sense. I just wasn't clear on what to call it given we have
StartJob. Maybe I'll move the existing one toCreateAndStartJob?
thanks, that's long name but precise
pkg/jobs/jobs.go, line 493 at r1 (raw file):
Previously, ajwerner wrote…
Look at all the accessors. The mutex and comments imply that things are safe for concurrency and yet we don't use it all here. Imagine somebody calls
Job.Progress()andJob.Update()concurrently.Either way, this should be in a different PR. I'll remove it.
Thanks, the reason I am asking is there's a lot of flakiness in the jobs/bulkio tests and I wonder if some of it was caused by this race. Some of the tests are disabled so it's hard to tell if they would improve. Furthermore some are flaky only on TC. Not related to this PR, but what's you advice on how to periodically retry the ones that are paused and maybe enable them?
pkg/jobs/registry.go, line 220 at r2 (raw file):
} if j.ID() == nil {
What are you thoughts on moving this block in CreateAndStartJob? Creating the ID in StartJob make cause some subtle bugs down the road, probably it's better to fail fast if the ID is nil here.
spaskob
left a comment
There was a problem hiding this comment.
Thank you, this has been very useful! It looks good, I added one more comment that I don't feel strongly about.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @spaskob)
spaskob
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @dt, and @spaskob)
ee70dcd to
8c8e01e
Compare
ajwerner
left a comment
There was a problem hiding this comment.
TFTR!
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @dt and @spaskob)
pkg/jobs/jobs.go, line 105 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
thanks, that's long name but precise
Done.
pkg/jobs/jobs.go, line 493 at r1 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
Thanks, the reason I am asking is there's a lot of flakiness in the jobs/bulkio tests and I wonder if some of it was caused by this race. Some of the tests are disabled so it's hard to tell if they would improve. Furthermore some are flaky only on TC. Not related to this PR, but what's you advice on how to periodically retry the ones that are paused and maybe enable them?
I don't think so. At least I don't know where insert would be called in parallel with anything else.
pkg/jobs/registry.go, line 223 at r1 (raw file):
Previously, ajwerner wrote…
true.
Done.
pkg/jobs/registry.go, line 220 at r2 (raw file):
Previously, spaskob (Spas Bojanov) wrote…
What are you thoughts on moving this block in CreateAndStartJob? Creating the ID in StartJob make cause some subtle bugs down the road, probably it's better to fail fast if the ID is nil here.
Hmm yeah I could see doing that. I didn't want to write the job record until after I had verified that we could create the resumer. This way the semantics are that you could call NewJob() and then StartJob(). I really don't have strong feelings.
I moved it around to construct the resumer above common shared logic and it felt worse because then I needed to ensure that we did have an ID.
spaskob
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @dt, and @spaskob)
pkg/jobs/registry.go, line 220 at r2 (raw file):
Previously, ajwerner wrote…
Hmm yeah I could see doing that. I didn't want to write the job record until after I had verified that we could create the resumer. This way the semantics are that you could call
NewJob()and thenStartJob(). I really don't have strong feelings.I moved it around to construct the resumer above common shared logic and it felt worse because then I needed to ensure that we did have an ID.
SG
|
I'm pretty sure the build flakes are unrelated but I'd like it to build before I bors it. |
Prior to this commit, if a creator of a job wanted to get the results it would
have to create and start it at the same time by calling jobs.Registry.StartJob.
That method would take a jobs.Record, create a new *jobs.Job and start it
immediately, sending results on a passed channel and returning an error
channel.
This is awkward because sometimes callers might want to do other work on the
same transaction which is creating the job. Furthermore those callers might
not want to start the job until after the creating transaction commits.
This refactor provides a mechanism for callers to create and store a job
in a transaction and then start it after that transaction commits. Importantly
this new job creation mechanism also takes out a lease on the job so that no
other node will adopt the job before the current node unless the current node
dies.
Here's an example:
```
var j *jobs.Job
if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) (err error) {
if j, err = registry.CreateJobWithTxn(ctx, record, txn); err != nil {
return err
}
...
}); err != nil {
...
}
resultsCh := make(chan tree.Datums)
errCh, err := registry.StartJob(ctx, resultsCh, j)
...
```
Release note: None
8c8e01e to
e4f52a3
Compare
|
bors r=spaskob I'm merging this for now. We can roll it back when we provide a better mechanism. |
Merge conflict (retrying...) |
42170: opt: refactor limit hint propagation into a required physical property r=savoie a=savoie This patch follows up on #42086 and finishes the refactor of the heuristic planner `applyLimits` function. It removes the last of the heuristic planner limit handling, and introduces the LimitHint required physical property, which is represented by a float64 in anticipation of future improvements that will use statistics to inform better estimations. Previously, in the case where a limit hint was propagated down to a limitNode that had an offset but no limit, the limit hint would be discarded. In this patch, an OffsetOp's limit hint will be propagated to its children, having been increased enough to ensure that the required number of rows can be discarded by the offset. Another change from the behaviour of the heuristic planner's handling of limit hints is in the case of subqueries. `applyLimit` would not be called on subqueries created with `Builder.addSubquery`, so no nodes within subqueries had soft limits set. Since handling of required physical properties occurs before subqueries are created, this ceases to be a special case and soft limits may be set for scanNodes within subqueries. Besides these two departures from the heuristic planner's limit propagation, this patch should not have introduced further changes affecting the ultimate value of `scanNode.softLimit`. With this refactor, improvements to limit hint handling can be made within the optimizer. Release note: None 42521: jobs: allow jobs to be created and started separately r=spaskob a=ajwerner Prior to this commit, if a creator of a job wanted to get the results it would have to create and start it at the same time by calling jobs.Registry.StartJob. That method would take a jobs.Record, create a new *jobs.Job and start it immediately, sending results on a passed channel and returning an error channel. This is awkward because sometimes callers might want to do other work on the same transaction which is creating the job. Furthermore those callers might not want to start the job until after the creating transaction commits. This refactor provides a mechanism for callers to create and store a job in a transaction and then start it after that transaction commits. Here's an example: ``` j := registry.NewJob(record) if err := db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error { if err := j.WithTxn(txn).Created(ctx); err != nil { return err } ... }); err != nil { ... } resultsCh := make(chan tree.Datums) errCh, err := j.Start(ctx, resultsCh) ... ``` Release note: None Co-authored-by: Céline O'Neil <celineloneil@gmail.com> Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
Build succeeded |
Prior to this commit, if a creator of a job wanted to get the results it would
have to create and start it at the same time by calling jobs.Registry.StartJob.
That method would take a jobs.Record, create a new *jobs.Job and start it
immediately, sending results on a passed channel and returning an error
channel.
This is awkward because sometimes callers might want to do other work on the
same transaction which is creating the job. Furthermore those callers might
not want to start the job until after the creating transaction commits.
This refactor provides a mechanism for callers to create and store a job
in a transaction and then start it after that transaction commits.
Here's an example:
Release note: None