Enable SequentialReadingService to support MP + Distributed#985
Enable SequentialReadingService to support MP + Distributed#985ejguan wants to merge 6 commits intometa-pytorch:mainfrom
Conversation
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| source_datapipe, | ||
| protocol_type(req_queue, res_queue), | ||
| blocking_request_get=True, | ||
| blocking_request_get=blocking_request_get, |
There was a problem hiding this comment.
This was a bug that would blocking the dispatching process since there will be multiple loops running in the same process. And, we need to make sure each loop won't block each other.
And, it should fix the problem for DPP + MPRS with Fullsync
There was a problem hiding this comment.
And, for bullet proof, I have changed both distribtued/non-distributed tests against the non-balanced data shard to guard the dispatching use cases.
| Multiprocessing + Distributed | ||
| ------------------------------ | ||
|
|
||
| ``SequentialReadingService`` can be used to combine both ``ReadingServices`` together to achive multiprocessing and distributed training at the same time. |
There was a problem hiding this comment.
Do we expect in the future, it could also be used in OSS to chain disagg reading service + last-mile "on-trainer" Python transformation? :)
There was a problem hiding this comment.
I guess so when AIStoreRS or RayRS is provided.
| graph = traverse_dps(datapipe) | ||
| non_replicable_dp = find_lca_round_robin_sharding_dp(graph) | ||
| if non_replicable_dp is not None: | ||
| dispatching_dp = find_lca_round_robin_sharding_dp(graph) |
There was a problem hiding this comment.
thanks. it's now more clear which part of dp ~
| separates graph by multiple pieces and reconnects it using queues. | ||
| creates subprocesses. | ||
| """ | ||
| if dist.is_available() and dist.is_initialized(): |
| pass | ||
|
|
||
|
|
||
| def _launch_distributed_training(world_size, *args, fn): |
There was a problem hiding this comment.
I asked ChatGPT what does this function do:
The program is a function _launch_distributed_training that launches a distributed training process. The function takes in parameters world_size, *args, and fn. The environment variable MASTER_ADDR is set to TEST_MASTER_ADDR, and the environment variable MASTER_PORT is set to a value returned from a call to the _get_open_port function. The function creates a multiprocessing context using the spawn method, and creates a queue q using the context.
The function then creates world_size processes using the Process method of the context and starts each process. The target of each process is the function fn, and the arguments for each process are rank, world_size, q, and *args. The function stores the created processes in a list ps.
The function then uses a while loop to get data from the queue q and append it to a list res. The loop breaks when a TerminateSignal is received from the queue. After the loop, the function joins all processes in the ps list. Finally, the function returns the res list.
Seems quite correct? ~
There was a problem hiding this comment.
what happens if we give the description back to ChatGPT and ask it to write a code~~ "write a function that (quote~)"
There was a problem hiding this comment.
Haha, writing those comments would take me more time than writing code.
| model(d) | ||
| dl.shutdown() | ||
|
|
||
| Multiprocessing + Distributed |
There was a problem hiding this comment.
Non-blocking but it will be nice to add this to our Colab example as well!
There was a problem hiding this comment.
Sounds reasonable to me.
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Fixes #911
Changes
blocking_request_getnot sent to worker processSequentialReadingServiceto combine both Distributed and MP ReadingServiceSequentialReadingServiceSequentialReadingService