Skip to content

Use c10::ThreadPool to send messages#23968

Closed
mrshenli wants to merge 9 commits intogh/mrshenli/8/basefrom
gh/mrshenli/8/head
Closed

Use c10::ThreadPool to send messages#23968
mrshenli wants to merge 9 commits intogh/mrshenli/8/basefrom
gh/mrshenli/8/head

Conversation

@mrshenli
Copy link
Copy Markdown
Contributor

@mrshenli mrshenli commented Aug 7, 2019

Stack from ghstack:

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

Differential Revision: D16695091

Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
@pytorchbot pytorchbot added the oncall: distributed Add this issue/PR to distributed oncall triage queue label Aug 7, 2019
Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
mrshenli added a commit that referenced this pull request Aug 7, 2019
Pull Request resolved: #23968

Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.
ghstack-source-id: 87896432

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
@aazzolini
Copy link
Copy Markdown
Contributor

Since the single goal of this PR is speed up, I believe need needs to come along with a micro benchmark. Could you measure the improvement in terms of latency?

@ilia-cher ilia-cher self-requested a review August 7, 2019 20:33
Copy link
Copy Markdown
Contributor

@ilia-cher ilia-cher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second that, perf. related PRs should demonstrate an improvement on a (micro-)benchmark

@ilia-cher
Copy link
Copy Markdown
Contributor

also please add more information on what you're trying to achieve here, e.g. I wonder how many new thread pool instances there's going to be in a training process?

@mrshenli
Copy link
Copy Markdown
Contributor Author

mrshenli commented Aug 7, 2019

@aazzolini @ilia-cher

I was intended to try out c10::ThreadPool on sends before adding it for recvs to unblock @xush6528. Will add some micro-benchmark later.

@ilia-cher
There will be one send ThreadPool and one recv ThreadPool one shared ThreadPool for send and receive per ProcessGroupAgent per process. Is there any concern on this?

BTW, is it true that c10::ThreadPool is making a copy of the function and bound args?

Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
mrshenli added a commit that referenced this pull request Aug 7, 2019
Pull Request resolved: #23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 87907929

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.h Outdated
xush6528 pushed a commit to xush6528/pytorch that referenced this pull request Aug 8, 2019
Summary:
Pull Request resolved: pytorch#23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 87896432

Differential Revision: D16695091

fbshipit-source-id: b05c6c9749d4801a4c11a5bf1e660b60e0688163
xush6528 pushed a commit to xush6528/pytorch that referenced this pull request Aug 9, 2019
Summary:
Pull Request resolved: pytorch#23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 87896432

Differential Revision: D16695091

fbshipit-source-id: 330926902ada920c6123268f63b6e24931eaa9ad
@gqchen gqchen self-requested a review August 9, 2019 22:53
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.h Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.h Outdated
@mrshenli
Copy link
Copy Markdown
Contributor Author

Hey @pritamdamania87 thanks for reviewing. Shall we hold followup reviews on this PR a bit? I have some updates on this one but cannot export it after rebasing to #23569. Will update when #23569 is landed.

Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Existing ProcessGroupAgent uses a single thread to send all messages. To speed
up, it now uses a thread pool.

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
mrshenli added a commit that referenced this pull request Aug 15, 2019
Pull Request resolved: #23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 88365104

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
@mrshenli
Copy link
Copy Markdown
Contributor Author

Micro-benchmark:

before:

test_stress_heavy_rpc (test_rpc.RpcTest) ... 
Rank 3 finished testing heavy_rpc 20 times in 1.841923713684082 seconds.
Rank 0 finished testing heavy_rpc 20 times in 1.8617804050445557 seconds.
Rank 1 finished testing heavy_rpc 20 times in 1.9007983207702637 seconds.
Rank 2 finished testing heavy_rpc 20 times in 1.9105055332183838 seconds.

test_stress_light_rpc (test_rpc.RpcTest) ... 
Rank 2 finished testing light_rpc 1000 times in 1.124621868133545 seconds.
Rank 0 finished testing light_rpc 1000 times in 1.1577978134155273 seconds.
Rank 3 finished testing light_rpc 1000 times in 1.170255422592163 seconds.
Rank 1 finished testing light_rpc 1000 times in 1.1736280918121338 seconds.

after (num_send_recv_threads = 4, default):

test_stress_heavy_rpc (test_rpc.RpcTest) ... 
Rank 1 finished testing heavy_rpc 20 times in 0.5223677158355713 seconds.
Rank 0 finished testing heavy_rpc 20 times in 0.5939044952392578 seconds.
Rank 2 finished testing heavy_rpc 20 times in 0.6136205196380615 seconds.
Rank 3 finished testing heavy_rpc 20 times in 0.6397840976715088 seconds.

test_stress_light_rpc (test_rpc.RpcTest) ... 
Rank 3 finished testing light_rpc 1000 times in 0.5930540561676025 seconds.
Rank 0 finished testing light_rpc 1000 times in 0.5937092304229736 seconds.
Rank 2 finished testing light_rpc 1000 times in 0.6050472259521484 seconds.
Rank 1 finished testing light_rpc 1000 times in 0.6078634262084961 seconds.

