Skip to content

Commit bc09478

Browse files
lwfacebook-github-bot
authored andcommitted
[TensorPipe] Use the new multi-payload message API (#37919)
Summary: In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance. Pull Request resolved: #37919 ghstack-source-id: 103572164 Test Plan: On both workers ``` import os import torch import torch.distributed.rpc as rpc os.environ["MASTER_ADDR"] = "127.0.0.1" os.environ["MASTER_PORT"] = "8765" ``` On worker 0 ``` rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0})) ``` On worker 1 ``` rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0})) ``` On worker 0 ``` In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2))) Out[15]: tensor([[3., 3.], [3., 3.]]) In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2)) Out[16]: 3 ``` Differential Revision: D21425536 fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
1 parent 978ad16 commit bc09478

4 files changed

Lines changed: 36 additions & 34 deletions

File tree

test/cpp/rpc/test_tensorpipe_serialization.cpp

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,20 @@ TEST(TensorpipeSerialize, Base) {
2525
torch::distributed::rpc::TensorPipeEntry tpEntry =
2626
torch::distributed::rpc::tensorpipeSerialize(sendingRpcMessage);
2727
tensorpipe::Message sendingTpMessage = std::move(tpEntry.message);
28-
EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
2928

30-
// Mimic receiving message descriptor
29+
// Mimic receiving message descriptor: recvingTpMessage is a copy of
30+
// sendingTpMessage except for the data pointers which are left null.
3131
tensorpipe::Message recvingTpMessage;
32-
recvingTpMessage.length = sendingTpMessage.length;
3332
recvingTpMessage.metadata = sendingTpMessage.metadata;
33+
recvingTpMessage.payloads.reserve(sendingTpMessage.payloads.size());
34+
for (auto& tpPayload : sendingTpMessage.payloads) {
35+
tensorpipe::Message::Payload p;
36+
p.length = tpPayload.length;
37+
p.metadata = tpPayload.metadata;
38+
recvingTpMessage.payloads.push_back(std::move(p));
39+
}
40+
EXPECT_EQ(
41+
recvingTpMessage.payloads.size(), sendingTpMessage.payloads.size());
3442
recvingTpMessage.tensors.reserve(sendingTpMessage.tensors.size());
3543
for (auto& tpTensor : sendingTpMessage.tensors) {
3644
tensorpipe::Message::Tensor t;
@@ -46,23 +54,18 @@ TEST(TensorpipeSerialize, Base) {
4654
// 2. Fill pointers to tensorpipe message
4755
torch::distributed::rpc::Message recvingRpcMessage =
4856
torch::distributed::rpc::tensorpipeAllocateMessage(recvingTpMessage);
49-
EXPECT_EQ(
50-
recvingRpcMessage.tensors().size(), recvingTpMessage.tensors.size());
51-
recvingTpMessage.data = (uint8_t*)(recvingRpcMessage.payload().data());
52-
for (int i = 0; i < recvingRpcMessage.tensors().size(); i++) {
53-
auto& rpcTensor = recvingRpcMessage.tensors()[i];
54-
auto& tpTensor = recvingTpMessage.tensors[i];
55-
tpTensor.data = (uint8_t*)(rpcTensor.data_ptr());
56-
}
5757

5858
// Mimic tensorpipe data transfer
59+
for (int i = 0; i < recvingTpMessage.payloads.size(); i++) {
60+
tensorpipe::Message::Payload& srcPayload = sendingTpMessage.payloads[i];
61+
tensorpipe::Message::Payload& dstPayload = recvingTpMessage.payloads[i];
62+
memcpy(dstPayload.data, srcPayload.data, srcPayload.length);
63+
}
5964
for (int i = 0; i < recvingTpMessage.tensors.size(); i++) {
60-
auto& srcTensor = sendingTpMessage.tensors[i];
61-
auto& dstTensor = recvingTpMessage.tensors[i];
65+
tensorpipe::Message::Tensor& srcTensor = sendingTpMessage.tensors[i];
66+
tensorpipe::Message::Tensor& dstTensor = recvingTpMessage.tensors[i];
6267
memcpy(dstTensor.data, srcTensor.data, srcTensor.length);
6368
}
64-
memcpy(recvingTpMessage.data, sendingTpMessage.data, sendingTpMessage.length);
65-
recvingTpMessage.metadata = sendingTpMessage.metadata;
6669

6770
// Data is ready
6871
EXPECT_EQ(mtype, recvingRpcMessage.type());

torch/csrc/distributed/rpc/tensorpipe_agent.cpp

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,6 @@ void TensorPipeAgent::pipeRead(
122122

123123
// Allocate memory and fill in pointers
124124
Message rpcMessage = tensorpipeAllocateMessage(tpMessage);
125-
TORCH_INTERNAL_ASSERT(
126-
rpcMessage.tensors().size() == tpMessage.tensors.size(),
127-
"Tensor num mismatch");
128-
tpMessage.data = (uint8_t*)(rpcMessage.payload().data());
129-
for (size_t i = 0; i < rpcMessage.tensors().size(); i++) {
130-
auto& rpcTensor = rpcMessage.tensors()[i];
131-
auto& tpTensor = tpMessage.tensors[i];
132-
tpTensor.data = (uint8_t*)(rpcTensor.data_ptr());
133-
}
134125

135126
pipe->read(
136127
std::move(tpMessage),

torch/csrc/distributed/rpc/utils.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -362,8 +362,10 @@ TensorPipeEntry tensorpipeSerialize(const Message& rpcMessage) {
362362
c10::List<at::Tensor> tensors = cloneSparseTensors(rpcMessage.tensors());
363363

364364
// Payload
365-
tpMessage.data = (uint8_t*)(payload.data());
366-
tpMessage.length = payload.size();
365+
tensorpipe::Message::Payload tpPayload;
366+
tpPayload.data = (uint8_t*)(payload.data());
367+
tpPayload.length = payload.size();
368+
tpMessage.payloads.push_back(std::move(tpPayload));
367369

368370
// Metadata - encode rpc message type and message id into
369371
// 8 bytes respectively
@@ -412,9 +414,15 @@ TensorPipeEntry tensorpipeSerialize(const Message& rpcMessage) {
412414
std::move(copiedTensors)};
413415
}
414416

415-
Message tensorpipeAllocateMessage(const tensorpipe::Message& tpMessage) {
417+
Message tensorpipeAllocateMessage(tensorpipe::Message& tpMessage) {
416418
// Payload, message type and message id
417-
std::vector<char> payload(tpMessage.length);
419+
TORCH_INTERNAL_ASSERT(
420+
tpMessage.payloads.size() == 1,
421+
"message expected to contain 1 payload, whereas it contained ",
422+
tpMessage.payloads.size(),
423+
" payloads");
424+
std::vector<char> payload(tpMessage.payloads[0].length);
425+
tpMessage.payloads[0].data = (uint8_t*)(payload.data());
418426
TORCH_INTERNAL_ASSERT(
419427
tpMessage.metadata.size() == 2 * sizeof(int64_t),
420428
"message metadata must be ",
@@ -430,7 +438,7 @@ Message tensorpipeAllocateMessage(const tensorpipe::Message& tpMessage) {
430438
// Tensors
431439
std::vector<torch::Tensor> tensors;
432440
tensors.reserve(tpMessage.tensors.size());
433-
for (const tensorpipe::Message::Tensor& tpTensor : tpMessage.tensors) {
441+
for (tensorpipe::Message::Tensor& tpTensor : tpMessage.tensors) {
434442
const std::string& metadata = tpTensor.metadata;
435443
size_t metadataPos = 0;
436444
auto metaDataReadFunc = [&](char* buf, size_t n) -> size_t {
@@ -451,9 +459,10 @@ Message tensorpipeAllocateMessage(const tensorpipe::Message& tpMessage) {
451459

452460
torch::jit::Unpickler unpickler(
453461
metaDataReadFunc, nullptr, nullptr, sectionReadFunc, {});
454-
auto ival = unpickler.parse_ivalue();
455-
auto&& t = ival.toTensor();
456-
tensors.emplace_back(std::move(t));
462+
c10::IValue ival = unpickler.parse_ivalue();
463+
at::Tensor rpcTensor = ival.toTensor();
464+
tpTensor.data = (uint8_t*)(rpcTensor.data_ptr());
465+
tensors.emplace_back(std::move(rpcTensor));
457466
}
458467

459468
return Message(std::move(payload), std::move(tensors), mType, mId);

torch/csrc/distributed/rpc/utils.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ TORCH_API TensorPipeEntry tensorpipeSerialize(const Message& rpcMessage);
6060
// necessary information for memory allocation, like payload length
6161
// and tensor metadata. The returned RPC message doesn't have any
6262
// data, but would be valid after tensorpipe finishs data transfer.
63-
TORCH_API Message
64-
tensorpipeAllocateMessage(const tensorpipe::Message& tpMessage);
63+
TORCH_API Message tensorpipeAllocateMessage(tensorpipe::Message& tpMessage);
6564

6665
// Some Tensors are effectively views of larger Tensors, where only a small
6766
// subset of the Storage data is referenced. This normally is good and avoids

0 commit comments

Comments
 (0)