rangefeedbuffer: introduce a rangefeed buffer#71225
rangefeedbuffer: introduce a rangefeed buffer#71225craig[bot] merged 2 commits intocockroachdb:masterfrom
Conversation
de30374 to
d89057e
Compare
arulajmani
left a comment
There was a problem hiding this comment.
I haven't had a thorough look yet, but I don't think we can use checkpoint events here as they may not encompass the entire rangefeed span. I think this needs to work on frontier timestamps instead.
Maybe we can change the buffer methods to:
func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedValue) error
func (b *Buffer) Advance(ctx context.Context, TS hlc.Timestamp)
We can then use the OnFrontierAdvance option from #71256 to carve out a pattern like:
r, err := f.RangeFeed(ctx, ...,
func(ctx context.Context, value *roachpb.RangeFeedValue) {
b.Add(ctx, value)
},
rangefeed.WithOnFrontierAdvance(func(ctx context.Context, ts hlc.Timestamp) {
b.Advance(ctx, ts)
}),
Wdyt?
Reviewable status:
complete! 0 of 0 LGTMs obtained
|
I know, haha, that's what I was getting at here + discussed with you yesterday. I see you've already typed up #71256, I'll rebase. |
d89057e to
93d30ad
Compare
93d30ad to
3858ebf
Compare
3858ebf to
efc15c5
Compare
ajwerner
left a comment
There was a problem hiding this comment.
Reviewed 1 of 1 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Quoted 4 lines of code…
value := ev.Val.Value if !value.IsPresent() { value = ev.Val.PrevValue }
what is this case?
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
type entry struct { *roachpb.RangeFeedEvent hlc.Timestamp
Why separate timestamp when it's already a field in the event? Function seems better.
Data structure question: we only care about the ordering when we go to deal with a frontier. It feels like it would be simpler to just sort.Sort at Flush point and append during execution? That also allows us to end up freeing up the buffer after a burst and to avoid allocating a new one on flush. The flip side is if there are many flushes before a lot of data, you'll sort for each flush, but I really don't care about that case. The bigger reason to do it is that I think it's simpler. Consider:
type rangefeedEvents []*roachpb.RangefeedEvent
func (events rangefeedEvents) Len() int { return len(events) }
func (events rangefeedEvents) Swap(i, j int) { events[i], events[j] = events[j], events[i] }
func (events rangefeedEvents) Less(i, j int) bool { return eventTS(events[i].Timestamp).Less(eventTS(events[j])) }
func eventTS(ev *roachpb.RangefeedEvent) hlc.Timestamp { /* ... */ }
// Add adds the given rangefeed entry to the buffer.
func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedEvent) error {
// ...
b.mu.Lock()
defer b.mu.Unlock()
if value.Timestamp.LessEq(b.mu.frontier) {
// If the rangefeed entry is at a timestamp less than or equal to our
// last known checkpoint, we don't need to record it.
return nil
}
if len(b.mu.events) > b.limit {
return ErrBufferLimitExceeded
}
b.mu.events = append(b.mu.events, ev)
return nil
}
func (b *Buffer) Flush(
ctx context.Context, frontier hlc.Timestamp,
) (events []*roachpb.RangeFeedEvent) {
b.mu.Lock()
defer b.mu.Unlock()
if frontier.Less(b.mu.frontier) {
log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier)
}
sort.Sort(rangefeedEvents(b.mu.events))
idx := sort.Search(len(b.mu.events), func(i int) bool {
return !eventTS(b.mu.events).Less(frontier)
})
events = b.mu.events[:idx]
b.mu.events = b.mu.events[idx:]
b.mu.frontier = frontier
return events
}
irfansharif
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
value := ev.Val.Value if !value.IsPresent() { value = ev.Val.PrevValue }what is this case?
Just a way to extract a timestamp for a rangefeed value deletion event.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
Previously, ajwerner wrote…
Why separate timestamp when it's already a field in the event? Function seems better.
Data structure question: we only care about the ordering when we go to deal with a frontier. It feels like it would be simpler to just
sort.Sortat Flush point and append during execution? That also allows us to end up freeing up the buffer after a burst and to avoid allocating a new one on flush. The flip side is if there are many flushes before a lot of data, you'll sort for each flush, but I really don't care about that case. The bigger reason to do it is that I think it's simpler. Consider:type rangefeedEvents []*roachpb.RangefeedEvent func (events rangefeedEvents) Len() int { return len(events) } func (events rangefeedEvents) Swap(i, j int) { events[i], events[j] = events[j], events[i] } func (events rangefeedEvents) Less(i, j int) bool { return eventTS(events[i].Timestamp).Less(eventTS(events[j])) } func eventTS(ev *roachpb.RangefeedEvent) hlc.Timestamp { /* ... */ } // Add adds the given rangefeed entry to the buffer. func (b *Buffer) Add(ctx context.Context, ev *roachpb.RangeFeedEvent) error { // ... b.mu.Lock() defer b.mu.Unlock() if value.Timestamp.LessEq(b.mu.frontier) { // If the rangefeed entry is at a timestamp less than or equal to our // last known checkpoint, we don't need to record it. return nil } if len(b.mu.events) > b.limit { return ErrBufferLimitExceeded } b.mu.events = append(b.mu.events, ev) return nil } func (b *Buffer) Flush( ctx context.Context, frontier hlc.Timestamp, ) (events []*roachpb.RangeFeedEvent) { b.mu.Lock() defer b.mu.Unlock() if frontier.Less(b.mu.frontier) { log.Fatalf(ctx, "frontier timestamp regressed: saw %s, previously %s", frontier, b.mu.frontier) } sort.Sort(rangefeedEvents(b.mu.events)) idx := sort.Search(len(b.mu.events), func(i int) bool { return !eventTS(b.mu.events).Less(frontier) }) events = b.mu.events[:idx] b.mu.events = b.mu.events[idx:] b.mu.frontier = frontier return events }
Good idea, done for both. One think I'm not sure about is this bit:
events = b.mu.events[:idx]
b.mu.events = b.mu.events[idx:]
Are we creating an ever-growing slice here, discarding only the prefix when flushing out events lesseq than the frontier timestamp?
efc15c5 to
2d4a1e9
Compare
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Just a way to extract a timestamp for a rangefeed value deletion event.
🤔 the delete should have a timestamp, no?
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Good idea, done for both. One think I'm not sure about is this bit:
events = b.mu.events[:idx] b.mu.events = b.mu.events[idx:]Are we creating an ever-growing slice here, discarding only the prefix when flushing out events lesseq than the frontier timestamp?
When the slice re-allocs, it'll be a new backing array.
irfansharif
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
🤔 the delete should have a timestamp, no?
Yea, isn't it ev.Val.PrevValue.Timestamp? That's what this code block was doing.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 119 at r4 (raw file):
Previously, ajwerner wrote…
When the slice re-allocs, it'll be a new backing array.
Ack.
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
Yea, isn't it ev.Val.PrevValue.Timestamp? That's what this code block was doing.
I don't think so. PrevValue will be empty unless you explicitly request it. It will also have the timestamp of the value that preceded the deletion.
Can you write a test that exercises the deletion path?
nvb
left a comment
There was a problem hiding this comment.
Reviewed 6 of 6 files at r2, 3 of 4 files at r3, 1 of 1 files at r5, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @irfansharif)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
I don't think so. PrevValue will be empty unless you explicitly request it. It will also have the timestamp of the value that preceded the deletion.
Can you write a test that exercises the deletion path?
I'm also confused by this. The Value should have a timestamp even if it is a deletion tombstone.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
} func eventTS(ev *roachpb.RangeFeedEvent) hlc.Timestamp {
@arulajmani and I were talking yesterday about whether this structure could be used in the SQLWatcher as well in place of the current eventBuffer he introduced over there. That use would be a little different in that it is zipping together two rangefeeds, but it has a lot of the same requirements. We discussed whether this buffer should be made slightly more generic to accommodate other uses cases, essentially by replacing *roachpb.RangeFeedEvent with a type Event interface { Timestamp() hlc.Timestamp }.
We wanted to get your thoughts on that.
2d4a1e9 to
e0a4e2e
Compare
irfansharif
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Hm, looks like I misunderstood the APIs here; wrote a test at the top-level rangefeed package spelling out what the two timestamps should look like when writing/overwriting/deleting keys.
It will also have the timestamp of the value that preceded the deletion.
Looks like this isn't true? I get an empty timestamp for the value preceding the deletion.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
@arulajmani and I were talking yesterday about whether this structure could be used in the
SQLWatcheras well in place of the currenteventBufferhe introduced over there. That use would be a little different in that it is zipping together two rangefeeds, but it has a lot of the same requirements. We discussed whether this buffer should be made slightly more generic to accommodate other uses cases, essentially by replacing*roachpb.RangeFeedEventwith atype Event interface { Timestamp() hlc.Timestamp }.We wanted to get your thoughts on that.
I don't mind the idea of hiding away the even through an interface. I imagine then in the SQLWatcher the events we'd store would be the decoded descriptor IDs (decoded as per the right table's schema), which feels cleaner than juggling two instances of this buffer; made that change.
There's little in this library that's now "rangefeed" specific, though I expect it to mostly be used in conjunction with it. Suggestions for where it should be placed or what it should be named?
ac785d0 to
b7a0e52
Compare
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
PrevValue will be empty unless you explicitly request it.
irfansharif
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
In TestRangefeedValueTimestamps I'm specifying
rangefeed.WithDiff(), I'm seeing the previous values just fine -- the timestamps however are empty.
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 632 to 644 in 0417041
Doesn't look like we set a prev-value timestamp above, unlike during the initial scan:
cockroach/pkg/kv/kvclient/rangefeed/rangefeed.go
Lines 258 to 278 in fc803ee
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, irfansharif (irfan sharif) wrote…
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 632 to 644 in 0417041
Doesn't look like we set a prev-value timestamp above, unlike during the initial scan:
cockroach/pkg/kv/kvclient/rangefeed/rangefeed.go
Lines 258 to 278 in fc803ee
Ah, guess we didn't expose the PrevValue timestamp. That was probably smart from an API contract perspective.
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 625 to 627 in efb1a95
More commentary on
cockroach/pkg/roachpb/api.proto
Lines 2384 to 2388 in efb1a95
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
Ah, guess we didn't expose the PrevValue timestamp. That was probably smart from an API contract perspective.
cockroach/pkg/kv/kvserver/rangefeed/processor.go
Lines 625 to 627 in efb1a95
More commentary on
probably wouldn't hurt.cockroach/pkg/roachpb/api.proto
Lines 2384 to 2388 in efb1a95
Probably also worth reconciling those behaviors. That certainly was not intentional.
arulajmani
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @irfansharif and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I don't mind the idea of hiding away the even through an interface. I imagine then in the SQLWatcher the events we'd store would be the decoded descriptor IDs (decoded as per the right table's schema), which feels cleaner than juggling two instances of this buffer; made that change.
There's little in this library that's now "rangefeed" specific, though I expect it to mostly be used in conjunction with it. Suggestions for where it should be placed or what it should be named?
Another thing that has come up in offline discussions with both @irfansharif and @nvanbenschoten, and I'd like to get more thoughts on here, is how we feel about generalizing this even further so that this can track multiple frontier timestamps. Specifically for the SQLWatcher, this would allow us to push the "combined frontier timestamp" into the buffer (as opposed to tracking it in a layer above). We could extend this interface a bit to make this work.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 75 at r6 (raw file):
// recorded (expected to monotonically increase), and future events with // timestamps less than or equal to it are discarded. func (b *Buffer) Flush(ctx context.Context, frontier hlc.Timestamp) (events []Event) {
In the EventBuffer from the SQLWatcher PR we were de-duplicating events from the buffer when flushing. Is that something we want to do here as well?
irfansharif
left a comment
There was a problem hiding this comment.
TFTRs so far! Do we think this is close to getting merged? I want to build on top of this for the TODOs I've left over on #69614.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 69 at r4 (raw file):
Previously, ajwerner wrote…
Probably also worth reconciling those behaviors. That certainly was not intentional.
Added a commit.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
Another thing that has come up in offline discussions with both @irfansharif and @nvanbenschoten, and I'd like to get more thoughts on here, is how we feel about generalizing this even further so that this can track multiple frontier timestamps. Specifically for the SQLWatcher, this would allow us to push the "combined frontier timestamp" into the buffer (as opposed to tracking it in a layer above). We could extend this interface a bit to make this work.
I'm 👎 on it, especially if you can simply add as buffer events as decoded IDs. There are no other instances (to my knowledge) where we're maintaining multiple (n=2) rangefeeds to keep track of multiple frontier timestamps, so that sounds overgeneralized. What do you imagine that API would look like? Would it give us much over something like the following:
type ... struct {
rangefeedbuffer.Buffer // accumulates descpb.IDs
zonesFrontierTS, descriptorFrontierTS hlc.Timestamp
}pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 75 at r6 (raw file):
Previously, arulajmani (Arul Ajmani) wrote…
In the EventBuffer from the
SQLWatcherPR we were de-duplicating events from the buffer when flushing. Is that something we want to do here as well?
I had a TODO here, but decided ultimately against it -- easy enough to do at the caller + difficult now that we're taking in an opaque Event interface.
nvb
left a comment
There was a problem hiding this comment.
Reviewed 7 of 7 files at r6, 4 of 4 files at r7, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @irfansharif)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, irfansharif (irfan sharif) wrote…
I'm 👎 on it, especially if you can simply add as buffer events as decoded IDs. There are no other instances (to my knowledge) where we're maintaining multiple (n=2) rangefeeds to keep track of multiple frontier timestamps, so that sounds overgeneralized. What do you imagine that API would look like? Would it give us much over something like the following:
type ... struct { rangefeedbuffer.Buffer // accumulates descpb.IDs zonesFrontierTS, descriptorFrontierTS hlc.Timestamp }
+1 to what Irfan said.
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 85 at r6 (raw file):
// Accumulate all events with timestamps <= the given timestamp in sorted // order. sort.Sort(b.mu.events)
I think that this is going to heap allocate because you have the interface implemented on a value. If you implement the interface on *events and then pass sort.Sort(&b.mu.events), you can avoid that.
ajwerner
left a comment
There was a problem hiding this comment.
Reviewed 6 of 7 files at r6.
Reviewable status:complete! 2 of 0 LGTMs obtained (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
+1 to what Irfan said.
Had written this earlier but forgot to push publish. Seems like we're in agreement:
Leave generalizing refactors for later, when they’re obvious or better understood?
In both cockroachdb#69614 and cockroachdb#69661 we find ourselves reaching for a thin, memory-bounded buffer to sit on top a rangefeed. We want to be able to accumulate raw rangefeed events, and when the rangefeed frontier is bumped, flush everything out en-masse in timestamp sorted order. Package rangefeedbuffer introduces such a datastructure. If we're past the configured memory limit before having observed a frontier bump, we have little recourse -- since we receive checkpoints and rangefeed values on the same stream, we're waiting for a future checkpoint to flush out the accumulated memory. Given this, we simply error out to the caller (expecting them to re-establish the rangefeed). Release note: None Co-authored-by: Arul Ajmani <arula@cockroachlabs.com>
99c5df4 to
83acbc5
Compare
irfansharif
left a comment
There was a problem hiding this comment.
❤️ Thanks y'all!
bors r+
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @arulajmani and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 85 at r6 (raw file):
Previously, nvanbenschoten (Nathan VanBenschoten) wrote…
I think that this is going to heap allocate because you have the interface implemented on a value. If you implement the interface on
*eventsand then passsort.Sort(&b.mu.events), you can avoid that.
TIL, you're right:
$ go build -gcflags '-m' github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer
...
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go:86:16: b.mu.events escapes to heap
Done.
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 85 at r6 (raw file):
Previously, irfansharif (irfan sharif) wrote…
TIL, you're right:
$ go build -gcflags '-m' github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer ... pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go:86:16: b.mu.events escapes to heapDone.
an equivalent choice which maybe is worse is to keep the slice for the struct field and cast like: sort.Sort((*events)(&b.mu.events)). Also, I don't think you need to define the methods on the pointer to defeat the allocation as the pointer inherits the method set of the non-pointer.
arulajmani
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 2 stale) (waiting on @arulajmani, @irfansharif, and @nvanbenschoten)
pkg/kv/kvclient/rangefeed/rangefeedbuffer/buffer.go, line 111 at r5 (raw file):
What do you imagine that API would look like? Would it give us much over something like the following ...
We could structure it such that tracking of the frontier timestamp tracking is pushed inside this buffer, instead of the caller having to do it at the layer above. What I had in mind was something like:
type Event interface {
Timestamp() hlc.Timestamp
Type() int // key into the array that tracks frontier timestamps
FrontierAdvance() bool // true if the event corresponds to the frontier being advanced.
}
This would allow the SQLWatcher to construct different event types based on where the event was generated at (system.descriptors or system.zones) and not have to individually track the frontiers of these rangefeeds.
All that being said, looks like this isn't super popular, so I'm fine this being structured as is.
|
CI flaked on #70220. |
|
Build failed: |
There was a discrepancy with whether or not the previous contained the timestamp of when the previous value was recorded. With the initial scan, we populated the timestamp of the initial scan itself. With regular diffs, we omitted the timestamp. This made for confusing semantics (well, confusing at least for this author). Release note: None
83acbc5 to
d93e7cf
Compare
|
CI + bazel infra flake: #71679, retrying. bors r+ |
|
Build succeeded: |
When unmarshaling descriptor protos into their specific types, we want to pass in the MVCC timestamp at which that descriptor was read. Given we receive these protos through the surrounding rangefeed, we want to use the rangefeed event timestamp. We we erroneously using the timestamp found on the rangefeed event's "previous value", which the API guarantees will be the zero timestamp. (This tripped us up before; we added some commentary + tests in cockroachdb#71225 for the rangefeed library to make this clearer.) Release note: None
When unmarshaling descriptor protos into their specific types, we want to pass in the MVCC timestamp at which that descriptor was read. Given we receive these protos through the surrounding rangefeed, we want to use the rangefeed event timestamp. We we erroneously using the timestamp found on the rangefeed event's "previous value", which the API guarantees will be the zero timestamp. (This tripped us up before; we added some commentary + tests in cockroachdb#71225 for the rangefeed library to make this clearer.) Release note: None
73439: spanconfig/sqlwatcher: use the right mvcc timestamp r=irfansharif a=irfansharif When unmarshaling descriptor protos into their specific types, we want to pass in the MVCC timestamp at which that descriptor was read. Given we receive these protos through the surrounding rangefeed, we want to use the rangefeed event timestamp. We we erroneously using the timestamp found on the rangefeed event's "previous value", which the API guarantees will be the zero timestamp. (This tripped us up before; we added some commentary + tests in #71225 for the rangefeed library to make this clearer.) Release note: None Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
Buffer provides a thin memory-bounded buffer to sit on top of a rangefeed. It
accumulates raw rangefeed events1, which can be flushed out in timestamp
sorted order en-masse whenever the rangefeed frontier is bumped. If we
accumulate more events than the limit allows for, we error out to the caller.
We need such a thing in both #69614 and #69661.
Release note: None
First commit is from #71256. Co-authored-by: Arul Ajmani arula@cockroachlabs.com.
Footnotes
Rangefeed error events are propagated to the caller, checkpoint events
are discarded. ↩