after (num_send_recv_threads = 8):

test_stress_heavy_rpc (test_rpc.RpcTest) ... 
Rank 1 finished testing heavy_rpc 20 times in 0.34965944290161133 seconds.
Rank 3 finished testing heavy_rpc 20 times in 0.3574059009552002 seconds.
Rank 2 finished testing heavy_rpc 20 times in 0.3725595474243164 seconds.
Rank 0 finished testing heavy_rpc 20 times in 0.41245269775390625 seconds.

test_stress_light_rpc (test_rpc.RpcTest) ... 
Rank 3 finished testing light_rpc 1000 times in 0.5265188217163086 seconds.
Rank 2 finished testing light_rpc 1000 times in 0.5312702655792236 seconds.
Rank 0 finished testing light_rpc 1000 times in 0.5387604236602783 seconds.
Rank 1 finished testing light_rpc 1000 times in 0.541905403137207 seconds.

@mrshenli
Copy link
Copy Markdown
Contributor Author

mrshenli commented Aug 15, 2019

@aazzolini @ilia-cher @pritamdamania87 @pietern I think I addressed all the comments above and added a micro-benchmark, could you please help take another look? Thanks!

Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp
Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
mrshenli added a commit that referenced this pull request Aug 16, 2019
Pull Request resolved: #23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 88432444

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
}

void ProcessGroupAgent::listenLoop() {
google::InitGoogleLogging("distributed.rpc");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line helped me get rid of the following warning for the LOG(INFO) (Thanks! @pritamdamania87 and @jamarshon):

WARNING: Logging before InitGoogleLogging() is written to STDERR

But I don't feel right to call it here. I searched a bit, and saw LOG is used extensively in caffe2/ but almost not used at all in torch/ or aten/. @soumith @dzhulgakov @gchanan @ezyang In general, what are the recommended API for logging? Or are we intentionally avoiding that?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @dzhulgakov knows what the current state here is

Comment thread torch/csrc/distributed/rpc/ProcessGroupAgent.cpp Outdated
Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
pg_->send(payload, work.to_, work.to_ /* channelTag */));
}
for (auto& pendingSend: pendingSends) {
pendingSend->wait();
Copy link
Copy Markdown
Contributor

@xush6528 xush6528 Aug 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious about the reason you want to wait for gloo device thread

Copy link
Copy Markdown
Contributor Author

@mrshenli mrshenli Aug 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because work will be destructed after the lambda function finishes, and the send function itself does not keep the tensor alive. If destruction happens before send finish, the behavior will be undefined.

Copy link
Copy Markdown
Contributor

@xush6528 xush6528 Aug 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrshenli

Since the change you are making here, waiting for GLOO device thread to finish the ProcessGroup::Work, is similar to, in network programming, waiting for os kernel to send out the buffer in kernel, making your user-land code essentially in a blocking fashion. This is a performance regression we should be able to avoid.

I wonder is this possible to keep alive the ProcessGroup::Work object on heap returned by calling ProcessGroup::send(..).

The solition is capturing the shared_ptr of ProcessGroup::Work by value in lambda (adding ref count to the heap object) to keep it alive. This way, you don't need to call pendingSend->wait();

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xush6528 could you please elaborate more? By worker object do you mean SendWork? If so, why using a shared_ptr would help? The SendWork is already capture by this lambda. Or are you referring to a different lambda than this one?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we could have a separate GC thread for send, and enqueue all async send handles to that thread. The GC thread will then wait on all the send handle in order and delete it when done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrshenli Sorry, I mean the ProcessGroup::Work you are waiting for. Updated the above comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because work will be destructed after the lambda function finishes, and the send function itself does not keep the tensor alive. If destruction happens before send finish, the behavior will be undefined.

Oh, my bad, when I say work above, I actually mean preamble and payload tensors. We need keep those tensors alive before send finishes. And I agree that we can optimize this by capturing ProcessGroup::Work and those tensors (all as std::shared_ptrs) in a separate lambda, and wait there. Let me add an issue for this later. Thanks!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #24946

@zou3519 zou3519 deleted the gh/mrshenli/8/head branch August 17, 2019 00:49
@facebook-github-bot
Copy link
Copy Markdown
Contributor

This pull request has been merged in 99dea08.

xxtEchjovs44 pushed a commit to xxtEchjovs44/pytorch that referenced this pull request Jan 29, 2020
Pull Request resolved: pytorch/pytorch#23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 88362006

Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
laurentdupin pushed a commit to laurentdupin/pytorch that referenced this pull request Apr 24, 2026
Summary:
Pull Request resolved: pytorch#23968

Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.

This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs

ghstack-source-id: 88476246

Differential Revision: D16695091

fbshipit-source-id: fd18a5c65e7fcd1331b73d1287673e6e10d2dd86
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Merged oncall: distributed Add this issue/PR to distributed oncall triage queue

Projects

None yet

Development

Successfully merging this pull request may close these issues.