Skip to content

Add support to keep non-replicable DataPipe in the main process#950

Closed
ejguan wants to merge 6 commits intometa-pytorch:mainfrom
ejguan:keep_non_replicable_in_main_process
Closed

Add support to keep non-replicable DataPipe in the main process#950
ejguan wants to merge 6 commits intometa-pytorch:mainfrom
ejguan:keep_non_replicable_in_main_process

Conversation

@ejguan
Copy link
Contributor

@ejguan ejguan commented Jan 19, 2023

This is useful when fullsync is in the pipeline and we don't want to make this DataPipe running in the worker process

Changes

  • Change the function names that is dispatching-related to dispatching_xxx
  • Make fullsync DataPipe non-replicable
  • Add _find_replicable_branches to find the last DataPipe prior to any non-replicable DataPipe
    • Add graph tests
  • In PrototypeMultiprocessingReadingService, make sure only replicable_datapipe sent to worker process. And, replace the replicable_datapipe with the worker_consumer_datapipe.

@facebook-github-bot facebook-github-bot added the CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. label Jan 19, 2023
@ejguan ejguan requested a review from wenleix January 19, 2023 14:37
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

return d * 2


class NonReplicableDataPipe(IterDataPipe):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like IterDataPipe is evolving towards DAG nodes...

ShardingRoundRobinDispatcherIterDataPipe is also non-replicable right ? But they seem to have different graph rewrite strategy (one tries to move to dispatch process and one tries to keep in main process)

@mp_ctx_parametrize
def test_non_replicable_datapipe(self, ctx) -> None:
r"""
For the pipeline with non-replicable DataPipe, make sure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This non-replicable DataPipe also cannot be ShardingRRDispatchDP, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can't. Like I said, ShardingRRDispatchDP labels the prior graph non-shardable and other non-shardable DataPipe labels itself and subsequent graph non-shardable.

self._main_prefetch_datapipe = end_datapipe

# Attach non-replicable DataPipes
if replicable_dp is not datapipe:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this happens when we have non-replicable data pipe (and need to keep it in the main process).

And the reason to replace replicable_dp with end_datapipe is because end_datapipe has "exchange sink" attached? (i.e. the _IterateQueueDataPipes on line 291)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing that because we send replicable_dp to worker process, which returns an end_datapipe with the exchange sink.

non_rep_dp1 (datapipe) -> non_rep_dp2 -> rep_dp_1 (replicable_dp) -> rep_dp2 -> ...

After replacement, the graph becomes

non_rep_dp1 (datapipe) -> non_rep_dp2 -> end_datapipe

In this case, we want to return non_rep_dp1 (datapipe) rather than the end_datapipe.

For the case that the whole graph is replicable

rep_dp_1 (replicable_dp/datapipe) -> rep_dp2 -> ...

After launching mp, graph becomes

end_datapipe

So, we just need to keep end_datapipe.

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@ejguan ejguan force-pushed the keep_non_replicable_in_main_process branch from 6446dd3 to 2e0a080 Compare January 23, 2023 17:47
@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@facebook-github-bot
Copy link
Contributor

@ejguan merged this pull request in 2ca1fa6.

SvenDS9 pushed a commit to SvenDS9/PytorchData that referenced this pull request Jan 24, 2023
…-pytorch#950)

Summary:
This is useful when `fullsync` is in the pipeline and we don't want to make this DataPipe running in the worker process

### Changes

- Change the function names that is dispatching-related to `dispatching_xxx`
- Make `fullsync` DataPipe non-replicable
- Add `_find_replicable_branches` to find the last DataPipe prior to any non-replicable DataPipe
  - Add graph tests
- In `PrototypeMultiprocessingReadingService`, make sure only `replicable_datapipe` sent to worker process. And, replace the `replicable_datapipe` with the `worker_consumer_datapipe`.

Pull Request resolved: meta-pytorch#950

Reviewed By: wenleix, NivekT, Miiira

Differential Revision: D42617776

Pulled By: ejguan

fbshipit-source-id: 1138203507934b089025e290597b473ef9be32bb
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Facebook bot. Authors need to sign the CLA before a PR can be reviewed. Merged

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants