Skip to content

streamingccl: support canceling an ingestion job#83310

Merged
craig[bot] merged 2 commits intocockroachdb:masterfrom
gh-casper:cancel-ingestion
Jul 31, 2022
Merged

streamingccl: support canceling an ingestion job#83310
craig[bot] merged 2 commits intocockroachdb:masterfrom
gh-casper:cancel-ingestion

Conversation

@gh-casper
Copy link
Copy Markdown
Contributor

@gh-casper gh-casper commented Jun 24, 2022

This PR supports the intended behavior when canceling
an ingestion job: canceling the producer job which releases the
protected timestamp for the source tenant. The
destination tenant key ranges are left for
the user to clean up with crdb_internal.destroy_tenant.

This PR also expands crdb_internal.complete_stream_replication
to support canceling a producer job to take 2nd argument
'successfulIngestion' to support complete replication stream
even when cutover doesn't happen, e.g., ingestion gets canceled
and a revert happens, this cancels the producer job

Release note (sql change): expand crdb_internal.complete_stream_replication
to take successfulIngestion argument, which indicates if this
stream ingestion finished successfully.

Closes: #57409

@gh-casper gh-casper requested review from a team, miretskiy, samiskin and stevendanna June 24, 2022 07:50
@gh-casper gh-casper requested review from a team as code owners June 24, 2022 07:50
@gh-casper gh-casper requested review from a team June 24, 2022 07:50
@gh-casper gh-casper requested a review from a team as a code owner June 24, 2022 07:50
@gh-casper gh-casper added the A-tenant-streaming Including cluster streaming label Jun 24, 2022
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please only review the top two commits.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)

return err
}

defer func() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this function into a helper.

}

// TODO(casper): handle timeout when we have very large tenant with
// saving checkpoints and retrying.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about timeout (we haven't set it have we? And the context doesn't have a deadline set either); but I do think we need to add checks to ensure the requirements specified by the ClearRange request: namely, we need to ensure that the spans are offline.
https://github.com/cockroachdb/cockroach/blob/master/pkg/roachpb/api.proto#L384-L384

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have some planned work to ensure that the tenant is offline throughout the ingestion process.

