kvserver,kvflowcontrol: integrate flow control#98308
kvserver,kvflowcontrol: integrate flow control#98308craig[bot] merged 12 commits intocockroachdb:masterfrom
Conversation
e523594 to
5e66b17
Compare
a961dbc to
47214ac
Compare
4c4cac9 to
b9e8c50
Compare
b9e8c50 to
c896ea9
Compare
sumeerbhola
left a comment
There was a problem hiding this comment.
I did a rough pass. Just some small comments.
Reviewed 2 of 37 files at r26, 6 of 74 files at r34, 3 of 66 files at r35, 1 of 29 files at r37, 7 of 63 files at r38, 12 of 45 files at r39, 4 of 18 files at r41, 1 of 2 files at r42.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @irfansharif, @pavelkalinnikov, @smg260, and @tbg)
-- commits line 106 at r42:
nit: two settings?
pkg/kv/kvserver/flow_control_replica_integration.go line 88 at r42 (raw file):
// tied to the lifetime of a leaseholder replica having raft leadership. We // don't intercept lease acquisitions/transfers -- simply raft leadership. // This is ok since leadership follows the lease.
We may have discussed this before, but I can't remember -- is the idea here that we don't worry about not being leaseholder as that is a benign situation in that since we won't evaluate proposals at this replica we won't deduct any tokens, so the flow control stuff will go unused. And once we also become the leaseholder they will start getting exercised? If yes, this could use a code comment since "is ok since leadership follows the lease" doesn't give much confidence to a reader about correctness in the intermediate states when they are not colocated.
pkg/kv/kvserver/flow_control_replica_integration.go line 99 at r42 (raw file):
localRepl, found := f.lastKnownReplicas.GetReplicaDescriptorByID(f.replicaForFlowControl.getReplicaID()) if !found { log.Fatalf(ctx, "leader (replid=%d) didn't find self in last known replicas (%s)",
Is this assertion relying on the fact that replicaForFlowControl is locked, so the descriptor could not have changed state while this callback is ongoing?
pkg/kv/kvserver/flow_control_replica_integration.go line 170 at r42 (raw file):
// We're observing ourselves get removed from the raft group, but // are still retaining raft leadership. Close the underlying handle // and bail.
Is there a reason this is not an assertion failure like the previous case. I think we need a code comment explaining why some things are tolerated and some are not -- it's making me slightly nervous about this code model where the notification name tells us something (like onBecomeLeader) and then we fetch some state inside that notification and expect it to be consistent with what we were told.
pkg/kv/kvserver/flow_control_replica_integration.go line 232 at r42 (raw file):
for _, repl := range f.lastKnownReplicas.Descriptors() { if repl.ReplicaID == ourReplicaID { continue
should we assert that ourReplicaID is not in disconnectedStores?
pkg/kv/kvserver/flow_control_replica_integration.go line 318 at r42 (raw file):
if _, found := pausedFollowers[repl.ReplicaID]; found { // As of 4/23, we don't make any strong guarantees around the set of
who is "we" in this statement? Is this saying pausedFollowers can include replicas that are not in the range descriptor, so we'll return such non-existent replicas from notActivelyReplicatingTo and call f.innerHandle.DisconnectStream(ctx, stream) on it?
A longer comment would be helpful.
irfansharif
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @herkolategan, @pavelkalinnikov, @smg260, @sumeerbhola, and @tbg)
pkg/kv/kvserver/flow_control_replica_integration.go line 88 at r42 (raw file):
Previously, sumeerbhola wrote…
We may have discussed this before, but I can't remember -- is the idea here that we don't worry about not being leaseholder as that is a benign situation in that since we won't evaluate proposals at this replica we won't deduct any tokens, so the flow control stuff will go unused. And once we also become the leaseholder they will start getting exercised? If yes, this could use a code comment since "is ok since leadership follows the lease" doesn't give much confidence to a reader about correctness in the intermediate states when they are not colocated.
The referenced "I5" had more detail that I've copied over here. But yes, to everything you said.
// When leadership is lost we release all held flow tokens. Tokens are only
// deducted at proposal time when the proposing replica is both the raft
// leader and leaseholder (the latter is tautological since only
// leaseholders propose). We're relying on timely acquisition of raft
// leadership by the leaseholder to not be persistently over admitting.
pkg/kv/kvserver/flow_control_replica_integration.go line 99 at r42 (raw file):
Previously, sumeerbhola wrote…
Is this assertion relying on the fact that
replicaForFlowControlis locked, so the descriptor could not have changed state while this callback is ongoing?
Yes. The other reason this assertion here exists is because the "local" stream is never disconnected, which you've commented on below. There's an equivalent assertion for the local stream never getting disconnected.
pkg/kv/kvserver/flow_control_replica_integration.go line 170 at r42 (raw file):
Previously, sumeerbhola wrote…
Is there a reason this is not an assertion failure like the previous case. I think we need a code comment explaining why some things are tolerated and some are not -- it's making me slightly nervous about this code model where the notification name tells us something (like
onBecomeLeader) and then we fetch some state inside that notification and expect it to be consistent with what we were told.
It's not an assertion because this is a valid code path we can hit -- it gets triggered in TestFlowControlRaftMembershipRemoveSelf (which I've referenced). The story here is simple, I think, the descriptor's changed and the change includes the handler being removed. It's not inconsistent with what we're being told (onDescChanged). The notification API makes no claim that onBecomeFollower can only come after onDescChanged.
pkg/kv/kvserver/flow_control_replica_integration.go line 232 at r42 (raw file):
Previously, sumeerbhola wrote…
should we assert that
ourReplicaIDis not indisconnectedStores?
We're already asserting that in the following code snippet. It runs before the one place where disconnectedStores is written to:
// disconnectStreams disconnects replication streams for the given replicas.
func (f *replicaFlowControlIntegrationImpl) disconnectStreams(
ctx context.Context, toDisconnect []roachpb.ReplicaDescriptor, reason string,
) {
ourReplicaID := f.replicaForFlowControl.getReplicaID()
for _, repl := range toDisconnect {
if repl.ReplicaID == ourReplicaID {
log.Fatal(ctx, "replica attempting to disconnect from itself")
}
pkg/kv/kvserver/flow_control_replica_integration.go line 318 at r42 (raw file):
Previously, sumeerbhola wrote…
who is "we" in this statement? Is this saying
pausedFollowerscan include replicas that are not in the range descriptor, so we'll return such non-existent replicas fromnotActivelyReplicatingToand callf.innerHandle.DisconnectStream(ctx, stream)on it?
A longer comment would be helpful.
Removed the "we" but meant it in the royal pronoun sense 😛
It's not possible to return non-existent replicas from this method through`notActivelyReplicatingTo and invoke DisconnectStream on them, because the loop iteration is iterating through the descriptor's replicas. Expanded the comment:
// As of 6/23, there are no strong guarantees around the set of
// paused followers we're tracking, nothing that ensures that what's
// tracked is guaranteed to be a member of the range descriptor.
// This is why we treat the range descriptor derived state as
// authoritative (we're using it in the loop iteration and only
// tracking replicas here that are both paused AND part of the
// descriptor).
sumeerbhola
left a comment
There was a problem hiding this comment.
Reviewed 1 of 32 files at r43.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @herkolategan, @irfansharif, @pavelkalinnikov, @smg260, and @tbg)
pkg/kv/kvserver/flow_control_replica_integration.go line 105 at r43 (raw file):
// This assertion relies on replicaForFlowControl being locked, so the // descriptor could not have changed state while this callback is // ongoing. We also never disconnect the local stream until
incomplete sentence
|
Thanks for the reviews! 🍻🍻🍻 bors r+ single on p=420 |
|
Build failed: |
|
Timed out. |
|
bors r+ single on |
|
The CI run itself had succeeded, but bors for some reason timed out. Asked internally. |
|
Canceled. |
|
Appended a commit to bump the bors timeout to 1h20m to see if it helps. bors r+ single on p=420 |
|
Build failed: |
This commit integrates the kvflowcontrol.Dispatch with the
kvserver-level RaftTransport. When log entries are admitted below raft,
we'll want to inform the origin nodes of this fact, effectively
returning the flow tokens that were deducted when replicating the log
entry to us. We repurpose the existing RaftTransport for this
communication -- we piggyback these flow token returns[^1] on raft
messages already bound to nodes we're returning tokens to. We also
guarantee delivery of tokens in the presence of idle RaftTransport
connections[^2].
We had to to introduce some protocol changes here. When a client
establishes a RaftMessageRequestBatch stream, it sends along to the
server the set of all StoreIDs it has. It's populated on the first
RaftMessageRequestBatch sent along MultiRaft.RaftMessageBatch gRPC
stream identifying at least one store, and then populated once more if
any additional stores have been initialized[^3]. This data is used by
the kvflowcontrol machinery to track the exact set of stores on the
client node. It uses this information to react to the gRPC streams
breaking. Since these streams are used to piggy information about which
log entries were admitted below raft[^4] in order for the server-side to
free up flow tokens, if the stream breaks we possibly risk leaking these
tokens. So when these streams break, we use information about the
client's stores to release all held tokens[^5].
We're not using this code just yet, which is just the below-raft
integration with kvflowcontrol. The subsequent commit will introduce the
above-raft integration where we'll actually deduct flow tokens at the
sender, encode proposals using EntryEncoding{Standard,Sideloaded}WithAC,
which in turn enqueues virtual work items in below-raft admission queues
for asynchronous admission. Once asynchronously admitted, using the
changes in this commit, we'll return flow tokens using the now-wired-up
kvflowcontrol.Dispatch interface.
---
Suggested reading order for reviewers:
- (*RaftTransport).kvflowControl
Brief comment block which tries to give a lay of the land.
- flow_control_stores.go
Integration interface+implementation that's going to be used by the
RaftTransport to return flow tokens to the specific locally held
kvflowcontrol.Handles, after learning about admitted raft log entries
from remote nodes. It's implemented more fully in the subsequent commit.
- flow_control_raft_transport.go
Contains the set of new dependencies now used in the RaftTransport
code for flow token purposes. It also includes the interfaces that
show how the RaftTransport informs individual replicas that its no
longer connected to specific (remote) stores. They're used more fully
in the subsequent commit.
- raft_transport.go
The actual code changes to the RaftTransport.
- flow_token_transport_test.go and flow_token_transport/*
Datadriven test to understand how the various pieces fit together.
- kvflowdispatch/*
Adds some metrics and unit testing for the canonical
kvflowcontrol.Dispatch implementation (previously implemented).
---
[^1]: In the form of kvflowcontrolpb.AdmittedRaftLogEntries.
[^2]: See kvserver.TestFlowTokenTransport.
[^3]: This two-step process is because of how and when we allocate
StoreIDs. Ignoring nodes that are bootstrapping the cluster (which
just picks the initial set of StoreIDs -- see
pkg/server.bootstrapCluster), whenever a new node is added, it's
assigned a node ID and store ID by an existing node in CRDB (see
kvpb.JoinNodeResponse). Subsequent store IDs, for multi-store
nodes, are generated by the joining node by incrementing a
sequence ID generator (see
pkg/server.(*Node).initializeAdditionalStores). All of which is to
say that the very first time we issue a RaftMessageRequestBatch,
we might not have all the StoreIDs. But we will very shortly
after, and certainly before any replicas get allocated to the
additional store.
[^4]: See kvflowcontrolpb.AdmittedRaftLogEntries and its use in
RaftMessageRequest.
[^5]: See I1 from kvflowcontrol/doc.go.
Release note: None
Part of cockroachdb#95563. This PR integrates various kvflowcontrol components into the critical path for replication traffic. It does so by introducing two "integration interfaces" in the kvserver package to intercept various points of a replica's lifecycle, using it to manage the underlying replication streams and flow tokens. The integration is mediated through two cluster settings: - kvadmission.flow_control.enabled This is a top-level kill-switch to revert to pre-kvflowcontrol behavior where follower writes unilaterally deducted IO tokens without blocking. - kvadmission.flow_control.mode It can take on one of two settings, each exercising the flow control machinery to varying degrees. - apply_to_elastic Only applies admission delays to elastic traffic. - apply_to_all Applies admission delays to {regular,elastic} traffic. When the mode is changed, we simply admit all waiting requests. This risks possibly over-admitting work, but that's ok -- we assume these mode changes are rare events and done under supervision. These settings are hooked into in the kvadmission and kvflowcontroller packages. As for the actual integration interfaces in kvserver, they are: - replicaFlowControlIntegration: used to integrate with replication flow control. It's intercepts various points in a replica's lifecycle, like it acquiring raft leadership or losing it, or its raft membership changing, etc. Accessing it requires Replica.mu to be held, exclusively (this is asserted on in the canonical implementation). type replicaFlowControlIntegration interface { handle() (kvflowcontrol.Handle, bool) onBecameLeader(context.Context) onBecameFollower(context.Context) onDescChanged(context.Context) onFollowersPaused(context.Context) onReplicaDestroyed(context.Context) onProposalQuotaUpdated(context.Context) } - replicaForFlowControl abstracts the interface of an individual Replica, as needed by replicaFlowControlIntegration. type replicaForFlowControl interface { assertLocked() annotateCtx(context.Context) context.Context getTenantID() roachpb.TenantID getReplicaID() roachpb.ReplicaID getRangeID() roachpb.RangeID getDescriptor() *roachpb.RangeDescriptor pausedFollowers() map[roachpb.ReplicaID]struct{} isFollowerActive(context.Context, roachpb.ReplicaID) bool appliedLogPosition() kvflowcontrolpb.RaftLogPosition withReplicaProgress(f func(roachpb.ReplicaID, rafttracker.Progress)) } Release note: None
This commit adds rudimentary /inspectz-style pages to CRDB. These hang
off of <url>/inspectz and support a few registered endpoints to inspect
kvflowcontrol state in JSON form.
- /inspectz/kvflowcontroller: marshals the state of registered flow
control streams, and how many {regular,elastic} tokens are available
for each one.
- /inspectz/kvflowhandles: marshals the state of all in-memory
kvflowcontrol.Handles held per leader+leaseholder replica, showing
all deducted tokens and corresponding log positions, and what
stream(s) each replica is connected to. It also supports querying
for specific ranges through a ?ranges=<int>[,<int>] query parameter.
To power these endpoints we introduced proto representations for various
components under pkg/../kvflowcontrol/kvflowinspectpb. Select
kvflowcontrol interfaces now expose an Inspect() method, returning the
relevant proto.
Other than just observing proto-state like the 90s, this commit also
adds indexed crdb_internal vtables to do more sophisticated filtering of
these inspectz-protos. It's easy to combine these tables to understand
exactly which ranges are blocked on flow tokens, and for which streams
in particular. They are:
CREATE TABLE crdb_internal.kv_flow_controller (
tenant_id INT NOT NULL,
store_id INT NOT NULL,
available_regular_tokens INT NOT NULL,
available_elastic_tokens INT NOT NULL
)
CREATE TABLE crdb_internal.kv_flow_control_handles (
range_id INT NOT NULL,
tenant_id INT NOT NULL,
store_id INT NOT NULL,
total_tracked_tokens INT NOT NULL,
INDEX(range_id)
)
CREATE TABLE crdb_internal.kv_flow_token_deductions (
range_id INT NOT NULL,
tenant_id INT NOT NULL,
store_id INT NOT NULL,
priority STRING NOT NULL,
log_term INT NOT NULL,
log_index INT NOT NULL,
tokens INT NOT NULL,
INDEX(range_id)
)
To see the set of ranges blocked on regular flow tokens, one can run
something like:
SELECT range_id,
crdb_internal.humanize_bytes(available_regular_tokens)
FROM crdb_internal.kv_flow_controller AS c
INNER JOIN crdb_internal.kv_flow_control_handles AS hs ON
c.tenant_id = hs.tenant_id AND c.store_id = hs.tenant_id
WHERE available_regular_tokens <= 0;
Or if looking to understand how many active replication streams each
leaseholder+leader range is shaping write traffic through, something
like:
SELECT range_id, count(*) AS streams
FROM crdb_internal.kv_flow_control_handles
GROUP BY (range_id)
ORDER BY streams DESC;
Release note: None
We add TestFlowControlIntegration, exercising the kvflowcontrol
integration interface (replicaFlowControlIntegration) introduced in an
earlier commit. It tests how the underlying kvflowcontrol.Handle is
constructed/destroyed, and its stream-management APIs invoked, as
replicas acquire raft leadership, lost it, observe paused and/or
inactive followers, change range descriptors, and react to raft progress
updates. We can write tests of the following form:
# Observe how the integration layer deals with paused followers.
# Start off with a triply replicated range r1/t1, with replicas on
# n1/s1, n2/s2, and n3/s3 (with replica IDs 1-3 respectively).
init tenant=t1 range=r1 replid=1
----
state descriptor=(1,2,3) applied=1/10
----
# Set up replid=1 (declared in init above) to be the raft leader. It
# should connect to all three replication streams.
integration op=became-leader
----
initialized flow control handle for r1/t1
connected to replication stream t1/s1 starting at log-position=1/10
connected to replication stream t1/s2 starting at log-position=1/10
connected to replication stream t1/s3 starting at log-position=1/10
# Pause replid=2. Observe that we disconnect the stream to t1/s2.
state descriptor=(1,2,3) paused=(2)
----
integration op=followers-paused
----
disconnected from replication stream t1/s2
These are still unit tests, testing the interface at just the
interface-level. We'll introduce more end-to-end integration testing of
an actual replica's lifecycle in subsequent commits.
Release note: None
Release note: None
TestFlowControl* are end-to-end tests of the kvflowcontrol
machinery, replicating + admitting individual writes. They make use of
an echotest-backed test harness to make it possible to observe
sophisticated KV interactions. We can now write tests that look like
this:
-- Flow token metrics, before issuing the regular 1MiB replicated
-- write.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvadmission%tokens%'
ORDER BY name ASC;
kvadmission.flow_controller.elastic_tokens_available | 0 B
kvadmission.flow_controller.elastic_tokens_deducted | 0 B
kvadmission.flow_controller.elastic_tokens_returned | 0 B
kvadmission.flow_controller.elastic_tokens_unaccounted | 0 B
kvadmission.flow_controller.regular_tokens_available | 0 B
kvadmission.flow_controller.regular_tokens_deducted | 0 B
kvadmission.flow_controller.regular_tokens_returned | 0 B
kvadmission.flow_controller.regular_tokens_unaccounted | 0 B
-- Flow token metrics from n1 after issuing the regular 1MiB
-- replicated write, and it being admitted on n1, n2 and n3. We
-- should see 3*1MiB = 3MiB of {regular,elastic} tokens deducted and
-- returned, and {8*3=24MiB,16*3=48MiB} of {regular,elastic} tokens
-- available. Everything should be accounted for.
SELECT name, crdb_internal.humanize_bytes(value::INT8)
FROM crdb_internal.node_metrics
WHERE name LIKE '%kvadmission%tokens%'
ORDER BY name ASC;
kvadmission.flow_controller.elastic_tokens_available | 24 MiB
kvadmission.flow_controller.elastic_tokens_deducted | 3.0 MiB
kvadmission.flow_controller.elastic_tokens_returned | 3.0 MiB
kvadmission.flow_controller.elastic_tokens_unaccounted | 0 B
kvadmission.flow_controller.regular_tokens_available | 48 MiB
kvadmission.flow_controller.regular_tokens_deducted | 3.0 MiB
kvadmission.flow_controller.regular_tokens_returned | 3.0 MiB
kvadmission.flow_controller.regular_tokens_unaccounted | 0 B
----
----
Release note: None
This commit documents the kvflowcontrol integration interfaces
introduced in earlier commits across flow_control_*.go, grouping
commentary and interfaces in a top-level flow_control_integration.go,
and makes minor simplifications where applicable. It's helpful to read
kvflowcontrol/{doc,kvflowcontrol}.go to understand the library
components in question, and also the comment block on
replicaFlowControlIntegration.
Here's how the various pieces fit together:
┌───────────────────┐
│ Receiver (client) │
├───────────────────┴─────────────────────┬─┬─┐
┌──○ kvflowcontrolpb.AdmittedRaftLogEntries │ │ │
│ └─────────────────────────────────────────┴─┴─┘
│ ┌───────────────────┐
│ │ Receiver (client) │
│ ├───────────────────┴─────────────────────┬─┬─┐
┌─────────────────────▶─┼──○ kvflowcontrolpb.AdmittedRaftLogEntries │ │ │
│ │ └─────────────────────────────────────────┴─┴─┘
['1] gRPC streams │
connecting/disconnecting [1] RaftMessageBatch
│ │
│ ┌─────────────────┐ │
│ │ Sender (server) │ │
│ ├─────────────────┴──│────────────────┐ ┌────────────────────────────────────────┐
│ │ RaftTransport │ │ │ StoresForFlowControl │
│ │ │ │ │ │
│ │ │ │ │ ┌───────────────────────────────────┐ │
│ │ └────[2] Lookup ─┼────────┼─▶│ kvflowcontrol.Handles ○─┼──┐
│ │ │ │ └───────────────────────────────────┘ │ │
│ │ ┌─────────────────────────────────┐ │ │ ┌───────────────────────────────────┐ │ │
└──┼▶│ connectionTrackerForFlowControl │ ├──['2]──┼─▶│ RaftTransportDisconnectedListener │ │ │
│ └─────────────────────────────────┘ │ │ └──────○────────────────────────────┘ │ │
└─────────────────▲───────────────────┘ └─────────┼──────────────────────────────┘ │
│ │ │
│ ['3] onRaftTransportDisconnected [3] ReturnTokensUpto
│ │ │
│ │ │
│ ┌──────────────────────────────┼─────────────────────────────────┼─────────┬─┬─┐
│ │ replicaFlowControlIntegration│ ┌──────────────────────▼───────┐ │ │ │
│ │ │ │ kvflowcontrol.Handle │ │ │ │
│ │ onBecameLeader() ▼ └───────────────────▲─▲────────┘ │ │ │
│ │ onBecameFollower() ○────['4] DisconnectStream ──┘ │ │ │ │
│ │ onDescChanged() ◀─── ["5] tryReconnect ──────┐ │ │ │ │
│ │ onFollowersPaused() ○─── ["7] ConnectStream ────┼─┘ │ │ │
│ │ = onRaftTransportDisconnected() ┌───────────────────▼──────────┐ │ │ │
│ │ = onRaftTicked() │ replicaForFlowControl │ │ │ │
│ │ onReplicaDestroyed() │ │ │ │ │
│ │ │ getDescriptor() │ │ │ │
["6] isConnectedTo │ │ getPausedFollowers() │ │ │ │
│ │ │ getBehindFollowers() │ │ │ │
│ │ │ getInactiveFollowers() │ │ │ │
└───────┼─────────────────────────────────────────▶ = getDisconnectedFollowers() │ │ │ │
│ └──────────────────────────────┘ │ │ │
└──────────────────────────────────────────────────────────────────────────┴─┴─┘
The "server" and "client" demarcations refer to the server and client-side of
the RaftTransport stream. "Sender" and "Receiver" is kvflowcontrol verbiage,
referring to where proposals originate (and flow tokens deducted) and the
remote follower nodes where they're received. Below-raft admission happens
asynchronously on the receiver nodes, of which the sender is informed, which
in turn lets it release flow tokens and unblock further proposals.
Notation:
- Stacked boxes (with " │││" on the right hand side) indicate that there are
multiple of a kind. Like multiple replicaFlowControlIntegration
implementations (one per locally held replica), multiple
kvflowcontrolpb.AdmittedRaftLogEntries, etc.
- [<digit>], [<digit>'], and [<digit>"] denote independent sequences,
explained in text below.
---
A. How are flow tokens returned after work is admitted below-raft on remote,
receiver nodes?
[1]: When work gets admitted below-raft on the receiver, the sender (where
work originated, and flow tokens were deducted) is informed of the fact
through the RaftMessageBatch gRPC stream. There are two bi-directional
raft transport streams between a pair of nodes. We piggyback
kvflowcontrolpb.AdmittedRaftLogEntries on raft messages being sent from
the RaftMessageBatch client to the RaftMessageBatch server.
[2]: We lookup the relevant kvflowcontrol.Handle from the set of
kvflowcontrol.Handles, to inform it of below-raft admission.
[3]: We use the relevant kvflowcontrol.Handle (hanging off of some locally
held replica) to return relevant previously deducted flow tokens.
The piggy-backing from [1] and the intercepting of piggy-backed messages and
kvflowcontrol.Handle lookup from [2] both happen in the RaftTransport layer,
in raft_transport.go. The set of local kvflowcontrol.Handles is exposed
through the StoresForFlowControl interface, backed by local stores and their
contained replicas. Each replica exposes the underlying handle through the
replicaFlowControlIntegration interface.
---
B. How do we react to raft transport streams breaking? (I1 from
kvflowcontrol/doc.go)
['1]: The server-side of RaftMessageBatch observes every client-initiated
stream breaking. The connectionTrackerForFlowControl, used within the
RaftTransport layer, also monitors all live gRPC streams to understand
exactly the set of clients we're connected to.
['2]: Whenever any raft transport gRPC stream breaks, we notify components of
this fact through the RaftTransportDisconnectedListener interface.
['3]: This in turn informs all locally held replicas, through the
replicaFlowControlIntegration interface.
['4]: We actively disconnect streams for replicas we just disconnected from
as informed by the raft transport.
Note that we actually plumb down information about exactly which raft
transport streams broke. It's not enough to simply inform the various
replicaFlowControlIntegrations of some transport stream breaking, and for
them to then determine which streams to disconnect. This is because it's
possible for the streams to be re-established in the interim, or for there to
be another active stream from the same client but using a different RPC
class. We still want to free up all tokens for that replication stream, lest
we leak flow tokens in transit on the particular stream that broke.
---
C. What happens when the raft transport streams reconnect? (I1 from
kvflowcontrol/doc.go)
["5]: The replicaFlowControlIntegration interface is used to periodically
reconnect previously disconnected streams. This is driven primarily
through the onRaftTicked() API, but also happens opportunistically
through onFollowersPaused(), onRaftTransportDisconnected(), etc.
["6]: We check whether we're connected to remote replicas via the
raftTransportForFlowControl.isConnectedTo(). This is powered by the
connectionTrackerForFlowControl embedded in the RaftTransport which
monitors all active gRPC streams as seen on the server-side.
["7]: If we're now connected to previously disconnected replicas, we inform
the underlying kvflowcontrol.Handle in order to deduct flow tokens for
subsequent proposals.
---
replicaFlowControlIntegration is used to integrate with replication flow
control. It intercepts various points in a replica's lifecycle, like it
acquiring raft leadership or losing it, or its raft membership changing, etc.
Accessing it requires Replica.mu to be held, exclusively (this is asserted on
in the canonical implementation). The "external" state is mediated by the
replicaForFlowControl interface. The state transitions look as follows:
─ ─ ─ ─ ─ ─ ─ ┌───── onDestroyed ──────────────────▶ ╳╳╳╳╳╳╳╳╳╳╳╳╳
─ ─ ─ ─ ─ ─ ┐ │ │ ┌─── onDescChanged(removed=self) ──▶ ╳ destroyed ╳
┌──────── onBecameLeader ─────────┐ │ │ ╳╳╳╳╳╳╳╳╳╳╳╳╳
│ │ │ │ │ │
○ ○ ○ ▼ ○ ○
┌ ─ ─ ─ ─ ─ ─ ─ ┐ ┌──────────────┐
─ ─ ─ ○ follower │ leader │ ○─────────────────────────────┐
└ ─ ─ ─ ─ ─ ─ ─ ┘ └──────────────┘ │
▲ ▲ ○ ▲ onDescChanged │
│ │ │ │ onFollowersPaused │
─ ─ ─ ─ ─ ─ ─ └──────── onBecameFollower ───────┘ └────── onRaftTransportDisconnected ─┘
onRaftTicked
We're primarily interested in transitions to/from the leader state -- the
equivalent transitions from the follower state are no-ops.
- onBecameLeader is when the replica acquires raft leadership. At this
point we initialize the underlying kvflowcontrol.Handle and other
internal tracking state to handle subsequent transitions.
- onBecameFollower is when the replica loses raft leadership. We close the
underlying kvflowcontrol.Handle and clear other tracking state.
- onDescChanged is when the range descriptor changes. We react to changes
by disconnecting streams for replicas no longer part of the range,
connecting streams for newly members of the range, closing the underlying
kvflowcontrol.Handle + clearing tracking state if we ourselves are no
longer part of the range.
- onFollowersPaused is when the set of paused followers have changed. We
react to it by disconnecting streams for newly paused followers, or
reconnecting to newly unpaused ones.
- onRaftTransportDisconnected is when we're no longer connected to some
replicas via the raft transport. We react to it by disconnecting relevant
streams.
- onRaftTicked is invoked periodically, and refreshes the set of streams
we're connected to. It disconnects streams to inactive followers and/or
reconnects to now-active followers. It also observes raft progress state
for individual replicas, disconnecting from ones we're not actively
replicating to (because they're too far behind on their raft log, in need
of snapshots, or because we're unaware of their committed log indexes).
It also reconnects streams if the raft progress changes.
- onDestroyed is when the replica is destroyed. Like onBecameFollower, we
close the underlying kvflowcontrol.Handle and clear other tracking state.
Release note: None
TestFlowControlAdmissionPostSplitMerge walks through what happens admission happens after undergoes splits/merges. It does this by blocking and later unblocking below-raft admission, verifying: - tokens for the RHS are released at the post-merge leaseholder, - admission for the RHS post-merge does not cause a double return of tokens, - admission for the LHS can happen post-merge, - admission for the LHS and RHS can happen post-split. Release note: None
Disable kvadmission.flow_control.enabled by default. We'll re-enable it on master shortly after some baking time while it's switched off. We need to ensure that there are zero performance regressions when switched off, and that the integration code does not exercise new machinery when turned off. Merging this as turned-off-by-default also reduces revert likelihood. Release note: None
We've seen the per-PR stress jobs take upwards of 55m and be successful. This happens for PRs with a wide surface area. Release note: None
|
Hit #104639 again. Appending a commit to skip it. bors r+ single on p=99 |
|
Build failed: |
|
Any day now. bors r+ single on p=99 |
|
Build failed: |
It's flakey. See cockroachdb#104639. Release note: None
We see this (benign) data race in under --stress --race. Squash it by
targeted use of atomics.
Read at 0x00c0039c06a8 by goroutine 31161:
github.com/cockroachdb/cockroach/pkg/util/admission.(*StoreWorkQueue).gcSequencers()
github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:2176 +0x196
github.com/cockroachdb/cockroach/pkg/util/admission.makeStoreWorkQueue.func1()
github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:2162 +0x5b
Previous write at 0x00c0039c06a8 by goroutine 31105:
github.com/cockroachdb/cockroach/pkg/util/admission.(*sequencer).sequence()
github.com/cockroachdb/cockroach/pkg/util/admission/sequencer.go:61 +0x190
github.com/cockroachdb/cockroach/pkg/util/admission.(*StoreWorkQueue).sequenceReplicatedWork()
github.com/cockroachdb/cockroach/pkg/util/admission/work_queue.go:2193 +0x145
github.com/cockroachdb/cockroach/pkg/util/admission.(*StoreWorkQueue).Admit()
Release note: None
|
The other subtest in #104639 flakes, skipping the whole thing. bors r+ single on p=21 |
|
Build succeeded: |
Part of #95563. See individual commits.
Release note: None