Use c10::ThreadPool to send messages#23968
Use c10::ThreadPool to send messages#23968mrshenli wants to merge 9 commits intogh/mrshenli/8/basefrom
Conversation
Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Pull Request resolved: #23968 Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. ghstack-source-id: 87896432 Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
|
Since the single goal of this PR is speed up, I believe need needs to come along with a micro benchmark. Could you measure the improvement in terms of latency? |
ilia-cher
left a comment
There was a problem hiding this comment.
Second that, perf. related PRs should demonstrate an improvement on a (micro-)benchmark
|
also please add more information on what you're trying to achieve here, e.g. I wonder how many new thread pool instances there's going to be in a training process? |
|
I was intended to try out @ilia-cher BTW, is it true that |
Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Pull Request resolved: #23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 87907929 Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Summary: Pull Request resolved: pytorch#23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 87896432 Differential Revision: D16695091 fbshipit-source-id: b05c6c9749d4801a4c11a5bf1e660b60e0688163
Summary: Pull Request resolved: pytorch#23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 87896432 Differential Revision: D16695091 fbshipit-source-id: 330926902ada920c6123268f63b6e24931eaa9ad
|
Hey @pritamdamania87 thanks for reviewing. Shall we hold followup reviews on this PR a bit? I have some updates on this one but cannot export it after rebasing to #23569. Will update when #23569 is landed. |
Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Existing ProcessGroupAgent uses a single thread to send all messages. To speed up, it now uses a thread pool. Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Pull Request resolved: #23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 88365104 Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
|
Micro-benchmark: before: after (num_send_recv_threads = 4, default): after (num_send_recv_threads = 8): |
|
@aazzolini @ilia-cher @pritamdamania87 @pietern I think I addressed all the comments above and added a micro-benchmark, could you please help take another look? Thanks! |
Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Pull Request resolved: #23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 88432444 Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
| } | ||
|
|
||
| void ProcessGroupAgent::listenLoop() { | ||
| google::InitGoogleLogging("distributed.rpc"); |
There was a problem hiding this comment.
This line helped me get rid of the following warning for the LOG(INFO) (Thanks! @pritamdamania87 and @jamarshon):
WARNING: Logging before InitGoogleLogging() is written to STDERR
But I don't feel right to call it here. I searched a bit, and saw LOG is used extensively in caffe2/ but almost not used at all in torch/ or aten/. @soumith @dzhulgakov @gchanan @ezyang In general, what are the recommended API for logging? Or are we intentionally avoiding that?
There was a problem hiding this comment.
I think @dzhulgakov knows what the current state here is
Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
| pg_->send(payload, work.to_, work.to_ /* channelTag */)); | ||
| } | ||
| for (auto& pendingSend: pendingSends) { | ||
| pendingSend->wait(); |
There was a problem hiding this comment.
I'm curious about the reason you want to wait for gloo device thread
There was a problem hiding this comment.
Because work will be destructed after the lambda function finishes, and the send function itself does not keep the tensor alive. If destruction happens before send finish, the behavior will be undefined.
There was a problem hiding this comment.
Since the change you are making here, waiting for GLOO device thread to finish the ProcessGroup::Work, is similar to, in network programming, waiting for os kernel to send out the buffer in kernel, making your user-land code essentially in a blocking fashion. This is a performance regression we should be able to avoid.
I wonder is this possible to keep alive the ProcessGroup::Work object on heap returned by calling ProcessGroup::send(..).
The solition is capturing the shared_ptr of ProcessGroup::Work by value in lambda (adding ref count to the heap object) to keep it alive. This way, you don't need to call pendingSend->wait();
There was a problem hiding this comment.
@xush6528 could you please elaborate more? By worker object do you mean SendWork? If so, why using a shared_ptr would help? The SendWork is already capture by this lambda. Or are you referring to a different lambda than this one?
There was a problem hiding this comment.
Or we could have a separate GC thread for send, and enqueue all async send handles to that thread. The GC thread will then wait on all the send handle in order and delete it when done.
There was a problem hiding this comment.
@mrshenli Sorry, I mean the ProcessGroup::Work you are waiting for. Updated the above comment.
There was a problem hiding this comment.
Because work will be destructed after the lambda function finishes, and the send function itself does not keep the tensor alive. If destruction happens before send finish, the behavior will be undefined.
Oh, my bad, when I say work above, I actually mean preamble and payload tensors. We need keep those tensors alive before send finishes. And I agree that we can optimize this by capturing ProcessGroup::Work and those tensors (all as std::shared_ptrs) in a separate lambda, and wait there. Let me add an issue for this later. Thanks!
|
This pull request has been merged in 99dea08. |
Pull Request resolved: pytorch/pytorch#23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 88362006 Differential Revision: [D16695091](https://our.internmc.facebook.com/intern/diff/D16695091/)
Summary: Pull Request resolved: pytorch#23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 88476246 Differential Revision: D16695091 fbshipit-source-id: fd18a5c65e7fcd1331b73d1287673e6e10d2dd86
Stack from ghstack:
Existing ProcessGroupAgent uses a single thread to send all messages, and
a single thread to listen and process all received messages. This causes
both performance issues and also prevents nested RPCs. For example, when
running nested RPC A->B->A->B, the second recv on B cannot start until
the first recv on B finishes. If the second recv is triggered by a nested
RPC in the first recv, it will deadlock. Ideally, we should expose sth like
responder or FutureResult to the Python land to support nested asynchronous
UDFs.
This diff adds a shared ThreadPool for send and recv. Send use it do send
out messages, and recv use it to process received messages. There is still
a dedicated thread to listen for incoming messages and add it to task queue.
There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a
temporary solution for (a small number of) nested RPCs
Differential Revision: D16695091