Skip to content

kvcoord: Fix error handling and retries in mux rangefeed client#102094

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
miretskiy:mux
Apr 26, 2023
Merged

kvcoord: Fix error handling and retries in mux rangefeed client#102094
craig[bot] merged 1 commit intocockroachdb:masterfrom
miretskiy:mux

Conversation

@miretskiy
Copy link
Copy Markdown
Contributor

Fix error handling and retries when restarting rangefeeds. A big difference between regular rangefeed, and mux rangefeed, is regular rangefeed has a dedicated go routine per range. This go routine is responsible for running a rangefeed, handling its errors, management of state pertaining to the RPC call (transport, retry information, routing information, etc), and restarting rangefeed with backoff as needed.

Mux rangefeed, on the other hand, is not "loop" based. Prior to this PR, mux rangefeed, when it encountered a transient error, would loose a lot of the restart state mentioned above. For example, it would loose the transport information, so that the restart would run against the same node as before, resulting, potentially, in busy loops. Those busy loops (where the RPC call is restarted against the same node/replica that just experienced an error), would tend to make test flaky since they would take longer time to converge to the state expected by the tests (such as TestDecommission) test.

This PR fixes this loss of state pertaining to single range restart by associating this state with the long lived activeMuxRangeFeed state.

Fixes #96630
Fixes #100783
Informs #99631
Informs #101614

Release note: None

@miretskiy miretskiy requested review from a team and erikgrinaker April 23, 2023 14:49
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@miretskiy miretskiy added backport-23.1.x PAST MAINTENANCE SUPPORT: 23.1 patch releases via ER request only backport-23.1.0 labels Apr 23, 2023
@miretskiy miretskiy force-pushed the mux branch 2 times, most recently from 8d4b84c to 67edbe1 Compare April 23, 2023 15:19
@erikgrinaker
Copy link
Copy Markdown
Contributor

I'll be busy with L2 today. @aliher1911 @pavelkalinnikov can you have a look?

