Conversation
| return self.error is not None | ||
|
|
||
|
|
||
| class _PrefetchExecutor: |
There was a problem hiding this comment.
This executor takes reference from the internal implementation: https://fburl.com/code/7dk6mvs4
On top of the implementation, I added prefetch_size and attached index to Expected object to make sure it can work with Prefetch in the future.
|
Adding test now. |
ffe3a1e to
fd98a28
Compare
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
test/test_distributed.py
Outdated
| data_length = 23 | ||
| dp = IterableWrapper(list(range(data_length))).sharding_filter().fullsync() |
There was a problem hiding this comment.
Without fullsync, this pipeline would hang forever.
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| # LICENSE file in the root directory of this source tree. | ||
|
|
||
| # Use the same timeout as PyTorch Distributed | ||
| default_timeout_in_s = 30 * 60 |
There was a problem hiding this comment.
Just a comment (no change) - we should put other things such as default buffer size here too.
| which is caused by uneven sharded data (functional name: ``fullsync``). It should | ||
| be appended at the end of the graph of ``DataPipe`` by ``DistributedReadingService`` | ||
| automatically. | ||
|
|
There was a problem hiding this comment.
Question: do we recommend against usage of this DataPipe outside of a ReadingService? If not, can we potentially include an example?
There was a problem hiding this comment.
Makes sense. Will add it even though we should always recommend users relying on RS
|
@ejguan has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator. |
| def __getstate__(self): | ||
| if IterDataPipe.getstate_hook is not None: | ||
| return IterDataPipe.getstate_hook(self) | ||
| state = ( | ||
| self.datapipe, | ||
| self.timeout, | ||
| ) | ||
| return state |
There was a problem hiding this comment.
IMHO, checkpoint for fullsync or prefetch is a little tricky.
Let's confirm the expected behavior. When we do checkpoint, we should pause any further prefetching and save all prefetched data into a buffer. Then, we serialize the buffer ant inner datapipe (because we have to serialize datapipe after prefetching is done). And, only when we start iteration again, would we start prefetching again.
WDYT: @VitalyFedyunin @NivekT
Then, the whole logic of fullsync should be changed. This is even more complicated when the data ends when put the prefetched data into the buffer. I might open a new PR to achieve serialization.
There was a problem hiding this comment.
Yea I think we should stop the prefetch and capture current data. I feel this can be similar to internal client snapshot, so https://fburl.com/code/6hrjawgh may be helpful for reference
|
I will land this PR for now. List two follow-up works:
|
Changes
_PrefetchExecutorto run prefetching in multi-threadingPrefetchIterDataPipeFullSyncIterDataPipeFullSyncIterDataPipeis unclear to me)