[torch.distributed][RPC] Add test case for nested RPC#24036
[torch.distributed][RPC] Add test case for nested RPC#24036xush6528 wants to merge 3 commits intopytorch:masterfrom
Conversation
Summary: run python udf over rpc: 1. pickle python udf 2. pass pickle to C++ 3. C++ pass over rpc from client to server 4. server call runPythonUDF() python function to unpickle and run python udf, using python embedder 5. serialize result in C++ 6. pass back serialized result from server to client and deserialize it in C++ 7. return it to python Differential Revision: D16390764 fbshipit-source-id: 1a589f932f25b65d9b2227d6a18b919a98d2df02
Summary: Pull Request resolved: pytorch#23968 Existing ProcessGroupAgent uses a single thread to send all messages, and a single thread to listen and process all received messages. This causes both performance issues and also prevents nested RPCs. For example, when running nested RPC A->B->A->B, the second recv on B cannot start until the first recv on B finishes. If the second recv is triggered by a nested RPC in the first recv, it will deadlock. Ideally, we should expose sth like responder or FutureResult to the Python land to support nested asynchronous UDFs. This diff adds a shared ThreadPool for send and recv. Send use it do send out messages, and recv use it to process received messages. There is still a dedicated thread to listen for incoming messages and add it to task queue. There are two goals: 1) speed up ProcessGroupAgent 2) use ThreadPool as a temporary solution for (a small number of) nested RPCs ghstack-source-id: 87896432 Differential Revision: D16695091 fbshipit-source-id: b05c6c9749d4801a4c11a5bf1e660b60e0688163
Summary: # Issue Implementation in pytorch#23228 has a recvLoop thread that blocks on running a requested RPC function. This can't meet the requirement in pytorch#23110 The proposal implies a worker could send out nested RPC. "Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no thread to receive the return value of the second RPC function. A diagram showing the case of this issue, pytorch#23228 (comment) # Solution - Add a test case to capture this requirement. - Add debugging utilities that could be quite useful for debugging tricky RPC cases. # Misc - Add debugging utility in common_distributed for tracing RPC calls. Differential Revision: D16682122 fbshipit-source-id: 062e77faa5e8467cb3cfe5b0a16333c2762768a9
| from common_utils import load_tests, run_tests | ||
|
|
||
|
|
||
| class WorkerContext(object): |
There was a problem hiding this comment.
Perhaps I should write a C++ class for it.
And it should eventually evolve into a WorkerDiscoveryService Interface.
There was a problem hiding this comment.
Good point! I think we can keep this here for now. If we do want to evolve this into WorkerDiscoveryService we might need to have discussion on the features and API and make sure it works for different RPC backend.
Summary: Pull Request resolved: pytorch#24036 # Issue Implementation in pytorch#23228 has a recvLoop thread that blocks on running a requested RPC function. This can't meet the requirement in pytorch#23110, where the proposal implies a worker could send out nested RPC. "Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no available idle thread to receive the return value of the second RPC function. A diagram showing the case of this issue, pytorch#23228 (comment). A more extensive thinking by mrshenli in pytorch#23569 (comment). # Solution - Add a test case to capture this requirement. # Misc - Add debugging utilities that could be quite useful for tracing RPC behaviors and debugging tricky RPC cases similar to this. Differential Revision: D16682122 fbshipit-source-id: ffa78eb20af4e2cf9476998fa544ab940035cae9
|
|
||
| def sprint(*args, **kwargs): | ||
| date_str = datetime.datetime.today().strftime("%m%d %H:%M:%S.%f") | ||
| print( |
There was a problem hiding this comment.
nit: Is there a way to toggle this. Sometimes the printed message becomes less readable especially when args/kwargs contain large tensors.
| if WorkerContext.worker_name() == "worker0": | ||
| to = "worker1" | ||
| ret = dist.rpc(to, nested_rpc, args=(WorkerContext.worker_name(), self.num_send_recv_threads, to), async_call=False) | ||
| assert ret is None, str(ret) |
There was a problem hiding this comment.
nit: Do we want to use a not-None return value here, just to explicitly check results are correct?
| dist.init_process_group( | ||
| backend="gloo", rank=self.rank, world_size=self.world_size, store=store | ||
| ) | ||
| WorkerContext(worker_id=self.rank, world_size=self.world_size) |
There was a problem hiding this comment.
nit: After this line, WorkerContext. _INITIALIZED is already True? Should we actually put this after init_rpc?
| from common_utils import load_tests, run_tests | ||
|
|
||
|
|
||
| class WorkerContext(object): |
There was a problem hiding this comment.
Good point! I think we can keep this here for now. If we do want to evolve this into WorkerDiscoveryService we might need to have discussion on the features and API and make sure it works for different RPC backend.
pritamdamania87
left a comment
There was a problem hiding this comment.
Can we update the title and description of this PR to reflect what we're addressing in the PR? The title suggests we're only adding some unit tests, but the PR actually touches a lot of the core RPC code and some similar changes exist in other PRs.
|
@pritamdamania87 I think most changes to the core part will be gone after a rebase. But yes, if there are indeed changes to the rpc core, let's make sure the PR description says so. |
|
Will get back to this later. |
|
Not urgent |
Summary:
Issue
Implementation in #23228 has a recvLoop thread that blocks on running a requested RPC function.
This can't meet the requirement in #23110, where the proposal implies a worker could send out nested RPC.
"Nested RPC" means, an RPC callee could send out another RPC, before the first RPC returns, and since the worker's recv thread would be busy waiting for the return of the first RPC function, there is no available idle thread to receive the return value of the second RPC function.
A diagram showing the case of this issue, #23228 (comment).
A more extensive thinking by @mrshenli in #23569 (comment).
Solution
Misc
Differential Revision: D16682122