Skip to content

[EventEngine] Implement work-stealing in the EventEngine ThreadPool#32869

Merged
drfloob merged 55 commits intogrpc:masterfrom
drfloob:waitless
May 8, 2023
Merged

[EventEngine] Implement work-stealing in the EventEngine ThreadPool#32869
drfloob merged 55 commits intogrpc:masterfrom
drfloob:waitless

Conversation

@drfloob
Copy link
Copy Markdown
Member

@drfloob drfloob commented Apr 14, 2023

This PR implements a work-stealing thread pool for use inside EventEngine implementations. Because of historical risks here, I've guarded the new implementation behind an experiment flag: GRPC_EXPERIMENTS=work_stealing. Current default behavior is the original thread pool implementation.

Benchmarks look very promising:

bazel test \
--test_timeout=300 \
--config=opt -c opt \
--test_output=streamed \
--test_arg='--benchmark_format=csv' \
--test_arg='--benchmark_min_time=0.15' \
--test_arg='--benchmark_filter=_FanOut' \
--test_arg='--benchmark_repetitions=15' \
--test_arg='--benchmark_report_aggregates_only=true' \
test/cpp/microbenchmarks:bm_thread_pool

2023-05-04: bm_thread_pool benchmark results on my local machine (64 core ThreadRipper PRO 3995WX, 256GB memory), comparing this PR to master:

image

2023-05-04: bm_thread_pool benchmark results in the Linux RBE environment (unsure of machine configuration, likely small), comparing this PR to master.

image

paulosjca pushed a commit to paulosjca/grpc that referenced this pull request May 4, 2023
…c#32935)

Fix: grpc#18075

From comments in grpc#18075, `CPython`
reinitialize the `GIL` after `pthread_atfork` child handler, thus we
shouldn't use any `GIL` related functions in child handler which is what
we're currently doing, this PR uses `os.register_at_fork` to replace
`pthread_atfork` to prevent any undesired bevahior.

This also seems to fixes a thread hanging issue cased by changes in
core: grpc#32869

### Testing:
* Passed existing fork tests. (Note that due to some issues in `Bazel`,
this change was not verified by `Bazel runs_per_test`).
* Tested by patch the core PR, was able to fix Python fork tests:
grpc#32933
@drfloob drfloob requested review from soheilhy and removed request for gnossen, jtattermusch and veblush May 4, 2023 22:44
@Vignesh2208 Vignesh2208 self-requested a review May 5, 2023 22:41
Copy link
Copy Markdown
Contributor

@Vignesh2208 Vignesh2208 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi AJ, reviewed the core logic and left a few suggestions. I haven't reviewed the tests yet.

PosixEventEngine::PosixEventEngine()
: connection_shards_(std::max(2 * gpr_cpu_num_cores(), 1u)),
executor_(std::make_shared<ThreadPool>()),
executor_(MakeThreadPool(grpc_core::Clamp(gpr_cpu_num_cores(), 2u, 16u))),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the max was 32u in the original threadpool. Maybe we retain that here too and everywhere else ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two primary reasons why I changed it:

  1. We've had multiple complaints about an excessive number of threads for idle gRPC processes. For complete control, users can implement their own EventEngine. However using the code in this PR, if 32 threads are needed then they will be spun up eventually (a ~25s warmup period). If you'll note the benchmark results, even 8 threads here outperforms the previous implementation with 32 threads.

  2. The auto-scaling mechanism here is nascent. I have one concrete idea for improved auto-scaling, but with a set of target platforms to test on, we can benchmark various auto-scalers. Getting that set of platforms is tricky in OSS though.

