Skip to content

spanconfig: introduce spanconfig.KVSubscriber#69614

Merged
craig[bot] merged 6 commits intocockroachdb:masterfrom
irfansharif:210824.kv-spanconfig-store
Nov 3, 2021
Merged

spanconfig: introduce spanconfig.KVSubscriber#69614
craig[bot] merged 6 commits intocockroachdb:masterfrom
irfansharif:210824.kv-spanconfig-store

Conversation

@irfansharif
Copy link
Copy Markdown
Contributor

@irfansharif irfansharif commented Aug 30, 2021

KVSubscriber presents a consistent1 snapshot of a spanconfig.StoreReader that's incrementally maintained with changes made to the global span configurations state. The maintenance happens transparently; callers can subscribe to learn about what key spans may have seen a configuration change. After learning about a span update, consulting the embedded StoreReader would retrieve an up-to-date2 config for it.

When a callback is first installed, it's invoked with the [min,max) span -- a shorthand to indicate that callers should consult the StoreReader for all spans of interest. Subsequent updates are of the more incremental kind. It's possible that the span updates received are no-ops, i.e. consulting the StoreReader for the given span would retrieve the last config observed for the span2.

type KVSubscriber interface {
  StoreReader
  Subscribe(func(updated roachpb.Span))
}

Internally we maintain a rangefeed over the global store of span configurations (system.span_configurations), applying updates from it into an embedded spanconfig.Store. A read-only view of this data structure (spanconfig.StoreReader) is exposed as part of the KVSubscriber interface. Rangefeeds used as is don't offer any ordering guarantees with respect to updates made over non-overlapping keys, which is something we care about3. For that reason we make use of a rangefeed buffer, accumulating raw rangefeed updates and flushing them out en-masse in timestamp order when the rangefeed frontier is bumped4. If the buffer overflows (as dictated by the memory limit the KVSubscriber is instantiated with), the subscriber is wound down and an appropriate error is returned to the caller.

When running into the errors above, it's safe for the caller to re-subscribe to effectively re-establish the underlying rangefeeds. When re-establishing a new rangefeed and populating a spanconfig.Store using the contents of the initial scan5, we wish to preserve the existing spanconfig.StoreReader. Discarding it would entail either blocking all external readers until a new spanconfig.StoreReader was fully populated, or presenting an inconsistent view of the spanconfig.Store that's currently being populated. For new rangefeeds what we do then is route all updates from the initial scan to a fresh spanconfig.Store, and once the initial scan is done, swap at the source for the exported spanconfig.StoreReader. During the initial scan, concurrent readers would continue to observe the last spanconfig.StoreReader if any. After the swap, it would observe the more up-to-date source instead. Future incremental updates will also target the new source. When this source swap occurs, we inform handlers of the need to possibly refresh their view of all configs.

This commit also wires up the KVSubscriber into KV stores, replacing the use of the gossiped system config span (possible given the StoreReader interface, only happens if a testing flag/env var is set).

Release note: None

Footnotes

  1. The contents of the StoreReader at t1 corresponds exactly to the contents of the global span configuration state at t0 where t0 <= t1. If the StoreReader is read from at t2 where t2 > t1, it's guaranteed to observe a view of the global state at t >= t0.

  2. For the canonical KVSubscriber implementation, this is typically the closed timestamp target duration. 2

  3. For a given key k, it's config may be stored as part of a larger span S (where S.start <= k < S.end). It's possible for S to get deleted and replaced with sub-spans S1...SN in the same transaction if the span is getting split. When applying these updates, we need to make sure to process the deletion event for S before processing S1...SN.

  4. In our example above deleting the config for S and adding configs for S1...Nwe want to make sure that we apply the full set of updates all at once -- lest we expose the intermediate state where the config for S was deleted but the configs for S1...SN were not yet applied.

  5. When tearing down the subscriber due to underlying errors, we could also surface a checkpoint to use the next time the subscriber is established. That way we can avoid the full initial scan over the span configuration state and simply pick up where we left off with our existing spanconfig.Store.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@irfansharif irfansharif force-pushed the 210824.kv-spanconfig-store branch 2 times, most recently from ed1705e to a5e850f Compare August 31, 2021 17:52
@irfansharif irfansharif force-pushed the 210824.kv-spanconfig-store branch from a5e850f to 9372967 Compare September 14, 2021 19:26
@irfansharif irfansharif changed the title [wip] *: kvwatcher+spanconfig.store spanconfig: introduce spanconfig.{KVWatcher,StoreWriter} Sep 14, 2021
@irfansharif irfansharif marked this pull request as ready for review September 14, 2021 19:27
@irfansharif irfansharif requested review from a team as code owners September 14, 2021 19:27
@irfansharif irfansharif requested a review from a team September 14, 2021 19:27
@irfansharif irfansharif requested a review from a team as a code owner September 14, 2021 19:27
@irfansharif irfansharif requested review from ajwerner, arulajmani and stevendanna and removed request for a team and stevendanna September 14, 2021 19:27
@irfansharif
Copy link
Copy Markdown
Contributor Author

irfansharif commented Sep 14, 2021

First commit is from #69658. Still want a bit more testing here and planning to split off the spanconfig.Store work into a stand-alone PR for #69661 to build on top of. But it's ready for a look.