spans = spans[:0]
for _, raw := range b.RawResponse().Responses {
r := raw.GetClearRange()
if r.ResumeSpan != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure GetClearRange returns resume spans. I mean it has a header, and the field; but I'm not sure it ever returns resume spans. My hunch is not; but would be nice to double check with @erikgrinaker

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, ClearRange doesn't return resume spans.


// Pause ingestion.
c.destSysSQL.Exec(t, fmt.Sprintf("PAUSE JOB %d", ingestionJobID))
fmt.Println("after pausing job")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stray fmt.Println

destTenantPrefix := keys.MakeTenantPrefix(c.args.destTenantID)
rows, err := c.destKvDB.Scan(ctx, destTenantPrefix, destTenantPrefix.PrefixEnd(), 10)
require.NoError(t, err)
require.Empty(t, rows)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

tree.Overload{
Types: tree.ArgTypes{
{"stream_id", types.Int},
{"ingestion_cutover", types.Bool},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever not want to do this? I.e. do we imagine this function, which intended to be invoked from destination cluster, will ever want to invoke this set to false?

Is the intention to store some information about cutover event in the src cluster? If so, perhaps having a timestamp makes more sense. Or even some sort of proto message indicating the reason for cutover, timestamp, etc..?

Copy link
Copy Markdown
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the last two commits LGTM modulo the existing comments from @miretskiy.

ptsRecord, err := execConfig.ProtectedTimestampProvider.GetRecord(evalCtx.Ctx(), txn,
*payload.GetStreamReplication().ProtectedTimestampRecordID)
if err != nil {
if err == protectedts.ErrNotExists || err == nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use errors.Is here?

}

// TODO(casper): handle timeout when we have very large tenant with
// saving checkpoints and retrying.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have some planned work to ensure that the tenant is offline throughout the ingestion process.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 313 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I would move this function into a helper.

Done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 341 at r4 (raw file):

Previously, stevendanna (Steven Danna) wrote…

I believe we have some planned work to ensure that the tenant is offline throughout the ingestion process.

Yes, this is the work that Shiranka is working on. Added a TODO here.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 345 at r4 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

No, ClearRange doesn't return resume spans.

Okay. Removed. Added a TODO to deal with large tenant.


pkg/ccl/streamingccl/streamproducer/stream_lifetime.go line 162 at r3 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Should we use errors.Is here?

Done


pkg/sql/sem/builtins/replication_builtins.go line 303 at r3 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Do we ever not want to do this? I.e. do we imagine this function, which intended to be invoked from destination cluster, will ever want to invoke this set to false?

Is the intention to store some information about cutover event in the src cluster? If so, perhaps having a timestamp makes more sense. Or even some sort of proto message indicating the reason for cutover, timestamp, etc..?

This bool basically tells whether the stream ingestion ends with cutover or not. If it ends with cutover, then this is a successful ingestion, we mark producer job succeeded. If no, then we don't need to track this stream replication anymore, so we just cancel the producer job. I think logically this makes sense.

I made two commits on this PR, first commit introducing this flag, purpose being to support canceling the producer job when the ingestion job is being canceled and tenant key range gets reverted.


pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 370 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

stray fmt.Println

Done.

Code quote:

fter

pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 482 at r4 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

nice.

Done.

// ingestion anymore.
defer cancelProducerJob(ctx,
streaming.StreamID(details.StreamID),
streamingccl.StreamAddress(details.StreamAddress))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are missing () after defer, I believe.
Also, did any tests fail?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to cancel if db.Run below failed?

// TODO(casper): deal with very large tenant.
// TODO(casper): ensure the tenant is offline while we revert it.
var b kv.Batch
b.AddRawRequest(&roachpb.ClearRangeRequest{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to to do this only if job was cancelled? And not e.g. because some error happen?

@shermanCRL shermanCRL added this to the 22.2 milestone Jun 28, 2022
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 334 at r9 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Do we want to cancel if db.Run below failed?

No. cancelProducerJob is a function defined outside of this function body. Adding () after defer doesn't make it compile.

Yes, if db.Run below failed, we still want to cancel the producer job because the source job's
protected timestamp is no longer needed as this ingestion job won't resume ingestion anymore (already in cancel-requested status). This makes me consider doing cancelProducerJob before ClearRange.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 339 at r9 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Don't we want to to do this only if job was cancelled? And not e.g. because some error happen?

We will pause on error, so only CANCEL JOB can trigger the ClearRange.

// The job will ingest events from StartTime onwards during the current run.
// This may change when the the ingestion job gets resumed from the previous checkpoint.
util.hlc.Timestamp start_time = 3 [(gogoproto.nullable) = false];

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like it's a pure code movement? any reasons for this?

}
// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
for r := retry.Start(ro); r.Next(); {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any cases when the reason why we hit this loop (e.g. retry due to an error) is because the error was with the client? And in such cases we would want to re-create client to re-establish connection?

It feels like this retry loop belongs "on top":
for retry ... {
inject()
}

func ingest() {
client = connect;
if err { return }
return distIngest()
}

// Make sure the producer job is active before start the stream replication.
var status streampb.StreamReplicationStatus
for r := retry.Start(ro); r.Next(); {
status, err = client.Heartbeat(ctx, streamID, startTime)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the ingestion processor was supposed to heartbeat src cluster periodically; I only see this happen here... where do we periodically ping src cluster?

streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs(
streamAddress, topology, sqlInstanceIDs, initialHighWater, ingestionJobID, streamID, oldTenantID, newTenantID)
streamAddress, topology, sqlInstanceIDs, progress.GetStreamIngest().StartTime,
ingestionJob.ID(), streamID, details.TenantID, details.NewTenantID)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see in this code how do we ever retry (i.e. hit the for r := retry.Start(ro); r.Next(); { line). It seems that we bubble up every error encountered in this function.

distStreamIngestion for example could return an error for any number of legitimate reasons (node crashed; or perhaps restarted, etc).

I think part of this is code structure, as mentioned above:
have a resumer call a function ingestWithRetry; that function should be simple:

func ingestWithRetry() {
   for retry ... {
     err := ingest()
     // Handle error appropriately
     if retryableError {
        // reload job; log error, etc
     } else {
       // pause the job
    }

How we handle errors -- depends. We could say all errors get retried, or perhaps we retry some of the errors.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 139 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

Are there any cases when the reason why we hit this loop (e.g. retry due to an error) is because the error was with the client? And in such cases we would want to re-create client to re-establish connection?

It feels like this retry loop belongs "on top":
for retry ... {
inject()
}

func ingest() {
client = connect;
if err { return }
return distIngest()
}

Nah, this retry is just make sure the producer job is active/running until we start doing the real ingestion(plan, ingest, cutover).

This is useful in two scenarios:

  1. when we create the stream ingestion at the first place, the producer job maybe still pending or not adopted yet (it's an adoptable job).
  2. when we resume the stream ingestion through RESUME JOB id, we want to make sure the producer job doesn't time out and is still pending, otherwise we will pause on error again.

Yes, you are right about that we should retry the whole ingest as well. I intend to do it in another PR which covers retry in both ingesting stage and clear range stage in the case of cancel. And this work is tracked here. #83450


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 140 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I thought that the ingestion processor was supposed to heartbeat src cluster periodically; I only see this happen here... where do we periodically ping src cluster?

This ping is just to check the producer job status as we need to make sure the producer job is up running either when we first start it or resume from paused , Heartbeat is the perfect function to call here.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 184 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I don't see in this code how do we ever retry (i.e. hit the for r := retry.Start(ro); r.Next(); { line). It seems that we bubble up every error encountered in this function.

distStreamIngestion for example could return an error for any number of legitimate reasons (node crashed; or perhaps restarted, etc).

I think part of this is code structure, as mentioned above:
have a resumer call a function ingestWithRetry; that function should be simple:

func ingestWithRetry() {
   for retry ... {
     err := ingest()
     // Handle error appropriately
     if retryableError {
        // reload job; log error, etc
     } else {
       // pause the job
    }

How we handle errors -- depends. We could say all errors get retried, or perhaps we retry some of the errors.

Yes, I agree to this. Retry is a big thing I need to tackle, just didn't want to include it in this PR as it's relatively large and needs its own tests.

#83450 tracks this work and its my next step.


pkg/jobs/jobspb/jobs.proto line 114 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

seems like it's a pure code movement? any reasons for this?

Start_time means where we will resume from last ingestion. It's empty when start the ingestion at the first place and it's the last checkpoint when resume ingestion. From the perspective of the whole stream ingestion journey, it's more of a progress information. So I moved it from StreamIngestionDetails to StreamIngestionProgress.

@gh-casper gh-casper requested a review from miretskiy June 28, 2022 22:03
@miretskiy
Copy link
Copy Markdown
Contributor

Nah, this retry is just make sure the producer job is active/running until we start doing the real ingestion(plan, ingest, cutover).

If that's the purpose, then I suggest you introduce waitUntilProducerActive function that does just that.

Even so, I find that your stated purpose of ensuring the producer job is active/running is not really met by the
code as I see an error return if Heartbeat failed.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, moved it into waitUntilProducerActive . We should return an error when heartbeat fails. If producer job is not created yet or pending, or other transient status, it won't report error. Error happens when there is connection error or other runtime error on the producer job.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)

@miretskiy
Copy link
Copy Markdown
Contributor

miretskiy commented Jun 29, 2022 via email

Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would highly encourage that after addressing comments, this PR is split into independent PRs to simplify review process.

Reviewed 4 of 10 files at r6, 8 of 15 files at r7, 11 of 14 files at r8, 1 of 1 files at r11, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go line 405 at r11 (raw file):

		if !ok {
			err = errors.Errorf("received progress update from unrecognized partition %s", partition)
			return true

I have to say, or even insist, that changes like this deserve to be in separate PR; The change to
introduce

  // Mapping between a source cluster partition ID and its subscribing sql instance ID
  // in the destination cluster. This describes the flow topology of the replication stream.
  map<string, uint32> subscribing_sql_instances = 6 [(gogoproto.nullable) = false,
    (gogoproto.customname) = "SubscribingSQLInstances"];

seems innocuous enough. But it's not -- it's an important change that serializes topology information
on the destination cluster, and ensures that this topology is correct -- for example by returning the error above. Serialization of state like this raises questions as to the correctness of such state when resuming.

I'm not necessarily saying that this is bad. But I am saying that such changes should be considered carefully.
For example, this is a frontier processor; this progress is initialized when frontier processor has been constructed. Part of this initialization is the initialization of the above subscribing sql instances.
What happens if that set changes? What happens if say src cluster introduces a new node, and decomissions the old one? The frontier processor needs to retry the whole flow -- but it's not clear to me what happens with that map. Is it updated because we retry? Where? Is it possible that this map gets out of date?

I just have hard time answering these questions and if the review makes it hard for me to answer those questions, perhaps such changes deserve to be reviewed independently and with enough surrounding context.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 76 at r11 (raw file):

		}
		lagInfo.SlowestSourcePartitionTimestamp = hlc.MaxTimestamp
		lagInfo.FastestSourcePartitionTimestamp = hlc.MinTimestamp

not sure what these are meant to accomplish... Seems to me that slow/fast is pretty little information
that doesn't actually convey much. If the whole thing is stuck, both slow and wait will be pretty close;
have you considered just using span.Frontier for this?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 113 at r11 (raw file):

// Ping the producer job until it is active/running, returns nil when
// the job is active.

nit: waits until the job is active?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 125 at r11 (raw file):

		Multiplier:     2,
		MaxBackoff:     10 * time.Second,
		MaxRetries:     5,

So, we have 1 + 2 + 4 + 8 + 16 = 31 seconds for the job to become active?
Is that a bit too aggressive?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 133 at r11 (raw file):

		if err != nil {
			return errors.Wrapf(err, "failed to resume ingestion job %d due to producer job error",
				ingestionJobID)

help me understand: we attempt to heartbeat the client. Connection to source cluster fails; we then immediately return from this function. What is the point of the retry loop above?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 232 at r11 (raw file):

// Resume is part of the jobs.Resumer interface.
func (s *streamIngestionResumer) Resume(resumeCtx context.Context, execCtx interface{}) error {
	p := execCtx.(sql.JobExecContext)

nit: while here: p is a bit of a remnant from the time when this used to be Planner;
mind renaming it to execCtx?


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 241 at r11 (raw file):

		errorMessage := fmt.Sprintf(errorFmt, err)
		log.Warningf(resumeCtx, errorFmt, err)
		return s.job.PauseRequested(resumeCtx, p.Txn(), func(ctx context.Context,

What happens on the producer? Should we attempt to best effort pause it?
Do we rely on the fact that the producer should also pause itself?

Please add a comment about this.


pkg/ccl/streamingccl/streampb/stream.proto line 148 at r11 (raw file):

    // among all partitions.
    google.protobuf.Duration slowest_fastest_partition_ingestion_lag = 7
    [(gogoproto.nullable) = false, (gogoproto.stdduration) = true];

have you considered just storing the span frontier or portion of this frontier
instead of restricting to 2 timestamps/spans?


pkg/ccl/streamingccl/streamproducer/stream_lifetime.go line 252 at r11 (raw file):

		func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
			if md.Status == jobs.StatusRunning || md.Status == jobs.StatusPending {
				if !ingestionCutover {

having descriptive status/reason why we are completing stream would improve api;
it will also allow you to update producer job description and record some metadata -- such as
when was it completed, why, etc....


pkg/streaming/api.go line 78 at r11 (raw file):

	// 'ingestionCutover' indicates whether the stream ingestion has been cutover.
	CompleteReplicationStream(
		evalCtx *eval.Context, txn *kv.Txn, streamID StreamID, ingestionCutover bool,

I'm not a fan of this boolean argument; I honestly don't know what it means.
Presumably it's true if it's a "normal" cutover... Why do we care that the source
cluster knows about the reason why it's being canceled? Is it observability?
Does it make sense to have a "reason" as some type of a enum or a struct or a string, or even
an error?


pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 70 at r11 (raw file):

	`)
	},
	srcNumNodes:  1,

perhaps we should use more than 1

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My assumption of this whole review process of the two dependent PRs (pause on error and clear range on cancel) was you only review the relevant commits to the PR description. Take this PR for example, only review the top two commits. To review the other commit, please include the comment in the other PR.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 76 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

not sure what these are meant to accomplish... Seems to me that slow/fast is pretty little information
that doesn't actually convey much. If the whole thing is stuck, both slow and wait will be pretty close;
have you considered just using span.Frontier for this?

See the other comment related to this. Adding a TODO here.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 125 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

So, we have 1 + 2 + 4 + 8 + 16 = 31 seconds for the job to become active?
Is that a bit too aggressive?

Changed it to 1 + 2 + 4 + 5 = 11 seconds.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 133 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

help me understand: we attempt to heartbeat the client. Connection to source cluster fails; we then immediately return from this function. What is the point of the retry loop above?

The retry loop is to deal with transient producer job status like pending, cancel-requested, reverting. Connection failure will make the ingestion error out and pause, instead of retrying here. Users will fix that and resume job manually.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 241 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

What happens on the producer? Should we attempt to best effort pause it?
Do we rely on the fact that the producer should also pause itself?

Please add a comment about this.

Done.

Code quote:

PauseRequested

pkg/ccl/streamingccl/streamproducer/stream_lifetime.go line 252 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

having descriptive status/reason why we are completing stream would improve api;
it will also allow you to update producer job description and record some metadata -- such as
when was it completed, why, etc....

Made several changes:
Replace ingestionCutover with successfulIngestion as it makes more sense. Added updating running status when we complete the replication stream.


pkg/streaming/api.go line 78 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I'm not a fan of this boolean argument; I honestly don't know what it means.
Presumably it's true if it's a "normal" cutover... Why do we care that the source
cluster knows about the reason why it's being canceled? Is it observability?
Does it make sense to have a "reason" as some type of a enum or a struct or a string, or even
an error?

It is not the reason we care about; we just need to tell whether ingestion finished successfully so that the producer job can be succeeded or canceled, so I now replaced ingestionCutover with successfulIngestion.


pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 70 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

perhaps we should use more than 1

Using more than 1 doesn't make any difference now as we don't have more than one partition. There is ongoing work to test multiple partitions. I added a TODO item before in the test file, don't think I need to add a TODO here again.


pkg/ccl/streamingccl/streamingest/stream_ingestion_frontier_processor.go line 405 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I have to say, or even insist, that changes like this deserve to be in separate PR; The change to
introduce

  // Mapping between a source cluster partition ID and its subscribing sql instance ID
  // in the destination cluster. This describes the flow topology of the replication stream.
  map<string, uint32> subscribing_sql_instances = 6 [(gogoproto.nullable) = false,
    (gogoproto.customname) = "SubscribingSQLInstances"];

seems innocuous enough. But it's not -- it's an important change that serializes topology information
on the destination cluster, and ensures that this topology is correct -- for example by returning the error above. Serialization of state like this raises questions as to the correctness of such state when resuming.

I'm not necessarily saying that this is bad. But I am saying that such changes should be considered carefully.
For example, this is a frontier processor; this progress is initialized when frontier processor has been constructed. Part of this initialization is the initialization of the above subscribing sql instances.
What happens if that set changes? What happens if say src cluster introduces a new node, and decomissions the old one? The frontier processor needs to retry the whole flow -- but it's not clear to me what happens with that map. Is it updated because we retry? Where? Is it possible that this map gets out of date?

I just have hard time answering these questions and if the review makes it hard for me to answer those questions, perhaps such changes deserve to be reviewed independently and with enough surrounding context.

It was in the separate PR (stream_ingestion_stats) and this thing already got checked in.

This map is populated when we create a new flow. We will create a new flow (re-plan) when we will retry. In the future, we will also re-plan on the go if we don't retry, for example, src cluster topology changes and there are so many non-local rangefeeds or dest cluster instances expand or shrink.


pkg/ccl/streamingccl/streampb/stream.proto line 148 at r11 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

have you considered just storing the span frontier or portion of this frontier
instead of restricting to 2 timestamps/spans?

Yes, have considered this and intended to track them once we switch from tracking partition id as frontier to tracking actual spans as frontier in the frontier_processor in the near future. Added a TODO here.

@gh-casper gh-casper requested a review from erikgrinaker July 1, 2022 22:07
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now that we have all the comments in this PR, we just keep them here (no need to comment on the pause-on-erorr PR). The stream-ingestion-stats PR get merged in, no need to worry. I'll merge pause-on-error PR separately once this PR is approved.

Next time, I won't create separate PRs that have dependency again. It just cause too much confusion given that Github doesn't support it well. Also it takes too much time for me to do the syncing and rebasing.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)

@miretskiy
Copy link
Copy Markdown
Contributor

It just cause too much confusion given that Github doesn't support it well. Also it takes too much time for me to do the syncing and rebasing.

I understand that this is sometimes inconvenient; but you should optimize for the reader -- i.e the person reviewing the code.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, Please review.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)

@gh-casper gh-casper force-pushed the cancel-ingestion branch 2 times, most recently from 7c49f26 to 040c8c2 Compare July 12, 2022 22:12
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did some investigation on the bytes-left-after-clearrange bug. In the test, I shut downs the tenant SQL servers before canceling the ingestion. This is not ideal, but we may have a follwup PR that disallow any process to touch this tenant's key range after the ingestion job gets canceled unless user creates another ingestion.

Please review again. @miretskiy @stevendanna

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)

Copy link
Copy Markdown
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pushing this forward. As we discussed, I think the only must-have is removing the cleanup on cancellation for now.

})
}

func (s *streamIngestionResumer) cancelProducerJob(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to do it in this PR. But I wonder if this should be configurable. Since this will release the protected timestamp on the producer side.

Comment on lines +385 to +400
if tr.State == descpb.TenantInfo_ACTIVE {
return errors.Errorf("error in cancel stream ingestion: cannot delete an active tenant %s",
details.NewTenantID)
}

// TODO(casper): deal with very large tenant.
tenantInfo := descpb.TenantInfo{
ID: p.GetStreamIngestion().NewTenantID.ToUint64(),
State: descpb.TenantInfo_DROP,
}
// GC the tenant's data as well as the system tenant's knowledge about this tenant.
if err = sql.GCTenantSync(ctx, jobExecCtx.ExecCfg(), &tenantInfo); err != nil {
log.Warningf(ctx,
"encountered error when clearing tenant key ranges for ingestion job %d", s.job.ID())
return err
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per recent discussions, I think for now we may want to avoid doing this altogether and have a second command: either just SELECT crdb_internal.destroy_tenant(ID, true) or a DROP TENANT command that we create.

if isTimedOut(j) {
return errors.Errorf("replication stream %d timed out", p.job.ID())

prog := j.Progress()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also be checking for this condition at startup to avoid having to wait for one liveness tracking interval?

return p.releaseProtectedTimestamp(ctx, execCfg)
case jobspb.StreamReplicationProgress_FINISHED_UNSUCCESSFULLY:
return j.Update(ctx, nil, func(txn *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
ju.UpdateStatus(jobs.StatusCancelRequested)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fine for now, but lets think through whether we want to release the protected timestamp in the case of it finishing unnecessfully.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was trying to add logic to wait a while after the last ingested time using TrailingMetaCallback, but it turns out it's non-trivial work: we have to let the frontier processor to collect trailing ProducerMetadatas from all ingestion processors, this needs some modification to the code. Will file another issue for this.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 342 at r16 (raw file):

Previously, stevendanna (Steven Danna) wrote…

We don't have to do it in this PR. But I wonder if this should be configurable. Since this will release the protected timestamp on the producer side.

As mentioned in other comment, we can drop the producer job protected timestamp once we enter cancel-requested state.


pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go line 400 at r16 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Per recent discussions, I think for now we may want to avoid doing this altogether and have a second command: either just SELECT crdb_internal.destroy_tenant(ID, true) or a DROP TENANT command that we create.

Done.


pkg/ccl/streamingccl/streamproducer/producer_job.go line 104 at r16 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Should we also be checking for this condition at startup to avoid having to wait for one liveness tracking interval?

Done.

Code quote:

			prog := j.Progress()

pkg/ccl/streamingccl/streamproducer/producer_job.go line 110 at r16 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Fine for now, but lets think through whether we want to release the protected timestamp in the case of it finishing unnecessfully.

If we decide to cancel and once the job enters cancel-requested state, we won't be able to utilize the producer job's protected timestamp anymore: we cannot unpause a cancel-requested job unless we support restoring an existing tenant with deprecated StartFrom. Either way, we have to clean up this producer job.

@gh-casper gh-casper changed the title streamingccl: support canceling a ingestion job streamingccl: support canceling an ingestion job Jul 19, 2022
@gh-casper gh-casper requested a review from stevendanna July 19, 2022 16:44
Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 20 files at r13, 2 of 27 files at r15, 1 of 11 files at r17, 1 of 4 files at r18.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)

@miretskiy miretskiy self-requested a review July 25, 2022 21:24
to support canceling a producer job

Previously we can only make a producer job normally finish
through complete_stream_replication after ingestion is cutover.
Now we expand complete_stream_replication to take 2nd argument
'successfulIngestion' to support complete replication stream
even when cutover doesn't happen, e.g., ingestion gets canceled
and a revert happens, this cancels the producer job.

Release note (sql change): expand crdb_internal.complete_stream_replication
to take successfulIngestion argument, which indicates if this
stream ingestion finished successfully.
This commit supports the intended behavior when canceling
an ingestion job: canceling the producer job which releases the
protected timestamp for the source tenant. The
destination tenant key ranges are left for
the user to clean up with crdb_internal.destroy_tenant

Release note: None
@gh-casper
Copy link
Copy Markdown
Contributor Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Jul 31, 2022

Build succeeded:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tenant-streaming Including cluster streaming

Projects

None yet

Development

Successfully merging this pull request may close these issues.

streaming: stream ingestion job should clear range OnFailOrCancel

6 participants