Second prototype to do pre-sharding work in single process#555
Second prototype to do pre-sharding work in single process#555VitalyFedyunin wants to merge 4 commits intogh/VitalyFedyunin/7/basefrom
Conversation
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
|
Hi @VitalyFedyunin! Thank you for your pull request. We require contributors to sign our Contributor License Agreement, and yours needs attention. You currently have a record in our system, but the CLA is no longer valid, and will need to be resubmitted. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at cla@meta.com. Thanks! |
…processing (#919) Summary: This PR is created on top of #555. And, this PR extends `PrototypeMultiprocessingReadingService` to accept non-replicable DataPipe. And, this PR depends on pytorch/pytorch#90769 ### Main Changes - Add a way to launch a process to fetch data from non-replicable DataPipes and send data to worker processes in a round-robin manner - Add `ShardingRoundRobinDispatcher` (functional name `sharding_round_robin_dispatch`) to indicate non-replicable DataPipe - Add `MultipleDataPipesToQueuesLoop` to connect non-sharding process to request/response queues - Add `find_lca_non_replicable_dp` as a graph function to determine the lowest common ancestor of all non-replicabble DataPipes. This would guarantee that all non-replicable DataPipes will be running in a single dispatching process - In each multiprocessing worker process, - If All datapipes are replicable, apply multiprocessing sharding to the graph - If not, worker would use `find_replicable_branches` to apply mp sharding to those replicable branches, because all non-replicable branches have been properly sharded by routing data round-robinly to worker processes. - Properly get `ResetEpochResponse` from protocol via `get_response_reset_epoch` - [x] Add tests for two graph functions - [x] Add test to launch non-shardable DataPipe process - Add documents - [x] Replicable DataPipe/Non-replicable DataPipe in multiprocessing - [x] How PrototypeMPRS handles the above two types of DataPipe Please check the link for doc: https://ejguan.github.io/dataloader2.html#dynamic-sharding ## nit Changes - Rename `Spawn` to `Create` as the process has not been started Pull Request resolved: #919 Reviewed By: wenleix Differential Revision: D42004034 Pulled By: ejguan fbshipit-source-id: 5b0b1cb7c2781c4f45240d21f37d457b9729b9a4
Stack from ghstack (oldest at bottom):