kvcoord: Fix error handling and retries in mux rangefeed client#102094
kvcoord: Fix error handling and retries in mux rangefeed client#102094craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
8d4b84c to
67edbe1
Compare
|
I'll be busy with L2 today. @aliher1911 @pavelkalinnikov can you have a look? |
| // If we've cleared the descriptor on failure, re-lookup. | ||
| if !token.Valid() { | ||
| var err error | ||
| if !stream.token.Valid() { |
There was a problem hiding this comment.
Most of this loop checks and modifies stream.Something fields. This suggests that much of this code logically belongs to the stream type, rather than muxer. Could this be encapsulated in the stream as a few stream lifecycle methods, and the muxer would just orchestrate?
There was a problem hiding this comment.
Perhaps it can; it's more of a result of inheriting "dna" from regular rangefeed;
Do you feel strongly that such refactoring should be done right now?
There was a problem hiding this comment.
Not strongly, but I feel it would improve understanding of this code. It wouldn't be a large refactoring I think. Instead of rangefeedMuxer.startActiveRangeFeed(stream), it would be a activeMuxRangeFeed.start() method which would otherwise do the same thing?
| // If we've cleared the descriptor on failure, re-lookup. | ||
| if !token.Valid() { | ||
| var err error | ||
| if !stream.token.Valid() { |
There was a problem hiding this comment.
Not strongly, but I feel it would improve understanding of this code. It wouldn't be a large refactoring I think. Instead of rangefeedMuxer.startActiveRangeFeed(stream), it would be a activeMuxRangeFeed.start() method which would otherwise do the same thing?
I misunderstood; this of course is no big deal to do. I'm not sure that moving this logic to start() would be better necessarily. Even though startActiveRangeFeed operates mostly on "activeMuxRangeFeed" structure -- that's not the only thing that's happening; there are many dependencies on "rangefeedMuxer" -- establishing rpc connection to the node, access to many members of rangefeedMuxer. |
Right, there are some dependencies as always. Dependencies can/should be manageable if abstractions are clear, clear abstractions are important for readability and maintainability. Can we clarify the abstractions here? (maybe better comments around So far, the As a small example, in this PR some fields of I understand that there might be no short way to refactor in this PR (I didn't try, you know better), but in the longer run I feel like we need clearer abstractions here. I'll make another review pass in a few hours if that's ok, feel free to polish / add some comments in the meantime as you feel necessary? |
As you say: things are not simple. Multiple go routines are involved; resources acquired at one point on 1 go routine (catchup scan) and are released on another go routine (which manages many range feeds). I've introduce |
pav-kv
left a comment
There was a problem hiding this comment.
@miretskiy Thanks for adding the diagram! It helped me to understand this design a bit better. Looks like a bunch of streams that used to be in their own goroutines, flattened into one goroutine? With a few signals that drive it, primarily the events loop.
I've brushed up the diagram a bit, see the suggestions.
I'll make another pass tomorrow morning. I see the benefit of this fix, but hopefully in the morning I'll have a better focus on nitty-gritty details.
| }() | ||
|
|
||
| errInfo, err := handleRangefeedError(ctx, reason) | ||
| if err != nil { |
There was a problem hiding this comment.
Could reuse the new resetRouting method for replacing lines 484-485?
There was a problem hiding this comment.
Ah sorry, it's probably incorrect to do that because resetRouting also resets the transport. And in that branch, we only want to evict the token. Revert?
There was a problem hiding this comment.
Nah; it's correct. Every time we evict token, we re-resolve it and recreate transport.
Exactly right; we are replacing N go routines with ... N go routines where N is the number of nodes as opposed to the number of ranges. |
| // start begins execution of activeMuxRangeFeed. | ||
| // Backoff is applied if same stream restarts. | ||
| func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error { |
There was a problem hiding this comment.
It would be nice to outline the states in which this function can be called, and in which state it can leave the activeRangeFeed. I.e. are there any invariants? If an error is returned, what does it mean - is it a transient/recoverable or permanent error that gets propagated all the way up to the client?
From the diagram above, it seems that there are 2 states from which we can enter this:
- A brand new
activeMuxRangeFeedstart. Thetransportis nil,catchupResnot nil, andtokencan be valid or not (commonly still valid). - A restart after a transient error. In this case the
transport,catchupResandtokenmight still be valid.
There was a problem hiding this comment.
Comment added to this function and to the catchupRes member field to describe catchupRes lifetime.
| }() | ||
|
|
||
| errInfo, err := handleRangefeedError(ctx, reason) | ||
| if err != nil { |
There was a problem hiding this comment.
Ah sorry, it's probably incorrect to do that because resetRouting also resets the transport. And in that branch, we only want to evict the token. Revert?
| streamID := atomic.AddInt64(&m.seqID, 1) | ||
|
|
||
| // Start a retry loop for sending the batch to the range. | ||
| for s.retry.Next() { |
There was a problem hiding this comment.
Carrying the transport, token and friends seems beneficial.
I'm not sure that reusing the retry state across starts is great, if I understand this implementation. Say, start succeeds, and we run the activeMuxRangeFeed for a while. Then a transient error happens during the event handling loop, we try to restart the feed. Why would we use the same retry state that we used to start the feed in the first place?
The delay in retry will keep growing. So, with every transient error we will be starting the feed slower and slower. I think we should sometimes (when things are going smoothly again) decrease the delay, or reset the state of retry. For example, if the time after the last successful start is more than a threshold, then we s.retry.Reset() here.
This way, we will ensure that there is no spinning if all we do is trying to start the feed. Once feed is started, we'll reset the retry state so that the next restart will be snappy again.
There was a problem hiding this comment.
It's not that bad since the backoff is bound by 1 sec; but I was thinking the same, just wanted to keep things
as close to existing impl as possible. Having said that, since you're voicing the same concern I had, I removed retry state from the activeMux. No need to reset it. Basically, if you were able to establish node connection, plus send an initial request, then you've done enough work -- and retry can happen immediately.
Verified this fix is fine with decommission test.
bb602dc to
c27731f
Compare
pav-kv
left a comment
There was a problem hiding this comment.
CI seems unhappy, please fix.
Fix error handling and retries when restarting rangefeeds. A big difference between regular rangefeed, and mux rangefeed, is regular rangefeed has a dedicated go routine per range. This go routine is responsible for running a rangefeed, handling its errors, management of state pertaining to the RPC call (transport, retry information, routing information, etc), and restarting rangefeed with backoff as needed. Mux rangefeed, on the other hand, is not "loop" based. Prior to this PR, mux rangefeed, when it encountered a transient error, would loose a lot of the restart state mentioned above. For example, it would loose the transport information, so that the restart would run against the same node as before, resulting, potentially, in busy loops. Those busy loops (where the RPC call is restarted against the same node/replica that just experienced an error), would tend to make test flaky since they would take longer time to converge to the state expected by the tests (such as `TestDecommission`) test. This PR fixes this loss of state pertaining to single range restart by associating this state with the long lived `activeMuxRangeFeed` state. Fixes cockroachdb#96630 Fixes cockroachdb#100783 Informs cockroachdb#99631 Informs cockroachdb#101614 Release note: None
|
bors r+ |
|
Build failed (retrying...): |
|
Build succeeded: |
|
Encountered an error creating backports. Some common things that can go wrong:
You might need to create your backport manually using the backport tool. error creating merge commit from c0a5c30 to blathers/backport-release-23.1-102094: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 23.1.x failed. See errors above. error creating merge commit from c0a5c30 to blathers/backport-release-23.1.0-102094: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict [] you may need to manually resolve merge conflicts with the backport tool. Backport to branch 23.1.0 failed. See errors above. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
release-23.1: kvcoord: Fix error handling and retries in mux rangefeed client" (#102094)
Fix error handling and retries when restarting rangefeeds. A big difference between regular rangefeed, and mux rangefeed, is regular rangefeed has a dedicated go routine per range. This go routine is responsible for running a rangefeed, handling its errors, management of state pertaining to the RPC call (transport, retry information, routing information, etc), and restarting rangefeed with backoff as needed.
Mux rangefeed, on the other hand, is not "loop" based. Prior to this PR, mux rangefeed, when it encountered a transient error, would loose a lot of the restart state mentioned above. For example, it would loose the transport information, so that the restart would run against the same node as before, resulting, potentially, in busy loops. Those busy loops (where the RPC call is restarted against the same node/replica that just experienced an error), would tend to make test flaky since they would take longer time to converge to the state expected by the tests (such as
TestDecommission) test.This PR fixes this loss of state pertaining to single range restart by associating this state with the long lived
activeMuxRangeFeedstate.Fixes #96630
Fixes #100783
Informs #99631
Informs #101614
Release note: None