[DataLoader2] Fix apply_sharding to accept one sharding_filter per branch#90769
[DataLoader2] Fix apply_sharding to accept one sharding_filter per branch#90769ejguan wants to merge 3 commits intopytorch:masterfrom
Conversation
🔗 Helpful Links🧪 See artifacts and rendered test results at hud.pytorch.org/pr/90769
Note: Links to docs will display an error until the docs builds have been completed. ✅ No FailuresAs of commit 6db5dd5: This comment was automatically generated by Dr. CI and updates every 15 minutes. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
NivekT
left a comment
There was a problem hiding this comment.
LGTM! Thanks for adding this!
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
NivekT
left a comment
There was a problem hiding this comment.
Thanks for updating the error message!
|
@pytorchbot merge (Initiating merge automatically since Phabricator Diff has merged) |
Merge startedYour change will be merged once all checks pass (ETA 0-4 Hours). Learn more about merging in the wiki. Questions? Feedback? Please reach out to the PyTorch DevX Team |
…processing (#919) Summary: This PR is created on top of #555. And, this PR extends `PrototypeMultiprocessingReadingService` to accept non-replicable DataPipe. And, this PR depends on pytorch/pytorch#90769 ### Main Changes - Add a way to launch a process to fetch data from non-replicable DataPipes and send data to worker processes in a round-robin manner - Add `ShardingRoundRobinDispatcher` (functional name `sharding_round_robin_dispatch`) to indicate non-replicable DataPipe - Add `MultipleDataPipesToQueuesLoop` to connect non-sharding process to request/response queues - Add `find_lca_non_replicable_dp` as a graph function to determine the lowest common ancestor of all non-replicabble DataPipes. This would guarantee that all non-replicable DataPipes will be running in a single dispatching process - In each multiprocessing worker process, - If All datapipes are replicable, apply multiprocessing sharding to the graph - If not, worker would use `find_replicable_branches` to apply mp sharding to those replicable branches, because all non-replicable branches have been properly sharded by routing data round-robinly to worker processes. - Properly get `ResetEpochResponse` from protocol via `get_response_reset_epoch` - [x] Add tests for two graph functions - [x] Add test to launch non-shardable DataPipe process - Add documents - [x] Replicable DataPipe/Non-replicable DataPipe in multiprocessing - [x] How PrototypeMPRS handles the above two types of DataPipe Please check the link for doc: https://ejguan.github.io/dataloader2.html#dynamic-sharding ## nit Changes - Rename `Spawn` to `Create` as the process has not been started Pull Request resolved: #919 Reviewed By: wenleix Differential Revision: D42004034 Pulled By: ejguan fbshipit-source-id: 5b0b1cb7c2781c4f45240d21f37d457b9729b9a4
Changes:
sharding_filterin the pipeline as long as they are not on the same branchExample:
In order to properly shard
DP1andDP2, we should allow multiplesharding_filters