@irfansharif irfansharif force-pushed the 210824.kv-spanconfig-store branch 2 times, most recently from cbecf62 to bced2ec Compare September 15, 2021 21:10
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Sep 15, 2021
In cockroachdb#69172 we introduced a spanconfig.StoreReader interface to abstract
away the gossiped system config span. We motivated that PR by teasing
a future implementation of the same interface, an in-memory data
structure to maintain a mapping between between spans and configs
(powered through a view over system.span_configurations introduced in
\cockroachdb#69047). This PR introduces just that.

Intended (future) usages:
- cockroachdb#69614 introduces the KVWatcher interface, listening in on
  system.span_configurations. The updates generated by it will be used
  to populate per-store instantiations of this data structure, with an
  eye towards providing a "drop-in" replacement of the gossiped system
  config span (conveniently implementing the sibling
  spanconfig.StoreReader interface).
- cockroachdb#69661 introduces the SQLWatcher interface, listening in on changes to
  system.{descriptor,zones} and generating denormalized span config
  updates for every descriptor/zone config change. These updates will
  need to be diffed against a spanconfig.StoreWriter populated with the
  existing contents of KVAccessor to generate the "targeted" diffs
  KVAccessor expects.

Release note: None
@irfansharif irfansharif force-pushed the 210824.kv-spanconfig-store branch from bced2ec to 861be9c Compare September 15, 2021 21:55
@irfansharif irfansharif changed the title spanconfig: introduce spanconfig.{KVWatcher,StoreWriter} spanconfig: introduce spanconfig.KVWatcher Sep 15, 2021
@irfansharif irfansharif force-pushed the 210824.kv-spanconfig-store branch from 861be9c to 6ef1291 Compare September 15, 2021 22:59
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Sep 16, 2021
In cockroachdb#69172 we introduced a spanconfig.StoreReader interface to abstract
away the gossiped system config span. We motivated that PR by teasing
a future implementation of the same interface, an in-memory data
structure to maintain a mapping between between spans and configs
(powered through a view over system.span_configurations introduced in
\cockroachdb#69047). This PR introduces just that.

Intended (future) usages:
- cockroachdb#69614 introduces the KVWatcher interface, listening in on
  system.span_configurations. The updates generated by it will be used
  to populate per-store instantiations of this data structure, with an
  eye towards providing a "drop-in" replacement of the gossiped system
  config span (conveniently implementing the sibling
  spanconfig.StoreReader interface).
- cockroachdb#69661 introduces the SQLWatcher interface, listening in on changes to
  system.{descriptor,zones} and generating denormalized span config
  updates for every descriptor/zone config change. These updates will
  need to be diffed against a spanconfig.StoreWriter populated with the
  existing contents of KVAccessor to generate the "targeted" diffs
  KVAccessor expects.

Release note: None
irfansharif added a commit to irfansharif/cockroach that referenced this pull request Sep 16, 2021
Grafted from cockroachdb#69269. This seems like a useful primitive for users of
this library. We intend to use it in cockroachdb#69661 and cockroachdb#69614.

Release note: None

Co-authored-by: irfan sharif <irfanmahmoudsharif@gmail.com>
Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a simple datadriven test framework and addressed the comments so far. This PR is missing two (small) unit tests:

  • Something that overflows the underlying rangefeed buffer maintained in the KVSubscriber to ensure that we re-establish the feed correctly and always present a consistent view of a spanconfig.StoreReader
  • Something at the kvserver.Store level that ensures that following a span update, we iterate through the right replicas and install a KVSubscriber.StoreReader source span config.

Will get to these tomorrow.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 1988 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

👍, but consider making the removal of the // Load the system config. logic in a separate commit, in case we got this wrong and it is still needed and we need to bisect/revert.

Ack, will split once ready to merge.


pkg/kv/kvserver/store.go, line 2033 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

We're grabbing the replica's descriptor twice. Consider moving key := repl.Desc().StartKey up here and passing key.AsRawKey() to ContainsKey.

Done.


pkg/spanconfig/spanconfig.go, line 45 at r7 (raw file):

Previously, irfansharif (irfan sharif) wrote…

I'm having to rework this interface to accommodate retries in the presence of rangefeedbuffer.ErrBufferLimitExceeded errors and it's getting a bit unwieldy. When encountering such an error, we want to close the old rangefeed, create a new one, somehow relay that info to the caller who may want to either re-use the spanconfig.Store they're already using to power all replicas, or start afresh (it'd be safe to re-use here, but it's not super obvious and feels incidental).

If we're instantiating a new spanconfig.Store to store all these updates from the re-established rangefeed, we probably still want to keep the old one around while the new one gets fully populated from the initial rangefeed scan. That suggests some sort of hook into the underlying rangefeed library's OnInitialScan, and perhaps another OnReestablishingRangefeedFeed hook. All that sounds really messy, and I think (?) it's what you're getting at with wanting this to be a bit more tightly coupled with a StoreReader:

What do you think of something like the following:

	Subscribe(context.Context, func(roachpb.Span)) StoreReader

We could also maybe take in a retry.Options, or somehow indicate that the interface can internally retry. We'd export, opaquely, a StoreReader that the callers can consult when retrieving configs for updated spans (notified via the callback). Underneath the hood, we'd deal with all the subtle retry business -- if the buffer limit is reached, we'd close the old rangefeed, establish a new one and populate a new spanconfig.Store, when our scan is done, swap out the reference to StoreReader held at the caller, and invoke the update callback for all spans in the datastructure (we could also be smarter and diff the two spanconfig.Stores).

Made this change above -- PTAL.


pkg/spanconfig/spanconfig.go, line 63 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Should this say StoreReader?

spanconfig.Store*; divvied up this comment block to a bunch of other places.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 134 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Can this be called after the subscriber has been started?

Yes, in fact it's the expected pattern (see pkg/server) that KV will have provided an already started KVSubscriber to register callbacks into.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 176 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Style nit: if these closures aren't capturing anything other than the KVSubscriber receiver, then why return a closure at all? You could write this as a method that accepts (ctx context.Context, ev *roachpb.RangeFeedValue) and then pass a method reference to s.rangeFeedFactory.New. The same comment applies to onFrontierAdvance and onInitialScanDone (where you could store the initialTS on the KVSubscriber).

I guess this is a bit more than a nit. Closures lead to indirection through transparent (unnamed) bundles of state. It's generally better to be explicit, especially if doing so make the code cleaner and less verbose like it will here.

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 224 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Do we have a guarantee that after we close the old rangefeed, none of these callbacks will ever be invoked again in response to the old rangefeed? It would be bad if the old rangefeed was able to cause changes to the newly created s.buffer.

Yes, it's a guarantee provided by the rangefeed library. Added a comment to the library itself -- see the usage of the RangeFeed.stopped channel -- it's closed by the single thread processing all the events, and .Close() on the rangefeed waits for the stopped channel to have been closed.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):

Is it possible for the frontier to advance before the initial scan is complete?

I don't think so, both from a reading of the code and intuitively from what these frontier bumps are supposed to represent -- "no updates will be observed for a timestamp less than the frontier timestamp" (how are we to guarantee that before finishing the entire scan?)

Is this because a handler can be added after the subscriber has started? If so, is being able to do so important?

Yes, it was written such that a handler could be added after the subscriber was started. We could always pull .Start into the interface itself, and rely on the caller to wire up the callback handler to the embedded StoreReader, but this felt just as easy to reason about and similar to our other uses of rangefeeds (that are started at the pkg/server level).


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 265 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

It's pretty subtle that s.mu.external points at a fallback reader until the first initial scan, and then points at stale readers during the initial scan of newly established rangefeeds. A reader needs to jump back and forth to the interface definition a few packages up to convince themselves that this isn't a bug. Could you add some commentary here explaining what's going on and why?

Done, both here and as part of the type-level comment.


pkg/spanconfig/spanconfigkvwatcher/span_config_decoder.go, line 50 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

It's often interesting to run with TESTFLAGS='-benchmem' as well.

BenchmarkSpanConfigDecoder-16            1911889               594.7 ns/op           242 B/op          5 allocs/op
BenchmarkSpanConfigDecoder-16            2025855               585.6 ns/op           241 B/op          5 allocs/op
BenchmarkSpanConfigDecoder-16            1982452               587.9 ns/op           242 B/op          5 allocs/op
BenchmarkSpanConfigDecoder-16            2261464               532.1 ns/op           241 B/op          5 allocs/op
BenchmarkSpanConfigDecoder-16            2220526               536.6 ns/op           241 B/op          5 allocs/op

Copy link
Copy Markdown
Contributor

@nvb nvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 17 files at r4, 2 of 105 files at r17, 18 of 29 files at r19, 10 of 10 files at r20, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 1843 at r8 (raw file):

I plan to remove it shortly once we start issuing RPCs for realsies and defaulting to using the new infrastructure; we'll need actual tests then and not just log warnings.

Do you mind leaving a TODO for this?


pkg/kv/kvserver/store.go, line 2015 at r20 (raw file):

	// We'll want to offer all replicas to the split and merge queues. Be a
	// little careful about not spawning too many individual goroutines.

What is this comment in reference to? The s.splitQueue.Async and s.mergeQueue.Async down below? How are we avoiding the 2 goroutines per range?


pkg/spanconfig/spanconfig.go, line 50 at r20 (raw file):

// state (system.span_configurations). The maintenance happens transparently;
// callers can subscribe to learn about what key spans may have seen a
// configuration change. After learning about a span update, consulting the

nit: the expected usage pattern would be clearer if this comment avoided the passive tone at the end. Consider changing the last sentence to:

// After learning about a span update through a callback invocation, callers can
// consult the embedded StoreReader to retrieve an up-to-date[2] config for the
// updated span.

pkg/spanconfig/spanconfig.go, line 69 at r20 (raw file):

//      feeds when errors occur, possibly re-transmitting earlier updates
//      (usually through a lazy [min,max) span) despite possibly not needing to.
type KVSubscriber interface {

Let's try to learn from the fact that we keep running into issues with implicit (and easily misunderstood) contracts regarding callbacks passed through interface boundaries. What can we say about this callback in the interface definition to make it harder to misuse? Which goroutine is it called on? Can multiple invocations of the callback occur concurrently?


pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go, line 36 at r20 (raw file):

// CRDB cluster. It's a concrete implementation of the KVAccessor interface.
type KVAccessor struct {
	db       *kv.DB

Is the refactor in this file unrelated to the rest of the commit? If so, consider extracting it to its own commit. Same thing with the spanconfigtestutils changes.


pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go, line 142 at r20 (raw file):

					mu.Lock()
					mu.lastFrontierTS = ts
					defer mu.Unlock()

nit: either move the defer mu.Unlock() up or remove the defer.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):

I don't think so, both from a reading of the code and intuitively from what these frontier bumps are supposed to represent -- "no updates will be observed for a timestamp less than the frontier timestamp" (how are we to guarantee that before finishing the entire scan?)

Got it, thanks. Assuming we don't get our wires crossed and hear from old rangefeeds, what you say checks out.

Yes, it was written such that a handler could be added after the subscriber was started. We could always pull .Start into the interface itself, and rely on the caller to wire up the callback handler to the embedded StoreReader, but this felt just as easy to reason about and similar to our other uses of rangefeeds (that are started at the pkg/server level).

I'm surprised by this. Is this as easy to reason about? The late binding of the callback means that we have more states that the subscriber can be in. For instance, the initial scan can complete before the handler is registered. I think this is why we have to handle the handler initialization in onFrontierAdvance as well as in onInitialScanDone. It's also why we have to access the handler under the mutex

I also don't think it's similar to our other uses of rangefeeds. The rangefeed client library accepts its onValue callback on creation before its rangefeed is established.

Is there a benefit to this late binding that I'm missing?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 57 at r20 (raw file):

// initial scan, concurrent readers would continue to observe the last
// spanconfig.StoreReader if any. After the swap, it would observe the more
// up-to-date source instead. Subsequent incremental updates target the new

Can we add a claim here about why we are guaranteed that the new StoreReader will be more up-to-date than the old one by the time it is swapped out? I don't think we want to allow regressions.

Even with a claim, is there an assertion we can add somewhere to sanity check this?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 64 at r20 (raw file):

// instead of informing callers with an everything [min,max) span, we could diff
// the two data structures and only emit targeted updates.
type KVSubscriber struct {

Have you given any thought to observability of these components? Are there metrics that would help understand how they are behaving in a real cluster and diagnose poor behavior like rapid restarts?

cc. @andreimatei maybe this would be a good trial ground for some jstable-powered observability.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 86 at r20 (raw file):

		handler            func(roachpb.Span)
		handlerInitialized bool

Does handlerInitialized need to be accessed under lock? Isn't it local to the single rangefeed callback goroutine?

Combined with the comment above, it may be possible to reduce the fields under lock to just the spanconfig.StoreReader. That feels right to me after reading the comment above, as this the only state that's meant to be shared across concurrent actors.

If we do get to that state, you may consider replacing the mutex with an atomic.Value. That will make the atomic swap cleaner, it will make access cheaper (no CPU synchronization cost between concurrent readers), and it will prevent future changes from adding more state under the lock without thinking through ownership carefully.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 171 at r20 (raw file):

func (s *KVSubscriber) establishRangefeed(ctx context.Context) error {
	s.internal = spanconfigstore.New(s.fallback)

The comment Andrew and I were making yesterday was mostly about these fields and how we replace them one-by-one when restarting the rangefeed. Something about how implicit that is feels wrong. The suggestion was to wrap up all of the state scoped to a single rangefeed attempt in a struct. This would allow us to clobber it all at once on restart and have an easier time convincing ourselves (now and in the future when this code inevitably changes) that we're starting from a blank slate on each retry and aren't accidentally forgetting to clear some state and letting it leak into the next retry.

Something like:

s.cur.rangefeed.Close()
s.cur = attempt{}
s.establishRangefeed(ctx) // could even assert that s.cur is empty

If you aren't a fan of that, then it still might be a good idea to add commentary about which fields are scoped to a single retry and add a new teardownRangefeed method that Closes the rangefeed and clear all these fields.


pkg/spanconfig/spanconfigtestutils/utils.go, line 130 at r20 (raw file):

		}

		if strings.HasPrefix(line, deletePrefix) {

nit: this would avoid redundancy by being structured like:

switch {
case strings.HasPrefix(line, deletePrefix):
    ...
case strings.HasPrefix(line, upsertPrefix):
    ...
default:
    t.Fatalf("malformed line %q, expected to find prefix %q or %q",
        line, upsertPrefix, deletePrefix)
}

Release note: None
Re-name only commit.

Release note: None
We'll use it in a future commit. Also rename a field while here.

Release note: None
It's dead code; was just a convenient way of avoiding test log spam way
back in cockroachdb#16354.

Release note: None
Release note: None
Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the tests above, but more importantly, I've re-worked things yet again (third time's the charm!). Instead of having the KVSubscriber being a stand-alone component that's internally retrying and re-establishing feeds, etc. I've made it so that the caller can bounce the entire subscriber all at once. Quite a lot of code was simplified as a result -- we're no longer doing the hairy "chain a bunch of rangefeed callbacks together and also bounce the rangefeed in one of these callbacks". This is very similar to what we're doing now at the SQLWatcher, #71968, which is nice. PTAL.

The top-level Subscribe call is a blocking one, returning only on errors (at which point callers would re-Subscribe). The handlers installed concurrently stay installed, and the exported StoreReader is still accessible. When successfully re-Subscribed, the handlers will learn about it and will be able to consult a fresher StoreReader for new data. This structure feels much easier because the lifecycle management of the entire Subscriber is decoupled from passing updates to the handler, and the code dealing with the subscriber lifecycle can use whatever retry.Options it wants (for now it'll retry indefinitely with some back-off).


I've broken reviewable by an accidental amend rebase, so it's kind of slow to work with. If you want a fresh slate for this PR, I'm happy to make one.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 1843 at r8 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I plan to remove it shortly once we start issuing RPCs for realsies and defaulting to using the new infrastructure; we'll need actual tests then and not just log warnings.

Do you mind leaving a TODO for this?

Done.


pkg/kv/kvserver/store.go, line 2015 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

What is this comment in reference to? The s.splitQueue.Async and s.mergeQueue.Async down below? How are we avoiding the 2 goroutines per range?

This is in reference to the fact that we have a spanConfigUpdateQueueRateLimiter below to pace ourselves. Copy-pasta from the system config span system, though I suspect we can be much more lenient with the rate limiting here given the updates are targeted (i.e. we know exactly which ranges are changing), whereas previously it wasn't. There's a TODO to this effect near the rate limiter itself.


pkg/spanconfig/spanconfig.go, line 69 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Let's try to learn from the fact that we keep running into issues with implicit (and easily misunderstood) contracts regarding callbacks passed through interface boundaries. What can we say about this callback in the interface definition to make it harder to misuse? Which goroutine is it called on? Can multiple invocations of the callback occur concurrently?

Single goroutine; added commentary here and near the concrete type.


pkg/spanconfig/spanconfigkvaccessor/kvaccessor.go, line 36 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Is the refactor in this file unrelated to the rest of the commit? If so, consider extracting it to its own commit. Same thing with the spanconfigtestutils changes.

Done.


pkg/spanconfig/spanconfigkvsubscriber/datadriven_test.go, line 142 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

nit: either move the defer mu.Unlock() up or remove the defer.

Woops, done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):
I've re-worked some of this control flow to make it so that all these handlers are run in a single thread and at only two points. The overall re-working is described in the top-level comment, and I think it's a lot more "standard" usage wrt rangefeeds now. There are a lot less fields to begin with and a lot easier to reason about what's happening under the mutex. PTAL -- if it's still messy, I'm happy to fold a Start into the interface itself and disallow lazy bindings.

As for declaring the handler as "initialized" after the initial scan, it wasn't necessary -- we can wait for the subsequent frontier advance to make it so. I just did it this way to opportunistically avoid having the callers deal with back-to-back "everything" updates.

Is there a benefit to this late binding that I'm missing?

The benefit is that we can avoid having to pass in the concrete type into pkg/kv/kvserver by having pkg/server.Start oversee the lifecycle of the Subscriber. To KV it just looks like a pre-initialized, already running subscriber. It's completely agnostic to whether it's running, needs to be bounced, etc.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 57 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Can we add a claim here about why we are guaranteed that the new StoreReader will be more up-to-date than the old one by the time it is swapped out? I don't think we want to allow regressions.

Even with a claim, is there an assertion we can add somewhere to sanity check this?

It's got to do with the fact that when re-establishing the feed, we're doing so with higher and higher timestamps. I've changed how this automatic re-establishment of feeds works to leave it up to the caller to bounce the Subscriber as a whole. Added an assertion that checks that the initial timestamp we scan with is greater than the last recorded frontier timestamp. As for ensuring that frontier timestamps are monotonically increasing from the initial scan timestamp, there are assertions for it both at the rangefeed level and also at the rangefeed buffer level.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 86 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Does handlerInitialized need to be accessed under lock? Isn't it local to the single rangefeed callback goroutine?

Combined with the comment above, it may be possible to reduce the fields under lock to just the spanconfig.StoreReader. That feels right to me after reading the comment above, as this the only state that's meant to be shared across concurrent actors.

If we do get to that state, you may consider replacing the mutex with an atomic.Value. That will make the atomic swap cleaner, it will make access cheaper (no CPU synchronization cost between concurrent readers), and it will prevent future changes from adding more state under the lock without thinking through ownership carefully.

Moved handlerInitialized out of the mutex, though still have handler in here after re-working things a bit. See above, curious to see how you feel about it now -- happy to get rid of the lazy binding if it's still messy. Or maybe even use another mutex (handlerMu).


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 171 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

The comment Andrew and I were making yesterday was mostly about these fields and how we replace them one-by-one when restarting the rangefeed. Something about how implicit that is feels wrong. The suggestion was to wrap up all of the state scoped to a single rangefeed attempt in a struct. This would allow us to clobber it all at once on restart and have an easier time convincing ourselves (now and in the future when this code inevitably changes) that we're starting from a blank slate on each retry and aren't accidentally forgetting to clear some state and letting it leak into the next retry.

Something like:

s.cur.rangefeed.Close()
s.cur = attempt{}
s.establishRangefeed(ctx) // could even assert that s.cur is empty

If you aren't a fan of that, then it still might be a good idea to add commentary about which fields are scoped to a single retry and add a new teardownRangefeed method that Closes the rangefeed and clear all these fields.

See top-level comment, I think we can avoid all this complexity (that I originally introduced 😛).

Copy link
Copy Markdown
Contributor

@nvb nvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 3 files at r22, 4 of 5 files at r23, 1 of 1 files at r25, 23 of 25 files at r26, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 2015 at r20 (raw file):

Previously, irfansharif (irfan sharif) wrote…

This is in reference to the fact that we have a spanConfigUpdateQueueRateLimiter below to pace ourselves. Copy-pasta from the system config span system, though I suspect we can be much more lenient with the rate limiting here given the updates are targeted (i.e. we know exactly which ranges are changing), whereas previously it wasn't. There's a TODO to this effect near the rate limiter itself.

Could you move the shouldQueue := up below this comment so that it's clear what it's referring to?

But also, should we be checking for each range whether we should queue? Something seems off about calling AdminN(1) and then queuing 10k ranges. That's especially true with applyAllFromSpanConfigStore.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):

The benefit is that we can avoid having to pass in the concrete type into pkg/kv/kvserver by having pkg/server.Start oversee the lifecycle of the Subscriber. To KV it just looks like a pre-initialized, already running subscriber. It's completely agnostic to whether it's running, needs to be bounced, etc.

Is this pre-initialization needed though? I guess I don't see why the Subscribe and OnSpanConfigUpdate methods both need to exist. I think this is what you mean by "fold a Start into the interface itself and disallow lazy bindings". Is the concern that you don't want KV managing the lifecycle of the subscriber?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 174 at r26 (raw file):

		log.Fatal(ctx, "currently subscribed: only allowed once at any point in time")
	}
	defer func() { atomic.CompareAndSwapInt32(&s.subscribed, 1, 0) }()

nit: isn't this just an atomic.StoreInt32(&s.subscribed, 0)?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 211 at r26 (raw file):

		})
		if err != nil {
			select {

Will these selects with a default clause allow the error to be silently ignored if the main goroutine is busy handling an event on one of the other channels?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 254 at r26 (raw file):

			mu.Unlock()

			select {

Same question here about the default case. I think this only works if the channel is buffered. But if you have multiple buffered channels then you can't make strong claims about which channel is read from first. Should this be reading from the ctx.Done() instead?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 275 at r26 (raw file):

	}
	defer rangefeed.Close()
	s.stopper.AddCloser(rangefeed)

Why do we need this? Aren't we already waiting on the stopper down below?

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @andreimatei, @arulajmani, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 2015 at r20 (raw file):

Could you move the shouldQueue := up below this comment so that it's clear what it's referring to?

Done.

But also, should we be checking for each range whether we should queue? Something seems off about calling AdminN(1) and then queuing 10k ranges.

I was considering that, but this is actually exactly what we're doing with the system config span trigger above (since #53605) -- in that regard this is keeping the status quo. How would you feel about actually profiling once it's wired up before making any changes? It's hard to profile now given none of these tables are being written to, but we should be in much better shape next week after #71994 lands.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

The benefit is that we can avoid having to pass in the concrete type into pkg/kv/kvserver by having pkg/server.Start oversee the lifecycle of the Subscriber. To KV it just looks like a pre-initialized, already running subscriber. It's completely agnostic to whether it's running, needs to be bounced, etc.

Is this pre-initialization needed though? I guess I don't see why the Subscribe and OnSpanConfigUpdate methods both need to exist. I think this is what you mean by "fold a Start into the interface itself and disallow lazy bindings". Is the concern that you don't want KV managing the lifecycle of the subscriber?

Yes, I didn't want KV managing the lifecycle of the subscriber; felt pkg/server feels a better home for it. The other thing is we can't import the kvsubscriber library in KV directly because of import cycles, so we'd have to pass in in through an interface. I felt it cleaner to not have aStart method on the interface where any mocked impl would have to block on some surrounding stopper.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 64 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Have you given any thought to observability of these components? Are there metrics that would help understand how they are behaving in a real cluster and diagnose poor behavior like rapid restarts?

cc. @andreimatei maybe this would be a good trial ground for some jstable-powered observability.

The number of restart attempts would be worth capturing in a metric, they're just logged prominently now. Is that what you were suggesting?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 211 at r26 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Will these selects with a default clause allow the error to be silently ignored if the main goroutine is busy handling an event on one of the other channels?

Doh, good catch. I was originally trying to not have the handler thread block the underlying rangefeed, but that wasn't going to work here either. Changed the contract to have users ensure short-lived, non-blocking callbacks -- more than find for our purposes.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 254 at r26 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Same question here about the default case. I think this only works if the channel is buffered. But if you have multiple buffered channels then you can't make strong claims about which channel is read from first. Should this be reading from the ctx.Done() instead?

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 275 at r26 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Why do we need this? Aren't we already waiting on the stopper down below?

Oops, don't need it. Removed.

Copy link
Copy Markdown
Contributor

@nvb nvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting close!

Reviewed 2 of 4 files at r27, 9 of 9 files at r28, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @andreimatei, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 2015 at r20 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Could you move the shouldQueue := up below this comment so that it's clear what it's referring to?

Done.

But also, should we be checking for each range whether we should queue? Something seems off about calling AdminN(1) and then queuing 10k ranges.

I was considering that, but this is actually exactly what we're doing with the system config span trigger above (since #53605) -- in that regard this is keeping the status quo. How would you feel about actually profiling once it's wired up before making any changes? It's hard to profile now given none of these tables are being written to, but we should be in much better shape next week after #71994 lands.

If this is the status quo then I'm ok with not changing behavior in this PR.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 64 at r20 (raw file):

Previously, irfansharif (irfan sharif) wrote…

The number of restart attempts would be worth capturing in a metric, they're just logged prominently now. Is that what you were suggesting?

Yes, the number of restarts would be a worthwhile metric. This can be a separate PR.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 192 at r28 (raw file):

	}

	return err // return the last error

Depending on the retryOpts, we don't always expect this to be reachable. Is that correct? Is so, consider alluding to that in a comment.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 257 at r28 (raw file):

		if err := buffer.Add(ctx, &bufferEvent{update, ev.Value.Timestamp}); err != nil {
			select {
			case <-ctx.Done():

I saw this pattern in one of Arul's PRs as well and found it very subtle that it doesn't deadlock. It relies on the contract written far away that a rangefeed callbacks' ctx is canceled when the rangefeed is closed.

Could you leave a comment near each of these case <-ctx.Done(): branches that mentions that we expect to hit this case after the handler goroutine stops listening to errCh and closes the rangeefed?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 310 at r28 (raw file):

	injectedErrCh := s.knobs.KVSubscriberErrorInjectionCh
	internal := spanconfigstore.New(s.fallback)
	var handlerInitialized bool

Could you confirm that we want to re-initialize the handler each time the rangefeed retries?

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 64 at r20 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Yes, the number of restarts would be a worthwhile metric. This can be a separate PR.

Added to #67679, hoping to burn through this list this month.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 257 at r28 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I saw this pattern in one of Arul's PRs as well and found it very subtle that it doesn't deadlock. It relies on the contract written far away that a rangefeed callbacks' ctx is canceled when the rangefeed is closed.

Could you leave a comment near each of these case <-ctx.Done(): branches that mentions that we expect to hit this case after the handler goroutine stops listening to errCh and closes the rangeefed?

Done. There's actually a stronger rangefeed API guarantee we're relying on. When invoking rangefeed.Close, we actually wait for the rangefeed library to ensure that all inflight, possibly blocking, handlers have finished running. See:

func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
defer close(f.stopped)

The stopped channel is closed before returning from rangefeed.run, the method actually invoking all the handlers one-by-one. Added this as a comment to rangefeed.Close.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 310 at r28 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Could you confirm that we want to re-initialize the handler each time the rangefeed retries?

We do -- added a comment below.

Copy link
Copy Markdown
Collaborator

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This thing seems to be coming together! Mostly minor comments from me except for one. Can we reconsider the need to perform lazy binding here?

Reviewed 2 of 22 files at r13, 1 of 1 files at r14, 5 of 21 files at r15, 2 of 105 files at r17, 111 of 112 files at r18, 32 of 33 files at r21, 3 of 3 files at r22, 4 of 5 files at r23, 1 of 1 files at r24, 1 of 1 files at r25, 1 of 25 files at r26.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):
I have similar sentiments on this -- can we fold Subscribe and OnSpanConfigUpdate into a single method and put it on the interface instead? Something like:

type KVSubscriber interface {
   Subscribe(context.Context, retry.Options, func (update roachpb.Span))
}

This lets you forgo the complexity around if the handler has been lazily bound or not. I'm also not sure what lazily binding this handler gets us, considering we only ever allow a single handler.

I felt it cleaner to not have aStart method on the interface where any mocked impl would have to block on some surrounding stopper.

Now that you've pushed the retrying into the Subscribe call, would a mocked impl simply be able to return?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 186 at r27 (raw file):

	}{}

	defer func() {

This could use a comment


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 308 at r27 (raw file):

				}

				for _, ev := range events {

Do you only need this in the else case?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 317 at r27 (raw file):

			}
		case <-initialScanDoneCh:
			events := buffer.Flush(ctx, initialTS)

Might be worth calling out why we're using initialTS here as the timestamp to flush at, considering it's referencing behaviour in the rangefeed package.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 327 at r27 (raw file):

			s.mu.Unlock()

			if handler != nil {

Consider pulling this code out into a function so that you can call it both from here and above the very first time the handler is called.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 310 at r28 (raw file):

	injectedErrCh := s.knobs.KVSubscriberErrorInjectionCh
	internal := spanconfigstore.New(s.fallback)
	var handlerInitialized bool

This could use a comment


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 390 at r29 (raw file):

// OnSpanConfigUpdate installs a callback that's invoked with whatever span may
// have seen a config update.

nit: clarify that only one handler is ever allowed?

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll (timidly) push back again against getting rid of lazy binding for the reasons outlined below, at best it's simplifying a nil check and a boolean and makes for what I think is a more awkward interface to mock out. The latter is something I'm trying to do in #71994, which is where the pushback stems from.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):

Now that you've pushed the retrying into the Subscribe call, would a mocked impl simply be able to return?

No, it'll still have to a blocking call, right? Unless we want Subscribe to fire off an async task internally -- which is more surprising for the many reasons discussed elsewhere.

I'm also not sure what lazily binding this handler gets us, considering we only ever allow a single handler.

The lazy binding is not related to having a single handler, it's to do with not wanting to bunch together code dealing with the lifecycle the subscription and code that wants to learn about updates. The mock impl considerations are another aspect -- what does it mean to mock the interface that's providing you a retry option?

I still don't really see the complexity around lazy binding, everything else in this package was more intricate and that's where all my bugs were when writing tests. Really, it's one a nil check to see if a handler is available and a single bool.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 186 at r27 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

This could use a comment

Added to lastFrontierTS.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 308 at r27 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Do you only need this in the else case?

Yea, that works, done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 317 at r27 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Might be worth calling out why we're using initialTS here as the timestamp to flush at, considering it's referencing behaviour in the rangefeed package.

Renamed it to initialScanTS instead.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 327 at r27 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Consider pulling this code out into a function so that you can call it both from here and above the very first time the handler is called.

Meh, it's two lines. I prefer the inline commentary.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 310 at r28 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

This could use a comment

Added below + at the type level.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 390 at r29 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

nit: clarify that only one handler is ever allowed?

Specified in the interface commentary.

Copy link
Copy Markdown
Collaborator

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to chat about this offline, I may not be appreciating fully why we want to keep the lifecycle of subscription and learning about updates separate. I do think it buys us a bit more than simplifying a nil check though as it reduces the state this thing needs to track, which makes it easier to reason about.

The latter is something I'm trying to do in #71994, which is where the pushback stems from.

Are you referring to TestSpanConfigUpdateAppliedToReplica here or something else? If that's the one, then I think you could make it work even with the interface suggestion below.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):
I agree, there's not much code complexity around the lazy binding. To me the complexity stems from the fact that it isn't completely obvious to me why these distinct phases exist (as a first time reader of this code).

I could buy that you don't want to bunch together the code that deals with the lifecycle of the subscription and the code that wants to learns about updates, but then I'd expect us to be registering/de-registering multiple handlers while maintaining the rangefeed. We currently register only one handler and it is never deregistered. Do we see that changing in the future? If not, is there a benefit to having this separation?

No, it'll still have to a blocking call, right? Unless we want Subscribe to fire off an async task internally -- which is more surprising for the many reasons discussed elsewhere.

I wasn't suggesting that Subscribe fire off an async task internally. (I think) earlier you had retry code in the server package that retried this thing in a goroutine. Now that the retry code has been moved inside the kvsubscriber package, a mock implementation can simply no-op. I think I may have misunderstood what you meant above?

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, TestSpanConfigUpdateAppliedToReplica from that this branch is right. I'm still not following your suggestion, can you show what that test would look like if we did this differently?

I may not be appreciating fully why we want to keep the lifecycle of subscription and learning about updates separate

Because I feel they're separate things that benefit from the separation. I fear we're going in circles, I don't really have reasons other than ones we've already discussed below. Does this feel like a hard blocker? If not, I'll #meal-meetups expense you a beer for the rubber stamp.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 246 at r16 (raw file):

We currently register only one handler and it is never deregistered. Do we see that changing in the future?

I don't see it changing in the future, no. If it does I'm happy to revisit.

a mock implementation can simply no-op. I think I may have misunderstood what you meant above?

But we want the handler to be called in the same goroutine as Subscribe, so it has to be blocking -- I'm not sure what "simply no-op" means. I'd welcome a code example of the test we're discussing above.

@irfansharif
Copy link
Copy Markdown
Contributor Author

Pulled the stopper async task back into Subscribe (I'd benefit from a rule of thumb for when we should vs. not), and introduced a tiny handler type to capture initialization better. Now it's possible to install multiple callbacks with ease. It's as tiny as:

type handler struct {
	initialized bool // tracks whether we need to invoke with a [min,max) span first
	fn          func(update roachpb.Span)
}

func (h handler) invoke(update roachpb.Span) {
	if !h.initialized {
		h.fn(keys.EverythingSpan)
		h.initialized = true

		if update.Equal(keys.EverythingSpan) {
			return // we can opportunistically avoid re-invoking with the same update
		}
	}

	h.fn(update)
}

At the callers, it's then simply:

for _, h := range handlers {
	for _, ev := range events {
		h.invoke(ev.(*bufferEvent).Update.Span)
	}
}

Copy link
Copy Markdown
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comments are largely minor or superficial. This is looking good.

Copy link
Copy Markdown
Collaborator

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor/cleanup comments from me as well.

Reviewed 1 of 4 files at r27, 1 of 9 files at r28.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):

// KVSubscriber is used to subscribe to global span configuration changes. It's
// a concrete implementation of the spanconfig.KVSubscriber interface.
//

It might be worth explicitly adding words around the lifecycle of this thing. Maybe a summary of what we discussed in yesterday's pod meeting to get here? Specifically, that we expect there to be a singleton KVSubscriber started per node that establishes and maintains the lifecycle of rangefeeds internally. It also allows all stores on that node to subscribe to to updates to particular spans by registering handlers.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 47 at r34 (raw file):

// updates and flushing them out en-masse in timestamp order when the rangefeed
// frontier is bumped[2]. If the buffer overflows (as dictated by the memory
// limit the KVSubscriber is instantiated with), the subscriber is wound down and

Should this say "subscriber retries" internally now?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 50 at r34 (raw file):

// an appropriate error is returned to the caller.
//
// When running into the errors above, it's safe for the caller to re-subscribe

This needs an update


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 67 at r34 (raw file):

//
// TODO(irfansharif): When swapping the old spanconfig.StoreReader for the new,
// instead of informing callers with an everything [min,max) span, we could diff

nit: s/callers/registered handlers/


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 75 at r34 (raw file):

//      getting split. When applying these updates, we need to make sure to
//      process the deletion event for S before processing S1...SN.
// [2]: In our example above deleting the config for S and adding configs for S1...Nwe

Nwe?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):

//      lest we expose the intermediate state where the config for S was deleted
//      but the configs for S1...SN were not yet applied.
// [3]: TODO(irfansharif): When tearing down the subscriber due to underlying errors,

Now that we're doing this internally, should we address this TODO? Or, if not, move it inline?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 173 at r34 (raw file):

//      the exported StoreReader will be up-to-date and continue to be
//      incrementally maintained.
func (s *KVSubscriber) Subscribe(ctx context.Context) error {

In line with Andrew's comment about renaming OnSpanConfigUpdate to Subscribe, let's rename this thing to Start?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 199 at r34 (raw file):

Previously, ajwerner wrote…

nit: how do you feel about calling this run or runRangefeed?

  • 1 on Start / run pattern we have in other places as well.

pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 217 at r34 (raw file):

	defer func() {
		mu.Lock()
		s.lastFrontierTS = mu.frontierTS

Should this be a s.lastFrontierTS.Forwad(mu.frontierTS)instead?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 375 at r34 (raw file):

Previously, ajwerner wrote…

nit: I could see renaming this method to Subscribe

  • 1

pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):

// TestingSubscribeInner exports the inner subscription route for testing
// purposes.
func (s *KVSubscriber) TestingSubscribeInner(ctx context.Context) error {

Move this method to a testutils file instead.

Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What in this change obviates the need to do this?

This was a stand alone commit unrelated to anything else in the PR. It's actually just dead code like mentioned in the commit message. Was discussed in this thread: https://reviewable.io/reviews/cockroachdb/cockroach/69614#-MlLiZryEFeTuS3XXoq2.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)


pkg/kv/kvserver/store.go, line 2010 at r36 (raw file):

Previously, ajwerner wrote…

We've got the RSpan now, may as well use it: !sp.ContainsKey(startKey)

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

It might be worth explicitly adding words around the lifecycle of this thing. Maybe a summary of what we discussed in yesterday's pod meeting to get here? Specifically, that we expect there to be a singleton KVSubscriber started per node that establishes and maintains the lifecycle of rangefeeds internally. It also allows all stores on that node to subscribe to to updates to particular spans by registering handlers.

I feel like how this thing is expecting to get used is not commentary well suited here. A lot of the text below talks about the lifecycle, and looking the uses it's (I hope) clear how it's being used -- "started at the node level once" and "subscribed to by each store".


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 47 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Should this say "subscriber retries" internally now?

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 50 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

This needs an update

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 75 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Nwe?

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Now that we're doing this internally, should we address this TODO? Or, if not, move it inline?

I'm fine with just leaving the TODO here.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 173 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

In line with Andrew's comment about renaming OnSpanConfigUpdate to Subscribe, let's rename this thing to Start?

Already done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 178 at r34 (raw file):

Previously, ajwerner wrote…

I was thinking something like:

for r := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); r.Next(); {
                        const aWhile = 5 * time.Minute // arbitrary but much longer than a retry
                        started := timeutil.Now()
			if err := s.run(ctx); err != nil {
				if errors.Is(err, context.Canceled) {
					return // we're done here
				}
				ranFor := timeutil.Since(start)
				if ranFor > aWhile {
				    r.Reset()
				}
				log.Warningf(ctx, "spanconfig-kvsubscriber failed with %v after %v, retrying...", err, ranFor)
				continue
			}

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 199 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…
  • 1 on Start / run pattern we have in other places as well.

Already done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 217 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Should this be a s.lastFrontierTS.Forwad(mu.frontierTS)instead?

Sure, done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 375 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…
  • 1

Already done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 188 at r36 (raw file):

Previously, ajwerner wrote…

what is the no-error return from s.run? Is that the context being canceled? It's not totally obvious. Perhaps add some more commentary and note that case on the run comment?

Done (was already mentioned in run).


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Move this method to a testutils file instead.

Does this apply? I'm exporting a package internal method.

Copy link
Copy Markdown
Collaborator

@arulajmani arulajmani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @ajwerner, @arulajmani, @irfansharif, and @nvanbenschoten)


pkg/spanconfig/spanconfig.go, line 51 at r37 (raw file):

// callers can subscribe to learn about what key spans may have seen a
// configuration change. After learning about a span update through a callback
// invocation, callers can consult the embedded StoreReader to retrieve an

s/callers/subscribers/ (in a couple of places below as well).


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):

Previously, irfansharif (irfan sharif) wrote…

I feel like how this thing is expecting to get used is not commentary well suited here. A lot of the text below talks about the lifecycle, and looking the uses it's (I hope) clear how it's being used -- "started at the node level once" and "subscribed to by each store".

IMO the details about the lifecycle are getting lost in some of the details below. I'd strongly encourage you to write about the two phases (start and subscribe) and why they exist, even if you do it in general terms rather than talking about specific callers :)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):

Previously, irfansharif (irfan sharif) wrote…

I'm fine with just leaving the TODO here.

Okay. It's still worth updating the TODO to talk about internally retrying from the last saved checkpoint as opposed to surfacing it to the callers of this function.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 97 at r37 (raw file):

	knobs               *spanconfig.TestingKnobs

	subscribed int32    // accessed atomically

s/subscribed/started


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):

Previously, irfansharif (irfan sharif) wrote…

Does this apply? I'm exporting a package internal method.

I would've expected kvsubscriber_test.go to have unit tests and testutils.go to have wrappers around internal methods we want to export for testing purposes.

KVSubscriber presents a consistent[^1] snapshot of a
spanconfig.StoreReader that's incrementally maintained with changes made
to the global span configurations state. The maintenance happens
transparently; callers can subscribe to learn about what key spans may
have seen a configuration change. After learning about a span update,
consulting the embedded StoreReader would retrieve an up-to-date[^2]
config for it.

When a callback is first installed, it's invoked with the [min,max) span
-- a shorthand to indicate that subscribers should consult the
StoreReader for all spans of interest. Subsequent updates are of the
more incremental kind. It's possible that the span updates received are
no-ops, i.e.  consulting the StoreReader for the given span would
retrieve the last config observed for the span[^2].

    type KVSubscriber interface {
      StoreReader
      Subscribe(func(updated roachpb.Span))
    }

It's expected to Start-ed once, after which one or many subscribers can
listen in for updates. Internally we maintain a rangefeed over the
global store of span configurations (system.span_configurations),
applying updates from it into an embedded spanconfig.Store. A read-only
view of this data structure (spanconfig.StoreReader) is exposed as part
of the KVSubscriber interface. Rangefeeds used as is don't offer any
ordering guarantees with respect to updates made over non-overlapping
keys, which is something we care about[^4]. For that reason we make use
of a rangefeed buffer, accumulating raw rangefeed updates and flushing
them out en-masse in timestamp order when the rangefeed frontier is
bumped[^5]. If the buffer overflows (as dictated by the memory limit the
KVSubscriber is instantiated with), the old rangefeed is wound down and
a new one re-established.

