Skip to content

kvflowhandle: implement kvflowcontrol.Handle#96642

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
irfansharif:230205.kvflowhandle
Feb 18, 2023
Merged

kvflowhandle: implement kvflowcontrol.Handle#96642
craig[bot] merged 1 commit intocockroachdb:masterfrom
irfansharif:230205.kvflowhandle

Conversation

@irfansharif
Copy link
Copy Markdown
Contributor

@irfansharif irfansharif commented Feb 6, 2023

Part of #95563.

kvflowcontrol.Handle is used to interface with replication flow control;
it's typically backed by a node-level kvflowcontrol.Controller. Handles
are held on replicas initiating replication traffic, i.e. are both the
leaseholder and raft leader, and manage multiple streams underneath
(typically one per active member of the raft group).

When replicating log entries, these replicas choose the log position
(term+index) the data is to end up at, and use this handle to track the
token deductions on a per log position basis. When informed of admitted
log entries on the receiving end of the stream, we free up tokens by
specifying the highest log position up to which we've admitted
(below-raft admission, for a given priority, takes log position into
account -- see kvflowcontrolpb.AdmittedRaftLogEntries for more details).

We also extend the testing framework introduced in #95905 to also
support writing tests for kvflowcontrol.Handle -- it's now pulled into
its own kvflowsimulator package. We're able to write tests that look
like the following:

# Set up a triply connected handle (to s1, s2, s3) and start issuing
# writes at 1MiB/s. For two of the streams, return tokens at exactly
# the rate its being deducted (1MiB/s). For the third stream (s3),
# we return flow tokens at only 0.5MiB/s.
timeline
t=0s         handle=h op=connect    stream=t1/s1   log-position=1/0
t=0s         handle=h op=connect    stream=t1/s2   log-position=1/0
t=0s         handle=h op=connect    stream=t1/s3   log-position=1/0
t=[0s,50s)   handle=h class=regular adjust=-1MiB/s   rate=10/s
t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s1
t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s2
t=[0.2s,50s) handle=h class=regular adjust=+0.5MiB/s rate=10/s stream=t1/s3
----

# Observe:
# - Total available tokens flatlines at 32MiB since flow tokens for
#   s3 eventually depletes and later bounces off of 0MiB. We
#   initially have 3*16MiB = 48MiB worth of flow tokens, and end
#   up at 48MiB-16MiB = 32MiB.
# - Initially the rate of token deductions (3*1MiB/s = 3MiB/s) is
#   higher than the token returns (1MiB/s+1MiB/s+0.5MiB/s =
#   2.5MiB/s), but after we start shaping it to the slowest
#   stream, they end up matching at (0.5MiB/s*3 = 1.5MiB/s).
# - The blocked stream count bounces between 0 and 1 as the s3
#   stream gets blocked/unblocked as tokens are
#   deducted/returned. The demand for tokens (1MiB/s) is higher
#   than the replenishment rate (0.5MiB/s).
# - The overall admission rate is reduced from 30 reqs/s to 25
#   reqs/s, mapping to token deduction rate of 3MiB/s to 2.5MiB/s
#   (1MiB/s divvied across 10 reqs). The difference between 30
#   reqs/s and 25 reqs/s is found in the +5 reqs/s accumulating in
#   the wait queue.
plot
kvadmission.flow_controller.regular_tokens_available            unit=MiB
kvadmission.flow_controller.regular_tokens_{deducted,returned}  unit=MiB/s rate=true
kvadmission.flow_controller.regular_blocked_stream_count        unit=streams
kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true
----
----
 47.7 ┼╮
 46.6 ┤╰─╮
 45.6 ┤  ╰─╮
 44.5 ┤    ╰╮
 43.5 ┤     ╰─╮
 42.4 ┤       ╰╮
 41.4 ┤        ╰─╮
 40.3 ┤          ╰─╮
 39.3 ┤            ╰╮
 38.2 ┤             ╰─╮
 37.2 ┤               ╰─╮
 36.1 ┤                 ╰╮
 35.1 ┤                  ╰─╮
 34.0 ┤                    ╰─╮
 33.0 ┤                      ╰╮
 31.9 ┤                       ╰───────────────
            regular_tokens_available (MiB)

 3.0 ┤╭───────────────────────╮
 2.8 ┤│                       │
 2.6 ┤╭────────────────────────╮
 2.4 ┤│                       ╰│
 2.2 ┤│                        │
 2.0 ┤│                        │
 1.8 ┤│                        │
 1.6 ┤│                        ╰─────────────
 1.4 ┤│
 1.2 ┤│
 1.0 ┤│
 0.8 ┤│
 0.6 ┤│
 0.4 ┤│
 0.2 ┤│
 0.0 ┼╯
      rate(regular_tokens_{deducted,returned}) (MiB/s)

 1.0 ┤                                 ╭╮   ╭
 0.9 ┤                            ╭╮   ││   │
 0.9 ┤                            ││   ││   │
 0.8 ┤                            ││   ││   │
 0.7 ┤                            ││   ││   │
 0.7 ┤                         ╭╮ ││╭╮ ││   │
 0.6 ┤                         ││ ││││ ││╭─╮│
 0.5 ┤                         │╰╮│││╰╮│││ ││
 0.5 ┤                         │ ││││ ││││ ││
 0.4 ┤                         │ ││││ ││││ ││
 0.3 ┤                         │ ││││ ││││ ││
 0.3 ┤                         │ ╰╯││ ││││ ││
 0.2 ┤                         │   ││ ╰╯╰╯ ╰╯
 0.1 ┤                        ╭╯   ╰╯
 0.1 ┤                        │
 0.0 ┼────────────────────────╯
       regular_blocked_stream_count (streams)

 30.0 ┤╭───────────────────────╮
 28.0 ┤│                       ╰╮
 26.0 ┤│                        ╰─────────────
 24.0 ┤│
 22.0 ┤│
 20.0 ┤│
 18.0 ┤│
 16.0 ┤│
 14.0 ┤│
 12.0 ┤│
 10.0 ┤│
  8.0 ┤│
  6.0 ┤│                        ╭─────────────
  4.0 ┤│                        │
  2.0 ┤│                       ╭╯
  0.0 ┼────────────────────────╯
       rate(regular_requests_{admitted,waiting}) (reqs/s)
