Skip to content

[TensorPipe] Use the new multi-payload message API#37919

Closed
lw wants to merge 3 commits intogh/lw/2/basefrom
gh/lw/2/head
Closed

[TensorPipe] Use the new multi-payload message API#37919
lw wants to merge 3 commits intogh/lw/2/basefrom
gh/lw/2/head

Conversation

@lw
Copy link
Copy Markdown
Contributor

@lw lw commented May 6, 2020

Stack from ghstack:

In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.

Differential Revision: D21425536

Differential Revision: D21425536

lw added a commit that referenced this pull request May 6, 2020
Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/)

ghstack-source-id: 103569097
Pull Request resolved: #37919
@dr-ci
Copy link
Copy Markdown

dr-ci Bot commented May 6, 2020

💊 Build failures summary and remediations

As of commit 9c42c5f (more details on the Dr. CI page):



🕵️ 1 new failure recognized by patterns

The following build failures do not appear to be due to upstream breakages:

See CircleCI build pytorch_linux_backward_compatibility_check_test (1/1)

Step: "Run tests" (full log | diagnosis details | 🔁 rerun)

May 06 15:55:52 The PR is introducing backward incompatible changes to the operator library. Please contact PyTorch team to confirm whether this change is wanted or not.
May 06 15:55:52 processing existing schema:  aten::var.out(Tensor self, int[1] dim, bool unbiased=True, bool keepdim=False, *, Tensor(a!) out) -> (Tensor(a!)) 
May 06 15:55:52 processing existing schema:  aten::var.names_dim(Tensor self, str[1] dim, bool unbiased=True, bool keepdim=False) -> (Tensor) 
May 06 15:55:52 processing existing schema:  aten::var.names_out(Tensor self, str[1] dim, bool unbiased=True, bool keepdim=False, *, Tensor(a!) out) -> (Tensor(a!)) 
May 06 15:55:52 processing existing schema:  aten::var_mean(Tensor self, bool unbiased=True) -> (Tensor, Tensor) 
May 06 15:55:52 processing existing schema:  aten::var_mean.dim(Tensor self, int[1] dim, bool unbiased=True, bool keepdim=False) -> (Tensor, Tensor) 
May 06 15:55:52 processing existing schema:  aten::var_mean.names_dim(Tensor self, str[1] dim, bool unbiased=True, bool keepdim=False) -> (Tensor, Tensor) 
May 06 15:55:52 processing existing schema:  aten::view_as(Tensor self, Tensor other) -> (Tensor) 
May 06 15:55:52 processing existing schema:  aten::where.self(Tensor condition, Tensor self, Tensor other) -> (Tensor) 
May 06 15:55:52 processing existing schema:  aten::where(Tensor condition) -> (Tensor[]) 
May 06 15:55:52 processing existing schema:  aten::_s_where(Tensor condition, Tensor self, Tensor other) -> (Tensor) 
May 06 15:55:52 The PR is introducing backward incompatible changes to the operator library. Please contact PyTorch team to confirm whether this change is wanted or not.  
May 06 15:55:52  
May 06 15:55:52 Broken ops: [ 
May 06 15:55:52 	quantized::conv_unpack(__torch__.torch.classes.quantized.Conv2dPackedParamsBase packed_weights) -> (Tensor unpacked_weights, Tensor? B_origin) 
May 06 15:55:52 ] 
May 06 15:55:52 + cleanup 
May 06 15:55:52 + retcode=1 
May 06 15:55:52 + set +x 
May 06 15:55:52 =================== sccache compilation log =================== 
May 06 15:55:52 =========== If your build fails, please take a look at the log above for possible reasons =========== 
May 06 15:55:52 Compile requests                 0 

🚧 1 fixed upstream failure:

These were probably caused by upstream breakages that were already fixed.

Please rebase on the viable/strict branch (expand for instructions)

Since your merge base is older than viable/strict, run these commands:

git fetch https://github.com/pytorch/pytorch viable/strict
git rebase FETCH_HEAD

Check out the recency history of this "viable master" tracking branch.


This comment was automatically generated by Dr. CI (expand for details).Follow this link to opt-out of these comments for your Pull Requests.

Please report bugs/suggestions on the GitHub issue tracker.

See how this bot performed.

This comment has been revised 10 times.

Copy link
Copy Markdown
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some description to this PR?

torch::distributed::rpc::TensorPipeEntry tpEntry =
torch::distributed::rpc::tensorpipeSerialize(sendingRpcMessage);
tensorpipe::Message sendingTpMessage = std::move(tpEntry.message);
EXPECT_EQ(sendingTpMessage.tensors.size(), 2);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why removing this check? Is the number of tensors no longer 2?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the main purpose of this test is that tensorpipeAllocateMessage is the inverse of tensorpipeSerialize, that is, that by passing through one and then the other one obtains the original RPC message. The implementation details of the format that these two functions choose to adopt is irrelevant to the correctness of the test, as long as they both agree on it. So for example even if the RPC message contained two tensors this doesn't have to be the case for the TP message: it could deduplicate them (if they have the same storage), coalesce them, treat one as a payload (if it's small enough that latency is more important than bandwidth), ... The test should be robust to these changes.

