[TensorPipe] Use the new multi-payload message API#37919
[TensorPipe] Use the new multi-payload message API#37919lw wants to merge 3 commits intogh/lw/2/basefrom
Conversation
Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/) [ghstack-poisoned]
Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/) ghstack-source-id: 103569097 Pull Request resolved: #37919
💊 Build failures summary and remediationsAs of commit 9c42c5f (more details on the Dr. CI page):
🕵️ 1 new failure recognized by patternsThe following build failures do not appear to be due to upstream breakages:
|
Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/) [ghstack-poisoned]
mrshenli
left a comment
There was a problem hiding this comment.
Could you please add some description to this PR?
| torch::distributed::rpc::TensorPipeEntry tpEntry = | ||
| torch::distributed::rpc::tensorpipeSerialize(sendingRpcMessage); | ||
| tensorpipe::Message sendingTpMessage = std::move(tpEntry.message); | ||
| EXPECT_EQ(sendingTpMessage.tensors.size(), 2); |
There was a problem hiding this comment.
Why removing this check? Is the number of tensors no longer 2?
There was a problem hiding this comment.
I think the main purpose of this test is that tensorpipeAllocateMessage is the inverse of tensorpipeSerialize, that is, that by passing through one and then the other one obtains the original RPC message. The implementation details of the format that these two functions choose to adopt is irrelevant to the correctness of the test, as long as they both agree on it. So for example even if the RPC message contained two tensors this doesn't have to be the case for the TP message: it could deduplicate them (if they have the same storage), coalesce them, treat one as a payload (if it's small enough that latency is more important than bandwidth), ... The test should be robust to these changes.
| // Allocate memory and fill in pointers | ||
| Message rpcMessage = tensorpipeAllocateMessage(tpMessage); | ||
| TORCH_INTERNAL_ASSERT( | ||
| rpcMessage.tensors().size() == tpMessage.tensors.size(), |
There was a problem hiding this comment.
Looks like tensorpipeAllocateMessage does add tensors into rpcMessage, why removing this check here?
There was a problem hiding this comment.
Same reason as above, the number of tensors in the TP message is an implementation detail and may not be equal to the number of tensors in the RPC message.
There was a problem hiding this comment.
Right now tensorpipe id not deduplicating tensors sharing the same storage. I think the number of tensors between RPC message and TensorPipe message are always the same. Let's keep these checks until handling duplicated tensor storage?
There was a problem hiding this comment.
Yes, it is true that at the moment that condition would be true and the check would pass. My take is that tests should only be allowed to check what is in the specification of a function, i.e., the semantics of its interface. If they do instead check the implementation then they may need to be updated each time the code changes which means they are not robust and don't really provide any safeguard.
There was a problem hiding this comment.
This guard is right to make sure each update of the implementation doesn't break the critical assumption, unless we start to handle the duplicated tensor storage, which will break the assumption.
There was a problem hiding this comment.
The only "critical" assumption, IMO, is that deserializing a serialized message gives back the original. Which is now exactly the only thing the tests are checking.
|
Sorry, I added a descriptor to the diff after I had exported it, let me re-export it. |
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance. Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/) Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536) [ghstack-poisoned]
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance. Pull Request resolved: #37919 ghstack-source-id: 103572164 Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/)
mrshenli
left a comment
There was a problem hiding this comment.
LGTM! Test failures are irrelevant.
| // Allocate memory and fill in pointers | ||
| Message rpcMessage = tensorpipeAllocateMessage(tpMessage); | ||
| TORCH_INTERNAL_ASSERT( | ||
| rpcMessage.tensors().size() == tpMessage.tensors.size(), |
There was a problem hiding this comment.
This guard is right to make sure each update of the implementation doesn't break the critical assumption, unless we start to handle the duplicated tensor storage, which will break the assumption.
| TORCH_INTERNAL_ASSERT( | ||
| tpMessage.payloads.size() == 1, | ||
| "message expected to contain 1 payload, whereas it contained ", | ||
| tpMessage.payloads.size(), | ||
| " payloads"); | ||
| std::vector<char> payload(tpMessage.payloads[0].length); | ||
| tpMessage.payloads[0].data = (uint8_t*)(payload.data()); |
There was a problem hiding this comment.
@lw If we look at this one, the check is just valid for current interface usage. When start to handle duplicated tensor storage and put all tensors' metadata to payload[1], this will break. Does that mean this should be removed..?
There was a problem hiding this comment.
This check is being performed inside the function, not by any of its users or by the tests.
|
This pull request has been merged in bc09478. |
2 similar comments
|
This pull request has been merged in bc09478. |
|
This pull request has been merged in bc09478. |
Summary: In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance. Pull Request resolved: pytorch#37919 ghstack-source-id: 103572164 Test Plan: On both workers ``` import os import torch import torch.distributed.rpc as rpc os.environ["MASTER_ADDR"] = "127.0.0.1" os.environ["MASTER_PORT"] = "8765" ``` On worker 0 ``` rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0})) ``` On worker 1 ``` rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0})) ``` On worker 0 ``` In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2))) Out[15]: tensor([[3., 3.], [3., 3.]]) In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2)) Out[16]: 3 ``` Differential Revision: D21425536 fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
Stack from ghstack:
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.
Differential Revision: D21425536
Differential Revision: D21425536