----
----

Release note: None

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@irfansharif irfansharif force-pushed the 230205.kvflowhandle branch 5 times, most recently from f07a68e to 65f81eb Compare February 10, 2023 15:30
@irfansharif irfansharif marked this pull request as ready for review February 10, 2023 15:31
@irfansharif irfansharif requested a review from a team as a code owner February 10, 2023 15:31
@irfansharif irfansharif force-pushed the 230205.kvflowhandle branch 4 times, most recently from 18b66a8 to 93aa058 Compare February 11, 2023 12:20
Copy link
Copy Markdown
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still reading

Reviewed 7 of 26 files at r1.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif)


pkg/kv/kvserver/kvflowcontrol/doc.go line 368 at r1 (raw file):

//       - Since we don't want to wait below raft, one way bound the lag between
//         appended entries and applied ones is to only release flow tokens for
//         an entry at position P once the applied state position >= P - delta.

I don't think we can link the return of tokens to state machine application without being very careful. If state machine application on one range is not happening because quorum has been lost due to some other node failure, we don't want that to exhaust tokens such that it causes interference on other unrelated ranges.

I think the simpler thing would be to not let state machine application get significantly behind due to local scheduling reasons. IIRC, we will be asynchronously doing the the raft log writes, since those require fsync, and not necessarily decoupling the async raft log writes and state machine application (which almost never fsyncs, so could be done by the same goroutine that is doing the async raft log writes). So we may be able to structurally avoid a "scheduling issue" that causes state machine application to lag behind raft log writes.


pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go line 61 at r1 (raw file):

	// with the given priority, create-time, and over the given stream. This
	// blocks until there are flow tokens available or the stream disconnects,
	// subject to context cancellation.

If the stream disconnects, will Admit be successful?


pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go line 390 at r1 (raw file):

// (signaled return true if ctx is canceled), and admit can increment the right
// errored metric underneath. We'll have to plumb this to the (test) caller too
// to prevent it from deducting tokens for canceled requests.

hmm, I overlooked this in the previous PR. This is similar to Admit and we don't exercise Admit in tests and instead exercise this. This seems wrong in that we are not quite testing the production code.
Can we share most of the code between the sync and async implementation?


pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go line 90 at r1 (raw file):

	requestsBypassed = metric.Metadata{
		Name:        "kvadmission.flow_controller.%s_requests_bypassed",
		Help:        "Number of %s requests that bypassed the flow controller (due to disconnecting streams)",

disconnecting or disconnected? Things that were waiting and while waiting a stream was disconnected?


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 122 at r1 (raw file):

		lowerBound := h.mu.perStreamLowerBound[c.Stream()]
		if !(lowerBound.Less(pos)) {

Why not let thing race and not do any deduction if the lower bound has already passed the pos?
It is often hard to maintain synchronization invariants across different components.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 125 at r1 (raw file):

			log.Fatalf(ctx, "observed regression in lower bound (%s <= %s)", pos, lowerBound)
		}
		h.mu.perStreamLowerBound[c.Stream()] = pos

I misunderstood the lower-bound. I thought it was maintained based on ReturnTokensUpto. But it is because []tracked needs to be ordered by pos and we don't want to have to do work to keep it sorted.

Copy link
Copy Markdown
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't read the tests yet. Generally looks good.

Reviewed 10 of 26 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif)


pkg/kv/kvserver/kvflowcontrol/doc.go line 318 at r1 (raw file):

//      application, create lots of overlapping files/sublevels in L0.
// - We want to pace the initial rate of log appends while factoring in the
//   effect of the subsequent state machine application on L0 (modulo [^9]). We

I think we can do the same thing here as I mentioned in the other comment (not let state machine application get significantly behind the raft log writes). It simpler than having to factor in the effect of subsequent state machine application.


pkg/kv/kvserver/kvflowcontrol/doc.go line 323 at r1 (raw file):

//   for it to be sufficiently caught before deducting/blocking for flow tokens.
//   Until that point we can use flow tokens on sender nodes that wish to send
//   MsgApps to the newly-restarted node. Similar to the steady state, flow

It seems like we have the following modes with flow tokens:

  • Connected: Waiting on flow tokens at Admit time and deducting after proposal evaluation. This is the common case.
  • Disconnected: Not waiting on flow tokens and not deducting after proposal evaluation.
  • Catchup: Not waiting on flow tokens before proposal eval and not deducting after proposal eval. These proposals get queued below raft, and need waiting on and deducting tokens before sending to the store.

State transitions are connected => disconnected => catchup => connected/disconnected
Is that consistent with your mental model?

The state transitions in this roughly sketched approach are per range/Handle. This makes sense since we want the transition to disconnected to happen when pausing (among other things), which may be only for a subset of ranges sending to that store. And different ranges will finish catchup at different times. So we can have one range doing catchup competing with another range in connected state for the same tokens. This is ok-ish since we have isolation at the work class level. But there is a concern that if two ranges are seeing regular WorkClass and range R1 is connected and R2 transitions from disconnected => catchup that it can consume all the tokens causing a slowdown for user-facing work in R1. We can fix this by separating out a token category for regular work catchup: something like 16MB for regular, 8MB for elastic, 8MB for regular-catchup.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 39 at r1 (raw file):

		// (identified by their log positions) have been admitted below-raft,
		// streams disconnect, or the handle closed entirely.
		perStreamTokenTracker map[kvflowcontrol.Stream]*kvflowtokentracker.Tracker

Can you consolidate these two maps into a single one.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 42 at r1 (raw file):

		// perStreamLowerBound tracks on a per-stream basis the log position
		// below which we're not allowed to deduct tokens. It's used only for
		// assertions

We should document in the interface the requirement that DeductTokensFor requires that the RaftLogPositions be monotonically increasing.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 72 at r1 (raw file):

		// introducing cluster settings that disable flow control entirely or
		// for regular traffic. We risk returning tokens that we never deducted
		// otherwise.

