Add support to keep non-replicable DataPipe in the main process#950
Add support to keep non-replicable DataPipe in the main process#950ejguan wants to merge 6 commits intometa-pytorch:mainfrom
Conversation
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| return d * 2 | ||
|
|
||
|
|
||
| class NonReplicableDataPipe(IterDataPipe): |
There was a problem hiding this comment.
Looks like IterDataPipe is evolving towards DAG nodes...
ShardingRoundRobinDispatcherIterDataPipe is also non-replicable right ? But they seem to have different graph rewrite strategy (one tries to move to dispatch process and one tries to keep in main process)
| @mp_ctx_parametrize | ||
| def test_non_replicable_datapipe(self, ctx) -> None: | ||
| r""" | ||
| For the pipeline with non-replicable DataPipe, make sure |
There was a problem hiding this comment.
This non-replicable DataPipe also cannot be ShardingRRDispatchDP, right?
There was a problem hiding this comment.
No, it can't. Like I said, ShardingRRDispatchDP labels the prior graph non-shardable and other non-shardable DataPipe labels itself and subsequent graph non-shardable.
| self._main_prefetch_datapipe = end_datapipe | ||
|
|
||
| # Attach non-replicable DataPipes | ||
| if replicable_dp is not datapipe: |
There was a problem hiding this comment.
I guess this happens when we have non-replicable data pipe (and need to keep it in the main process).
And the reason to replace replicable_dp with end_datapipe is because end_datapipe has "exchange sink" attached? (i.e. the _IterateQueueDataPipes on line 291)?
There was a problem hiding this comment.
Doing that because we send replicable_dp to worker process, which returns an end_datapipe with the exchange sink.
non_rep_dp1 (datapipe) -> non_rep_dp2 -> rep_dp_1 (replicable_dp) -> rep_dp2 -> ...
After replacement, the graph becomes
non_rep_dp1 (datapipe) -> non_rep_dp2 -> end_datapipe
In this case, we want to return non_rep_dp1 (datapipe) rather than the end_datapipe.
For the case that the whole graph is replicable
rep_dp_1 (replicable_dp/datapipe) -> rep_dp2 -> ...
After launching mp, graph becomes
end_datapipe
So, we just need to keep end_datapipe.
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
6446dd3 to
2e0a080
Compare
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…-pytorch#950) Summary: This is useful when `fullsync` is in the pipeline and we don't want to make this DataPipe running in the worker process ### Changes - Change the function names that is dispatching-related to `dispatching_xxx` - Make `fullsync` DataPipe non-replicable - Add `_find_replicable_branches` to find the last DataPipe prior to any non-replicable DataPipe - Add graph tests - In `PrototypeMultiprocessingReadingService`, make sure only `replicable_datapipe` sent to worker process. And, replace the `replicable_datapipe` with the `worker_consumer_datapipe`. Pull Request resolved: meta-pytorch#950 Reviewed By: wenleix, NivekT, Miiira Differential Revision: D42617776 Pulled By: ejguan fbshipit-source-id: 1138203507934b089025e290597b473ef9be32bb
This is useful when
fullsyncis in the pipeline and we don't want to make this DataPipe running in the worker processChanges
dispatching_xxxfullsyncDataPipe non-replicable_find_replicable_branchesto find the last DataPipe prior to any non-replicable DataPipePrototypeMultiprocessingReadingService, make sure onlyreplicable_datapipesent to worker process. And, replace thereplicable_datapipewith theworker_consumer_datapipe.