Implementing thread based PrefetcherIterDataPipe#770
Implementing thread based PrefetcherIterDataPipe#770VitalyFedyunin wants to merge 8 commits intogh/VitalyFedyunin/21/basefrom
Conversation
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
ejguan
left a comment
There was a problem hiding this comment.
A few comments about threading below.
| time.sleep(PRODUCER_SLEEP_INTERVAL) | ||
|
|
||
| def __iter__(self): | ||
| self.reset() |
| # TODO: Potential optimization is changing buffer from list to dequeue | ||
| self.prefetch_buffer = [] |
There was a problem hiding this comment.
Yeah. I agree changing to deque because it's thread safe.
|
|
||
| class _PrefetchData: | ||
| def __init__(self, source_datapipe, buffer_size): | ||
| self.run_prefetcher = True |
There was a problem hiding this comment.
And, should we add a thread lock around run_prefetcher?
There was a problem hiding this comment.
my thread lock is GIL =)
| except communication.iter.InvalidStateResetRequired: | ||
| stop_iteration = True | ||
| except communication.iter.TerminateRequired: | ||
| prefetch_data.run_prefetcher = False |
There was a problem hiding this comment.
Do we need to handle those two exceptions? communication is more or less a sub module for MultiprocessingReadingService. I personally feel better to remove those exceptions from a DataPipe
There was a problem hiding this comment.
I have to it here, because it is a separate thread that needs to be terminated nicely in case of source Datapipe is out of commission.
There was a problem hiding this comment.
Oh, I see. This prefetch is not only attached to the main problem but also to child processes.
[ghstack-poisoned]
[ghstack-poisoned]
[ghstack-poisoned]
ejguan
left a comment
There was a problem hiding this comment.
LGTM
I assume there will be test added after the corresponding changes are made into PrototypeMPRS.
NivekT
left a comment
There was a problem hiding this comment.
Do we intend for this DataPipe to be user-facing? (I'm guessing not?) If it is, adding a docstring and adding this to torchdata.datapipes.iter.rst will be good.
| def reset_iterator(self): | ||
| self.reset() |
There was a problem hiding this comment.
Question: What is the expected behavior of reset_iterator in PrototypeRS? How is that different from the usual DataPipe reset?
| if self.buffer_size < 1: | ||
| yield from self.source_datapipe |
There was a problem hiding this comment.
This case should not be possible because of the check in __init__
| prefetch_data = _PrefetchData(self.source_datapipe, self.buffer_size) | ||
| self.prefetch_data = prefetch_data |
There was a problem hiding this comment.
nit: Rename these to prefetcher_thread_worker or something?
|
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Differential Revision: [D39816751](https://our.internmc.facebook.com/intern/diff/D39816751) [ghstack-poisoned]
|
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Differential Revision: [D39816751](https://our.internmc.facebook.com/intern/diff/D39816751) [ghstack-poisoned]
|
@VitalyFedyunin has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
Stack from ghstack (oldest at bottom):
Differential Revision: D39816751