When running into the internal errors described above, it's safe for us
to re-establish the underlying rangefeeds. When re-establishing a new
rangefeed and populating a spanconfig.Store using the contents of the
initial scan[3], we wish to preserve the existing
spanconfig.StoreReader. Discarding it would entail either blocking all
external readers until a new spanconfig.StoreReader was fully populated,
or presenting an inconsistent view of the spanconfig.Store that's
currently being populated. For new rangefeeds what we do then is route
all updates from the initial scan to a fresh spanconfig.Store, and once
the initial scan is done, swap at the source for the exported
spanconfig.StoreReader. During the initial scan, concurrent readers
would continue to observe the last spanconfig.StoreReader if any.  After
the swap, it would observe the more up-to-date source instead. Future
incremental updates will also target the new source. When this source
swap occurs, we inform the handler of the need to possibly refresh its
view of all configs.

This commit also wires up the KVSubscriber into KV stores, replacing the
use of the gossiped system config span (possible given the StoreReader
interface, only happens if a testing flag/env var is set).

[^1]: The contents of the StoreReader at t1 corresponds exactly to the
      contents of the global span configuration state at t0 where
      t0 <= t1. If the StoreReader is read from at t2 where t2 > t1,
      it's guaranteed to observe a view of the global state at t >= t0.