EventEngine::Closure* WorkStealingThreadPool::TheftRegistry::StealOne() {
grpc_core::MutexLock lock(&mu_);
EventEngine::Closure* closure;
for (auto* queue : queues_) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One potential future optimization here could be that if we have a data structure that returns the queue with the highest backlog (in-terms of queue length), it might be preferable to steal from that queue first.

It can even be coarse grained: queues can be put into 3 buckets: SMALL, MEDIUM, LARGE depending on their queue length and while stealing, we can iterate over large queues first before moving on to medium and small sized queues.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bucketing idea is interesting. We could also institute queue priorities, I've seen some good results there. I just wonder if the overhead of queue juggling would negate the benefits of optimizing for queue time.

One potential future optimization here could be that if we have a data structure that returns the queue with the highest backlog (in-terms of queue length), it might be preferable to steal from that queue first.

Maybe so. In my chats with @soheilhy, optimizing for queue time was not terribly fruitful in their experiments. Further, this is pop-most-recent because we have a high performance queue implementation that avoids mutexes for LIFO operations. It has some very rare atomic flake issues, so I planned to land that as a subsequent improvement.

But these are all things we can experiment with if the performance difference is meaningful.

thread_running_.store(true);
while (true) {
absl::SleepFor(absl::Milliseconds(
(backoff_.NextAttemptTime() - grpc_core::Timestamp::Now()).millis()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we backoff exponentially here instead of having a fixed sleep duration ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarks and efficiency :-)

If the pool is highly active, we want a vigilant lifeguard because it will wake idle workers faster than the workers will wake themselves. If gRPC is idle, having the lifeguard wake up every 50 millis is needlessly expensive.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this mean that through line 267, the backoff timer keeps multiplying even for the case where there is some idle thread AND work to be done (i.e work queue is not empty) ?

Shouldn't the backoff timer multiply only for the case where there are idle threads but no work needs to be done (i.e the pool is empty) ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, good catch. Fixed.

state.counters["pop_rate"] = benchmark::Counter(
element_count * state.iterations(), benchmark::Counter::kIsRate);
state.counters["pop_attempts"] = pop_attempts;
state.counters["hit_rate"] =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment on what the hit_rate is measuring ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

TYPED_TEST(ThreadPoolTest, ScalesWhenBackloggedFromSingleThreadLocalQueue) {
int pool_thread_count = 8;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an upper limit on the number of threads spawned by the thread pool ? If not, can you create some k* pool_thread_count callbacks in line 161 and line 187 to verify the lifeguard is able to create more and more threads as necessary ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no upper limit. As is, this test ensures that scaling happens, which is essential. I'm not sure if we can safely specify a minimum scale amount to assert for all implementations. Do you think some N threads would make this a better test?

The biggest issue with "more and more (N) threads" is that the pool implementation creates at most 1 thread per second, so the test could take a while depending on N.

void ScheduleSelf(ThreadPool* p) {
p->Run([p] { ScheduleSelf(p); });
TYPED_TEST(ThreadPoolTest, ForkStressTest) {
// Runs a large number of closures and multiple simulated fork events,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more comments for this test ? Is it testing that a Fork operation is not blocked indefinitely ?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Copy Markdown
Contributor

@Vignesh2208 Vignesh2208 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. Left a few clarifications.

@drfloob
Copy link
Copy Markdown
Member Author

drfloob commented May 8, 2023

Mostly looks good. Left a few clarifications.

Thanks Vignesh! I've updated the PR and responded to all comments.

@drfloob drfloob requested a review from Vignesh2208 May 8, 2023 19:13
@drfloob drfloob changed the title [EventEngine] Implement work stealing in the EventEngine ThreadPool [EventEngine] Implement work-stealing in the EventEngine ThreadPool May 8, 2023
@drfloob
Copy link
Copy Markdown
Member Author

drfloob commented May 8, 2023

A test cherrypick was clean, no need for it. Thanks for the review!

@drfloob drfloob merged commit 3fb738b into grpc:master May 8, 2023
@copybara-service copybara-service bot added the imported Specifies if the PR has been imported to the internal repository label May 9, 2023
wanlin31 pushed a commit that referenced this pull request May 18, 2023
)

Fix: #18075

From comments in #18075, `CPython`
reinitialize the `GIL` after `pthread_atfork` child handler, thus we
shouldn't use any `GIL` related functions in child handler which is what
we're currently doing, this PR uses `os.register_at_fork` to replace
`pthread_atfork` to prevent any undesired bevahior.

This also seems to fixes a thread hanging issue cased by changes in
core: #32869

### Testing:
* Passed existing fork tests. (Note that due to some issues in `Bazel`,
this change was not verified by `Bazel runs_per_test`).
* Tested by patch the core PR, was able to fix Python fork tests:
#32933
wanlin31 pushed a commit that referenced this pull request May 18, 2023
…32869)

This PR implements a work-stealing thread pool for use inside
EventEngine implementations. Because of historical risks here, I've
guarded the new implementation behind an experiment flag:
`GRPC_EXPERIMENTS=work_stealing`. Current default behavior is the
original thread pool implementation.

Benchmarks look very promising:

```
bazel test \
--test_timeout=300 \
--config=opt -c opt \
--test_output=streamed \
--test_arg='--benchmark_format=csv' \
--test_arg='--benchmark_min_time=0.15' \
--test_arg='--benchmark_filter=_FanOut' \
--test_arg='--benchmark_repetitions=15' \
--test_arg='--benchmark_report_aggregates_only=true' \
test/cpp/microbenchmarks:bm_thread_pool
```

2023-05-04: `bm_thread_pool` benchmark results on my local machine (64
core ThreadRipper PRO 3995WX, 256GB memory), comparing this PR to
master:


![image](https://user-images.githubusercontent.com/295906/236315252-35ed237e-7626-486c-acfa-71a36f783d22.png)

2023-05-04: `bm_thread_pool` benchmark results in the Linux RBE
environment (unsure of machine configuration, likely small), comparing
this PR to master.


![image](https://user-images.githubusercontent.com/295906/236317164-2c5acbeb-fdac-4737-9b2d-4df9c41cb825.png)

---------

Co-authored-by: drfloob <drfloob@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bloat/medium imported Specifies if the PR has been imported to the internal repository lang/c++ lang/core lang/php per-call-memory/neutral per-channel-memory/neutral release notes: no Indicates if PR should not be in release notes

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants