Skip to content

Updates for P2P shuffle skeleton#8392

Closed
gjoseph92 wants to merge 8 commits intodask:mainfrom
gjoseph92:p2p-shuffle-draft
Closed

Updates for P2P shuffle skeleton#8392
gjoseph92 wants to merge 8 commits intodask:mainfrom
gjoseph92:p2p-shuffle-draft

Conversation

@gjoseph92
Copy link
Collaborator

Changes to dask to support peer-to-peer shuffling implemented in dask/distributed#5520.

The primary changes (besides supporting shuffle="p2p") are adjusting tests to not force the synchronous scheduler, but use whatever scheduler is set as the current scheduler (including a distributed client). No tests actually seemed to be relying on this.

  • Tests added / passed
  • Passes pre-commit run --all-files

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Nov 17, 2021

With these changes, everything in test_shuffle.py passes under dask/distributed#5520 except test_shuffle_sort:

def test_shuffle_sort(shuffle_method):
df = pd.DataFrame({"x": [1, 2, 3, 2, 1], "y": [9, 8, 7, 1, 5]})
ddf = dd.from_pandas(df, npartitions=3)
df2 = df.set_index("x").sort_index()
ddf2 = ddf.set_index("x", shuffle=shuffle_method)
assert_eq(ddf2.loc[2:3], df2.loc[2:3])

This is a known problem that won't be fixed yet; see https://github.com/gjoseph92/distributed/blob/p2p-shuffle/proposal/distributed/shuffle/shuffle-design.md#graph-rewrite-on-cull for discussion.

conftest.py Outdated


@pytest.fixture(params=["disk", "tasks"])
@pytest.fixture(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from #8250 (comment)

I'm not sure we'll want exactly this in the end. I think we may want something that can be controlled with a command-line flag to pytest (https://stackoverflow.com/a/63271994/17100540)? That way:

  • In dask/dask, shuffle tests would not use p2p
  • In dask/distributed, within distributed/shuffle/tests/test_dask.py, we can from dask.dataframe.tests import test_shuffle—this will cause the dask tests to run within distributed's CI
  • In dask/distributed, we invoke pytest with the --shuffle-p2p pytest flag we set up, so the imported tests from dask only use the p2p shuffle fixture.

gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 18, 2021
All pass with dask/dask#8392. Rather crude;
needs unit testing.
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 18, 2021
All pass with dask/dask#8392. Rather crude;
needs unit testing.
gjoseph92 added a commit to gjoseph92/distributed that referenced this pull request Nov 19, 2021
All pass with dask/dask#8392. Rather crude;
needs unit testing.
@gjoseph92 gjoseph92 changed the title [DNM] Updates for P2P shuffle skeleton Updates for P2P shuffle skeleton Dec 16, 2021
@gjoseph92 gjoseph92 marked this pull request as ready for review December 16, 2021 03:51
@gjoseph92 gjoseph92 self-assigned this Dec 16, 2021
@gjoseph92
Copy link
Collaborator Author

@fjetter ready for final review?

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see test failures in a few different shuffling tests

Sometimes it's a broken process pool, other times a wrong assert. I guess this is connected to removing the sync scheduler since some tasks may now submitted to a process pool. Other times there is a broken assert, e.g. in test_set_index_overlap

Before merging we should verify this is not related and/or fix the problems

@gjoseph92
Copy link
Collaborator Author

I ran test_set_index_overlap 1,000 times locally with 8 concurrent pytest processes and it never failed, which makes me think this will be hard to reproduce. However, the fact that it failed twice in the same way—and that the failure was that a row was simply missing from the shuffle—makes me concerned. Not that these changes have actually broken anything (that test doesn't even use the parametrized shuffle fixture; so it's always using the default shuffle), but that using scheduler=threads instead of scheduler=sync in assert_eq has inadvertently revealed an existing bug.

The process pool tests that failed on Windows both explicitly created a ProcessPool within the test, and only one was even a DataFrame test; neither were related to the sync changes and I think can be ignored. The macOS failures were some weird CI thing with the TileDB conda package. So the only test I'm concerned about is test_set_index_overlap. We'll see what happens with this merge of main.

@gjoseph92
Copy link
Collaborator Author

Welp, test_groupby.py::test_cumulative[disk-cumsum-key1-d] , test_groupby.py::test_cumulative[disk-cumprod-key1-d] and test_shuffle.py::test_set_index_overlap failed in CI. Though they all were set to use the disk-based shuffle, the groupby ones don't even do a shuffle, so the shuffle mode is irrelevant.

I have gotten test_cumulative to fail with the same error locally, but still not test_set_index_overlap.

The tests are vaguely similar, in that they both involve columns with repeated values like [1, 2, 3, 4, 4, 5, 6, 7] (repeated 4, ends up in both partitions). In test_set_index_overlap, the repeated value is getting dropped from the shuffle output. In test_cumulative, the repeated value is getting repeated in groupby output (group 4 shows up twice), or something like that?

Still feeling like these are revealing actual bugs, possibly thread-unsafe code in pandas and/or partd.

As a workaround, if we don't want to deal with these bugs right now, I suppose I could make assert_eq take a scheduler="sync" kwarg, so the default will stay the same for all these tests, but in the P2P shuffle tests we can explicitly pass scheduler=None or scheduler=client.

Using @ian-r-rose's suggestion from dask#8250 (comment). Don't actually merge this yet!
All tests pass except `test_shuffle_sort`, because it has the gall to slice from the DataFrame after shuffling! This causes rows to get culled, so some of the output tasks we try to set worker restrictions on don't exist.

For how simple it is, this is a very broken case in the overall shuffle design. It's completely dependent on none of the output tasks getting culled. And it's actually not so simple to solve, because
a) we should do the shuffle differently if we don't need all output partitions
b) figuring out which tasks have been culled is tricky; requires either custom optimization or talking to scheduler at runtime
c) the outputs needed _could change_ at runtime (first you compute just 5 partitions, then immediately change your mind and submit the whole thing—the `transfer`/`barrier` tasks haven't changed, nor have the `unpacks`. There are just more `unpacks`. But the new `unpacks` are actually broken if we cleverly skipped sending some data in the original `transfer`s.)
This revets commits d6de9a8 and a158490.

Seems like the fixture has to be defined in distributed now? So we can just redefine it there. Not sure why that wasn't the case last time I tried.
`pytest -n 4 --count=50 dask/dataframe/tests/test_multi.py -k hash_join` and they all passed, so I don't think this was necessary.
@gjoseph92
Copy link
Collaborator Author

This is no longer needed to merge dask/distributed#5520. I've pulled out #8559, #8557, and #8396 into separate PRs, and removed them from this branch.

This branch now represents most of the tweaks needed to make all the dask/dask shuffling tests pass using the p2p shuffle, with one glaring exception: every test would have to specify scheduler=None to assert_eq. We'll have to think about how to handle that when we actually decide to run all these tests in distributed's CI.

I'm closing this, but it'll be good to have for posterity whenever we do want to get these tests running with the p2p shuffle.

@gjoseph92 gjoseph92 closed this Jan 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants