Remove "cleanup" task in dataframe on-disk shuffle#7260
Conversation
is already cleaned up by other means.
|
Apologies for the difficulties #5977 caused.
I don't think that's the case, at least it wasn't the intention. It's been a while, bu5t my hope was that https://github.com/dask/dask/pull/5977/files#diff-f1e38476dc4a4713fecd130660c4c21dd8122aa3bef55d05c1452a25961f3a86R420-R425 would create a task graph where the later tasks depend on just a small result indicating that the shuffle stage was finished. |
|
@TomAugspurger I could easily be misunderstanding how the task graph works. (First time contributor!) Looking some more at what More broadly: How can we cleanup the partd directory midway through a computation without already having read everything from the partd file into memory? Like it would make sense to me if the |
|
Thanks for the additional info! I won't really have time to dig into this, but I'm sufficiently convinced. Can you try running this a bunch of times locally on this branch to see if you can surfaced the original failure in #5867? I don't recall if I could reproduce it with cc @jrbourbeau, this could probably go in before the release if @sinclairtarget has a chance to stress test it locally. |
|
The release has been pushed out to next week, so that should take so pressure off. |
|
@TomAugspurger I've run the test that surfaced that original error 100 times with: Pretty low-tech but saw no errors. Let me know if there's anything else I can do. |
|
I also checked out 2.13.0 (which I think was the last release before #5977 went in) and ran the same test 100 times, but sadly didn't see an error. |
|
Ok it seems like we can merge this and keep an eye out for that flaky test. Does that sound reasonable @TomAugspurger? |
|
Thanks @sinclairtarget! |
* upstream/master: (43 commits) bump version to 2021.03.0 Bump minimum version of distributed (dask#7328) Fix `percentiles_summary` with `dask_cudf` (dask#7325) Temporarily revert recent Array.__setitem__ updates (dask#7326) Blockwise.clone (dask#7312) NEP-35 duck array update (dask#7321) Don't allow setting `.name` for array (dask#7222) Use nearest interpolation for creating percentiles of integer input (dask#7305) Test `exp` with CuPy arrays (dask#7322) Check that computed chunks have right size and dtype (dask#7277) pytest.mark.flaky (dask#7319) Contributing docs: add note to pull the latest git tags before pip installing Dask (dask#7308) Support for Python 3.9 (dask#7289) Add broadcast-based merge implementation (dask#7143) Add split_every to graph_manipulation (dask#7282) Typo in optimize docs (dask#7306) dask.graph_manipulation support for xarray.Dataset (dask#7276) Add plot width and height support for Bokeh 2.3.0 (dask#7297) Add numpy functions tri, triu_indices, triu_indices_from, tril_indices, tril_indices_from (dask#6997) Remove "cleanup" task in dataframe on-disk shuffle. The partd directory (dask#7260) ...



This PR removes the partd temporary directory cleanup task from
rearrange_by_column_diskthat was added by #5977.Why was it there in the first place?
As best as I can tell, there was a nasty race condition where one process could delete a temporary partd directory that another process was still using. See #5867.
This behavior arose because if you let partd create the temporary directory for you, it will delete it automatically when the
partd.Fileobject gets GCed. So thepartd.Fileobject going out of scope in one process caused issues for the other process.So in #5977, a fix was introduced that basically involved managing the lifetime of the temp directory in Dask rather than letting partd do it.
What problems did it cause?
See #7259. As best as I can tell, the cleanup task makes using partd (as a means of avoiding keeping everything in memory) basically useless, because I'm pretty sure all the partitions are now read from partd before any further computation is done.
If there were a cleanup task in the graph, it would really need to be at the very end of the task chain, no? The way the dependencies are set up now, there's no way for the scheduler to be smart about reading one partition from partd and reducing it according to whatever subsequent tasks are in the graph before reading the next partition from partd.
You're just deleting stuff. Surely that breaks things?
Maybe? But here's my reasoning why not:
/tmpfrom filling up with lots of partd directories while the process was still going. I don't know how much of a concern this is.partd.Filewon't do that if you give it a path. I'm unclear on why usingpartd.file.cleanup_filesdoesn't cause this same issue, but that's behavior that's already inmaster.Does this address the motivating issue?
Here is the resource profiler output for the sort script I was running in #7259, using this branch:
black dask/flake8 dask