// If we've cleared the descriptor on failure, re-lookup.
if !token.Valid() {
var err error
if !stream.token.Valid() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of this loop checks and modifies stream.Something fields. This suggests that much of this code logically belongs to the stream type, rather than muxer. Could this be encapsulated in the stream as a few stream lifecycle methods, and the muxer would just orchestrate?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it can; it's more of a result of inheriting "dna" from regular rangefeed;
Do you feel strongly that such refactoring should be done right now?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strongly, but I feel it would improve understanding of this code. It wouldn't be a large refactoring I think. Instead of rangefeedMuxer.startActiveRangeFeed(stream), it would be a activeMuxRangeFeed.start() method which would otherwise do the same thing?

// If we've cleared the descriptor on failure, re-lookup.
if !token.Valid() {
var err error
if !stream.token.Valid() {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strongly, but I feel it would improve understanding of this code. It wouldn't be a large refactoring I think. Instead of rangefeedMuxer.startActiveRangeFeed(stream), it would be a activeMuxRangeFeed.start() method which would otherwise do the same thing?

@miretskiy
Copy link
Copy Markdown
Contributor Author

Not strongly, but I feel it would improve understanding of this code. It wouldn't be a large refactoring I think. Instead of rangefeedMuxer.startActiveRangeFeed(stream), it would be a activeMuxRangeFeed.start() method which would otherwise do the same thing?

I misunderstood; this of course is no big deal to do. I'm not sure that moving this logic to start() would be better necessarily. Even though startActiveRangeFeed operates mostly on "activeMuxRangeFeed" structure -- that's not the only thing that's happening; there are many dependencies on "rangefeedMuxer" -- establishing rpc connection to the node, access to many members of rangefeedMuxer.

@pav-kv
Copy link
Copy Markdown
Collaborator

pav-kv commented Apr 24, 2023

I'm not sure that moving this logic to start() would be better necessarily. Even though startActiveRangeFeed operates mostly on "activeMuxRangeFeed" structure -- that's not the only thing that's happening; there are many dependencies on "rangefeedMuxer" -- establishing rpc connection to the node, access to many members of rangefeedMuxer.

Right, there are some dependencies as always. Dependencies can/should be manageable if abstractions are clear, clear abstractions are important for readability and maintainability. Can we clarify the abstractions here? (maybe better comments around activeMuxRangeFeed would help)

So far, the activeMuxRangeFeed is a simple struct with just a release() method, and a one-line comment. At the same time, the concept behind this struct seems to be an entire "session" of a rangefeed, right? With a non-trivial lifecycle. To me it would make sense to encapsulate the lifecycle of this "session" into a proper type, even if it requires some plumbing.

As a small example, in this PR some fields of activeMuxRangeFeed (transport and token) are released directly, but in addition there is the activeMuxRangeFeed.release() method. As a user of this type, how would I know when I need to call release() or when I need to manually access the fields?

I understand that there might be no short way to refactor in this PR (I didn't try, you know better), but in the longer run I feel like we need clearer abstractions here.

I'll make another review pass in a few hours if that's ok, feel free to polish / add some comments in the meantime as you feel necessary?

@miretskiy
Copy link
Copy Markdown
Contributor Author

As a small example, in this PR some fields of activeMuxRangeFeed (transport and token) are released directly, but in addition there is the activeMuxRangeFeed.release() method. As a user of this type, how would I know when I need to call release() or when I need to manually access the fields?

As you say: things are not simple. Multiple go routines are involved; resources acquired at one point on 1 go routine (catchup scan) and are released on another go routine (which manages many range feeds).
Sometimes, you need to release transport and sometimes you dont. Sometimes you need to evict routine token and sometimes you don't. There is a lot of complexity here -- and similar complexity exists in the existing range feed implementation. Similar split exists there (partial rangefeed runs the loop/handles errors; singleRangeFeed also handles errors, may or may not release catchup reservation; does some retries of its own).

I've introduce start method as you suggested. Also added resetRouting to help management of routing token/transport.

Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@miretskiy Thanks for adding the diagram! It helped me to understand this design a bit better. Looks like a bunch of streams that used to be in their own goroutines, flattened into one goroutine? With a few signals that drive it, primarily the events loop.

I've brushed up the diagram a bit, see the suggestions.

I'll make another pass tomorrow morning. I see the benefit of this fix, but hopefully in the morning I'll have a better focus on nitty-gritty details.

}()

errInfo, err := handleRangefeedError(ctx, reason)
if err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could reuse the new resetRouting method for replacing lines 484-485?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, it's probably incorrect to do that because resetRouting also resets the transport. And in that branch, we only want to evict the token. Revert?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah; it's correct. Every time we evict token, we re-resolve it and recreate transport.

@miretskiy
Copy link
Copy Markdown
Contributor Author

Thanks for adding the diagram! It helped me to understand this design a bit better. Looks like a bunch of streams that used to be in their own goroutines, flattened into one goroutine? With a few signals that drive it, primarily the events loop.

Exactly right; we are replacing N go routines with ... N go routines where N is the number of nodes as opposed to the number of ranges.

Comment on lines +232 to +246
// start begins execution of activeMuxRangeFeed.
// Backoff is applied if same stream restarts.
func (s *activeMuxRangeFeed) start(ctx context.Context, m *rangefeedMuxer) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to outline the states in which this function can be called, and in which state it can leave the activeRangeFeed. I.e. are there any invariants? If an error is returned, what does it mean - is it a transient/recoverable or permanent error that gets propagated all the way up to the client?

From the diagram above, it seems that there are 2 states from which we can enter this:

  • A brand new activeMuxRangeFeed start. The transport is nil, catchupRes not nil, and token can be valid or not (commonly still valid).
  • A restart after a transient error. In this case the transport, catchupRes and token might still be valid.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added to this function and to the catchupRes member field to describe catchupRes lifetime.

}()

errInfo, err := handleRangefeedError(ctx, reason)
if err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sorry, it's probably incorrect to do that because resetRouting also resets the transport. And in that branch, we only want to evict the token. Revert?

streamID := atomic.AddInt64(&m.seqID, 1)

// Start a retry loop for sending the batch to the range.
for s.retry.Next() {
Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv Apr 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Carrying the transport, token and friends seems beneficial.

I'm not sure that reusing the retry state across starts is great, if I understand this implementation. Say, start succeeds, and we run the activeMuxRangeFeed for a while. Then a transient error happens during the event handling loop, we try to restart the feed. Why would we use the same retry state that we used to start the feed in the first place?

The delay in retry will keep growing. So, with every transient error we will be starting the feed slower and slower. I think we should sometimes (when things are going smoothly again) decrease the delay, or reset the state of retry. For example, if the time after the last successful start is more than a threshold, then we s.retry.Reset() here.

This way, we will ensure that there is no spinning if all we do is trying to start the feed. Once feed is started, we'll reset the retry state so that the next restart will be snappy again.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that bad since the backoff is bound by 1 sec; but I was thinking the same, just wanted to keep things
as close to existing impl as possible. Having said that, since you're voicing the same concern I had, I removed retry state from the activeMux. No need to reset it. Basically, if you were able to establish node connection, plus send an initial request, then you've done enough work -- and retry can happen immediately.

Verified this fix is fine with decommission test.

@miretskiy miretskiy force-pushed the mux branch 2 times, most recently from bb602dc to c27731f Compare April 25, 2023 13:39
Copy link
Copy Markdown
Collaborator

@pav-kv pav-kv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI seems unhappy, please fix.

Fix error handling and retries when restarting rangefeeds.
A big difference between regular rangefeed, and mux rangefeed,
is regular rangefeed has a dedicated go routine per range.
This go routine is responsible for running a rangefeed, handling
its errors, management of state pertaining to the RPC call
(transport, retry information, routing information, etc), and
restarting rangefeed with backoff as needed.

Mux rangefeed, on the other hand, is not "loop" based.  Prior
to this PR, mux rangefeed, when it encountered a transient error,
would loose a lot of the restart state mentioned above.  For example,
it would loose the transport information, so that the restart would
run against the same node as before, resulting, potentially, in
busy loops.  Those busy loops (where the RPC call is restarted
against the same node/replica that just experienced an error), would
tend to make test flaky since they would take longer time to converge
to the state expected by the tests (such as `TestDecommission`) test.

This PR fixes this loss of state pertaining to single range restart
by associating this state with the long lived `activeMuxRangeFeed` state.

Fixes cockroachdb#96630
Fixes cockroachdb#100783
Informs cockroachdb#99631
Informs cockroachdb#101614

Release note: None
@miretskiy
Copy link
Copy Markdown
Contributor Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Apr 26, 2023

Build failed (retrying...):

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Apr 26, 2023

Build succeeded:

@craig craig bot merged commit d9aa438 into cockroachdb:master Apr 26, 2023
@blathers-crl
Copy link
Copy Markdown

blathers-crl bot commented Apr 26, 2023

Encountered an error creating backports. Some common things that can go wrong:

  1. The backport branch might have already existed.
  2. There was a merge conflict.
  3. The backport branch contained merge commits.

You might need to create your backport manually using the backport tool.


error creating merge commit from c0a5c30 to blathers/backport-release-23.1-102094: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict []

you may need to manually resolve merge conflicts with the backport tool.

Backport to branch 23.1.x failed. See errors above.


error creating merge commit from c0a5c30 to blathers/backport-release-23.1.0-102094: POST https://api.github.com/repos/cockroachdb/cockroach/merges: 409 Merge conflict []

you may need to manually resolve merge conflicts with the backport tool.

Backport to branch 23.1.0 failed. See errors above.


🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backport-23.1.x PAST MAINTENANCE SUPPORT: 23.1 patch releases via ER request only

Projects

None yet

Development

Successfully merging this pull request may close these issues.

rangefeed: mux rangefeeds failing in metamorphic test builds kv/kvserver: TestDecommission failed

4 participants