Conversation
|
With these changes, everything in dask/dask/dataframe/tests/test_shuffle.py Lines 268 to 275 in 6c5b482 This is a known problem that won't be fixed yet; see https://github.com/gjoseph92/distributed/blob/p2p-shuffle/proposal/distributed/shuffle/shuffle-design.md#graph-rewrite-on-cull for discussion. |
conftest.py
Outdated
|
|
||
|
|
||
| @pytest.fixture(params=["disk", "tasks"]) | ||
| @pytest.fixture( |
There was a problem hiding this comment.
Copied from #8250 (comment)
I'm not sure we'll want exactly this in the end. I think we may want something that can be controlled with a command-line flag to pytest (https://stackoverflow.com/a/63271994/17100540)? That way:
- In dask/dask, shuffle tests would not use
p2p - In dask/distributed, within
distributed/shuffle/tests/test_dask.py, we canfrom dask.dataframe.tests import test_shuffle—this will cause the dask tests to run within distributed's CI - In dask/distributed, we invoke pytest with the
--shuffle-p2ppytest flag we set up, so the imported tests from dask only use thep2pshuffle fixture.
All pass with dask/dask#8392. Rather crude; needs unit testing.
All pass with dask/dask#8392. Rather crude; needs unit testing.
All pass with dask/dask#8392. Rather crude; needs unit testing.
|
@fjetter ready for final review? |
fjetter
left a comment
There was a problem hiding this comment.
I can see test failures in a few different shuffling tests
Sometimes it's a broken process pool, other times a wrong assert. I guess this is connected to removing the sync scheduler since some tasks may now submitted to a process pool. Other times there is a broken assert, e.g. in test_set_index_overlap
Before merging we should verify this is not related and/or fix the problems
|
I ran The process pool tests that failed on Windows both explicitly created a ProcessPool within the test, and only one was even a DataFrame test; neither were related to the |
|
Welp, I have gotten The tests are vaguely similar, in that they both involve columns with repeated values like Still feeling like these are revealing actual bugs, possibly thread-unsafe code in pandas and/or partd. As a workaround, if we don't want to deal with these bugs right now, I suppose I could make |
Using @ian-r-rose's suggestion from dask#8250 (comment). Don't actually merge this yet!
All tests pass except `test_shuffle_sort`, because it has the gall to slice from the DataFrame after shuffling! This causes rows to get culled, so some of the output tasks we try to set worker restrictions on don't exist. For how simple it is, this is a very broken case in the overall shuffle design. It's completely dependent on none of the output tasks getting culled. And it's actually not so simple to solve, because a) we should do the shuffle differently if we don't need all output partitions b) figuring out which tasks have been culled is tricky; requires either custom optimization or talking to scheduler at runtime c) the outputs needed _could change_ at runtime (first you compute just 5 partitions, then immediately change your mind and submit the whole thing—the `transfer`/`barrier` tasks haven't changed, nor have the `unpacks`. There are just more `unpacks`. But the new `unpacks` are actually broken if we cleverly skipped sending some data in the original `transfer`s.)
`pytest -n 4 --count=50 dask/dataframe/tests/test_multi.py -k hash_join` and they all passed, so I don't think this was necessary.
7f89183 to
f49d8a4
Compare
|
This is no longer needed to merge dask/distributed#5520. I've pulled out #8559, #8557, and #8396 into separate PRs, and removed them from this branch. This branch now represents most of the tweaks needed to make all the dask/dask shuffling tests pass using the p2p shuffle, with one glaring exception: every test would have to specify I'm closing this, but it'll be good to have for posterity whenever we do want to get these tests running with the p2p shuffle. |
Changes to dask to support peer-to-peer shuffling implemented in dask/distributed#5520.
The primary changes (besides supporting
shuffle="p2p") are adjusting tests to not force the synchronous scheduler, but use whatever scheduler is set as the current scheduler (including a distributed client). No tests actually seemed to be relying on this.pre-commit run --all-files