It seems ok to return tokens for accounting purposes, since ReturnTokensUpto would return only what has been actually deducted.
Could the disable case be handled at a higher level by not calling Admit?
We also need to handle the transitions from enabled => disabled and vice versa, which may be simpler by always having a Handle and sometimes not using it.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 123 at r1 (raw file):

		lowerBound := h.mu.perStreamLowerBound[c.Stream()]
		if !(lowerBound.Less(pos)) {
			log.Fatalf(ctx, "observed regression in lower bound (%s <= %s)", pos, lowerBound)

How about checking the lower bound before calling Track? This would allow for some defensive code where we log an error if this invariant is violated and if not violated we would call Track. I slightly worry that with various failure modes and state transitions involving disconnected/catchup/disconnected that we may end up with a small race and it is better not to crash the node.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 154 at r1 (raw file):

		// acquired (whichever comes after). This is used to assert against
		// regressions in token deductions (i.e. deducting tokens for indexes
		// lower than the current term/lease).

We may want to weaken this assertion to a error log too.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 165 at r1 (raw file):

	h.mu.perStreamTokenTracker[stream].Untrack(ctx, pri, pos,
		func(tokens kvflowcontrol.Tokens, _ kvflowcontrolpb.RaftLogPosition) {

why bother with this RaftLogPosition param?


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 170 at r1 (raw file):

	)

	lowerBound := h.mu.perStreamLowerBound[stream]

Should this lowerBound be moved into the Tracker since that's the one trying to maintain a sorted slice.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 191 at r1 (raw file):

	}
	h.mu.connections = append(h.mu.connections, newConnectedStream(stream))
	sort.Slice(h.mu.connections, func(i, j int) bool { // sort for deadlock avoidance

I assume the deadlock case is what we will introduce in the future with a blocking AdmitForCatchup that will block and deduct tokens.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 214 at r1 (raw file):

	h.mu.perStreamTokenTracker[stream].Iter(ctx,
		func(pri admissionpb.WorkPriority, tokens kvflowcontrol.Tokens, _ kvflowcontrolpb.RaftLogPosition) bool {

why have this RaftLogPosition param?


pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go line 95 at r1 (raw file):

		}

		f(deduction.tokens, deduction.position)

It would be cheaper to accumulate the tokens to return in the loop and then call the function to return them in one shot. Also, it seems simpler to just return the tokens that are freed from Untrack instead of passing around a closure.


pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go line 122 at r1 (raw file):

	for pri, deductions := range dt.trackedM {
		for _, deduction := range deductions {
			if !f(pri, deduction.tokens, deduction.position) {

Here to we could just return the total tokens and clear the tracker since it seems the only production Iter use case is to return all the tokens. We shouldn't adopt this more expensive callback approach if its only useful for testing -- tests can use their own iteration.

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/doc.go line 318 at r1 (raw file):

Previously, sumeerbhola wrote…

I think we can do the same thing here as I mentioned in the other comment (not let state machine application get significantly behind the raft log writes). It simpler than having to factor in the effect of subsequent state machine application.

Ack. But BTW, this problem (= large deferred state machine application) might still exist even if at the individual replica level we made it so that state machine application never got significantly behind the raft log writes. Consider the pathological case described in the text below:

//       - For what it's worth, this "deferred application with high read-amp"
//         was also a problem before async raft storage writes. Consider many
//         replicas on an LSM, all of which appended a few large raft log entries
//         without applying, and at apply time across all those replicas, we end
//         up inverting the LSM.

pkg/kv/kvserver/kvflowcontrol/doc.go line 323 at r1 (raw file):

State transitions are connected => disconnected => catchup => connected/disconnected. Is that consistent with your mental model?

Yes.

We can fix this by separating out a token category for regular work catchup: something like 16MB for regular, 8MB for elastic, 8MB for regular-catchup.

Yes, I was thinking we'd do this this -- a separate category of tokens for performance isolation. BTW, I wasn't thinking we'd use the Handle for this catchup work, but perhaps something very similar that's also layered above the kvflowcontrol.Controller dealing with these "catchup tokens". But now that you mention it, maybe using the Handle here makes sense -- for any range we would never be using catchup tokens and also {regular,elastic} tokens. Maybe that means we could also just use elastic tokens for catchup work.


pkg/kv/kvserver/kvflowcontrol/doc.go line 368 at r1 (raw file):

If state machine application on one range is not happening because quorum has been lost due to some other node failure, we don't want that to exhaust tokens such that it causes interference on other unrelated ranges.

Good point. Added.

IIRC, we will be asynchronously doing the the raft log writes, since those require fsync, and not necessarily decoupling the async raft log writes and state machine application (which almost never fsyncs, so could be done by the same goroutine that is doing the async raft log writes).

I was anticipating #94854 and #94853, which I've now linked to in the text. This comment thread is also now visible on those issues. I agree this would be simpler. Perhaps we can revisit I11 and [^9] once we try to decouple log appends and state machine applications.


pkg/kv/kvserver/kvflowcontrol/kvflowcontrol.go line 61 at r1 (raw file):

Previously, sumeerbhola wrote…

If the stream disconnects, will Admit be successful?

Yes.


pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller.go line 390 at r1 (raw file):

Previously, sumeerbhola wrote…

hmm, I overlooked this in the previous PR. This is similar to Admit and we don't exercise Admit in tests and instead exercise this. This seems wrong in that we are not quite testing the production code.
Can we share most of the code between the sync and async implementation?

I tried sharing code between the sync and async implementations but it got awkward. So what I did instead was add tests for the production Handle.Admit() implementation in TestHandleAdmit. It ends up testing the production implementation of Controller.Admit() as well.


pkg/kv/kvserver/kvflowcontrol/kvflowcontroller/kvflowcontroller_metrics.go line 90 at r1 (raw file):
Disconnecting. These requests were waiting and while waiting a stream was disconnected. If you look at (h *Handle) DisconnectStream we get rid of the connection from Handle.mu.connections to subsequent calls to Handle.Admit() don't try to Controller.Admit using the now-disconnected stream. Reworded the help text to be clearer:

Number of {regular,elastic} waiting requests that bypassed the flow controller due to disconnecting streams.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 39 at r1 (raw file):

Previously, sumeerbhola wrote…

Can you consolidate these two maps into a single one.

Done (by moving the per-stream lower-bound tracking into the tracker itself).


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 42 at r1 (raw file):

Previously, sumeerbhola wrote…

We should document in the interface the requirement that DeductTokensFor requires that the RaftLogPositions be monotonically increasing.

Done.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 72 at r1 (raw file):

since ReturnTokensUpto would return only what has been actually deducted.

Ah, good point.

Could the disable case be handled at a higher level by not calling Admit? We also need to handle the transitions from enabled => disabled and vice versa, which may be simpler by always having a Handle and sometimes not using it.

Hm, yes, that does sound simpler than what I did in the prototype. Edited the TODO text.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 122 at r1 (raw file):

Why not let thing race and not do any deduction if the lower bound has already passed the pos?

Done.

It is often hard to maintain synchronization invariants across different components.

You're very likely right here, but for now I've kept a log.Errorf since I think it could indicate somewhat buggy use (or just racy if we don't go out of our way to add synchronization). Edit: Ah, I see you suggested log.Errorf-ing below anyway.

// We're trying to track a token deduction at a position less than the
// stream's lower-bound. Shout loudly but ultimately no-op. This
// regression indicates buggy usage since:
// - token deductions are done so with monotonically increasing log
//   positions (see Handle.DeductTokensFor);
//   - the monotonically increasing log positions for token deductions
//     also extends to positions at which streams are connected,
//     which typically happen when (a) members are added to the raft
//     group, (b) previously crashed follower nodes restart, (c)
//     replicas are unpaused, or (d) caught back up via snapshots (see
//     Handle.ConnectStream).
// - token returns upto some log position don't precede deductions at
//   lower log positions (see Handle.ReturnTokensUpto);
log.Errorf(ctx, "observed raft log position less than per-stream lower bound (%s <= %s)",
	pos, dt.lowerBound)

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 123 at r1 (raw file):

How about checking the lower bound before calling Track? This would allow for some defensive code where we log an error if this invariant is violated and if not violated we would call Track.

Done. Also see comment above re: log.Errorf.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 125 at r1 (raw file):
I think your original understanding was right. This lower bound is_ in fact maintained based on ReturnTokensUpto (which mentions "Once returned, subsequent attempts to return tokens upto the same position or lower are no-ops" in the interface comment) and also ConnectStream (which now mentions "The log position is used as a lower bound, beneath which all token deductions/returns are rendered no-ops" in the interface comment).

But it is because []tracked needs to be ordered by pos and we don't want to have to do work to keep it sorted.

Yes, but this is achieved using an assertion when appending to that slice, not using the lower-bound. From (*Tracker) Track:

if len(dt.trackedM[pri]) >= 1 {
	last := dt.trackedM[pri][len(dt.trackedM[pri])-1]
	if !last.position.Less(pos) {
		log.Fatalf(ctx, "expected in order tracked log positions (%s < %s)",
			last.position, pos)
	}
}
dt.trackedM[pri] = append(dt.trackedM[pri], tracked{
	tokens:   tokens,
	position: pos,
})

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 154 at r1 (raw file):

Previously, sumeerbhola wrote…

We may want to weaken this assertion to a error log too.

I assume you meant this comment for the various "operating on a closed handle" assertions. Done.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 165 at r1 (raw file):

Previously, sumeerbhola wrote…

why bother with this RaftLogPosition param?

Removed. It was used in tests, see:

# Untrack a subset of normal-pri tokens, up to 4/23. This should get rid of four
# tracked tokens.
untrack pri=normal-pri up-to=4/23
----
pri=normal-pri
  tokens=1B log-position=4/20
  tokens=1B log-position=4/21
  tokens=1B log-position=4/22
  tokens=1B log-position=4/23

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 170 at r1 (raw file):

Previously, sumeerbhola wrote…

Should this lowerBound be moved into the Tracker since that's the one trying to maintain a sorted slice.

Done. But see comment above -- this lowerBound is not what's used for maintaining the sorted []tracked slice, it's used to render token deductions/returns at positions less than the lower bound as no-ops.


pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 191 at r1 (raw file):

Previously, sumeerbhola wrote…

I assume the deadlock case is what we will introduce in the future with a blocking AdmitForCatchup that will block and deduct tokens.

Updated the comment to say:

// Sort connections based on store IDs (this is the order in which we
// invoke Controller.Admit) for predictability. If in the future we use
// flow tokens for raft log catchup (see I11 and [^9] in
// kvflowcontrol/doc.go), we may want to introduce an Admit-variant that
// both blocks and deducts tokens before sending catchup MsgApps. In
// that case, this sorting will help avoid deadlocks.

pkg/kv/kvserver/kvflowcontrol/kvflowhandle/kvflowhandle.go line 214 at r1 (raw file):

Previously, sumeerbhola wrote…

why have this RaftLogPosition param?

Removed, but same as above, it was used in two tests:

  1. TestTracker
iter
----
pri=low-pri
  tokens=1B log-position=4/24
  tokens=1B log-position=4/26
  tokens=1B log-position=4/28
pri=normal-pri
  tokens=1B log-position=4/25
  tokens=1B log-position=4/27
  tokens=1B log-position=4/29
  1. TestUsingSimulation where the simulator itself ties token deductions to corresponding raft log positions, and we use this tracking when returning tokens.
//     t=[<duration>,<duration>) handle=<string> class={regular,elastic} \
//     adjust={+,-}<bytes>/s rate=<int>/s [stream=t<int>/s<int>] \
//     [deduction-delay=<duration>]                                          (B)
// ...
//     B. Similar to A except using a named handle instead, which internally
//     deducts tokens from all connected streams or if returning tokens, does so
//     for the named stream. Token deductions from a handle are tied to
//     monotonically increasing raft log positions starting from position the
//     underlying stream was connected to (using C). When returning tokens, we
//     translate the byte token value to the corresponding raft log prefix
//     (token returns with handles are in terms of raft log positions).
timeline
t=0s        handle=h op=connect    stream=t1/s1   log-position=4/0
t=0s        handle=h op=connect    stream=t1/s2   log-position=4/0
t=0s        handle=h op=connect    stream=t1/s3   log-position=4/0
t=[0s,8s)   handle=h class=regular adjust=-2MiB/s rate=10/s
t=[10s,18s) handle=h class=regular adjust=+2MiB/s rate=10/s stream=t1/s1

pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go line 95 at r1 (raw file):

Previously, sumeerbhola wrote…

It would be cheaper to accumulate the tokens to return in the loop and then call the function to return them in one shot. Also, it seems simpler to just return the tokens that are freed from Untrack instead of passing around a closure.

Done.


pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go line 122 at r1 (raw file):

Previously, sumeerbhola wrote…

Here to we could just return the total tokens and clear the tracker since it seems the only production Iter use case is to return all the tokens. We shouldn't adopt this more expensive callback approach if its only useful for testing -- tests can use their own iteration.

Done.

Copy link
Copy Markdown
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I just need to read through some of the handle_ files

Reviewed 1 of 26 files at r1, 13 of 15 files at r2, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @irfansharif)


-- commits line 61 at r2:
nice simulations!


pkg/kv/kvserver/kvflowcontrol/doc.go line 323 at r1 (raw file):
Yes, I think we should use the Handle for this too.

Maybe that means we could also just use elastic tokens for catchup work.

That's a good point. Catchup is elastic. And if there is enough regular work that there are no tokens for elastic, then catchup should be deprioritized. The only concern I can think of is that if R2 is doing catchup and has regular work, and R3 is doing an index backfill, then R2 is more important.


pkg/kv/kvserver/kvflowcontrol/doc.go line 368 at r1 (raw file):

Previously, irfansharif (irfan sharif) wrote…

If state machine application on one range is not happening because quorum has been lost due to some other node failure, we don't want that to exhaust tokens such that it causes interference on other unrelated ranges.

Good point. Added.

IIRC, we will be asynchronously doing the the raft log writes, since those require fsync, and not necessarily decoupling the async raft log writes and state machine application (which almost never fsyncs, so could be done by the same goroutine that is doing the async raft log writes).

I was anticipating #94854 and #94853, which I've now linked to in the text. This comment thread is also now visible on those issues. I agree this would be simpler. Perhaps we can revisit I11 and [^9] once we try to decouple log appends and state machine applications.

Ack. I added a comment to one of those issues.

Copy link
Copy Markdown
Collaborator

@sumeerbhola sumeerbhola left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 4 of 26 files at r1, 2 of 15 files at r2.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @irfansharif)


pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_overview line 134 at r2 (raw file):



 4.0 ┤              ╭╮       ╭───────╮

nit: how much smoothing is happening in this rate calculation? It does look at bit odd that we don't show 4MB/s throughout that interval. Worth a comment at least.


pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_shared_stream line 1 at r2 (raw file):

# Demonstrate the behavior when a single stream is share across two handles h1

nit: shared


pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_single_slow_stream line 37 at r2 (raw file):

#   blocked/unblocked as tokens are deducted/returned. The demand for tokens
#   (1MiB/s) is higher than the replenishment rate (0.5MiB/s).
# - The overall admission rate is reduced from 30 reqs/s to 25 reqs/s,

why 30 req/s and not 10 req/s. Is this just because the metric is counting the same request 3 times because of the 3 stores?
And why is the reduction then to 25 req/s instead of 15 req/s. The plot below also shows 25.


pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_stream_disconnection line 30 at r2 (raw file):

# (s3, at 0.5MiB/s). We see the blocked stream count bouncing between 0 and 1
# as tokens get depleted and replenished (demand here is higher than the
# replenishment rate).

It would be good to see the admission rate until s3 gets exhausted, which then causes the rate to drop to s3's replenishment rate, and then after s3 disconnects see the admission rate climb back.

Part of cockroachdb#95563.

kvflowcontrol.Handle is used to interface with replication flow control;
it's typically backed by a node-level kvflowcontrol.Controller. Handles
are held on replicas initiating replication traffic, i.e. are both the
leaseholder and raft leader, and manage multiple streams underneath
(typically one per active member of the raft group).

When replicating log entries, these replicas choose the log position
(term+index) the data is to end up at, and use this handle to track the
token deductions on a per log position basis. When informed of admitted
log entries on the receiving end of the stream, we free up tokens by
specifying the highest log position up to which we've admitted
(below-raft admission, for a given priority, takes log position into
account -- see kvflowcontrolpb.AdmittedRaftLogEntries for more details).

We also extend the testing framework introduced in cockroachdb#95905 to also
support writing tests for kvflowcontrol.Handle -- it's now pulled into
its own kvflowsimulator package. We're able to write tests that look
like the following:

    # Set up a triply connected handle (to s1, s2, s3) and start issuing
    # writes at 1MiB/s. For two of the streams, return tokens at exactly
    # the rate its being deducted (1MiB/s). For the third stream (s3),
    # we return flow tokens at only 0.5MiB/s.
    timeline
    t=0s         handle=h op=connect    stream=t1/s1   log-position=1/0
    t=0s         handle=h op=connect    stream=t1/s2   log-position=1/0
    t=0s         handle=h op=connect    stream=t1/s3   log-position=1/0
    t=[0s,50s)   handle=h class=regular adjust=-1MiB/s   rate=10/s
    t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s1
    t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s2
    t=[0.2s,50s) handle=h class=regular adjust=+0.5MiB/s rate=10/s stream=t1/s3
    ----

    # Observe:
    # - Total available tokens flatlines at 32MiB since flow tokens for
    #   s3 eventually depletes and later bounces off of 0MiB. We
    #   initially have 3*16MiB = 48MiB worth of flow tokens, and end
    #   up at 48MiB-16MiB = 32MiB.
    # - Initially the rate of token deductions (3*1MiB/s = 3MiB/s) is
    #   higher than the token returns (1MiB/s+1MiB/s+0.5MiB/s =
    #   2.5MiB/s), but after we start shaping it to the slowest
    #   stream, they end up matching at (0.5MiB/s*3 = 1.5MiB/s).
    # - The blocked stream count bounces between 0 and 1 as the s3
    #   stream gets blocked/unblocked as tokens are
    #   deducted/returned. The demand for tokens (1MiB/s) is higher
    #   than the replenishment rate (0.5MiB/s).
    # - The overall admission rate is reduced from 30 reqs/s to 25
    #   reqs/s, mapping to token deduction rate of 3MiB/s to 2.5MiB/s
    #   (1MiB/s divvied across 10 reqs). The difference between 30
    #   reqs/s and 25 reqs/s is found in the +5 reqs/s accumulating in
    #   the wait queue.
    plot
    kvadmission.flow_controller.regular_tokens_available            unit=MiB
    kvadmission.flow_controller.regular_tokens_{deducted,returned}  unit=MiB/s rate=true
    kvadmission.flow_controller.regular_blocked_stream_count        unit=streams
    kvadmission.flow_controller.regular_requests_{admitted,waiting} unit=reqs/s rate=true
    ----
    ----
     47.7 ┼╮
     46.6 ┤╰─╮
     45.6 ┤  ╰─╮
     44.5 ┤    ╰╮
     43.5 ┤     ╰─╮
     42.4 ┤       ╰╮
     41.4 ┤        ╰─╮
     40.3 ┤          ╰─╮
     39.3 ┤            ╰╮
     38.2 ┤             ╰─╮
     37.2 ┤               ╰─╮
     36.1 ┤                 ╰╮
     35.1 ┤                  ╰─╮
     34.0 ┤                    ╰─╮
     33.0 ┤                      ╰╮
     31.9 ┤                       ╰───────────────
                regular_tokens_available (MiB)

     3.0 ┤╭───────────────────────╮
     2.8 ┤│                       │
     2.6 ┤╭────────────────────────╮
     2.4 ┤│                       ╰│
     2.2 ┤│                        │
     2.0 ┤│                        │
     1.8 ┤│                        │
     1.6 ┤│                        ╰─────────────
     1.4 ┤│
     1.2 ┤│
     1.0 ┤│
     0.8 ┤│
     0.6 ┤│
     0.4 ┤│
     0.2 ┤│
     0.0 ┼╯
          rate(regular_tokens_{deducted,returned}) (MiB/s)

     1.0 ┤                                 ╭╮   ╭
     0.9 ┤                            ╭╮   ││   │
     0.9 ┤                            ││   ││   │
     0.8 ┤                            ││   ││   │
     0.7 ┤                            ││   ││   │
     0.7 ┤                         ╭╮ ││╭╮ ││   │
     0.6 ┤                         ││ ││││ ││╭─╮│
     0.5 ┤                         │╰╮│││╰╮│││ ││
     0.5 ┤                         │ ││││ ││││ ││
     0.4 ┤                         │ ││││ ││││ ││
     0.3 ┤                         │ ││││ ││││ ││
     0.3 ┤                         │ ╰╯││ ││││ ││
     0.2 ┤                         │   ││ ╰╯╰╯ ╰╯
     0.1 ┤                        ╭╯   ╰╯
     0.1 ┤                        │
     0.0 ┼────────────────────────╯
           regular_blocked_stream_count (streams)

     30.0 ┤╭───────────────────────╮
     28.0 ┤│                       ╰╮
     26.0 ┤│                        ╰─────────────
     24.0 ┤│
     22.0 ┤│
     20.0 ┤│
     18.0 ┤│
     16.0 ┤│
     14.0 ┤│
     12.0 ┤│
     10.0 ┤│
      8.0 ┤│
      6.0 ┤│                        ╭─────────────
      4.0 ┤│                        │
      2.0 ┤│                       ╭╯
      0.0 ┼────────────────────────╯
           rate(regular_requests_{admitted,waiting}) (reqs/s)
    ----
    ----

Release note: None
Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @sumeerbhola)


pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_overview line 134 at r2 (raw file):

Previously, sumeerbhola wrote…

nit: how much smoothing is happening in this rate calculation? It does look at bit odd that we don't show 4MB/s throughout that interval. Worth a comment at least.

Done (added a comment + extended timescales to show 4MiB/s for a while longer).


pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_single_slow_stream line 37 at r2 (raw file):

Is this just because the metric is counting the same request 3 times because of the 3 stores?

Yes.

And why is the reduction then to 25 req/s instead of 15 req/s. The plot below also shows 25.

It's because that graph was plotting metrics from the controller's POV. When a handle wants to admit work along 3 streams s1-s3, where s3 is the bottleneck, when iterating through s1-s3, if it's able to admit on s1 and s2 before being blocked on s3, those metrics are incremented. But this is a bit onerous to explain and easy to misread since work is not actually getting admitted "at the handle level", so I added a few metrics to add visibility into admission rates and wait durations from the handle's perspective, across all connected streams. Now it looks like:

# Set up a triply connected handle (to s1, s2, s3) and start issuing writes at
# 1MiB/s. For two of the streams, return tokens at exactly the rate its being
# deducted (1MiB/s). For the third stream (s3), we return flow tokens at only
# 0.5MiB/s.
timeline
t=0s         handle=h op=connect    stream=t1/s1   log-position=1/0
t=0s         handle=h op=connect    stream=t1/s2   log-position=1/0
t=0s         handle=h op=connect    stream=t1/s3   log-position=1/0
t=[0s,50s)   handle=h class=regular adjust=-1MiB/s   rate=10/s
t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s1
t=[0.2s,50s) handle=h class=regular adjust=+1MiB/s   rate=10/s stream=t1/s2
t=[0.2s,50s) handle=h class=regular adjust=+0.5MiB/s rate=10/s stream=t1/s3
----

# The overall admission rate through the handle is reduced from 10 reqs/s
# (corresponding to 1MiB/s) to 5 reqs/s (corresponding to 0.5MiB/s), the
# difference now found in the +5 reqs/s accumulating in the wait queue.
plot
kvadmission.flow_handle.regular_requests_{admitted,waiting}    unit=reqs/s rate=true
----
 10.0 ┤╭───────────────────────╮
  9.3 ┤│                       │
  8.7 ┤│                       │
  8.0 ┤│                       ╰╮
  7.3 ┤│                        │
  6.7 ┤│                        │
  6.0 ┤│                        │
  5.3 ┤│                        ╭─────────────
  4.7 ┤│                        │
  4.0 ┤│                        │
  3.3 ┤│                        │
  2.7 ┤│                        │
  2.0 ┤│                       ╭╯
  1.3 ┤│                       │
  0.7 ┤│                       │
  0.0 ┼────────────────────────╯
       rate(flow_handle.regular_requests_{admitted,waiting}) (reqs/s)

pkg/kv/kvserver/kvflowcontrol/kvflowsimulator/testdata/handle_stream_disconnection line 30 at r2 (raw file):

Previously, sumeerbhola wrote…

It would be good to see the admission rate until s3 gets exhausted, which then causes the rate to drop to s3's replenishment rate, and then after s3 disconnects see the admission rate climb back.

Done. It looks like the following, with 4 phases:

  • slowest stream tokens available, high admission rate
  • slowest stream tokens depleted, admission rate shaped to slow stream
  • slowest stream disconnects, earlier requests waiting for admission are admitted in a burst
  • high admission rate (again) now that the slow stream is not considered

  25.0 ┤                     ╭─╮
  22.3 ┤                    ╭╯ │
  19.7 ┤                    │  │
  17.0 ┤                   ╭╯  │
  14.3 ┤                   │   │
  11.7 ┼───────╮           │   ╰───────────────
   9.0 ┤       ╰─╮         │
   6.3 ┤         ╰╭────────╮
   3.7 ┤        ╭─╯        │
   1.0 ┼────────╯          │   ╭───────────────
  -1.7 ┤                   │   │
  -4.3 ┤                   │   │
  -7.0 ┤                   ╰╮  │
  -9.7 ┤                    │  │
 -12.3 ┤                    ╰─╮│
 -15.0 ┤                      ╰╯
        rate(flow_handle.regular_requests_{admitted,waiting}) (reqs/s)

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR!

bors r+

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @sumeerbhola)

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Feb 18, 2023

Build succeeded:

@craig craig bot merged commit c95bef0 into cockroachdb:master Feb 18, 2023
@irfansharif irfansharif deleted the 230205.kvflowhandle branch February 18, 2023 02:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants