Skip to content

streamingccl: add retry into stream ingestion job#85432

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
gh-casper:running-status
Sep 1, 2022
Merged

streamingccl: add retry into stream ingestion job#85432
craig[bot] merged 1 commit intocockroachdb:masterfrom
gh-casper:running-status

Conversation

@gh-casper
Copy link
Copy Markdown
Contributor

@gh-casper gh-casper commented Aug 1, 2022

This PR support ingestion job to have its own job
retry mechanism. All errors are retryable by default
unless marked as permanent job error.

Also add running status into job progress when
the job reaches various stages.

Release note: None
Release justification: Cat 4

Closes: #83450
Closes: #82509

@gh-casper gh-casper requested a review from a team as a code owner August 1, 2022 21:59
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

}
ingestWithClient := func() error {
streamID := streaming.StreamID(details.StreamID)
updateRunningStatus(ctx, ingestionJob, fmt.Sprintf("connecting to the producer job %d", streamID))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to have just one running status that covers both the connecting and planning. Updating the job status isn't free, so let's keep it to the highest value ones.

log.Infof(ctx,
"starting to revert to the specified cutover timestamp for stream ingestion job %d",
ingestionJob.ID())
updateRunningStatus(ctx, ingestionJob, "starting to cut over to the given timestamp")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we include the timesstamp in this message?

}

log.Infof(ctx, "activating destination tenant %d", details.NewTenantID)
updateRunningStatus(ctx, ingestionJob, "activating destination tenant")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can elide this one. I think it is reasonable that "cutover" includes the tenant activation.

updateRunningStatus(ctx, ingestionJob, "running the SQL flow for the stream ingestion job")
if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, ingestionJob.ID(), planCtx, dsp,
streamIngestionSpecs, streamIngestionFrontierSpec); err != nil {
fmt.Println("ctx after running ingestion flow: ", ctx)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray println

Comment on lines +297 to +314
ro := retry.Options{
InitialBackoff: 3 * time.Second,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
MaxRetries: 5,
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue here is that a failure that happened 2 days ago feels like it should be irrelevant to a failure that happens today, but because nothing resets our retry counter, that failure matters forever in terms of both the backoff and the max count.

I wonder, is there a reason to ever stop retrying? What's the advantage of going into a paused state vs retrying? I suppose it means we stop forcing the source cluster to do work.

Comment on lines +308 to +319
if retryCount != 0 {
status = fmt.Sprintf("retrying stream ingestion in the %d round with previous error: %s", retryCount, err)
}
updateRunningStatus(ctx, ingestionJob, status)
err = ingest(ctx, execCtx, ingestionJob)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These statuses are going to be pretty rapidly overwritten by the status' we set in ingest(). Perhaps we should just set the status after we get the err so that the user knows that we've gotten an error and are waiting to retry.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 207 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

We might want to have just one running status that covers both the connecting and planning. Updating the job status isn't free, so let's keep it to the highest value ones.

Done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 260 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

stray println

Done.

Code quote:

fmt.Println

pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 271 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

should we include the timesstamp in this message?

Done. Added in maybeRevertToCutoverTimestamp.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 277 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

I think we can elide this one. I think it is reasonable that "cutover" includes the tenant activation.

Done.

Code quote:

log.Infof(ctx, "activating destination tenant %d", details.NewTenantID)

pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 302 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

One issue here is that a failure that happened 2 days ago feels like it should be irrelevant to a failure that happens today, but because nothing resets our retry counter, that failure matters forever in terms of both the backoff and the max count.

I wonder, is there a reason to ever stop retrying? What's the advantage of going into a paused state vs retrying? I suppose it means we stop forcing the source cluster to do work.

If the destination keeps failing, I think it's reasonable to just let it pause and it makes it easier for operator to fix something in a paused state . I can make the retry to last longer.

I think later it also makes sense to let the producer side tells the ingestion side when the producer job will expire and update it in running status or after pausing.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 312 at r1 (raw file):

Previously, stevendanna (Steven Danna) wrote…

These statuses are going to be pretty rapidly overwritten by the status' we set in ingest(). Perhaps we should just set the status after we get the err so that the user knows that we've gotten an error and are waiting to retry.

Done.


ctx := context.Background()
ingestErrCh := make(chan error, 1)
ingestionTimes := 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ingestionStarts could be more clear here

Comment on lines +323 to +324
updateRunningStatus(ctx, ingestionJob,
fmt.Sprintf("stream ingestion waits for retrying after error %s", err))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming updateRunningStatus doesn't log anything, it'd be nice to log every time we're about to retry.

Comment on lines +306 to +314
ro := retry.Options{
InitialBackoff: 3 * time.Second,
Multiplier: 2,
MaxBackoff: 1 * time.Minute,
MaxRetries: 60,
}
Copy link
Copy Markdown
Contributor

@samiskin samiskin Aug 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Steven's earlier comment seems like it'd still apply here, where this is expected to be an incredibly long running job and persisting consequences of previous eventually-successful retries is probably not great.

If you'd rather address that in a different issue/pr I'd at least put a comment here highlighting the concern.

Comment on lines +781 to +813
_, alternateSrcTenantConn := serverutils.StartTenant(t, c.srcCluster.Server(1), base.TestTenantArgs{TenantID: c.args.srcTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
_, alternateSrcTenantConn := serverutils.StartTenant(t, c.srcCluster.Server(1),
base.TestTenantArgs{TenantID: c.args.srcTenantID, DisableCreateTenant: true, SkipTenantCheck: true})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part we can just leave SkipTenantCheck being unset, since the source tenant should exist and should be active and we don't care why this fails if that isn't the case. The purpose of the other place with SkipTenantCheck was to try to connect to an inactive destination tenant and verify that even without a testserver's TenantCheck it still fails.

This PR support ingestion job to have its own job
retry mechanism. All errors are retryable by default
unless marked as permanent job error.

Also add running status into job progress when
the job reaches various stages.

Release note: None

Release justificationi: low risk, high benefit
changes to existing functionality
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 311 at r2 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

Steven's earlier comment seems like it'd still apply here, where this is expected to be an incredibly long running job and persisting consequences of previous eventually-successful retries is probably not great.

If you'd rather address that in a different issue/pr I'd at least put a comment here highlighting the concern.

I think what he meant in previous comment is that if we update status at the beginning of the retry, it will gets overridden very quick by the following update. But if we update the status after the a retry fails, the status can be shown for quite a while before the next retry (max backoff is 1 minute).


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 324 at r2 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

Assuming updateRunningStatus doesn't log anything, it'd be nice to log every time we're about to retry.

Done.


pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 531 at r2 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

ingestionStarts could be more clear here

Done.


pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 813 at r2 (raw file):

Previously, samiskin (Shiranka Miskin) wrote…

This part we can just leave SkipTenantCheck being unset, since the source tenant should exist and should be active and we don't care why this fails if that isn't the case. The purpose of the other place with SkipTenantCheck was to try to connect to an inactive destination tenant and verify that even without a testserver's TenantCheck it still fails.

Done.

Code quote:

alternateSrcSysSQL

@gh-casper
Copy link
Copy Markdown
Contributor Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Sep 1, 2022

Build succeeded:

@craig craig bot merged commit b316a5e into cockroachdb:master Sep 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

4 participants