Skip to content

streamingccl: support DeleteRange in tenant stream replication#84553

Merged
craig[bot] merged 2 commits intocockroachdb:masterfrom
gh-casper:del-range
Aug 4, 2022
Merged

streamingccl: support DeleteRange in tenant stream replication#84553
craig[bot] merged 2 commits intocockroachdb:masterfrom
gh-casper:del-range

Conversation

@gh-casper
Copy link
Copy Markdown
Contributor

@gh-casper gh-casper commented Jul 18, 2022

This PR supports DeleteRange operation both in stream producers
that process DelRange from rangefeed and in stream ingestion
processors that ingest DelRange as SST into destination.

For version >= 22.2, SSTs can also contain range tombstones,
i.e., DelRange, this PR also supports parsing and ingesting them.

Ingestion processor uses a separate SST writer to ingest range
tombstones. Another PR will extend SSTBatcher to support
adding MVCCRangeKeys.

This PR creates an envelope type for RangeFeedSSTTable
with additional field "RegisteredSpan" tracking the span of
rangefeed registration this event is supposed to be matching.
This helps rangefeed receiver to avoid processing rangefeed
events multiple times when it has multiple matching spans
registered.

This PR also cleans up the GenerationEvent that is no longer
a valid concept in the current consumer-tracked design.

Closes: #70433
Closes: #83810

Release note: None

@gh-casper gh-casper requested a review from a team as a code owner July 18, 2022 02:53
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable


// GetKV implements the Event interface.
func (ce checkpointEvent) GetKV() *roachpb.KeyValue {
func (dre delRangeEvent) GetKV() *roachpb.KeyValue {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this really highlights the fact that having generic Event type is not helping.
every implementation has all but 1 method returning nil;

I think Event could be just a concrete type (a union).
Okay to leave a TODO for a cleanup later.

sst *roachpb.RangeFeedSSTable, op func(keyVal roachpb.KeyValue),
// Extract the received SST to only contain data within the boundaries of spans in the partition
// spec, execute the specified operation on each MVCC key value in the trimmed SSTable. and
// execute the specified operation on each MVCCRangeKey value in the trimmed SSTable.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a bit better wrapping; also ". and" --> just start new stense.

rangeKeySstFile *storage.MemFile
// Separated SSTWriter to write MVCC range keys and will be replaced after
// SSTBatcher supports adding MVCCRangeKeys.
rangeKeySSTWriter storage.SSTWriter
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to split those range related fields into separate struct
rangeState struct {
....
}
or similar.
Also would be nice to have a pretty substantial comment the mechanics of how that data should be flushed; frontier updates, etc.


sip.rangeKeySstFile = &storage.MemFile{}
sip.rangeKeySSTWriter = storage.MakeIngestionSSTWriter(ctx, evalCtx.Settings, sip.rangeKeySstFile)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to create this lazily?

MVCCValueHeader: enginepb.MVCCValueHeader{LocalTimestamp: hlc.ClockTimestamp{WallTime: 0}},
})
if err != nil {
panic(err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we fail permanently instead?

Copy link
Copy Markdown
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some high-level comments around range tombstones, leaving the detailed review to those more familiar with tenant streaming.

}); err != nil {
return err
}
} else if hasRange {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't strictly correct, since it's possible for a range key and point key to be initially surfaced together. This can only happen for point keys without timestamps, i.e. intents or inline values, where the point key and range key's start bound are identical. Normally, we handle this by caching and comparing RangeBounds().Key (until #84379).

However, we can do something far more efficient here, by processing point and range keys separately, since we ingest them separately anyway. That is, first use IterKeyTypePointsOnly to process the point keys, then IterKeyTypeRangesOnly to process the range keys.

}
if hasPoint, hasRange := iter.HasPointAndRange(); hasPoint {
mvccKeyValOp(iter.Key(), iter.Value())
} else if hasRange {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in another comment, let's iterate over point keys and range keys separately, unless we need to emit them interleaved in order (in which case we need to detect changes to RangeBounds().Key).

mergeDelRange := func(unsafeRangeKey storage.MVCCRangeKey) {
delRanges, ok := timestampToDelRanges[unsafeRangeKey.Timestamp]
if ok {
for _, delRange := range delRanges {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we iterate across and add range keys in order (left-to-right in the key space), I believe we can just check the last member's end key here rather than iterating over all of them, which avoids making this quadratic.

bytes data = 1;
Span span = 2 [(gogoproto.nullable) = false];
util.hlc.Timestamp write_ts = 3 [(gogoproto.nullable) = false, (gogoproto.customname) = "WriteTS"];
Span registered_span = 4 [(gogoproto.nullable) = false];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit iffy. Shouldn't the client know this and keep track of it? Is it because the DistSender hides this internally?

Also, I see why we might need this on the SST (which isn't trimmed), but the DeleteRange should be truncated to the registration bounds already, so I don't think it should be necessary there in any case?

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Resolved Yevgeniy's comments as the first batch. Will work on Erik's on the next batch.

There are some flakiness in the added tests primarily due to the test setup. I'll keep digging into those while in review.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/event.go line 129 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I think this really highlights the fact that having generic Event type is not helping.
every implementation has all but 1 method returning nil;

I think Event could be just a concrete type (a union).
Okay to leave a TODO for a cleanup later.

Added a todo here.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 108 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

would be nice to split those range related fields into separate struct
rangeState struct {
....
}
or similar.
Also would be nice to have a pretty substantial comment the mechanics of how that data should be flushed; frontier updates, etc.

Done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 265 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

do we want to create this lazily?

Done.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 490 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

can we fail permanently instead?

Done. Also have done some refactoring.

Code quote:

rangeTombstone

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @erikgrinaker, @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 623 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

This isn't strictly correct, since it's possible for a range key and point key to be initially surfaced together. This can only happen for point keys without timestamps, i.e. intents or inline values, where the point key and range key's start bound are identical. Normally, we handle this by caching and comparing RangeBounds().Key (until #84379).

However, we can do something far more efficient here, by processing point and range keys separately, since we ingest them separately anyway. That is, first use IterKeyTypePointsOnly to process the point keys, then IterKeyTypeRangesOnly to process the range keys.

What does RangeBounds mean? It returns the boundary of all of current RangeKeys()?
RangeKeys() -> ["a", "c"], ["b", "d"], RangeBounds() -> ["a", "d"]


pkg/roachpb/api.proto line 2798 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

This seems a bit iffy. Shouldn't the client know this and keep track of it? Is it because the DistSender hides this internally?

Also, I see why we might need this on the SST (which isn't trimmed), but the DeleteRange should be truncated to the registration bounds already, so I don't think it should be necessary there in any case?

If the client creates rangefeed with two spans.
rf := RangefeedFactory.New(...) rf.Start(ctx, []roachpb.Span{"1", "2"})

Whenever a SST that overlaps with both "1" and "2", a same *RangefeedSSTable event will be emitted twice as there are two registrations. I want to avoid this. Have considered using if the current event is the same pointer as the last received one. This is hacky and may not be correct, as this event may not be sent consecutively.

So I added RegisteredSpan here to let client know which overlapping span this event is about. You may say what if client start the rangefeed with two same spans, but that's client's problem at the first place.

Copy link
Copy Markdown
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 623 at r1 (raw file):

Previously, gh-casper wrote…

What does RangeBounds mean? It returns the boundary of all of current RangeKeys()?
RangeKeys() -> ["a", "c"], ["b", "d"], RangeBounds() -> ["a", "d"]

Because of fragmentation, all range keys at a given position will all have the same bounds. See: https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/mvcc-range-tombstones.md#data-structure-mvcc-range-keys

So for your example of overlapping range keys [a-c) and [b-d), these would get fragmented into [a-b), [b-c), and [c-d), and you would only see the ones overlapping the iterator position.


pkg/roachpb/api.proto line 2798 at r1 (raw file):

Have considered using if the current event is the same pointer as the last received one. This is hacky and may not be correct, as this event may not be sent consecutively.

Yeah, I agree, we definitely shouldn't do that.

I suppose this is fine. Since it's the caller's responsibility to trim the SST, we may as well make it easy for them to do so. However, we should:

  • Mention in the comment that the caller should use this to trim the contents (as outlined in the comment below).
  • Add an assertion in TestReplicaRangefeed.
  • Add the SST span and registration span to kvclient/rangefeed/OnSSTable(), and test it in TestWithOnSSTable.

We should remove this for DeleteRange though, since that's always truncated to the registration span.

Are there any mixed-version compatibility concerns here? In other words, is it possible to use this streaming in a cluster that hasn't yet completed an upgrade to 22.2, where 22.1 nodes will not be setting this field? If so, we'll need a version gate here. We should also mention in the comment that this field is new in 22.2.

@miretskiy miretskiy requested a review from erikgrinaker July 20, 2022 12:32
Copy link
Copy Markdown
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @erikgrinaker, @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/roachpb/api.proto line 2798 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Have considered using if the current event is the same pointer as the last received one. This is hacky and may not be correct, as this event may not be sent consecutively.

Yeah, I agree, we definitely shouldn't do that.

I suppose this is fine. Since it's the caller's responsibility to trim the SST, we may as well make it easy for them to do so. However, we should:

  • Mention in the comment that the caller should use this to trim the contents (as outlined in the comment below).
  • Add an assertion in TestReplicaRangefeed.
  • Add the SST span and registration span to kvclient/rangefeed/OnSSTable(), and test it in TestWithOnSSTable.

We should remove this for DeleteRange though, since that's always truncated to the registration span.

Are there any mixed-version compatibility concerns here? In other words, is it possible to use this streaming in a cluster that hasn't yet completed an upgrade to 22.2, where 22.1 nodes will not be setting this field? If so, we'll need a version gate here. We should also mention in the comment that this field is new in 22.2.

What I don't understand is why we need this at the API level.
When starting rangefeed, dist sender rangefeed knows exactly the bounds it's dealing with (partial rangefeed).
Couldn't we extend partial rangefeed to return bounds? I realize right now it's a bit clunkly because single rangefeed just does


			select {
			case eventCh <- event:
			case <-ctx.Done():
				return args.Timestamp, ctx.Err()
			}

where eventCh is a *RangeFeedEvent channel, with event arriving from rangefeed directly;
but I can't imagine this being that hard to make the change in the client API, or perhaps introduce another method that can take a different kind of channel, and have this information available on the client side?

Wdyt @erikgrinaker ?

@miretskiy miretskiy self-requested a review July 20, 2022 12:32
Copy link
Copy Markdown
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/roachpb/api.proto line 2798 at r1 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

What I don't understand is why we need this at the API level.
When starting rangefeed, dist sender rangefeed knows exactly the bounds it's dealing with (partial rangefeed).
Couldn't we extend partial rangefeed to return bounds? I realize right now it's a bit clunkly because single rangefeed just does


			select {
			case eventCh <- event:
			case <-ctx.Done():
				return args.Timestamp, ctx.Err()
			}

where eventCh is a *RangeFeedEvent channel, with event arriving from rangefeed directly;
but I can't imagine this being that hard to make the change in the client API, or perhaps introduce another method that can take a different kind of channel, and have this information available on the client side?

Wdyt @erikgrinaker ?

I was thinking the same thing, but then we'd have to introduce additional data structures on the client side to track it and such. It just seemed excessive, especially given that this is limited to a single event type that is a bit of an outlier.

Since exceeding the registration bounds is a property of this one single event, it seems reasonable to also include the relevant bounds in the event.

@miretskiy
Copy link
Copy Markdown
Contributor

was thinking the same thing, but then we'd have to introduce additional data structures on the client side to track it and such. It just seemed excessive, especially given that this is limited to a single event type that is a bit of an outlier.

Since exceeding the registration bounds is a property of this one single event, it seems reasonable to also include the relevant bounds in the event.

But is it really that excessive? singleRangeFeed already has the span -- the bounds; it's just a matter of communicating that this *RangeFeedEvent belongs to that span... All of the information available in the client. The alternative is a larger RPC, more data that needs to be transferred, and change to the api.proto (with all of the questions around mixed version state)

@miretskiy
Copy link
Copy Markdown
Contributor

was thinking the same thing, but then we'd have to introduce additional data structures on the client side to track it and such. It just seemed excessive, especially given that this is limited to a single event type that is a bit of an outlier.
Since exceeding the registration bounds is a property of this one single event, it seems reasonable to also include the relevant bounds in the event.

But is it really that excessive? singleRangeFeed already has the span -- the bounds; it's just a matter of communicating that this *RangeFeedEvent belongs to that span... All of the information available in the client. The alternative is a larger RPC, more data that needs to be transferred, and change to the api.proto (with all of the questions around mixed version state)

Come to think of it, I think it's trivial; all we need to abstract away the 1 piece of code:

select {
			case eventCh <- event:
			case <-ctx.Done():
				return args.Timestamp, ctx.Err()
			}

That can be trivially done with:

type EventReceiver func(ctx context.Context, bounds roachpb.Span) error

Add a flavor of rangefeed that takes EventReceiver instead of eventCh.
Done.

@erikgrinaker
Copy link
Copy Markdown
Contributor

Come to think of it, I think it's trivial; all we need to abstract away the 1 piece of code:

I think I have a mild preference for a channel send, since we avoid making any assumptions about how quickly the caller will respect context cancellation (if at all). We could of course wrap it in a message envelope. I feel like we're just adding on cruft for the benefit of a very specialized use-case though. But I don't have a particularly strong opinion either way, both of these approaches seem fine with me.

@miretskiy
Copy link
Copy Markdown
Contributor

I feel like we're just adding on cruft for the benefit of a very specialized use-case though.

So, would that cruft be better on kv server side? I think that's the question -- and my preference is to ... not add cruft on rangefeed server?
Wdyt?

@erikgrinaker
Copy link
Copy Markdown
Contributor

erikgrinaker commented Jul 20, 2022

I feel like we're just adding on cruft for the benefit of a very specialized use-case though.

So, would that cruft be better on kv server side? I think that's the question -- and my preference is to ... not add cruft on rangefeed server? Wdyt?

I see it more like we're adding on cruft to the one offending event, rather than for all events. But again, no strong opinion, and it does seem conceptually cleaner to do it client-side, so we can give that a shot if everyone's on board.

We should measure whether it has any impact on event throughput through, since it'll affect the much hotter value events too.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me try message envelope approach, will have a separate commit.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @gh-casper, @miretskiy, @samiskin, and @stevendanna)

@gh-casper gh-casper requested a review from a team as a code owner July 21, 2022 10:53
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 623 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Because of fragmentation, all range keys at a given position will all have the same bounds. See: https://github.com/cockroachdb/cockroach/blob/master/docs/tech-notes/mvcc-range-tombstones.md#data-structure-mvcc-range-keys

So for your example of overlapping range keys [a-c) and [b-d), these would get fragmented into [a-b), [b-c), and [c-d), and you would only see the ones overlapping the iterator position.

Done.


pkg/ccl/streamingccl/streamproducer/event_stream.go line 375 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

As mentioned in another comment, let's iterate over point keys and range keys separately, unless we need to emit them interleaved in order (in which case we need to detect changes to RangeBounds().Key).

Done.


pkg/ccl/streamingccl/streamproducer/event_stream.go line 425 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Since we iterate across and add range keys in order (left-to-right in the key space), I believe we can just check the last member's end key here rather than iterating over all of them, which avoids making this quadratic.

Done.


pkg/roachpb/api.proto line 2798 at r1 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

I was thinking the same thing, but then we'd have to introduce additional data structures on the client side to track it and such. It just seemed excessive, especially given that this is limited to a single event type that is a bit of an outlier.

Since exceeding the registration bounds is a property of this one single event, it seems reasonable to also include the relevant bounds in the event.

Done.

@gh-casper gh-casper requested a review from erikgrinaker July 22, 2022 18:55
@shermanCRL shermanCRL added the A-tenant-streaming Including cluster streaming label Jul 23, 2022
Copy link
Copy Markdown
Contributor

@erikgrinaker erikgrinaker left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, range key and rangefeed parts look good at a high level! I'll leave the detailed review of the tenant streaming parts to that team.

Reviewed 30 of 30 files at r3, 18 of 18 files at r4, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @gh-casper, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/utils.go line 63 at r4 (raw file):

			break
		}
		if hasPoint, _ := pointIter.HasPointAndRange(); hasPoint {

We don't need to check this. With IterKeyTypePointsOnly, this will always be true for a valid iterator. Ditto for IterKeyTypeRangesOnly.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 750 at r4 (raw file):

			return 0, err
		}
		if err = r.rangeKeySSTWriter.PutMVCCRangeKey(rangeKeyVal.RangeKey, mvccValue); err != nil {

I have an open PR for PutRawMVCCRangeKey(), which will avoid having to decode/encode the value when copying like this.

#84962

We should make sure we update this once it's merged.


pkg/kv/kvclient/rangefeed_envelope.go line 16 at r3 (raw file):

// RangeFeedEnvelope is envelope type that encapsulates the roachpb.RangeFeedEvent.
type RangeFeedEnvelope struct {

I don't love the "envelope" name, but don't have any great other ideas either. Maybe Message? What do you think?

Also, we should probably move this into kvcoord, which is the package that constructs and emits them via DistSender.


pkg/kv/kvclient/rangefeed_envelope.go line 23 at r4 (raw file):

	// The span of the rangefeed registration that overlaps with SST span if
	// event has an underlying roachpb.RangeFeedSSTable event.
	RegisteredSpan *roachpb.Span

This can be a value rather than a pointer.


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go line 509 at r3 (raw file):

				}
			case *roachpb.RangeFeedSSTable:
				envelope.RegisteredSpan = &span

Shouldn't we always set this regardless of event type?


pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go line 520 at r3 (raw file):

			select {
			case eventCh <- &envelope:

We should consider sending a value rather than a pointer, which avoids it escaping to the heap. Ran some quick benchmarks which just built and sent the structs over a buffered channel:

BenchmarkChannelEvent-24            	 6036284	       197.3 ns/op
BenchmarkChannelEnvelope-24         	 5102538	       235.1 ns/op
BenchmarkChannelEnvelopeValue-24    	 5612910	       217.3 ns/op

So sending it by value cuts about half of the overhead of the envelope.


pkg/kv/kvserver/replica_rangefeed_test.go line 380 at r3 (raw file):

	require.NoError(t, sstWriter.Finish())

	// Does this mean all ts in sst should be equal to the batch ts?

Yes.

@gh-casper gh-casper force-pushed the del-range branch 2 times, most recently from fa15435 to 7e8c53c Compare July 27, 2022 21:31
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Erik. Please review for the streaming part. @stevendanna @dt @miretskiy

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to do we feel about introducing

func (ds *DistSender) ConsumeRangeFeed(
	ctx context.Context, spans []SpanTimePair, withDiff bool, consume func(context.Context, *roachpb.RangeFeedEvent),
) error {

There are other alternatives that would let the callers be, while enabling easy integration with streaming?

@erikgrinaker ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we then have to migrate all callers over to this new ConsumeRangeFeed method anyway, which is presumably an even larger change since it'd introduce callbacks everywhere?

I don't have a strong opinion on channels vs callbacks -- slight preference for channels because they can be combined via select. If you generally prefer callbacks on the client side then that's fine with me, since CDC is the primary consumer anyway.

I'm not sure if it's the best use of @gh-casper's time for him to keep rewriting this though. If he'd rather keep it as-is and work on something more important then I'd be +1 on that.

*roachpb.RangeFeedEvent

// The span of the rangefeed registration that overlaps with SST span if
// event has an underlying roachpb.RangeFeedSSTable event.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't mention anything about SST;
Isn't this span a "range" against which rangefeed was established?

@shermanCRL shermanCRL added this to the 22.2 milestone Jul 31, 2022
Copy link
Copy Markdown
Collaborator

@stevendanna stevendanna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks like a pretty reasonable step to me. I have one question about whether we need to sort the range keys.

Overall, I think we probably want some memory monitoring here just like we'll want some for the existing kv batching, but we can do that in a follow up.

batcher *bulk.SSTBatcher
curKVBatch mvccKeyValues
// curRangeKVBatch temporarily batches MVCCRangeKeys to be ingested separately
// with 'rangeKeySSTWriter' below.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 'rangeBatcher' below?

if err != nil {
return 0, err
}
if err = r.rangeKeySSTWriter.PutMVCCRangeKey(rangeKeyVal.RangeKey, mvccValue); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you might be able to use PutRawMVCCRangeKey now to avoid the decode.

r.rangeKeySSTWriter = r.rangeKeySSTWriterMaker()
}

for _, rangeKeyVal := range rangeKeys {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there ordering constraints on how we write range keys into the SST? That is, do we need sort this like we sort KV keys?

Copy link
Copy Markdown
Contributor

@erikgrinaker erikgrinaker Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they must be written in key order. Multiple versions of the same range key can be written in any order though, as long as the key order is maintained.

This PR creates kvcoor.RangeFeedMessage encapsulating
RangeFeedEvent and additional RegisteredSpan field which
is the span of the rangefeed registration that emits this
event.

This field is to help rangefeed clients to properly trim
the received SST with only single registered span. In the
case of creating rangefeed with multiple spans, rangefeed
consumers may end up receiving duplicate SST without
proper trimming using this field.

Release note: None
This PR supports DeleteRange operation both in producer that
process DelRange from rangefeed and in stream ingestion
processor that ingest DelRange as SST into destination.

For version >= 22.2, SSTs can also contain range tombstones,
i.e., DelRange, this PR also supports ingesting them.

Ingestion processor uses a separate SST writer to ingest range
tombstones as SSTBatcher does not support adding MVCCRangeKeys.
yet.

This PR also cleans up the GenerationEvent that is no longer
a valid concept in the current consumer-tracked design.

Release note: None
Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory monitoring in ingestion is something that I plan to do in the August plan, either through BufferAdder or set up its own memory monitoring mechanism.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 133 at r9 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Should this be 'rangeBatcher' below?

Done.

Code quote:

curRangeKVBatch

pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 667 at r9 (raw file):

Previously, stevendanna (Steven Danna) wrote…

[nit] should we use this function down below in bufferKV as well?

right, good call.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 745 at r9 (raw file):

Previously, erikgrinaker (Erik Grinaker) wrote…

Yes, they must be written in key order. Multiple versions of the same range key can be written in any order though, as long as the key order is maintained.

Now I sort them before batching. Also refactored the rangeBatcher to make it cleaner.


pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go line 750 at r9 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Looks like you might be able to use PutRawMVCCRangeKey now to avoid the decode.

Done.

Code quote:

PutMVCCRangeKey

pkg/kv/kvclient/kvcoord/rangefeed_message.go line 22 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I wouldn't mention anything about SST;
Isn't this span a "range" against which rangefeed was established?

Done.

Code quote:

has an underlying roachpb.RangeFeedSSTable event.


// TODO(casper): disabled due to error when setting a cluster setting
// "setting updated but timed out waiting to read new value"
skip.UnderStressRace(t, "disabled under stress race")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's open an issue for this so we don't forget.

Copy link
Copy Markdown
Contributor Author

@gh-casper gh-casper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @dt, @erikgrinaker, @miretskiy, @samiskin, and @stevendanna)


pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go line 330 at r11 (raw file):

Previously, stevendanna (Steven Danna) wrote…

Let's open an issue for this so we don't forget.

Done. #85615

Code quote:

	// TODO(casper): disabled due to error when setting a cluster setting
	// "setting updated but timed out waiting to read new value"

@gh-casper
Copy link
Copy Markdown
Contributor Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Aug 4, 2022

Build succeeded:

@craig craig bot merged commit 9bd9d64 into cockroachdb:master Aug 4, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-tenant-streaming Including cluster streaming

Projects

None yet

Development

Successfully merging this pull request may close these issues.

streamingccl: make MVCC bulk operations well-tested kvserver: C2C handling of DeleteRange aka MVCC range tombstones

6 participants