Fully exhaust datapipes that are needed to construct a dataset#6076
Fully exhaust datapipes that are needed to construct a dataset#6076pmeier merged 15 commits intopytorch:mainfrom
Conversation
|
After some offline discussion with @NicolasHug, it became clear that the situation is not as straight forward as I thought it was. For example, the child datapipes from the @ejguan @NivekT @VitalyFedyunin what is the correct way here? |
|
IMHO, it depends on your Dataset:
label_dp = resource_dp.filter(is_meta_file)
labels = list(label_dp)
image_dp = resource_dp.filter(is_image_file)
label_dp, image_dp = resource_dp.demux(classify_fn, 2)
label_dict = label_dp. to_map_datapipe()
image_dp.zip_with_map(label_dict)
... |
That only matters if you have read one of your child DataPipes eagerly during the construction (it seems that is no longer the case after your code change in this PR since it no longer calls |
|
So for my own understanding: the result of @ejguan's proposal was to use the map together with a dp = ...
map = IterToMapConverter(dp)
other_dp = ...
other_dp = Mapper(other_dp, map.__getitem__, input_col=...)Do you see any downside with that? Regardless of the approach chosen, I found that using from time import perf_counter
import torch
from torchvision.prototype import datasets
for name, config in [
("imagenet", dict(split="val")),
("cub200", dict(year="2011")),
]:
warmup_times = []
for _ in range(5):
tic = perf_counter()
for sample in datasets.load(name, **config):
break
tac = perf_counter()
warmup_times.append(tac - tic)
print(f"Warmup for {name} took on average {float(torch.tensor(warmup_times).mean()):.2f} seconds")Running this on Running it on this PR in the current state prints Any idea what could cause this? I'm aware that meta-pytorch/data#454 proposes a speed-up, but the implementation on |
|
|
||
| bounding_boxes_dp = CSVParser(bounding_boxes_dp, dialect="cub200") | ||
| bounding_boxes_dp = Mapper(bounding_boxes_dp, image_files_map.get, input_col=0) | ||
| bounding_boxes_dp = Mapper(bounding_boxes_dp, image_files_map.__getitem__, input_col=0) |
There was a problem hiding this comment.
This is a good trick to have the similar behavior as zip_with_map
cc: @NivekT
pmeier
left a comment
There was a problem hiding this comment.
This PR indeed solves the issue reported in #6515. The CI https://github.com/pytorch/vision/runs/8089441510 ran before the fresh nightly hit. There are still errors, but they are unrelated and reported in pytorch/pytorch#80267 (comment).
Maybe we can have another go at this PR given that it has a much narrower scope than #6128? cc @NivekT @ejguan
NivekT
left a comment
There was a problem hiding this comment.
I agree with your PR description:
- I agree that we don't want the buffer of
demuxto have anything in it prior to the start of the iteration of the final DataPipe. - Fully exhaust the DataPipe (so that it resets the next time it starts)or avoid using
demuxshould prevent the issue stated in 1.
If that is the goal and then LGTM. We can check if the buffer is empty before the next start if demux used in both iteration.
I do have a question:
Is the issue related to ResourceWarning: unclosed (which only happens for Python 3.9 Windows) caused by what you described above, or is that separate? I think #6128 is trying to fix that.
Yes, this is related. Although #6128 patches more things, I think this PR is sufficient to get rid of the warnings, given that they were always related to items left in a |
…et (#6076) Reviewed By: jdsgomes Differential Revision: D39543282 fbshipit-source-id: c43b9bc0acde33e9b2aa56402dae69a47ccd22d2
Failures that will be fixed by this PR are only visible on Python >= 3.8. See #6065 for details.
Some datasets store a file inside an archive that needs to be read completely in order to construct the final datapipe. While it is tempting to do this in one
Demultiplexer, this has the downside that (depending on how the archive is structured), theDemultiplexernow has items in the buffer before we actually start iterating on the final datapipe.To avoid that we should always exhaust datapipes completely in case we need them while constructing another. This means, there are two changes to be made:
classify_fnof theDemultiplexerand use aFilterto extract it separately.next(iter(dp))withlist(dp)to make suredpis exhausted.