streamingccl: support canceling an ingestion job#83310
streamingccl: support canceling an ingestion job#83310craig[bot] merged 2 commits intocockroachdb:masterfrom
Conversation
gh-casper
left a comment
There was a problem hiding this comment.
Please only review the top two commits.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
| return err | ||
| } | ||
|
|
||
| defer func() { |
There was a problem hiding this comment.
I would move this function into a helper.
| } | ||
|
|
||
| // TODO(casper): handle timeout when we have very large tenant with | ||
| // saving checkpoints and retrying. |
There was a problem hiding this comment.
I'm not sure about timeout (we haven't set it have we? And the context doesn't have a deadline set either); but I do think we need to add checks to ensure the requirements specified by the ClearRange request: namely, we need to ensure that the spans are offline.
https://github.com/cockroachdb/cockroach/blob/master/pkg/roachpb/api.proto#L384-L384
There was a problem hiding this comment.
I believe we have some planned work to ensure that the tenant is offline throughout the ingestion process.
| spans = spans[:0] | ||
| for _, raw := range b.RawResponse().Responses { | ||
| r := raw.GetClearRange() | ||
| if r.ResumeSpan != nil { |
There was a problem hiding this comment.
I'm not sure GetClearRange returns resume spans. I mean it has a header, and the field; but I'm not sure it ever returns resume spans. My hunch is not; but would be nice to double check with @erikgrinaker
There was a problem hiding this comment.
No, ClearRange doesn't return resume spans.
|
|
||
| // Pause ingestion. | ||
| c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID)) | ||
| fmt.Println("after pausing job") |
| destTenantPrefix := keys.MakeTenantPrefix(c.args.destTenantID) | ||
| rows, err := c.destKvDB.Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10) | ||
| require.NoError(t, err) | ||
| require.Empty(t, rows) |
| tree.Overload{ | ||
| Types: tree.ArgTypes{ | ||
| {"stream_id", types.Int}, | ||
| {"ingestion_cutover", types.Bool}, |
There was a problem hiding this comment.
Do we ever not want to do this? I.e. do we imagine this function, which intended to be invoked from destination cluster, will ever want to invoke this set to false?
Is the intention to store some information about cutover event in the src cluster? If so, perhaps having a timestamp makes more sense. Or even some sort of proto message indicating the reason for cutover, timestamp, etc..?
stevendanna
left a comment
There was a problem hiding this comment.
Overall the last two commits LGTM modulo the existing comments from @miretskiy.
| ptsRecord, err := execConfig.ProtectedTimestampProvider.GetRecord(evalCtx.Ctx(), txn, | ||
| *payload.GetStreamReplication().ProtectedTimestampRecordID) | ||
| if err != nil { | ||
| if err == protectedts.ErrNotExists || err == nil { |
There was a problem hiding this comment.
Should we use errors.Is here?
| } | ||
|
|
||
| // TODO(casper): handle timeout when we have very large tenant with | ||
| // saving checkpoints and retrying. |
There was a problem hiding this comment.
I believe we have some planned work to ensure that the tenant is offline throughout the ingestion process.
41665f3 to
1ca080f
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 313 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I would move this function into a helper.
Done.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 341 at r4 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I believe we have some planned work to ensure that the tenant is offline throughout the ingestion process.
Yes, this is the work that Shiranka is working on. Added a TODO here.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 345 at r4 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
No,
ClearRangedoesn't return resume spans.
Okay. Removed. Added a TODO to deal with large tenant.
pkg/ccl/streamingccl/streamproducer/stream_lifetime.go line 162 at r3 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Should we use errors.Is here?
Done
pkg/sql/sem/builtins/replication_builtins.go line 303 at r3 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Do we ever not want to do this? I.e. do we imagine this function, which intended to be invoked from destination cluster, will ever want to invoke this set to false?
Is the intention to store some information about cutover event in the src cluster? If so, perhaps having a timestamp makes more sense. Or even some sort of proto message indicating the reason for cutover, timestamp, etc..?
This bool basically tells whether the stream ingestion ends with cutover or not. If it ends with cutover, then this is a successful ingestion, we mark producer job succeeded. If no, then we don't need to track this stream replication anymore, so we just cancel the producer job. I think logically this makes sense.
I made two commits on this PR, first commit introducing this flag, purpose being to support canceling the producer job when the ingestion job is being canceled and tenant key range gets reverted.
pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 370 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
stray fmt.Println
Done.
Code quote:
fterpkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 482 at r4 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
nice.
Done.
| // ingestion anymore. | ||
| defer cancelProducerJob(ctx, | ||
| streaming.StreamID(details.StreamID), | ||
| streamingccl.StreamAddress(details.StreamAddress)) |
There was a problem hiding this comment.
you are missing () after defer, I believe.
Also, did any tests fail?
There was a problem hiding this comment.
Do we want to cancel if db.Run below failed?
| // TODO(casper): deal with very large tenant. | ||
| // TODO(casper): ensure the tenant is offline while we revert it. | ||
| var b kv.Batch | ||
| b.AddRawRequest(&roachpb.ClearRangeRequest{ |
There was a problem hiding this comment.
Don't we want to to do this only if job was cancelled? And not e.g. because some error happen?
gh-casper
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 334 at r9 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Do we want to cancel if db.Run below failed?
No. cancelProducerJob is a function defined outside of this function body. Adding () after defer doesn't make it compile.
Yes, if db.Run below failed, we still want to cancel the producer job because the source job's
protected timestamp is no longer needed as this ingestion job won't resume ingestion anymore (already in cancel-requested status). This makes me consider doing cancelProducerJob before ClearRange.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 339 at r9 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Don't we want to to do this only if job was cancelled? And not e.g. because some error happen?
We will pause on error, so only CANCEL JOB can trigger the ClearRange.
1ca080f to
6af99c6
Compare
| // The job will ingest events from StartTime onwards during the current run. | ||
| // This may change when the the ingestion job gets resumed from the previous checkpoint. | ||
| util.hlc.Timestamp start_time = 3 [(gogoproto.nullable) = false]; | ||
|
|
There was a problem hiding this comment.
seems like it's a pure code movement? any reasons for this?
| } | ||
| // Make sure the producer job is active before start the stream replication. | ||
| var status streampb.StreamReplicationStatus | ||
| for r := retry.Start(ro); r.Next(); { |
There was a problem hiding this comment.
Are there any cases when the reason why we hit this loop (e.g. retry due to an error) is because the error was with the client? And in such cases we would want to re-create client to re-establish connection?
It feels like this retry loop belongs "on top":
for retry ... {
inject()
}
func ingest() {
client = connect;
if err { return }
return distIngest()
}
| // Make sure the producer job is active before start the stream replication. | ||
| var status streampb.StreamReplicationStatus | ||
| for r := retry.Start(ro); r.Next(); { | ||
| status, err = client.Heartbeat(ctx, streamID, startTime) |
There was a problem hiding this comment.
I thought that the ingestion processor was supposed to heartbeat src cluster periodically; I only see this happen here... where do we periodically ping src cluster?
| streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( | ||
| streamAddress, topology, sqlInstanceIDs, initialHighWater, ingestionJobID, streamID, oldTenantID, newTenantID) | ||
| streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime, | ||
| ingestionJob.ID(), streamID, details.TenantID, details.NewTenantID) |
There was a problem hiding this comment.
I don't see in this code how do we ever retry (i.e. hit the for r := retry.Start(ro); r.Next(); { line). It seems that we bubble up every error encountered in this function.
distStreamIngestion for example could return an error for any number of legitimate reasons (node crashed; or perhaps restarted, etc).
I think part of this is code structure, as mentioned above:
have a resumer call a function ingestWithRetry; that function should be simple:
func ingestWithRetry() {
for retry ... {
err := ingest()
// Handle error appropriately
if retryableError {
// reload job; log error, etc
} else {
// pause the job
}
How we handle errors -- depends. We could say all errors get retried, or perhaps we retry some of the errors.
gh-casper
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 139 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Are there any cases when the reason why we hit this loop (e.g. retry due to an error) is because the error was with the client? And in such cases we would want to re-create client to re-establish connection?
It feels like this retry loop belongs "on top":
for retry ... {
inject()
}func ingest() {
client = connect;
if err { return }
return distIngest()
}
Nah, this retry is just make sure the producer job is active/running until we start doing the real ingestion(plan, ingest, cutover).
This is useful in two scenarios:
- when we create the stream ingestion at the first place, the producer job maybe still pending or not adopted yet (it's an adoptable job).
- when we resume the stream ingestion through
RESUME JOB id, we want to make sure the producer job doesn't time out and is still pending, otherwise we will pause on error again.
Yes, you are right about that we should retry the whole ingest as well. I intend to do it in another PR which covers retry in both ingesting stage and clear range stage in the case of cancel. And this work is tracked here. #83450
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 140 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I thought that the ingestion processor was supposed to heartbeat src cluster periodically; I only see this happen here... where do we periodically ping src cluster?
This ping is just to check the producer job status as we need to make sure the producer job is up running either when we first start it or resume from paused , Heartbeat is the perfect function to call here.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 184 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I don't see in this code how do we ever retry (i.e. hit the
for r := retry.Start(ro); r.Next(); {line). It seems that we bubble up every error encountered in this function.distStreamIngestion for example could return an error for any number of legitimate reasons (node crashed; or perhaps restarted, etc).
I think part of this is code structure, as mentioned above:
have a resumer call a functioningestWithRetry; that function should be simple:func ingestWithRetry() { for retry ... { err := ingest() // Handle error appropriately if retryableError { // reload job; log error, etc } else { // pause the job }How we handle errors -- depends. We could say all errors get retried, or perhaps we retry some of the errors.
Yes, I agree to this. Retry is a big thing I need to tackle, just didn't want to include it in this PR as it's relatively large and needs its own tests.
#83450 tracks this work and its my next step.
pkg/jobs/jobspb/jobs.proto line 114 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
seems like it's a pure code movement? any reasons for this?
Start_time means where we will resume from last ingestion. It's empty when start the ingestion at the first place and it's the last checkpoint when resume ingestion. From the perspective of the whole stream ingestion journey, it's more of a progress information. So I moved it from StreamIngestionDetails to StreamIngestionProgress.
If that's the purpose, then I suggest you introduce Even so, I find that your stated purpose of ensuring |
6af99c6 to
7b99411
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Sure, moved it into waitUntilProducerActive . We should return an error when heartbeat fails. If producer job is not created yet or pending, or other transient status, it won't report error. Error happens when there is connection error or other runtime error on the producer job.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
|
Also, is it possible to rebase so that already merged PRs do not make
situation more complicated?
…On Tue, Jun 28, 2022 at 8:21 PM gh-casper ***@***.***> wrote:
***@***.**** commented on this pull request.
Sure, moved it into waitUntilProducerActive . We should return an error
when heartbeat fails. If producer job is not created yet or pending, or
other transient status, it won't report error. Error happens when there is
connection error or other runtime error on the producer job.
*Reviewable
<https://reviewable.io/reviews/cockroachdb/cockroach/83310#-:-N5gk_W17fCF9kfxVtlv:bz377ej>*
status: [image:
|
miretskiy
left a comment
There was a problem hiding this comment.
I would highly encourage that after addressing comments, this PR is split into independent PRs to simplify review process.
Reviewed 4 of 10 files at r6, 8 of 15 files at r7, 11 of 14 files at r8, 1 of 1 files at r11, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @gh-casper, @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go line 405 at r11 (raw file):
if !ok { err = errors.Errorf("received progress update from unrecognized partition %s", partition) return true
I have to say, or even insist, that changes like this deserve to be in separate PR; The change to
introduce
// Mapping between a source cluster partition ID and its subscribing sql instance ID
// in the destination cluster. This describes the flow topology of the replication stream.
map<string, uint32> subscribing_sql_instances = 6 [(gogoproto.nullable) = false,
(gogoproto.customname) = "SubscribingSQLInstances"];
seems innocuous enough. But it's not -- it's an important change that serializes topology information
on the destination cluster, and ensures that this topology is correct -- for example by returning the error above. Serialization of state like this raises questions as to the correctness of such state when resuming.
I'm not necessarily saying that this is bad. But I am saying that such changes should be considered carefully.
For example, this is a frontier processor; this progress is initialized when frontier processor has been constructed. Part of this initialization is the initialization of the above subscribing sql instances.
What happens if that set changes? What happens if say src cluster introduces a new node, and decomissions the old one? The frontier processor needs to retry the whole flow -- but it's not clear to me what happens with that map. Is it updated because we retry? Where? Is it possible that this map gets out of date?
I just have hard time answering these questions and if the review makes it hard for me to answer those questions, perhaps such changes deserve to be reviewed independently and with enough surrounding context.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 76 at r11 (raw file):
} lagInfo.SlowestSourcePartitionTimestamp = hlc.MaxTimestamp lagInfo.FastestSourcePartitionTimestamp = hlc.MinTimestamp
not sure what these are meant to accomplish... Seems to me that slow/fast is pretty little information
that doesn't actually convey much. If the whole thing is stuck, both slow and wait will be pretty close;
have you considered just using span.Frontier for this?
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 113 at r11 (raw file):
// Ping the producer job until it is active/running, returns nil when // the job is active.
nit: waits until the job is active?
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 125 at r11 (raw file):
Multiplier: 2, MaxBackoff: 10 * time.Second, MaxRetries: 5,
So, we have 1 + 2 + 4 + 8 + 16 = 31 seconds for the job to become active?
Is that a bit too aggressive?
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 133 at r11 (raw file):
if err != nil { return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error", ingestionJobID)
help me understand: we attempt to heartbeat the client. Connection to source cluster fails; we then immediately return from this function. What is the point of the retry loop above?
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 232 at r11 (raw file):
// Resume is part of the jobs.Resumer interface. func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error { p := execCtx.(sql.JobExecContext)
nit: while here: p is a bit of a remnant from the time when this used to be Planner;
mind renaming it to execCtx?
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 241 at r11 (raw file):
errorMessage := fmt.Sprintf(errorFmt, err) log.Warningf(resumeCtx, errorFmt, err) return s.job.PauseRequested(resumeCtx, p.Txn(), func(ctx context.Context,
What happens on the producer? Should we attempt to best effort pause it?
Do we rely on the fact that the producer should also pause itself?
Please add a comment about this.
pkg/ccl/streamingccl/streampb/stream.proto line 148 at r11 (raw file):
// among all partitions. google.protobuf.Duration slowest_fastest_partition_ingestion_lag = 7 [(gogoproto.nullable) = false, (gogoproto.stdduration) = true];
have you considered just storing the span frontier or portion of this frontier
instead of restricting to 2 timestamps/spans?
pkg/ccl/streamingccl/streamproducer/stream_lifetime.go line 252 at r11 (raw file):
func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { if md.Status == jobs.StatusRunning || md.Status == jobs.StatusPending { if !ingestionCutover {
having descriptive status/reason why we are completing stream would improve api;
it will also allow you to update producer job description and record some metadata -- such as
when was it completed, why, etc....
pkg/streaming/api.go line 78 at r11 (raw file):
// 'ingestionCutover' indicates whether the stream ingestion has been cutover. CompleteReplicationStream( evalCtx *eval.Context, txn *kv.Txn, streamID StreamID, ingestionCutover bool,
I'm not a fan of this boolean argument; I honestly don't know what it means.
Presumably it's true if it's a "normal" cutover... Why do we care that the source
cluster knows about the reason why it's being canceled? Is it observability?
Does it make sense to have a "reason" as some type of a enum or a struct or a string, or even
an error?
pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 70 at r11 (raw file):
`) }, srcNumNodes: 1,
perhaps we should use more than 1
7b99411 to
d126d93
Compare
gh-casper
left a comment
There was a problem hiding this comment.
My assumption of this whole review process of the two dependent PRs (pause on error and clear range on cancel) was you only review the relevant commits to the PR description. Take this PR for example, only review the top two commits. To review the other commit, please include the comment in the other PR.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 76 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
not sure what these are meant to accomplish... Seems to me that slow/fast is pretty little information
that doesn't actually convey much. If the whole thing is stuck, both slow and wait will be pretty close;
have you considered just using span.Frontier for this?
See the other comment related to this. Adding a TODO here.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 125 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
So, we have 1 + 2 + 4 + 8 + 16 = 31 seconds for the job to become active?
Is that a bit too aggressive?
Changed it to 1 + 2 + 4 + 5 = 11 seconds.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 133 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
help me understand: we attempt to heartbeat the client. Connection to source cluster fails; we then immediately return from this function. What is the point of the retry loop above?
The retry loop is to deal with transient producer job status like pending, cancel-requested, reverting. Connection failure will make the ingestion error out and pause, instead of retrying here. Users will fix that and resume job manually.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 241 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
What happens on the producer? Should we attempt to best effort pause it?
Do we rely on the fact that the producer should also pause itself?Please add a comment about this.
Done.
Code quote:
PauseRequestedpkg/ccl/streamingccl/streamproducer/stream_lifetime.go line 252 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
having descriptive status/reason why we are completing stream would improve api;
it will also allow you to update producer job description and record some metadata -- such as
when was it completed, why, etc....
Made several changes:
Replace ingestionCutover with successfulIngestion as it makes more sense. Added updating running status when we complete the replication stream.
pkg/streaming/api.go line 78 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I'm not a fan of this boolean argument; I honestly don't know what it means.
Presumably it's true if it's a "normal" cutover... Why do we care that the source
cluster knows about the reason why it's being canceled? Is it observability?
Does it make sense to have a "reason" as some type of a enum or a struct or a string, or even
an error?
It is not the reason we care about; we just need to tell whether ingestion finished successfully so that the producer job can be succeeded or canceled, so I now replaced ingestionCutover with successfulIngestion.
pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 70 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
perhaps we should use more than 1
Using more than 1 doesn't make any difference now as we don't have more than one partition. There is ongoing work to test multiple partitions. I added a TODO item before in the test file, don't think I need to add a TODO here again.
pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go line 405 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I have to say, or even insist, that changes like this deserve to be in separate PR; The change to
introduce// Mapping between a source cluster partition ID and its subscribing sql instance ID // in the destination cluster. This describes the flow topology of the replication stream. map<string, uint32> subscribing_sql_instances = 6 [(gogoproto.nullable) = false, (gogoproto.customname) = "SubscribingSQLInstances"];seems innocuous enough. But it's not -- it's an important change that serializes topology information
on the destination cluster, and ensures that this topology is correct -- for example by returning the error above. Serialization of state like this raises questions as to the correctness of such state when resuming.I'm not necessarily saying that this is bad. But I am saying that such changes should be considered carefully.
For example, this is a frontier processor; this progress is initialized when frontier processor has been constructed. Part of this initialization is the initialization of the above subscribing sql instances.
What happens if that set changes? What happens if say src cluster introduces a new node, and decomissions the old one? The frontier processor needs to retry the whole flow -- but it's not clear to me what happens with that map. Is it updated because we retry? Where? Is it possible that this map gets out of date?I just have hard time answering these questions and if the review makes it hard for me to answer those questions, perhaps such changes deserve to be reviewed independently and with enough surrounding context.
It was in the separate PR (stream_ingestion_stats) and this thing already got checked in.
This map is populated when we create a new flow. We will create a new flow (re-plan) when we will retry. In the future, we will also re-plan on the go if we don't retry, for example, src cluster topology changes and there are so many non-local rangefeeds or dest cluster instances expand or shrink.
pkg/ccl/streamingccl/streampb/stream.proto line 148 at r11 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
have you considered just storing the span frontier or portion of this frontier
instead of restricting to 2 timestamps/spans?
Yes, have considered this and intended to track them once we switch from tracking partition id as frontier to tracking actual spans as frontier in the frontier_processor in the near future. Added a TODO here.
gh-casper
left a comment
There was a problem hiding this comment.
But now that we have all the comments in this PR, we just keep them here (no need to comment on the pause-on-erorr PR). The stream-ingestion-stats PR get merged in, no need to worry. I'll merge pause-on-error PR separately once this PR is approved.
Next time, I won't create separate PRs that have dependency again. It just cause too much confusion given that Github doesn't support it well. Also it takes too much time for me to do the syncing and rebasing.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)
I understand that this is sometimes inconvenient; but you should optimize for the reader -- i.e the person reviewing the code. |
d126d93 to
8fc7730
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Done, Please review.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)
7c49f26 to
040c8c2
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Did some investigation on the bytes-left-after-clearrange bug. In the test, I shut downs the tenant SQL servers before canceling the ingestion. This is not ideal, but we may have a follwup PR that disallow any process to touch this tenant's key range after the ingestion job gets canceled unless user creates another ingestion.
Please review again. @miretskiy @stevendanna
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)
040c8c2 to
31128b2
Compare
stevendanna
left a comment
There was a problem hiding this comment.
Thanks for pushing this forward. As we discussed, I think the only must-have is removing the cleanup on cancellation for now.
| }) | ||
| } | ||
|
|
||
| func (s *streamIngestionResumer) cancelProducerJob( |
There was a problem hiding this comment.
We don't have to do it in this PR. But I wonder if this should be configurable. Since this will release the protected timestamp on the producer side.
| if tr.State == descpb.TenantInfo_ACTIVE { | ||
| return errors.Errorf("error in cancel stream ingestion: cannot delete an active tenant %s", | ||
| details.NewTenantID) | ||
| } | ||
|
|
||
| // TODO(casper): deal with very large tenant. | ||
| tenantInfo := descpb.TenantInfo{ | ||
| ID: p.GetStreamIngestion().NewTenantID.ToUint64(), | ||
| State: descpb.TenantInfo_DROP, | ||
| } | ||
| // GC the tenant's data as well as the system tenant's knowledge about this tenant. | ||
| if err = sql.GCTenantSync(ctx, jobExecCtx.ExecCfg(), &tenantInfo); err != nil { | ||
| log.Warningf(ctx, | ||
| "encountered error when clearing tenant key ranges for ingestion job %d", s.job.ID()) | ||
| return err | ||
| } |
There was a problem hiding this comment.
Per recent discussions, I think for now we may want to avoid doing this altogether and have a second command: either just SELECT crdb_internal.destroy_tenant(ID, true) or a DROP TENANT command that we create.
| if isTimedOut(j) { | ||
| return errors.Errorf("replication stream %d timed out", p.job.ID()) | ||
|
|
||
| prog := j.Progress() |
There was a problem hiding this comment.
Should we also be checking for this condition at startup to avoid having to wait for one liveness tracking interval?
| return p.releaseProtectedTimestamp(ctx, execCfg) | ||
| case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY: | ||
| return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { | ||
| ju.UpdateStatus(jobs.StatusCancelRequested) |
There was a problem hiding this comment.
Fine for now, but lets think through whether we want to release the protected timestamp in the case of it finishing unnecessfully.
31128b2 to
ee5315d
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Was trying to add logic to wait a while after the last ingested time using TrailingMetaCallback, but it turns out it's non-trivial work: we have to let the frontier processor to collect trailing ProducerMetadatas from all ingestion processors, this needs some modification to the code. Will file another issue for this.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 342 at r16 (raw file):
Previously, stevendanna (Steven Danna) wrote…
We don't have to do it in this PR. But I wonder if this should be configurable. Since this will release the protected timestamp on the producer side.
As mentioned in other comment, we can drop the producer job protected timestamp once we enter cancel-requested state.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 400 at r16 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Per recent discussions, I think for now we may want to avoid doing this altogether and have a second command: either just
SELECT crdb_internal.destroy_tenant(ID, true)or aDROP TENANTcommand that we create.
Done.
pkg/ccl/streamingccl/streamproducer/producer_job.go line 104 at r16 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Should we also be checking for this condition at startup to avoid having to wait for one liveness tracking interval?
Done.
Code quote:
prog := j.Progress()pkg/ccl/streamingccl/streamproducer/producer_job.go line 110 at r16 (raw file):
Previously, stevendanna (Steven Danna) wrote…
Fine for now, but lets think through whether we want to release the protected timestamp in the case of it finishing unnecessfully.
If we decide to cancel and once the job enters cancel-requested state, we won't be able to utilize the producer job's protected timestamp anymore: we cannot unpause a cancel-requested job unless we support restoring an existing tenant with deprecated StartFrom. Either way, we have to clean up this producer job.
ee5315d to
2683ab1
Compare
miretskiy
left a comment
There was a problem hiding this comment.
Reviewed 1 of 20 files at r13, 2 of 27 files at r15, 1 of 11 files at r17, 1 of 4 files at r18.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)
to support canceling a producer job Previously we can only make a producer job normally finish through complete_stream_replication after ingestion is cutover. Now we expand complete_stream_replication to take 2nd argument 'successfulIngestion' to support complete replication stream even when cutover doesn't happen, e.g., ingestion gets canceled and a revert happens, this cancels the producer job. Release note (sql change): expand crdb_internal.complete_stream_replication to take successfulIngestion argument, which indicates if this stream ingestion finished successfully.
This commit supports the intended behavior when canceling an ingestion job: canceling the producer job which releases the protected timestamp for the source tenant. The destination tenant key ranges are left for the user to clean up with crdb_internal.destroy_tenant Release note: None
2683ab1 to
a71ddbc
Compare
|
bors r+ |
|
Build succeeded: |
Closes: #57409