Skip to content

Remove "cleanup" task in dataframe on-disk shuffle#7260

Merged
jsignell merged 1 commit intodask:masterfrom
sinclairtarget:shuffle-cleanup
Mar 1, 2021
Merged

Remove "cleanup" task in dataframe on-disk shuffle#7260
jsignell merged 1 commit intodask:masterfrom
sinclairtarget:shuffle-cleanup

Conversation

@sinclairtarget
Copy link
Contributor

This PR removes the partd temporary directory cleanup task from rearrange_by_column_disk that was added by #5977.

Why was it there in the first place?

As best as I can tell, there was a nasty race condition where one process could delete a temporary partd directory that another process was still using. See #5867.

This behavior arose because if you let partd create the temporary directory for you, it will delete it automatically when the partd.File object gets GCed. So the partd.File object going out of scope in one process caused issues for the other process.

So in #5977, a fix was introduced that basically involved managing the lifetime of the temp directory in Dask rather than letting partd do it.

What problems did it cause?

See #7259. As best as I can tell, the cleanup task makes using partd (as a means of avoiding keeping everything in memory) basically useless, because I'm pretty sure all the partitions are now read from partd before any further computation is done.

If there were a cleanup task in the graph, it would really need to be at the very end of the task chain, no? The way the dependencies are set up now, there's no way for the scheduler to be smart about reading one partition from partd and reducing it according to whatever subsequent tasks are in the graph before reading the next partition from partd.

You're just deleting stuff. Surely that breaks things?

Maybe? But here's my reasoning why not:

  1. There is this line, also added in Fixed flaky test-rearrange #5977, that ensures that the temporary directory is cleaned up on program exit. So the temp directory still gets cleaned up, just not until the process exits. Perhaps this could be an issue for a long-running process that generates lots of partd files? The prior behavior would have prevented /tmp from filling up with lots of partd directories while the process was still going. I don't know how much of a concern this is.
  2. As far as the original issue (Flaky test test_rearrange #5867) goes, because we're still passing our own temp directory path to partd, we don't have the GC-triggered directory deletion problem. partd.File won't do that if you give it a path. I'm unclear on why using partd.file.cleanup_files doesn't cause this same issue, but that's behavior that's already in master.

Does this address the motivating issue?

Here is the resource profiler output for the sort script I was running in #7259, using this branch:

image

@TomAugspurger
Copy link
Member

Apologies for the difficulties #5977 caused.

because I'm pretty sure all the partitions are now read from partd before any further computation is done.

I don't think that's the case, at least it wasn't the intention. It's been a while, bu5t my hope was that https://github.com/dask/dask/pull/5977/files#diff-f1e38476dc4a4713fecd130660c4c21dd8122aa3bef55d05c1452a25961f3a86R420-R425 would create a task graph where the later tasks depend on just a small result indicating that the shuffle stage was finished.

@sinclairtarget
Copy link
Contributor Author

@TomAugspurger I could easily be misunderstanding how the task graph works. (First time contributor!)

Looking some more at what _noop returns (it looks like the key of the relevant collect-1 task?), is the idea that the key gets resolved into the partition again later somehow? But if that's supposed to happen after the partd directory has been deleted, where can the partition come from if not somewhere in memory?

More broadly:

How can we cleanup the partd directory midway through a computation without already having read everything from the partd file into memory?

Like it would make sense to me if the cleanup task appeared at the end of the below task graph (from my sort example) after the write_partition tasks. But if the write_partition tasks all indirectly depend on cleanup, doesn't that mean that no writes can happen before the partd file is removed, and therefore that all reads from the partd file are forced to happen before all writes?

image

@sinclairtarget
Copy link
Contributor Author

Just to add some more info, here is the ordering of tasks for my sort / set_index test script using 2021.2.0:
tasks-2021 2 0

I believe the shuffle-collect tasks reading from partd are tasks 12 and 14 (while tasks 17 and 22 are the noops). Notice how both read tasks are executed before the write tasks, which are tasks 21 adnd 26 I think. Also note that the cleanup task, which deletes the on disk partd data, is task 16. So the partd directory gets deleted before any writes happen. Which is why I think everything must be in memory.

Here's the ordering of tasks on this branch:
tasks-no-cleanup

Here, one of the writes happens as task 16, and is then followed by the collect / read from partd step for the second partition as task 17. So only one partition has to be in memory.

@TomAugspurger
Copy link
Member

Thanks for the additional info! I won't really have time to dig into this, but I'm sufficiently convinced.

Can you try running this a bunch of times locally on this branch to see if you can surfaced the original failure in #5867? I don't recall if I could reproduce it with pytest-repeat, or if it needed a completely clean slate.

cc @jrbourbeau, this could probably go in before the release if @sinclairtarget has a chance to stress test it locally.

@jsignell
Copy link
Member

The release has been pushed out to next week, so that should take so pressure off.

@sinclairtarget
Copy link
Contributor Author

@TomAugspurger I've run the test that surfaced that original error 100 times with:

#! /bin/bash

set -e

for i in {1..100}
do
    pytest dask/dataframe/tests/test_shuffle.py -k test_rearrange[processes-disk]
done

Pretty low-tech but saw no errors. Let me know if there's anything else I can do.

@sinclairtarget
Copy link
Contributor Author

I also checked out 2.13.0 (which I think was the last release before #5977 went in) and ran the same test 100 times, but sadly didn't see an error.

@jsignell
Copy link
Member

jsignell commented Mar 1, 2021

Ok it seems like we can merge this and keep an eye out for that flaky test. Does that sound reasonable @TomAugspurger?

@TomAugspurger
Copy link
Member

TomAugspurger commented Mar 1, 2021 via email

@jsignell jsignell merged commit 8cf6673 into dask:master Mar 1, 2021
@jsignell
Copy link
Member

jsignell commented Mar 1, 2021

Thanks @sinclairtarget!

dcherian added a commit to dcherian/dask that referenced this pull request Mar 8, 2021
* upstream/master: (43 commits)
  bump version to 2021.03.0
  Bump minimum version of distributed (dask#7328)
  Fix `percentiles_summary` with `dask_cudf` (dask#7325)
  Temporarily revert recent Array.__setitem__ updates (dask#7326)
  Blockwise.clone (dask#7312)
  NEP-35 duck array update (dask#7321)
  Don't allow setting `.name` for array (dask#7222)
  Use nearest interpolation for creating percentiles of integer input (dask#7305)
  Test `exp` with CuPy arrays (dask#7322)
  Check that computed chunks have right size and dtype (dask#7277)
  pytest.mark.flaky (dask#7319)
  Contributing docs: add note to pull the latest git tags before pip installing Dask (dask#7308)
  Support for Python 3.9 (dask#7289)
  Add broadcast-based merge implementation (dask#7143)
  Add split_every to graph_manipulation (dask#7282)
  Typo in optimize docs (dask#7306)
  dask.graph_manipulation support for xarray.Dataset (dask#7276)
  Add plot width and height support for Bokeh 2.3.0 (dask#7297)
  Add numpy functions tri, triu_indices, triu_indices_from, tril_indices, tril_indices_from (dask#6997)
  Remove "cleanup" task in dataframe on-disk shuffle. The partd directory (dask#7260)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Memory performance regresssion in Dask Dataframe when shuffling on disk

3 participants