Closing streams to avoid testing issues#6128
Closing streams to avoid testing issues#6128VitalyFedyunin wants to merge 10 commits intogh/VitalyFedyunin/1/basefrom
Conversation
[ghstack-poisoned]
| ) | ||
| if self._split == "train_noval": | ||
| for i in split_dp: | ||
| StreamWrapper.cleanup_structure(i) |
There was a problem hiding this comment.
Noob question: What is the functionality of clean_structure?
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
pmeier
left a comment
There was a problem hiding this comment.
Just dropping in to add some context. Note that the spurious errors were not visible on Python 3.7, which is currently the only version our CI tests against. Either merge #6065 first or at least activate the other versions temporarily to see if this PR actually fixes them.
| ann_path, ann_buffer = ann_data | ||
|
|
||
| image = EncodedImage.from_file(image_buffer) | ||
| image_buffer.close() |
There was a problem hiding this comment.
The errors we have seen in our test suite have never been with these files, but only with archives.
There was a problem hiding this comment.
Tests complain that archive stream is not closed. This is because child (unpacked file stream) also remains open and referencing parent. In pytorch/pytorch#78952 and meta-pytorch/data#560 we introduced mechanism to close parent steams when every child is closed.
As a topic for discussion. When GC is not helping, we have to manually close streams. I have some prototypes/ideas how we can add debug info to find such leftovers. [ghstack-poisoned]
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
|
Rebased, would be nice to land to clean torchdata's dependency tests. |
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually. [ghstack-poisoned]
There was a problem hiding this comment.
Thanks for the effort @VitalyFedyunin! I left some questions and suggestions inline. If we have reached consensus on everything, I can take over and implement it if you want me to.
| image_buffer.close() | ||
| ann = read_mat(ann_buffer) | ||
| ann_buffer.close() |
There was a problem hiding this comment.
Instead of doing that in every dataset individually, can't we just do it in
and
? I think so far we don't have a case where we need to read from the same file handle twice. Plus, that would only work if the stream is seekable, which I don't know if we can guarantee.
| def __iter__(self) -> Iterator[Tuple[str, Dict[str, str]]]: | ||
| for _, file in self.datapipe: | ||
| file = (line.decode() for line in file) | ||
| for _, fh in self.datapipe: |
There was a problem hiding this comment.
I'm ok with the closing here, but why the rename? Can you revert that?
| for i in scenes_dp: | ||
| janitor(i) |
There was a problem hiding this comment.
- Can we make the loop variable more expressive?
- Can we use
torchdata.janitorinstead to make it more clear where this is coming from?
| for i in scenes_dp: | |
| janitor(i) | |
| for _, file in scenes_dp: | |
| janitor(file) |
Plus, do we even need to use torchdata.janitor here? Would just .close() by sufficient?
| for i in scenes_dp: | |
| janitor(i) | |
| for _, file in scenes_dp: | |
| file.close() |
| anns, image_meta = ann_data | ||
|
|
||
| sample = self._prepare_image(image_data) | ||
|
|
There was a problem hiding this comment.
Could you revert the formatting changes?
|
|
||
| def _prepare_image(self, data: Tuple[str, BinaryIO]) -> Dict[str, Any]: | ||
| path, buffer = data | ||
| image = close_buffer(EncodedImage.from_file, buffer) |
There was a problem hiding this comment.
If EncodedImage.from_file closes automatically we also don't need this wrapper.
| for i in split_dp: | ||
| janitor(i) |
There was a problem hiding this comment.
Plus, don't we need to do the same on extra_split_dp in the else branch?
|
|
||
|
|
||
| def close_buffer(fn: Callable, buffer: IO) -> Any: |
There was a problem hiding this comment.
I think this was only used once and can be superseded if our read functions clean up after themselves. So this can probably be removed.
|
|
||
| try: | ||
| sample = next(iter(dataset)) | ||
| iterator = iter(dataset) |
There was a problem hiding this comment.
Instead of sticking with the iterator pattern here, can't we just simply do
samples = list(dataset)
if not samples:
raise AssertionError(...)
sample = samples[0]
...| iterator = iter(dataset) | ||
| one_element = next(iterator) |
| if len(StreamWrapper.session_streams) > 0: | ||
| raise Exception(StreamWrapper.session_streams) |
There was a problem hiding this comment.
Could you explain what this does? Is StreamWrapper.session_streams just a counter for open streams? If yes, why are we only testing this here and not in the other tests? If this is something we should check in general, we can use a decorator like
def check_unclosed_streams(test_fn):
@functools.wraps(test_fn)
def wrapper(*args, **kwargs):
if len(StreamWrapper.session_streams) > 0:
raise pytest.UsageError("Some previous test didn't clean up")
test_fn(*args, **kwargs)
if len(StreamWrapper.session_streams) > 0:
raise Assertion("This test didn't clean up")
return wrapper
@VitalyFedyunin In #6647 I redid this PR with all my suggested changes. We can take the discussion there if you want. |
|
Hi @VitalyFedyunin! Thank you for your pull request. We require contributors to sign our Contributor License Agreement, and yours needs attention. You currently have a record in our system, but the CLA is no longer valid, and will need to be resubmitted. ProcessIn order for us to review and merge your suggested changes, please sign at https://code.facebook.com/cla. If you are contributing on behalf of someone else (eg your employer), the individual CLA may not be sufficient and your employer may need to sign the corporate CLA. Once the CLA is signed, our tooling will perform checks and validations. Afterwards, the pull request will be tagged with If you have received this in error or have any questions, please contact us at cla@meta.com. Thanks! |
|
Superseded by #6647. |
Stack from ghstack (oldest at bottom):
GC workaround is not working inside of test environments as they are keeping references to FH. So we have to close FH manually.