Skip to content

[DataPipes] Add group support to the sharding_filter#88424

Closed
VitalyFedyunin wants to merge 7 commits intogh/VitalyFedyunin/116/basefrom
gh/VitalyFedyunin/116/head
Closed

[DataPipes] Add group support to the sharding_filter#88424
VitalyFedyunin wants to merge 7 commits intogh/VitalyFedyunin/116/basefrom
gh/VitalyFedyunin/116/head

Conversation

@VitalyFedyunin
Copy link
Contributor

@VitalyFedyunin VitalyFedyunin commented Nov 3, 2022

Stack from ghstack (oldest at bottom):

Differential Revision: D41006747

@pytorch-bot pytorch-bot bot added release notes: dataloader release notes category labels Nov 3, 2022
@pytorch-bot
Copy link

pytorch-bot bot commented Nov 3, 2022

🔗 Helpful Links

🧪 See artifacts and rendered test results at hud.pytorch.org/pr/88424

Note: Links to docs will display an error until the docs builds have been completed.

✅ No Failures

As of commit 4a1181d:
💚 Looks good so far! There are no failures yet. 💚

This comment was automatically generated by Dr. CI and updates every 15 minutes.

@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Nov 3, 2022

CLA Signed

The committers listed above are authorized under a signed CLA.

@ejguan ejguan self-requested a review November 3, 2022 16:46
Copy link
Contributor

@ejguan ejguan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with a few nit comments.


def __init__(self, source_datapipe: IterDataPipe, sharding_group_filter=None):
self.source_datapipe = source_datapipe
self.sharding_group_filter = sharding_group_filter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need an extra API to set sharding_group_filter?

Based on the implementation, it seems sharding_group_filter is an integer, could we change it set or list to support multiple filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no use-cases for it, but it will be trivial to change later if we require to.

Comment on lines +60 to +64
if self.sharding_group_filter is None:
sorted_sharding_groups.append(self.groups[key])
else:
if key == self.sharding_group_filter:
sorted_sharding_groups.append(self.groups[key])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

if self.sharding_group_filter is None or key == self.sharding_group_filter:
    sorted_sharding_groups.append(self.groups[key])

def apply_sharding(self, num_of_instances, instance_id):
self.num_of_instances = num_of_instances
self.instance_id = instance_id
def apply_sharding(self, num_of_instances, instance_id, sharding_group=SHARDING_PRIORITIES.DEFAULT):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit: Could we add a validation that instance_id < num_of_instances?


with self.assertRaises(Exception):
dp.apply_sharding(2, 1, sharding_group=SHARDING_PRIORITIES.DEFAULT)
dp.apply_sharding(5, 3, sharding_group=SHARDING_PRIORITIES.MULTIPROCESSING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a separate context of self.assertRaises for the second Error?

@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

VitalyFedyunin added a commit that referenced this pull request Nov 4, 2022
ghstack-source-id: 9bf016c
Pull Request resolved: #88424
VitalyFedyunin added a commit that referenced this pull request Nov 7, 2022
ghstack-source-id: a6a1a75
Pull Request resolved: #88424
@VitalyFedyunin
Copy link
Contributor Author

@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

@VitalyFedyunin
Copy link
Contributor Author

/easycla

1 similar comment
@VitalyFedyunin
Copy link
Contributor Author

/easycla

@facebook-github-bot
Copy link
Contributor

@pytorchbot merge

(Initiating merge automatically since Phabricator Diff has merged)

@pytorch-bot pytorch-bot bot added the ciflow/trunk Trigger trunk jobs on your pull request label Nov 7, 2022
@pytorchmergebot
Copy link
Collaborator

Merge started

Your change will be merged once all checks pass (ETA 0-4 Hours).

Learn more about merging in the wiki.

Questions? Feedback? Please reach out to the PyTorch DevX Team

Advanced Debugging
Check the merge workflow status
here

facebook-github-bot pushed a commit to meta-pytorch/data that referenced this pull request Dec 7, 2022
Summary:
After pytorch/pytorch#88424 is landed, we are able to invoke `apply_sharding` by sharding levels (distributed or multiprocessing). Then, we are able to give fine-control on sharding by `ReadingService`.
- For `DistributedReadingService`, we will only set sharding on the distributed level
- For `PrototypeMPReadingService`, we will set distributed sharding in the main process and set mp sharding in the worker processes. Previously, we set sharding in each worker process based on both distributed and mp information.
  - `worker_init_fn` doesn't need `DistInfo` anymore. As, the `DataPipe` has been distributed sharded in the main process.
  - Combine `DistInfo` and `ExtraInfo` for `worker_reset_fn` to synchronize the distributed seeds across distributed workers and set worker-local seeds based on both distributed and mp information.

Pull Request resolved: #916

Reviewed By: mingyuzh

Differential Revision: D41776719

Pulled By: ejguan

fbshipit-source-id: 6042da09f5e83019d536696237028ea20e67d110
kulinseth pushed a commit to kulinseth/pytorch that referenced this pull request Dec 10, 2022
raise RuntimeError('This implementation of sharding can be only applied once per instance of DataPipeline.',
'Already applied to', already_applied_to, 'while trying to apply to', pipe)
pipe.apply_sharding(num_of_instances, instance_id)
pipe.apply_sharding(num_of_instances, instance_id, sharding_group=sharding_group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noob question: does is_shardable and apply_sharding only exists in ShardingFilterIterDataPipe ?

Also if there is no ShardingFilterIterDataPipe, look like no sharding will happen? (shall we error in that case? ) :)

cc @ejguan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ciflow/trunk Trigger trunk jobs on your pull request Merged release notes: dataloader release notes category

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants