[CudaIpc 2/3]: Ipc handle exchange#3910
Conversation
|
Review updated until commit 3957f14 Description
Changes walkthrough 📝
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
|
!test |
wujingyue
left a comment
There was a problem hiding this comment.
To help me understand this better, is it possible to add some unit tests to this PR? I tend to review a PR starting from tests unless it's trivial.
you can look at the test in #3911 |
I finally added a test |
|
!test |
CMakeLists.txt
Outdated
| ${LIBCUPTI} | ||
| ${TORCH_LIBRARIES} | ||
| dl | ||
| cuda |
There was a problem hiding this comment.
needed for cuMemGetAddressRange, otherwise getting an "invalid context" error. But according to #3907 there is clearly an issue with how we load driver API in general, so I'm linking to it directly in the meantime
| if (isOptionEnabled(EnableOption::HostIrLowering)) { | ||
| hie_ = std::make_unique<hir::HostIrEvaluator>( | ||
| hir::HostIrEvaluator(std::move(hic))); | ||
| hie_ = std::make_unique<hir::HostIrEvaluator>(std::move(hic)); |
There was a problem hiding this comment.
cc @nsarka
My compiler complains otherwise
| // all ranks set `send_tensor` | ||
| send_tensor.copy_(generate_tensor(repetition, my_rank)); | ||
| torch::cuda::synchronize(); | ||
| communicator_->barrier(); |
There was a problem hiding this comment.
I had to move it to after the cudaMemcpy. Because it is a "put" algorithm: we need to wait that the sender side has finished writing before validating.
If instead we write a "get" algorithm, the barrier would have been at the right place, and the justification would have been: we need to wait that the send side has finished setting-up its buffer before reading.
In any case we need a synchronization across iterations.
There was a problem hiding this comment.
I thought the semaphore is supposed to take care of this sender/receiver synchronization and therefore avoids the barrier. Am I missing something?
There was a problem hiding this comment.
I thought the semaphore is supposed to take care of this sender/receiver synchronization and therefore avoids the barrier. Am I missing something?
You are right, semaphores are used to sync in the p2p primitive that are implemented in the next PR. But, here, we have a standalone unit testing sharing the ipc handles, as per your request, so some synchronization needs to be added
csrc/multidevice/ipc_handle.h
Outdated
| } | ||
|
|
||
| private: | ||
| using KeyType = std::tuple<int64_t, at::Tensor, P2PCommunication*>; |
There was a problem hiding this comment.
I'm quite unsure about the key type. My impression from #3912 has been that IPC handle is registered per tensor not per communication. For example, when rank 0 sends the same buffer to rank 1's buffer1 and buffer2, can't rank 0 use the same IPC handle? (Of course, it would have to maintain some ref counts so the buffer is not prematurely deallocated before both reads are done)
There was a problem hiding this comment.
You also need semaphores for the synchronization, one per communication, thus this key type
There was a problem hiding this comment.
Besides the semaphores, we could think that we could reuse the same buffer's ipc handles for several P2P using the same buffer. But I don't think that is a good idea. It would only save us some cudaIpcGetMemHandle, which is a really minor improvement, but wouldn't save us all the remaining, semaphore, set/get the TCP store, the barrier etc.
And we still need to be consistent with the symmetry assumption. If rank 0 sends the same buffer to rank 1 and rank 2, we need two exchanges (0/1 and 0/2), therefore a way to not hitting the cache even though the buffer is the same. Same for 0 and 1 involved in two p2p communications, e.g., rank 0 sends buffer a to rank 1's buffer b and rank 0 sends buffer a to rank 1's buffer c
Does it make sense?
There was a problem hiding this comment.
I'm pretty sure what you have here is a valid solution. Lots of my questions came from that I'm new to CUDA IPC and I'm trying to figure out the first principles. Therefore, in addition to one solution, I'm trying to understand the design space and why certain solutions are preferred.
Re: semaphores. Yes, I understood that one semaphore per P2P communication is a valid solution. I also imagine a semaphore can be extended to deal with multiple senders or receivers, because there are "counting semaphores" which hold a count. Do people consider this for implementing collectives like allgather and allreduce?
Re: symmetry assumption. I'm sure you mentioned this somewhere and I forgot what it is and what it buys us. Do you have a reference?
There was a problem hiding this comment.
I'm pretty sure what you have here is a valid solution. Lots of my questions came from that I'm new to CUDA IPC and I'm trying to figure out the first principles. Therefore, in addition to one solution, I'm trying to understand the design space and why certain solutions are preferred.
No problem, I'm also happy to brainstorm ! :)
Re: semaphores. Yes, I understood that one semaphore per P2P communication is a valid solution. I also imagine a semaphore can be extended to deal with multiple senders or receivers, because there are "counting semaphores" which hold a count. Do people consider this for implementing collectives like allgather and allreduce?
"Counters" are definitely possible, but have two problems:
- cuStream ops do not have a read-and-write primitive, so we cannot easily implement incrementing a counter
- There is even less "atomic add" operations, and thus we would need to handle race conditions between ranks anyway.
Besides this, I can't see immediate benefit from using less semaphore allocations, if not a slightly reduced memory footprint.
Re: symmetry assumption. I'm sure you mentioned this somewhere and I forgot what it is and what it buys us. Do you have a reference?
I don't have a reference, I am using my own definition here. Btw this assumption is implicit in this pr and we need to make it explicit and more robust in the future. What I mean in this PR by symmetry assumption is: "if we hit the cache locally, we hit the cache globally". IOW, we only check in the local cache if we can use the IpcHandle, without a further interprocess synchronization.
This assumption is not necessarily true if, e.g., across iterations, one rank changes the buffer and the other doesn't.
However, this assumption can be enforced in nvFuser, at least for internal buffers that we allocate ourselves (we will probably need to add a new allocation attribute for that).
wujingyue
left a comment
There was a problem hiding this comment.
LGTM! While I still have several questions on the big picture that we can keep discussing in the PR, I believe it tries to implement fundamental building blocks that will enable IPC inside nvFuser.
| // all ranks set `send_tensor` | ||
| send_tensor.copy_(generate_tensor(repetition, my_rank)); | ||
| torch::cuda::synchronize(); | ||
| communicator_->barrier(); |
There was a problem hiding this comment.
I thought the semaphore is supposed to take care of this sender/receiver synchronization and therefore avoids the barrier. Am I missing something?
|
|
||
| const ExpressionEvaluator& expr_evaluator_; | ||
| std::unordered_map<KeyType, std::unique_ptr<P2pIpcHandle>, KeyHash, KeyEqual> | ||
| handles_; |
There was a problem hiding this comment.
A non-blocking question: Apparently, this is not deallocated until the end of the program so all handles and buffers live until the end. In the future, do we plan to have HostIrEvaluator::handle(P2PCommunication*) to stream-wait for the semaphore and a later Deallocate IR to deallocate the buffer? (That would make sense to me.)
There was a problem hiding this comment.
I missed this comment earlier, sorry
all handles and buffers live until the end
Regarding Ipc handles and semaphore: yes, their lifetime is owned by the P2pIpcHandle object.
Regarding data buffers, IIUC, in the current version, I think the answer is no. Indeed, the buffer's lifetime is managed by pytorch at::Tensor, and the Ipc handles do not hold any reference of the at::Tensor.
However, we might want to fix this to ensure that the buffer is still live before we close the handle, since the doc says:
Calling cudaFree on an exported memory region before calling cudaIpcCloseMemHandle in the importing context will result in undefined behavior.
Letting the IpcHandle hold the at::Tensor should fix this
a later Deallocate IR to deallocate the buffer
yes, that's also what I have in mind. The Deallocate IR should take care of cleaning up not only the data buffer but also any potential Ipc Handle pointing to it.
There was a problem hiding this comment.
Letting the IpcHandle hold the at::Tensor should fix this
I added that
| P2PCommunicationType::SEND, | ||
| send_tv, | ||
| IrBuilder::create<Val>(send_peer), | ||
| CommunicatorBackend::kNccl); |
There was a problem hiding this comment.
Is this supposed to be the CUDA backend?
There was a problem hiding this comment.
not yet -- it will be in the next PR
In this test the backend is meaningless, since we don't use HostIrEvaluator. We just need to create the Expr* to feed IpcHandleCache::exchangeHandles
| // to be exported by batch (thus the function taking a vector of | ||
| // P2PCommunication*) to improve performance and to avoid creating deadlocks | ||
| // when imports and exports order differ accross ranks. | ||
| void exchangeHandles(const std::vector<P2PCommunication*>& communications); |
There was a problem hiding this comment.
While I understood how it's used in the unit test, it's unclear when/how we will call this in practice. But I'm happy to defer this to following PRs.
When a P2PCommunication is in a for loop, it's no longer possible to exchangeHandles at the beginning of the program because peer and tensor depend on the loop index. Therefore, do you expect this to be called at the beginning of each control flow scope and with all P2PCommunications that are guaranteed to happen in that scope?
There was a problem hiding this comment.
Hopefully it wil become clearer in #3911
When a P2PCommunication is in a for loop, it's no longer possible to
exchangeHandlesat the beginning of the program because peer and tensor depend on the loop index.
It is not a problem ; actually, we precisely intend to use exchangeHandles inside a for Loop. That is the reason why the caching is not only based on symbolic P2PCommunication*, but also on runtime evaluation (at::Tensor buffer, int64_t peer) which a evaluated at each for-loop iteration.
Therefore, do you expect this to be called at the beginning of each control flow scope and with all P2PCommunications that are guaranteed to happen in that scope?
No, each iteration can trigger a new exchange if the cache is missed.
csrc/multidevice/ipc_handle.h
Outdated
| } | ||
|
|
||
| private: | ||
| using KeyType = std::tuple<int64_t, at::Tensor, P2PCommunication*>; |
There was a problem hiding this comment.
I'm pretty sure what you have here is a valid solution. Lots of my questions came from that I'm new to CUDA IPC and I'm trying to figure out the first principles. Therefore, in addition to one solution, I'm trying to understand the design space and why certain solutions are preferred.
Re: semaphores. Yes, I understood that one semaphore per P2P communication is a valid solution. I also imagine a semaphore can be extended to deal with multiple senders or receivers, because there are "counting semaphores" which hold a count. Do people consider this for implementing collectives like allgather and allreduce?
Re: symmetry assumption. I'm sure you mentioned this somewhere and I forgot what it is and what it buys us. Do you have a reference?
|
merging is blocked by the lazy-loading issue, but this time with |
|
!test |
1 similar comment
|
!test |
|
!build |
|
!test |
|
!test |
|
!test |
|
!test |
|
!test |
On top of
prerequesite to:
What
Exprnodehir::ShareMemHandlesto represent this op. We cannot embed the op in the Send/Recv semantics because we need to group the handle exchange between matching sends and recv to avoid deadlocksHow
Most of the implementation is in
multidevice/ipc_handle.cppIpcHandlerepresenting the ipc handle that is exchanged. This class is supplemented with a semaphore, which is a local cuda buffer allocated on the exporter's device.IpcHandleCachewhich handles exchanging and caching the ipc handles. Caching is made on with respect to a combination of runtime and symbolic ingredients:(runtime peer, at::Tensor, Expr*). This caching allows to have arbitrary number of p2p comms between pairs of ranks.