Fixed flaky test-rearrange#5977
Conversation
Puts file cleanup in the task graph, rather than relying on Partd.
|
@mrocklin you mentioned that you have concerns about this approach. Can you share them when you get a chance, before I put more time into finishing it off / finding an alternative? |
mrocklin
left a comment
There was a problem hiding this comment.
OK, so now I see that we're still using the existing cleanup mechanism in partd, so presumably this is unlikely to be less robust. I guess I'm coming around. I do have a few comments below though.
| p.append(d, fsync=True) | ||
| except Exception: | ||
| try: | ||
| p.drop() |
There was a problem hiding this comment.
I think this matches the behavior on master of a case where something in shuffle_group_3 raises an exception. Previously, p would go out of scope and be garbage collected, which includes a call to partd.File.drop.
Because we're explicitly providing a path to create the partd.File now, we're responsible for cleaning up.
| path = None | ||
|
|
||
| if path: | ||
| shutil.rmtree(path, ignore_errors=True) |
There was a problem hiding this comment.
It's low priority, but some folks might create their own PartD objects here. In the future we might want a more robust solution to finding a File object.
dask/dataframe/shuffle.py
Outdated
| (name1, i): (collect, p, i, df._meta, barrier_token) for i in range(npartitions) | ||
| } | ||
| cleanup_token = "cleanup-" + always_new_token | ||
| dsk5 = {cleanup_token: (cleanup_partd_files, p, list(dsk4))} |
There was a problem hiding this comment.
I think that you're bringing back all of the results into a single task with list(dsk4).
If we do this approach then we might have to have an intermediary set of tasks that take in the results of dsk4 and return None for each. In this way we maintain the dependencies, but don't move the data around.
There was a problem hiding this comment.
Mmm OK. Do you know, does dsk3 at https://github.com/dask/dask/pull/5977/files#diff-83ae5352ddc87bd80c831102addd9b1eL407 have a similar problem? Perhaps that result isn't as large.
There was a problem hiding this comment.
I don't think so, my guess is that the output tasks in dsk2 write things to disk and then return None.
The output tasks of dsk4 read from disk and return dataframes, so this is more of a concern.
Instead, I think that the solution is to have a dsk4b that maps lambda x: None across the outputs of dsk4.
dask/dataframe/shuffle.py
Outdated
| (name1, i): (collect, p, i, df._meta, barrier_token) for i in range(npartitions) | ||
| } | ||
| cleanup_token = "cleanup-" + always_new_token | ||
| dsk5 = {cleanup_token: (cleanup_partd_files, p, list(dsk4))} |
There was a problem hiding this comment.
Mmm OK. Do you know, does dsk3 at https://github.com/dask/dask/pull/5977/files#diff-83ae5352ddc87bd80c831102addd9b1eL407 have a similar problem? Perhaps that result isn't as large.
| p.append(d, fsync=True) | ||
| except Exception: | ||
| try: | ||
| p.drop() |
There was a problem hiding this comment.
I think this matches the behavior on master of a case where something in shuffle_group_3 raises an exception. Previously, p would go out of scope and be garbage collected, which includes a call to partd.File.drop.
Because we're explicitly providing a path to create the partd.File now, we're responsible for cleaning up.
| compute_divisions(c) | ||
|
|
||
|
|
||
| # TODO: Fix sporadic failure on Python 3.8 and remove this xfail mark |
There was a problem hiding this comment.
I've removed this test, but I can restore it if needed.
It's asserting that there are some files left around, but I'm not sure that we want that / can reliably assert that. My understanding was that we wanted them to be cleaned up automatically.
This concerns me. We will often fail at this. What is some exception happens during computation and we never get to the cleanup step? |
1b704d7 attempts to make this clearer. The idea is to wrap calls using the |
TomAugspurger
left a comment
There was a problem hiding this comment.
Gave this another look and I'm sufficiently happy with where it's at.
|
Thanks for your work on this @TomAugspurger |

Puts file cleanup in the task graph, rather than relying on Partd to do it for us. I'm not really happy with my approach, but wanted to try this on CI a few times.
Closes #5867