Skip to content

Enable P2P shuffling by default#9991

Merged
fjetter merged 9 commits intodask:mainfrom
fjetter:p2p_by_default
Feb 24, 2023
Merged

Enable P2P shuffling by default#9991
fjetter merged 9 commits intodask:mainfrom
fjetter:p2p_by_default

Conversation

@fjetter
Copy link
Copy Markdown
Member

@fjetter fjetter commented Feb 22, 2023

This would enable P2P shuffling by default for most shuffle based dataframe workloads (set_index, groupby w/ou split_out, etc.) iff pyarrow is installed.

There are still a couple of cases that are hard coded to tasks because they went through an elaborate evaluation which is something I don't feel entirely comfortable with toggling right now without even testing at least once. See for some discussion over here #9826

We haven't received a lot of feedback (see also dask/distributed#7509) yet but haven't encountered any critical issues during internal validation that I would suggest to flip the switch unless there are major objections (e.g. CI complains very hard).
The discussion issue also outlines a couple of pros/cons of moving to this new algorithm but I believe the benefits outweigh the cost for almost all users.

dask/utils.py Outdated
Comment on lines +2095 to +2099
import pyarrow # noqa

return "p2p"
except ImportError:
return "tasks"
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could add a warning/info log suggesting to install pyarrow if we hit this exception.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a p2p extras target that requires distributed and a suitable pyarrow version?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A p2p extra feels way too specific. I'd rather add this to the dataframe target but that'd obviously be a bigger change

Copy link
Copy Markdown
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fjetter! I've not thought too deeply about this yet, but will devote some cycles to it.

FWIW it looks like the dask/tests/test_distributed.py::test_fused_blockwise_dataframe_merge failure is genuinely related to the change in default

Also cc @quasiben who asked me about this a few days ago

@jrbourbeau
Copy link
Copy Markdown
Member

We haven't received a lot of feedback (see also dask/distributed#7509)

@rjzamora I think you mentioned wanting to try P2P shuffling out at some point. If you have any feedback, it would certainly be welcome. @quasiben are there folks you're aware of that would be interested in trying P2P and sharing feedback on their experience?

@quasiben
Copy link
Copy Markdown
Member

Yes, @wence- met up with @fjetter and others and is going to starting experimenting shortly

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Feb 23, 2023

It seems we're installing a couple of rather old pyarrow versions on some jobs, e.g. python3.8 jobs are installing pyarrow 4. Not sure why that is since we're not pinning anything.

@fjetter fjetter marked this pull request as ready for review February 23, 2023 14:51
@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Feb 23, 2023

Ok, looks like it's just the python 3.8 builds that are using this ancient pyarrow version. I haven't checked the entire matrix but py3.9+ seems to be using pyarrow 11.X so the tests are actually covering the p2p shuffle ;)

@hendrikmakait
Copy link
Copy Markdown
Member

This failure is cause by shuffling and task fusion not playing together nicely at the moment due to our reliance on the task name being unchanged. @fjetter: Could you re-run CI after the latest changes to dask/distributed#7578? The error should then pop up again.

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Feb 23, 2023

Since this affects merges, maybe this goes away with #9900 + dask/distributed#7514 ?

dask/utils.py Outdated
Comment on lines +2095 to +2096
# We might loose annotations if low level fusion is active
if not dask.config.get("optimization.fuse.active"):
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this guard to only toggle to p2p if low level fusion is disabled. We are relying on annotations and low level fusion apparently strips them sometimes.

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Feb 24, 2023

Tests are looking good. Anybody brave enough to ✅ ? @mrocklin maybe? :)

@mrocklin
Copy link
Copy Markdown
Member

I don't have enough hands-on experience with this to approve I think, especially given that we're up against a release day. Unfortunately I'm going to take the cowardly way here and defer to you and @hendrikmakait , who have the appropriate context here to make this decision.

I will say though that, if you both feel good about this, then I encourage you to move forward with it boldly.

Copy link
Copy Markdown
Member

@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @fjetter! The guards seem reasonable to me and should avoid user pains. Given the lack of negative feedback and the strictly positive results we've seen in testing, I'd say we should go ahead and roll this out. @wence-: Any chance you managed to break things in the meantime and would want to veto this?

@hendrikmakait
Copy link
Copy Markdown
Member

As discussed offline, we should document the config value users can set to fallback globally and rename it from a top-level shuffle to dataframe.shuffle (with a deprecation cycle).

@wence-
Copy link
Copy Markdown
Contributor

wence- commented Feb 24, 2023

@wence-: Any chance you managed to break things in the meantime and would want to veto this?

I haven't had a chance yet (and for the cases I was going to try "tasks" is already broken, or at least doesn't complete). In dask-cudf we explicitly override the shuffle default, so I think I don't have any veto reasons.

shuffle-compression: null # compression for on disk-shuffling. Partd supports ZLib, BZ2, SNAPPY
shuffle:
algorithm: null
compression: null # compression for on disk-shuffling. Partd supports ZLib, BZ2, SNAPPY
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's only about disk-based shuffling, what about dataframe.shuffle.disk.compression?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could see this being used elsewhere and think it's fine for a specific algorithm to ignore such a value and suggest to keep it as is

fjetter and others added 2 commits February 24, 2023 18:38
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
Co-authored-by: Lawrence Mitchell <wence@gmx.li>
@fjetter fjetter mentioned this pull request Feb 24, 2023
4 tasks
@github-actions github-actions bot added the documentation Improve or add to documentation label Feb 24, 2023
Comment on lines +2095 to +2096
# We might lose annotations if low level fusion is active
if not dask.config.get("optimization.fuse.active"):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about why this is needed (not necessarily saying it's wrong, just that I'm lacking context). @hendrikmakait you said

This failure is cause by shuffling and task fusion not playing together nicely at the moment due to our reliance on the task name being unchanged

Can you point me to where I can read more about this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly related, are we accounting for dask.dataframe and dask.array treating this config option slightly differently? dask.array will perform low-level task fusion by default, while dask.dataframe won't.

dask.array

# Perform low-level fusion unless the user has
# specified False explicitly.
if config.get("optimization.fuse.active") is False:
return dsk

dask.dataframe

# Do not perform low-level fusion unless the user has
# specified True explicitly. The configuration will
# be None by default.
if not config.get("optimization.fuse.active"):
return dsk

Sorry for being late to the party, I'm still catching up from some recent PTO

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused about why this is needed (not necessarily saying it's wrong, just that I'm lacking context).

task fusing both changes the keyname and is dropping annotations. We currently rely on both. If anybody toggles this on, we cannot use P2P

Possibly related, are we accounting for dask.dataframe and dask.array treating this config option slightly differently?

This code is only relevant for dask.dataframe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataframe documentation Improve or add to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants