Guarantee datapipe being reset iterator when all loops have received reset request in the dispatching process#994
Guarantee datapipe being reset iterator when all loops have received reset request in the dispatching process#994ejguan wants to merge 6 commits intometa-pytorch:mainfrom
Conversation
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
test/test_remote_io.py
Outdated
| [["s3://ai2-public-datasets/charades/"], 18], # folder without '/' | ||
| [["s3://ai2-public-datasets/charad"], 18], # prefix | ||
| [ | ||
| ( |
| def __del__(self): | ||
| try: | ||
| self.finalize() | ||
| except AttributeError: | ||
| pass | ||
|
|
There was a problem hiding this comment.
Just a comment - a little bit surprised we need this given ProtoMultiRS's finalize seems to be catching every possible AttributeError? Maybe the issue is Distributed? Thoughts?
There was a problem hiding this comment.
ProtoMPRS doesn't handle all of AttributeError like the self._worker_processes from
https://github.com/pytorch/data/blob/98222ad72ee7a29e676646e6b3f9173576410320/torchdata/dataloader2/reading_service.py#L345
Technical speaking, I should remove those try-except clauses in finalize to simplify the codebase
| # Ensure only reset iterator once for the dispatching process | ||
| if reset_iterator_counter is not None: | ||
| reset_iterator_counter.increment() | ||
| while not reset_iterator_counter.is_reached(): | ||
| yield True | ||
| # Sync between loops within the dispatching process | ||
| source_datapipe.reset_iterator() | ||
| yield True | ||
| reset_iterator_counter.reset() |
There was a problem hiding this comment.
You might need to do something similar to this for resume dispatching process. cc: @NivekT
There was a problem hiding this comment.
Just to confirm I understand - this is to handle the situation where some workers are handling GetNextRequest while some are trying to reset? You want all GetNext to be done before the dispatching process executes reset?
There was a problem hiding this comment.
Not workers. It only happens to the dispatching process when multiple leaf DataPipes shares the same data source (Round robin demux on the same DataPipe) in a single process.
It handles the case when some loops have received reset while the others haven't. We want to wait to request getNext until all loops have received reset. Otherwise, there will be a case that the data source is reset during the middle of iteration for other loops
|
I will rerun all tests tmrw when PyTorch nightly has been updated. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
NivekT
left a comment
There was a problem hiding this comment.
LGTM! Hope the nightly gets uploaded and all the CIs get fixed
| def __del__(self): | ||
| try: | ||
| self.finalize() | ||
| except AttributeError: | ||
| pass | ||
|
|
There was a problem hiding this comment.
Just a comment - a little bit surprised we need this given ProtoMultiRS's finalize seems to be catching every possible AttributeError? Maybe the issue is Distributed? Thoughts?
| if self._reached: | ||
| return self._reached |
There was a problem hiding this comment.
nit: These two lines can be removed? But I guess it is slightly faster, so I'm indifferent.
There was a problem hiding this comment.
Lol, you are right. I will remove them tmrw
| # Ensure only reset iterator once for the dispatching process | ||
| if reset_iterator_counter is not None: | ||
| reset_iterator_counter.increment() | ||
| while not reset_iterator_counter.is_reached(): | ||
| yield True | ||
| # Sync between loops within the dispatching process | ||
| source_datapipe.reset_iterator() | ||
| yield True | ||
| reset_iterator_counter.reset() |
There was a problem hiding this comment.
Just to confirm I understand - this is to handle the situation where some workers are handling GetNextRequest while some are trying to reset? You want all GetNext to be done before the dispatching process executes reset?
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
The test takes way more time to finish. However, I can't really reproduce it either on linux or mac Edit: Find the culprit test |
|
And, I am going to remove all S3 related commits. To fix S3 test, I plan to rely on #997 |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Changes
Fix S3 Testsin Fix test_remote_io.py due to mutating public s3 bucket #997gcgets involvedthreadfromPrefetcher. This would prevent racing condition when bothfinallyin generator andresetfunction are accessing the samethread.