streamingccl: add retry into stream ingestion job#85432
streamingccl: add retry into stream ingestion job#85432craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
| } | ||
| ingestWithClient := func() error { | ||
| streamID := streaming.StreamID(details.StreamID) | ||
| updateRunningStatus(ctx, ingestionJob, fmt.Sprintf("connecting to the producer job %d", streamID)) |
There was a problem hiding this comment.
We might want to have just one running status that covers both the connecting and planning. Updating the job status isn't free, so let's keep it to the highest value ones.
| log.Infof(ctx, | ||
| "starting to revert to the specified cutover timestamp for stream ingestion job %d", | ||
| ingestionJob.ID()) | ||
| updateRunningStatus(ctx, ingestionJob, "starting to cut over to the given timestamp") |
There was a problem hiding this comment.
should we include the timesstamp in this message?
| } | ||
|
|
||
| log.Infof(ctx, "activating destination tenant %d", details.NewTenantID) | ||
| updateRunningStatus(ctx, ingestionJob, "activating destination tenant") |
There was a problem hiding this comment.
I think we can elide this one. I think it is reasonable that "cutover" includes the tenant activation.
| updateRunningStatus(ctx, ingestionJob, "running the SQL flow for the stream ingestion job") | ||
| if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJob.ID(), planCtx, dsp, | ||
| streamIngestionSpecs, streamIngestionFrontierSpec); err != nil { | ||
| fmt.Println("ctx after running ingestion flow: ", ctx) |
| ro := retry.Options{ | ||
| InitialBackoff: 3 * time.Second, | ||
| Multiplier: 2, | ||
| MaxBackoff: 10 * time.Second, | ||
| MaxRetries: 5, | ||
| } |
There was a problem hiding this comment.
One issue here is that a failure that happened 2 days ago feels like it should be irrelevant to a failure that happens today, but because nothing resets our retry counter, that failure matters forever in terms of both the backoff and the max count.
I wonder, is there a reason to ever stop retrying? What's the advantage of going into a paused state vs retrying? I suppose it means we stop forcing the source cluster to do work.
| if retryCount != 0 { | ||
| status = fmt.Sprintf("retrying stream ingestion in the %d round with previous error: %s", retryCount, err) | ||
| } | ||
| updateRunningStatus(ctx, ingestionJob, status) | ||
| err = ingest(ctx, execCtx, ingestionJob) |
There was a problem hiding this comment.
These statuses are going to be pretty rapidly overwritten by the status' we set in ingest(). Perhaps we should just set the status after we get the err so that the user knows that we've gotten an error and are waiting to retry.
e9918cf to
97dd180
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 207 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
We might want to have just one running status that covers both the connecting and planning. Updating the job status isn't free, so let's keep it to the highest value ones.
Done.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 260 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
stray println
Done.
Code quote:
fmt.Printlnpkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 271 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
should we include the timesstamp in this message?
Done. Added in maybeRevertToCutoverTimestamp.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 277 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
I think we can elide this one. I think it is reasonable that "cutover" includes the tenant activation.
Done.
Code quote:
log.Infof(ctx, "activating destination tenant %d", details.NewTenantID)pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 302 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
One issue here is that a failure that happened 2 days ago feels like it should be irrelevant to a failure that happens today, but because nothing resets our retry counter, that failure matters forever in terms of both the backoff and the max count.
I wonder, is there a reason to ever stop retrying? What's the advantage of going into a paused state vs retrying? I suppose it means we stop forcing the source cluster to do work.
If the destination keeps failing, I think it's reasonable to just let it pause and it makes it easier for operator to fix something in a paused state . I can make the retry to last longer.
I think later it also makes sense to let the producer side tells the ingestion side when the producer job will expire and update it in running status or after pausing.
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 312 at r1 (raw file):
Previously, stevendanna (Steven Danna) wrote…
These statuses are going to be pretty rapidly overwritten by the status' we set in ingest(). Perhaps we should just set the status after we get the err so that the user knows that we've gotten an error and are waiting to retry.
Done.
|
|
||
| ctx := context.Background() | ||
| ingestErrCh := make(chan error, 1) | ||
| ingestionTimes := 0 |
There was a problem hiding this comment.
ingestionStarts could be more clear here
| updateRunningStatus(ctx, ingestionJob, | ||
| fmt.Sprintf("stream ingestion waits for retrying after error %s", err)) |
There was a problem hiding this comment.
Assuming updateRunningStatus doesn't log anything, it'd be nice to log every time we're about to retry.
| ro := retry.Options{ | ||
| InitialBackoff: 3 * time.Second, | ||
| Multiplier: 2, | ||
| MaxBackoff: 1 * time.Minute, | ||
| MaxRetries: 60, | ||
| } |
There was a problem hiding this comment.
Steven's earlier comment seems like it'd still apply here, where this is expected to be an incredibly long running job and persisting consequences of previous eventually-successful retries is probably not great.
If you'd rather address that in a different issue/pr I'd at least put a comment here highlighting the concern.
| _, alternateSrcTenantConn := serverutils.StartTenant(t, c.srcCluster.Server(1), base.TestTenantArgs{TenantID: c.args.srcTenantID, DisableCreateTenant: true, SkipTenantCheck: true}) | ||
| _, alternateSrcTenantConn := serverutils.StartTenant(t, c.srcCluster.Server(1), | ||
| base.TestTenantArgs{TenantID: c.args.srcTenantID, DisableCreateTenant: true, SkipTenantCheck: true}) |
There was a problem hiding this comment.
This part we can just leave SkipTenantCheck being unset, since the source tenant should exist and should be active and we don't care why this fails if that isn't the case. The purpose of the other place with SkipTenantCheck was to try to connect to an inactive destination tenant and verify that even without a testserver's TenantCheck it still fails.
97dd180 to
556c370
Compare
This PR support ingestion job to have its own job retry mechanism. All errors are retryable by default unless marked as permanent job error. Also add running status into job progress when the job reaches various stages. Release note: None Release justificationi: low risk, high benefit changes to existing functionality
556c370 to
745e6be
Compare
gh-casper
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 311 at r2 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
Steven's earlier comment seems like it'd still apply here, where this is expected to be an incredibly long running job and persisting consequences of previous eventually-successful retries is probably not great.
If you'd rather address that in a different issue/pr I'd at least put a comment here highlighting the concern.
I think what he meant in previous comment is that if we update status at the beginning of the retry, it will gets overridden very quick by the following update. But if we update the status after the a retry fails, the status can be shown for quite a while before the next retry (max backoff is 1 minute).
pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 324 at r2 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
Assuming
updateRunningStatusdoesn't log anything, it'd be nice to log every time we're about to retry.
Done.
pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 531 at r2 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
ingestionStartscould be more clear here
Done.
pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 813 at r2 (raw file):
Previously, samiskin (Shiranka Miskin) wrote…
This part we can just leave
SkipTenantCheckbeing unset, since the source tenant should exist and should be active and we don't care why this fails if that isn't the case. The purpose of the other place withSkipTenantCheckwas to try to connect to an inactive destination tenant and verify that even without a testserver's TenantCheck it still fails.
Done.
Code quote:
alternateSrcSysSQL|
bors r+ |
|
Build succeeded: |
This PR support ingestion job to have its own job
retry mechanism. All errors are retryable by default
unless marked as permanent job error.
Also add running status into job progress when
the job reaches various stages.
Release note: None
Release justification: Cat 4
Closes: #83450
Closes: #82509