kvcoord: Implement CloseStream for MuxRangeFeed#108335
kvcoord: Implement CloseStream for MuxRangeFeed#108335craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
erikgrinaker
left a comment
There was a problem hiding this comment.
Reviewed 6 of 6 files at r1, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/kv/kvpb/api.proto line 2993 at r1 (raw file):
// When this bit is set, the server should attempt, as best effort, to // quickly terminate rangefeed for this stream. bool close_stream = 6;
It's unfortunate that we're overloading RangeFeedRequest both for starting and closing a stream, it makes it harder to reason about how different fields interact. For example, when I set close_stream, what happens if I don't set stream_id, or pass a wrong span? MuxRangeFeed should really take a separate oneof message type, but I guess we're too far down this path now. :(
Let's at least document that only stream_id matters when we set close_stream. Also, I think we should guard against stream_id and close_stream being set in Node.RangeFeed() and error out.
We should call out that callers must check the V23_2 version gate before using this. Otherwise, if a 23.2 node sends a close_stream request to a 23.1 node, it will result in a new rangefeed being spun up.
pkg/server/node.go line 224 at r1 (raw file):
} metaClosedMuxRangeFeedStreams = metric.Metadata{ Name: "rpc.streams.mux_rangefeed.closed_streams",
I'm not sure we need this? It's slightly ambiguous, in that a client can close streams both by closing the connection and explicitly closing each stream. I think I'd just avoid the confusion and not have this, unless you feel strongly about it.
pkg/server/node.go line 1819 at r1 (raw file):
// nice to clean up anyway. activeStreams.Range(func(key, value any) bool { value.(*setRangeIDEventSink).cancel()
Let's do ctx, cancel := context.WithCancel(stream.Context()); defer cancel() instead, and use ctx throughout. That will cancel all the child contexts too.
pkg/server/node.go line 1832 at r1 (raw file):
if req.CloseStream { if !n.storeCfg.Settings.Version.IsActive(stream.Context(), clusterversion.V23_2) { return errors.AssertionFailedf("unexpected CloseStream(%d) request (min version %s)",
The RPC server should never reject functionality based on version gates, the client gets to decide. Otherwise, version gate rollout races will cause spurious errors. See:
cockroach/pkg/clusterversion/cockroach_versions.go
Lines 54 to 57 in c6d599f
pkg/server/node.go line 1846 at r1 (raw file):
// just before we receive close request. So, just print out a warning. if log.V(1) { log.Infof(stream.Context(), "ignoring likely benign CloseStream race for stream %d", req.StreamID)
Let's not speculate on why the client sent this, and just say "closing unknown rangefeed stream ID %d" or something.
pkg/server/node.go line 1875 at r1 (raw file):
streamSink.cancel() if streamSink.closedByClient.Load() && errors.Is(err, context.Canceled) {
I think we can avoid closedByClient here if we instead check streamSink.ctx.Err() != nil. The client doesn't care about the stream any more anyway.
pkg/server/node.go line 1880 at r1 (raw file):
// so that kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED gets returned // to the client. err = nil
Let's explicitly set err = kvpb.NewRangeFeedRetryError(REASON_RANGEFEED_CLOSED) here. Otherwise, in the case of a version gate race, the code below can fall through to REASON_REPLICA_REMOVED.
erikgrinaker
left a comment
There was a problem hiding this comment.
Thanks for making the effort to get this in!
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
pkg/kv/kvpb/api.proto line 2993 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
It's unfortunate that we're overloading
RangeFeedRequestboth for starting and closing a stream, it makes it harder to reason about how different fields interact. For example, when I setclose_stream, what happens if I don't setstream_id, or pass a wrongspan? MuxRangeFeed should really take a separateoneofmessage type, but I guess we're too far down this path now. :(Let's at least document that only
stream_idmatters when we setclose_stream. Also, I think we should guard againststream_idandclose_streambeing set inNode.RangeFeed()and error out.We should call out that callers must check the
V23_2version gate before using this. Otherwise, if a 23.2 node sends aclose_streamrequest to a 23.1 node, it will result in a new rangefeed being spun up.
Added NewCloseStreamRequest request helper method as a safety to make sure v23.2 gate is there (since we don't have real callers right now).
Added checks in regular rangefeed.
pkg/server/node.go line 224 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
I'm not sure we need this? It's slightly ambiguous, in that a client can close streams both by closing the connection and explicitly closing each stream. I think I'd just avoid the confusion and not have this, unless you feel strongly about it.
Sure -- just wanted to have at least some observability into explicit close requests. But i'm okay dropping this for now.
pkg/server/node.go line 1819 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Let's do
ctx, cancel := context.WithCancel(stream.Context()); defer cancel()instead, and usectxthroughout. That will cancel all the child contexts too.
Nice and neat. Thanks.
pkg/server/node.go line 1832 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
The RPC server should never reject functionality based on version gates, the client gets to decide. Otherwise, version gate rollout races will cause spurious errors. See:
cockroach/pkg/clusterversion/cockroach_versions.go
Lines 54 to 57 in c6d599f
Cool; makes sense. To be safe (i.e. not to forget to use correct version gate when we need to use this RPC), I also added a utility method NewCloseStreamRequest which errors out if v23.2 is not active.
pkg/server/node.go line 1875 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
I think we can avoid
closedByClienthere if we instead checkstreamSink.ctx.Err() != nil. The client doesn't care about the stream any more anyway.
I'm a bit worried relying just on ctx.Err() -- it can be non-nil when e.g. parent context is cancelled too; in that case I think it's correct to return an error.
What I can do is do "LoadAndDelete" above -- if context was cancelled, and !loaded -- then I think we know the stream was explicitly closed.
erikgrinaker
left a comment
There was a problem hiding this comment.
Reviewed 6 of 6 files at r2, 2 of 2 files at r3, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/kv/kvpb/api.proto line 2993 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Added NewCloseStreamRequest request helper method as a safety to make sure v23.2 gate is there (since we don't have real callers right now).
Added checks in regular rangefeed.
May still want to add a comment here, since caller can just construct this directly if they want to.
pkg/server/node.go line 1875 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I'm a bit worried relying just on ctx.Err() -- it can be non-nil when e.g. parent context is cancelled too; in that case I think it's correct to return an error.
What I can do is do "LoadAndDelete" above -- if context was cancelled, and !loaded -- then I think we know the stream was explicitly closed.
Ok, makes sense -- I don't have a good intuition for the error handling here. I'd still be inclined to check ctx.Err() != nil here, since it's generally better form -- context.Canceled can originate from a child context too, without our context being cancelled. It doesn't matter in this case though because streamClosedByClient would be false. Your call.
pkg/server/node.go line 1800 at r2 (raw file):
// cancelled once MuxRangeFeed exits. ctx, cancel := context.WithCancel(n.AnnotateCtx(stream.Context())) cancel()
defer cancel()
Also, still a few sites that use stream.Context(), let's make sure we use ctx throughout.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @erikgrinaker)
pkg/kv/kvpb/api.proto line 2993 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
May still want to add a comment here, since caller can just construct this directly if they want to.
Ack.
pkg/server/node.go line 1875 at r1 (raw file):
Previously, erikgrinaker (Erik Grinaker) wrote…
Ok, makes sense -- I don't have a good intuition for the error handling here. I'd still be inclined to check
ctx.Err() != nilhere, since it's generally better form --context.Canceledcan originate from a child context too, without our context being cancelled. It doesn't matter in this case though becausestreamClosedByClientwould be false. Your call.
Okay; I'm good using streamCtx.Err != nil
1197210 to
ec2106c
Compare
Extend MuxRangeFeed protocol to support explicit, caller initiated CloseStream operation. The caller may decide to stop receiving events for a particular stream, which is part of MuxRangeFeed. The caller may issue a request to MuxRangeFeed server to close the stream. The server will cancel underlying range feed, and return a `RangeFeedRetryError_REASON_RANGEFEED_CLOSED` error as a response. Note, current mux rangefeed clinet does not use this request. The code to support cancellation is added pre-emptively in case this functionality will be required in the future to support restarts due to stuck rangefeeds. Epic: CRDB-26372 Release note: None
|
bors r+ |
|
Build succeeded: |
Extend MuxRangeFeed protocol to support explicit,
caller initiated CloseStream operation.
The caller may decide to stop receiving events
for a particular stream, which is part of MuxRangeFeed. The caller may issue a request to MuxRangeFeed server to close the stream. The server will cancel underlying range feed, and return a
RangeFeedRetryError_REASON_RANGEFEED_CLOSEDerror as a response.Note, current mux rangefeed clinet does not use this request. The code to support cancellation is added pre-emptively in case this functionality will be required in the future to support restarts due to stuck rangefeeds.
Epic: CRDB-26372
Release note: None