[^2]: For the canonical KVSubscriber implementation, this is typically
      the closed timestamp target duration.
[^3]: The canonical KVSubscriber implementation internally
      re-establishes feeds when errors occur, possibly re-transmitting
      earlier updates (usually through a lazy [min,max) span) despite
      possibly not needing to. We could do a bit better and diff the two
      data structures, emitting only targeted updates.
[^4]: For a given key k, it's config may be stored as part of a larger
      span S (where S.start <= k < S.end). It's possible for S to get
      deleted and replaced with sub-spans S1...SN in the same
      transaction if the span is getting split. When applying these
      updates, we need to make sure to process the deletion event for S
      before processing S1...SN.
[^5]: In our example above deleting the config for S and adding configs
      for S1...SN, we want to make sure that we apply the full set of
      updates all at once -- lest we expose the intermediate state where
      the config for S was deleted but the configs for S1...SN were not
      yet applied.
[^6]: When tearing down the subscriber due to underlying errors, we
      could also surface a checkpoint to use the next time the
      subscriber is established. That way we can avoid the full initial
      scan over the span configuration state and simply pick up where we
      left off with our existing spanconfig.Store.

Release note: None
Copy link
Copy Markdown
Contributor Author

@irfansharif irfansharif left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks y'all! I'm sure this felt like a doozy.

bors r+

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner, @arulajmani, and @nvanbenschoten)


pkg/spanconfig/spanconfig.go, line 51 at r37 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

s/callers/subscribers/ (in a couple of places below as well).

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 37 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

IMO the details about the lifecycle are getting lost in some of the details below. I'd strongly encourage you to write about the two phases (start and subscribe) and why they exist, even if you do it in general terms rather than talking about specific callers :)

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 79 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

Okay. It's still worth updating the TODO to talk about internally retrying from the last saved checkpoint as opposed to surfacing it to the callers of this function.

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 97 at r37 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

s/subscribed/started

Done.


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber_test.go, line 17 at r34 (raw file):

Previously, arulajmani (Arul Ajmani) wrote…

I would've expected kvsubscriber_test.go to have unit tests and testutils.go to have wrappers around internal methods we want to export for testing purposes.

It's not atypical to export test-only symbols in a _test package, which is why I did it this way (testutils.go would surface this symbol in the godoc and your editor unless it's testutils_test.go 🤷‍♀️).

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Nov 3, 2021

Build succeeded:

Copy link
Copy Markdown
Contributor

@nvb nvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 1 of 2 files at r29, 6 of 10 files at r35, 3 of 4 files at r37, 2 of 2 files at r38, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale)


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 336 at r38 (raw file):

			events := buffer.Flush(ctx, frontierTS)
			s.mu.Lock()

It would have been nice to have added some commentary about this locking. It's pretty subtle, but as we've found, also very important. Maybe we can add a comment next time we're working around here?


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 418 at r38 (raw file):

}

type handler struct {

Much nicer!


pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go, line 428 at r38 (raw file):

		h.initialized = true

		if update.Equal(keys.EverythingSpan) {

Out of curiosity, why can't we always return on this path? Do we ever need the second call to fn if we just called it with keys.EverythingSpan?

@nvb
Copy link
Copy Markdown
Contributor

nvb commented Nov 3, 2021

Congrats on landing this. It came out nicely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants