Skip to content

storage: allow queues to process replicas in parallel, increase splitQueue's concurrency#21562

Merged
nvb merged 4 commits intocockroachdb:masterfrom
nvb:nvanbenschoten/parQueues2
Jan 21, 2018
Merged

storage: allow queues to process replicas in parallel, increase splitQueue's concurrency#21562
nvb merged 4 commits intocockroachdb:masterfrom
nvb:nvanbenschoten/parQueues2

Conversation

@nvb
Copy link
Copy Markdown
Contributor

@nvb nvb commented Jan 18, 2018

Up until this point, baseQueue has always made the assumption that only one
replica will be processed at any given time. This has been fine until now, but
there are valid cases where we may want to allow a bounded amount of concurrency
within the queues. An example of this is the splitQueue, where we may want to
allow a few replicas to be processed in parallel.

This change adds a maxConcurrency option to queueConfig. baseQueue then
changes to work off a process semaphore instead of the previous processMu. The
handling of processMu was pretty awkward before, so this change also allows us
to clean up some code, remove race conditions that allowed for poorly specified
behavior, and address a TODO.

One of the auxiliary reasons for prioritizing this change is that until now,
queues have always removed their handle to replicas completely once they began
processing. This meant that it was impossible to determine which replicas were
processing at any given time. It was also impossible to register a callback for
the completion of a processing attempt. Both of these were things I wanted to do
in relation to back-pressuring writes on splits (#21357).

Splits should be relatively isolated, other than requiring expensive RocksDB
scans over part of the splitting range to recompute stats. Because of this, a
later commit allows a limited number of splits to be processed at once. This
prevents slowsplits from stalling all other splits from ever running and also
improves the splitQueue's ability to keep up with heavy load.

Release note: None

A clock was already accessible through the Store, so it was
just distracting by crowding the method signatures.

Release note: None
@nvb nvb requested a review from a team January 18, 2018 22:14
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@nvb
Copy link
Copy Markdown
Contributor Author

nvb commented Jan 18, 2018

To expand on the note about auxiliary reasons for prioritizing this change, this is a WIP of what I picture back-pressuring writes on splits looking like. It relies on the changes to baseQueue we make here, including maintaining a handle to replicaItem through replica processing. An interesting test for that branch will be to watch how it behaves with a very small split size.

@nvb
Copy link
Copy Markdown
Contributor Author

nvb commented Jan 19, 2018

I did some testing of the impact parallelizing splits will have on workloads that
stress the split queue. While doing so, I also tested out my WIP branch
to backpressure writes on splits to see how it would impact these workloads.

To do this, I first dropped the range_max_bytes down to 4MB. I then spun up
kv --min-block-bytes=70000 --max-block-bytes=80000 --splits=0 --concurrency=16,
which was good at forcing lots of splits. With these relative sizes, it only
took about 50 writes to fully populate a range and push it over the max range
size. I then did some experimentation with the splitQueue, varying the level
of concurrency we gave it.

range_max_bytes=4MB split_concurrency=1

split1 4

range_max_bytes=4MB split_concurrency=4

split4 4

As the test shows, at a concurrency of 1, the split queue was unable to keep up
with the load and the number of ranges that needed to split grew at an unbounded
rate. When I increased the concurrency to 4, the split queue was able to keep up
and it never got stuck. As would be expected, there seemed to be a critical
level of concurrency necessary to keep up with the load. If the queue operated
below this level of concurrency, it would fail to keep up and would never catch up*. If
the queue operated above this level of concurrency, it would be able to keep up
and never get behind. For this test, that crossover point seemed to be between permitting
1 and 4 splits concurrently.

* it might eventually once the range count grew large enough that the load was sufficiently
spread out, but I suspect that this would have taken a long time

I then dropped the range_max_bytes from 4MB to 2MB, meaning that only
about 25 writes were required to fill up a range. I ran the tests again, this time,
keeping track of the largest range size over the course of the runs.

range_max_bytes=2MB split_concurrency=1

split1 2

Nathans-MBP:cockroach$ curl http://nathans-mbp:8080/_status/ranges/local 2>/dev/null |\
  jq '[.ranges | .[] | {keyBytes: .state.state.stats.keyBytes | tonumber, valBytes: .state.state.stats.valBytes | tonumber } | .keyBytes + .valBytes] | max' |\
  awk '{print $1/1000000 "MB"}'

59.4194MB

~30x the range max bytes

range_max_bytes=2MB split_concurrency=4

split4 2

Nathans-MBP:cockroach$ curl http://nathans-mbp:8080/_status/ranges/local 2>/dev/null |\
  jq '[.ranges | .[] | {keyBytes: .state.state.stats.keyBytes | tonumber, valBytes: .state.state.stats.valBytes | tonumber } | .keyBytes + .valBytes] | max' |\
  awk '{print $1/1000000 "MB"}'

6.43333MB

~3x the range max bytes

I also tried the second config with my WIP backpressure feature on. The graph looked
similar to range_max_bytes=2MB split_concurrency=4, but the max range size only
got up to 4.91065MB, ~2.5x the range max bytes. The backpressuring didn't put a
hard bound on the range size, but it did seem to help.

Finally, I dropped the range_max_bytes down all the way to 1MB. Here, none of the
graphs looked very good:

range_max_bytes=1MB split_concurrency=1

So bad I stopped testing right away.

range_max_bytes=1MB split_concurrency=4 with and without backpressure

split4 1

While the graphs looked about the same for concurrency=4 with and without
backpressure enabled, there was a pretty clear distinction between the maximum
range size seen between these two test cases. Without it enabled, I saw ranges
get as large as 8.01927MB, ~8x the range max bytes. With it enabled, I saw
ranges grow to 3.59608MB, ~4x the range max bytes. However, the improved
effectiveness at bounding range size came at the clear expense of
99th-percentile latency. I'll continue the exploration of backpressure when I
actually push the change for review, but this is all in line with what we'd
expect.

Finally, I ran an experiment with a much larger amount of split concurrency. Even
with a range size of 1MB, the split queue was able to keep up. This demonstrates
that we may want to consider dynamically increasing the split concurrency when the
splitQueue is struggling to keep up.

range_max_bytes=1MB split_concurrency=16 with backpressure

split16 1

@petermattis
Copy link
Copy Markdown
Collaborator

Nice writeup!

Finally, I ran an experiment with a much larger amount of split concurrency. Even
with a range size of 1MB, the split queue was able to keep up. This demonstrates
that we may want to consider dynamically increasing the split concurrency when the
splitQueue is struggling to keep up.

Dynamically increasing split concurrency sounds a bit much. Splits are quick with these small ranges because the stats computation is so fast. With larger ranges increased concurrency might be a problem. Or maybe not. Rather than dynamically adjusting the concurrency, I think a cluster setting plus the back pressure mechanism would be sufficient in the near term.

@bdarnell
Copy link
Copy Markdown
Contributor

:lgtm:

Dynamically increasing split concurrency sounds a bit much.

Agreed. When would a dynamic adjustment be better than just a higher constant concurrency?


Review status: 0 of 12 files reviewed at latest revision, 2 unresolved discussions, all commit checks successful.


pkg/storage/queue.go, line 288 at r2 (raw file):

	processSem := make(chan struct{}, semCount)
	for i := 0; i < semCount; i++ {
		processSem <- struct{}{}

The semaphore pattern we use elsewhere (e.g. Stopper.RunLimitedAsyncTask) has the channel initially empty (and then we "acquire" by sending to the channel and "release" by receiving)


pkg/storage/queue.go, line 803 at r2 (raw file):

					// Lock processing so that we don't race with the
					// main process loop.
					defer bq.lockProcessing()()

This smells bad to me. Why do we need to shut down everything instead of just locking bq.mu?


Comments from Reviewable

nvb added 3 commits January 19, 2018 23:18
Up until this point, `baseQueue` has always made the assumption that only one
replica will be processed at any given time. This has been fine until now, but
there are valid cases where we may want to allow a bounded amount of concurrency
within the queues. An example of this is the `splitQueue`, where we may want to
allow a few replicas to be processed in parallel.

This change adds a `maxConcurrency` option to `queueConfig`. `baseQueue` then
changes to work off a process semaphore instead of the previous `processMu`. The
handling of `processMu` was pretty awkward before, so this change also allows us
to clean up some code, remove race conditions that allowed for poorly specified
behavior, and address a TODO.

One of the auxiliary reasons for prioritizing this change is that until now,
queues have always removed their handle to replicas completely once they began
processing. This meant that it was impossible to determine which replicas were
processing at any given time. It was also impossible to register a callback for
the completion of a processing attempt. Both of these were things I wanted to do
in relation to back-pressuring writes on splits (cockroachdb#21357).

Release note: None
Splits should be relatively isolated, other than requiring expensive RocksDB
scans over part of the splitting range to recompute stats. Because of this, we
allow a limited number of splits to be processed at once. This prevents slow
splits from stalling all other splits from ever running and also improves the
splitQueue's ability to keep up with heavy load.

Release note: None
`baseQueue.DrainQueue` no longer races with `baseQueue.processLoop`, so we can
address an existing TODO and rely on `DrainQueue` to fully process all enqueued
replicas before returning. This allows us to simplify a few tests.

For each of these tests, I verified that they were still flaky without the
`SucceedsSoon` if `DrainQueue` did not call `lockProcessing` and then verified
that the flake went away when the `lockProcessing` call was added back in.

I'm sure there are other tests that could be improved now, but nothing stuck out
to me when I looked.

Release note: None
@nvb nvb force-pushed the nvanbenschoten/parQueues2 branch from 7d4c027 to bbf95aa Compare January 20, 2018 04:18
@nvb
Copy link
Copy Markdown
Contributor Author

nvb commented Jan 20, 2018

TFTR!

When would a dynamic adjustment be better than just a higher constant concurrency?

That's a good point, we might as well just increase the max concurrency to start with. Based on the experiments I ran above and on Peter's point about larger ranges being more resource intensive, I think a concurrency of 4 for the splitQueue is still a good level for now.


Review status: 0 of 12 files reviewed at latest revision, 2 unresolved discussions, some commit checks pending.


pkg/storage/queue.go, line 288 at r2 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

The semaphore pattern we use elsewhere (e.g. Stopper.RunLimitedAsyncTask) has the channel initially empty (and then we "acquire" by sending to the channel and "release" by receiving)

Done.


pkg/storage/queue.go, line 803 at r2 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This smells bad to me. Why do we need to shut down everything instead of just locking bq.mu?

Good point, we only needed to acquire from the semaphore once, since we were only running one task at a time. Done.


Comments from Reviewable

@bdarnell
Copy link
Copy Markdown
Contributor

Reviewed 6 of 9 files at r1, 2 of 3 files at r2, 1 of 1 files at r4, 3 of 3 files at r5.
Review status: all files reviewed at latest revision, 2 unresolved discussions, all commit checks successful.


Comments from Reviewable

@nvb nvb merged commit 0d0d9f1 into cockroachdb:master Jan 21, 2018
@nvb nvb deleted the nvanbenschoten/parQueues2 branch January 21, 2018 02:13
@knz
Copy link
Copy Markdown
Contributor

knz commented Jan 22, 2018

nit: this would have benefited from a release note mentioning the performance improvement.

@nvb
Copy link
Copy Markdown
Contributor Author

nvb commented Jan 22, 2018

@knz good point. I just opened a related change (#21673), so I put the release note there. The release note may be slightly misattributed, but I don't think that's a huge deal.

nvb added a commit to nvb/cockroach that referenced this pull request Jan 23, 2018
The change in cockroachdb#21562 made it possible for queues to process replicas in
parallel. In doing so, it changed how `baseQueue` handled the queuing of
replicas that were already processing when `MaybeAdd` was called. This
was not an issue before because only a single replica could ever be
processed at a time, so it was not an issue to add the replica that was
currently processing back into the queue. That said, the previous approach
could result in replicas being queued with the wrong priority, which
could result in queuing inefficiencies.

With the changes made in cockroachdb#21562, `baseQueue` now needs to be careful that
the same replica doesn't get processed twice at the same time. This is
accomplished by using a `requeue` flag on the `replicaItem`. If the flag
is set when a replica finishes processing, we requeue it. This ensures
that we never skip a necessary processing attempt, and is safe because
all queue processing is idempotent.

This change makes sure that we call `ShouldQueue` again before placing
replicas with the `requeue` flag back in the queue. This isn't strictly
required for correctness (again, because processing is idempotent), but
it ensure that we don't add replicas back into the queue needlessly. It
also ensures that we place the replica back in the queue using the
priority provided by `ShouldQueue` *after* the earlier processing attempt
finishes, instead of the priority it had *before*. This fixes a subtle
issue that we've actually always had.

I suspect that this would have helped keep the queue size down in some
of the testing from cockroachdb#21562. It would have also helped us in keeping
the max range size down because we would have better optimized the
ranges that we prioritized when the queue began to fill.

Release note (performance improvement): Multiple ranges can now split at
the same time, improving our ability to handle hotspot workloads.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants