[PrototypeRS] Adding 'pause' and 'resume' operations to halt DataPipes#879
[PrototypeRS] Adding 'pause' and 'resume' operations to halt DataPipes#879NivekT wants to merge 45 commits intogh/NivekT/100/basefrom
Conversation
[ghstack-poisoned]
| graph = traverse_dps(source_datapipe) | ||
| for dp, _ in graph.values(): | ||
| if hasattr(dp, "full_stop") and callable(dp.full_stop): | ||
| dp.full_stop() | ||
| protocol.response_full_stop() |
There was a problem hiding this comment.
Is there any potential issue with traversing through the graph and calling .full_stop() on all of them?
|
Another thing to discuss: should we do double underscore or something to protect |
The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
NivekT
left a comment
There was a problem hiding this comment.
Updated the implementation but I still need some test cases. Suggestions are welcomed.
ejguan
left a comment
There was a problem hiding this comment.
Can we change the name to pause? full stop seems like an operation ran at the end epoch
The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
|
I forget to mention about one scenario. If I iterate datapipe after >>> dp.pause()
>>> for d in dp:
... print(d) # Nothing should be printed
>>> dp.resume()
>>> for d in dp:
... print(d)
# Values |
If you try to call |
If we want to achieve mini-epoch for OSS as well, we might need it to be |
|
I see. I think what we can do is: Inside the internal of PrototypeRS, if it cannot submit a Publicly, DataLoader2 can behave different (maybe like you said return |
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
ejguan
left a comment
There was a problem hiding this comment.
The PR looks good in terms of test cases. I left a few comments below
| self._executor = None | ||
|
|
||
| def resume(self): | ||
| self._executor = _PrefetchExecutor(iter(self.datapipe), 1, self._callback_fn, self.timeout) |
There was a problem hiding this comment.
It seems that the resume for FullSync is going to re-read from the beginning. Is this expected? Or, we can raise Exception for fullsync saying it doesn't support checkpointing yet.
There was a problem hiding this comment.
Is there a way for it not to read from the beginning again? Halting the executor by changing __iter__?
For now, I can raise an Exception in both pause and resume.
|
CI is expected to fail until PyTorch core nightly updates |
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
|
@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
|
Addressed and resolved most of the comments but 2. CI is being re-ran after nightly updated. |
ejguan
left a comment
There was a problem hiding this comment.
Overall LGTM. Thank you. I left a few comments to help simplify the test and a comment for internal users
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
|
@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
|
@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
…lt DataPipes" The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot. This PR doesn't handle pausing or resuming with `sharding_round_robin_dispatch`. This will be fixed in an upcoming PR and is tracked by #991. --- Implementation note (feel free to discuss): `_pause` exists for both `DataLoader2Iterator` and `DataLoader2` as private methods. `resume` exists for both `DataLoader2Iterator` as public but it is private for `DataLoader2`. `limit` and `clear_limit` exist only for `DataLoader2Iterator` as a public methods. - Limit persists through iterations (with the iterator) until it is manually cleared via `clear_limit`. - Creating a new iterator (`iter(dl2)`) will not retain the previous limit, because the limit is attached to the iterator. Differential Revision: [D41744759](https://our.internmc.facebook.com/intern/diff/D41744759) [ghstack-poisoned]
|
@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@NivekT has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| self.thread = thread | ||
| self.thread.start() | ||
|
|
||
| while prefetch_data.run_prefetcher: |
There was a problem hiding this comment.
One more thing: Should this line be while not prefetch_data.stop_iteration:? The current implementation means that iterator exits earlier than it's supposed to be.
It's a little bit fuzzy to me if we are able to call next over the iterator of this DataPipe after pause.
There was a problem hiding this comment.
I guess my main question is if pause of DataPipe is exposed to users.
There was a problem hiding this comment.
I think you are right. Though from a glance changing that while condition will also require changing the exception conditions.
Particularly, it should become except StopIteration: prefetch_data.run_prefetcher = False and the same for except communication.iter.InvalidStateResetRequired.
Overall, it should be:
prefetch_data.run_prefetcher - indicates whether or not we should be prefetching, it should become False when the source runs out of elements, invalid state, termination, and maybe other exception raised.
prefetch_data.stop_iteration - this should. indicate when we should stop yielding elements, it should mostly be the same as above except when the source is out of elements, it should keep yielding until the buffer is empty.
There maybe other corner cases I have not stated here. We can test it out and see.
| except StopIteration: | ||
| prefetch_data.stop_iteration = True | ||
| except communication.iter.InvalidStateResetRequired: | ||
| prefetch_data.stop_iteration = True |
There was a problem hiding this comment.
Related to my last comment. stop_iteration should dominate run_prefetcher. When stop_iterator is met, there is no reason to process with prefetching, right?
Whenever stop_iteration turns to True, run_prefetcher should be False
There was a problem hiding this comment.
What you stated should be True, but it requires changing other parts as well.
Stack from ghstack:
The goal of this PR is fully stop DataPipe activities in preparation of snapshotting (which requires a halted state), so buffers will not be changing while we record the snapshot.
This PR doesn't handle pausing or resuming with
sharding_round_robin_dispatch. This will be fixed in an upcoming PR and is tracked by #991.Implementation note (feel free to discuss):
_pauseexists for bothDataLoader2IteratorandDataLoader2as private methods.resumeexists for bothDataLoader2Iteratoras public but it is private forDataLoader2.limitandclear_limitexist only forDataLoader2Iteratoras a public methods.clear_limit.iter(dl2)) will not retain the previous limit, because the limit is attached to the iterator.Differential Revision: D41744759