// Allocate memory and fill in pointers
Message rpcMessage = tensorpipeAllocateMessage(tpMessage);
TORCH_INTERNAL_ASSERT(
rpcMessage.tensors().size() == tpMessage.tensors.size(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like tensorpipeAllocateMessage does add tensors into rpcMessage, why removing this check here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above, the number of tensors in the TP message is an implementation detail and may not be equal to the number of tensors in the RPC message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now tensorpipe id not deduplicating tensors sharing the same storage. I think the number of tensors between RPC message and TensorPipe message are always the same. Let's keep these checks until handling duplicated tensor storage?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is true that at the moment that condition would be true and the check would pass. My take is that tests should only be allowed to check what is in the specification of a function, i.e., the semantics of its interface. If they do instead check the implementation then they may need to be updated each time the code changes which means they are not robust and don't really provide any safeguard.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard is right to make sure each update of the implementation doesn't break the critical assumption, unless we start to handle the duplicated tensor storage, which will break the assumption.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only "critical" assumption, IMO, is that deserializing a serialized message gives back the original. Which is now exactly the only thing the tests are checking.

@lw
Copy link
Copy Markdown
Contributor Author

lw commented May 6, 2020

Sorry, I added a descriptor to the diff after I had exported it, let me re-export it.

In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.



Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/)

Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536)

[ghstack-poisoned]
lw added a commit that referenced this pull request May 6, 2020
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.

Pull Request resolved: #37919


ghstack-source-id: 103572164

Differential Revision: [D21425536](https://our.internmc.facebook.com/intern/diff/D21425536/)
Copy link
Copy Markdown
Contributor

@mrshenli mrshenli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Test failures are irrelevant.

// Allocate memory and fill in pointers
Message rpcMessage = tensorpipeAllocateMessage(tpMessage);
TORCH_INTERNAL_ASSERT(
rpcMessage.tensors().size() == tpMessage.tensors.size(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This guard is right to make sure each update of the implementation doesn't break the critical assumption, unless we start to handle the duplicated tensor storage, which will break the assumption.

Comment on lines +419 to +425
TORCH_INTERNAL_ASSERT(
tpMessage.payloads.size() == 1,
"message expected to contain 1 payload, whereas it contained ",
tpMessage.payloads.size(),
" payloads");
std::vector<char> payload(tpMessage.payloads[0].length);
tpMessage.payloads[0].data = (uint8_t*)(payload.data());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lw If we look at this one, the check is just valid for current interface usage. When start to handle duplicated tensor storage and put all tensors' metadata to payload[1], this will break. Does that mean this should be removed..?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is being performed inside the function, not by any of its users or by the tests.

@facebook-github-bot
Copy link
Copy Markdown
Contributor

This pull request has been merged in bc09478.

2 similar comments
@facebook-github-bot
Copy link
Copy Markdown
Contributor

This pull request has been merged in bc09478.

@facebook-github-bot
Copy link
Copy Markdown
Contributor

This pull request has been merged in bc09478.

@facebook-github-bot facebook-github-bot deleted the gh/lw/2/head branch May 10, 2020 14:16
laurentdupin pushed a commit to laurentdupin/pytorch that referenced this pull request Apr 24, 2026
Summary:
In D21209901 TensorPipe added support for a vector of payloads inside each message, instead of a single one, so that users with multiple payloads can send them separately as they are instead of having to copy them into a new block of contiguous memory. The PyTorch agent is using the old API, which is preventing us from deleting it. This change has no effects on over-the-wire format and thus on performance.

Pull Request resolved: pytorch#37919

ghstack-source-id: 103572164

Test Plan:
On both workers
```
import os
import torch
import torch.distributed.rpc as rpc
os.environ["MASTER_ADDR"] = "127.0.0.1"
os.environ["MASTER_PORT"] = "8765"
```
On worker 0
```
rpc.init_rpc(name="foo", rank=0, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 1
```
rpc.init_rpc(name="bar", rank=1, backend=rpc.backend_registry.BackendType.TENSORPIPE, world_size=2, rpc_backend_options=rpc.TensorPipeRpcBackendOptions(worker_name_to_id={"foo": 0, "bar": 0}))
```
On worker 0
```
In [15]: rpc.rpc_sync("bar", torch.add, args=(torch.full((2,2), 1), torch.full((2,2), 2)))
Out[15]:
tensor([[3., 3.],
        [3., 3.]])

In [16]: rpc.rpc_sync("bar", torch.add, args=(1, 2))
Out[16]: 3
```

Differential Revision: D21425536

fbshipit-source-id: a0ec2be825556b39aff018a2834baf815a6d8fa5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants