Conversation
dask/utils.py
Outdated
| import pyarrow # noqa | ||
|
|
||
| return "p2p" | ||
| except ImportError: | ||
| return "tasks" |
There was a problem hiding this comment.
We could add a warning/info log suggesting to install pyarrow if we hit this exception.
There was a problem hiding this comment.
Should we add a p2p extras target that requires distributed and a suitable pyarrow version?
There was a problem hiding this comment.
A p2p extra feels way too specific. I'd rather add this to the dataframe target but that'd obviously be a bigger change
@rjzamora I think you mentioned wanting to try P2P shuffling out at some point. If you have any feedback, it would certainly be welcome. @quasiben are there folks you're aware of that would be interested in trying P2P and sharing feedback on their experience? |
|
It seems we're installing a couple of rather old pyarrow versions on some jobs, e.g. python3.8 jobs are installing pyarrow 4. Not sure why that is since we're not pinning anything. |
|
Ok, looks like it's just the python 3.8 builds that are using this ancient pyarrow version. I haven't checked the entire matrix but py3.9+ seems to be using pyarrow 11.X so the tests are actually covering the p2p shuffle ;) |
|
This failure is cause by shuffling and task fusion not playing together nicely at the moment due to our reliance on the task name being unchanged. @fjetter: Could you re-run CI after the latest changes to dask/distributed#7578? The error should then pop up again. |
|
Since this affects merges, maybe this goes away with #9900 + dask/distributed#7514 ? |
dask/utils.py
Outdated
| # We might loose annotations if low level fusion is active | ||
| if not dask.config.get("optimization.fuse.active"): |
There was a problem hiding this comment.
I added this guard to only toggle to p2p if low level fusion is disabled. We are relying on annotations and low level fusion apparently strips them sometimes.
e596510 to
7062b82
Compare
|
Tests are looking good. Anybody brave enough to ✅ ? @mrocklin maybe? :) |
|
I don't have enough hands-on experience with this to approve I think, especially given that we're up against a release day. Unfortunately I'm going to take the cowardly way here and defer to you and @hendrikmakait , who have the appropriate context here to make this decision. I will say though that, if you both feel good about this, then I encourage you to move forward with it boldly. |
hendrikmakait
left a comment
There was a problem hiding this comment.
Thanks @fjetter! The guards seem reasonable to me and should avoid user pains. Given the lack of negative feedback and the strictly positive results we've seen in testing, I'd say we should go ahead and roll this out. @wence-: Any chance you managed to break things in the meantime and would want to veto this?
|
As discussed offline, we should document the config value users can set to fallback globally and rename it from a top-level |
I haven't had a chance yet (and for the cases I was going to try "tasks" is already broken, or at least doesn't complete). In dask-cudf we explicitly override the shuffle default, so I think I don't have any veto reasons. |
| shuffle-compression: null # compression for on disk-shuffling. Partd supports ZLib, BZ2, SNAPPY | ||
| shuffle: | ||
| algorithm: null | ||
| compression: null # compression for on disk-shuffling. Partd supports ZLib, BZ2, SNAPPY |
There was a problem hiding this comment.
If it's only about disk-based shuffling, what about dataframe.shuffle.disk.compression?
There was a problem hiding this comment.
I could see this being used elsewhere and think it's fine for a specific algorithm to ignore such a value and suggest to keep it as is
Co-authored-by: Hendrik Makait <hendrik.makait@gmail.com>
Co-authored-by: Lawrence Mitchell <wence@gmx.li>
| # We might lose annotations if low level fusion is active | ||
| if not dask.config.get("optimization.fuse.active"): |
There was a problem hiding this comment.
I'm a bit confused about why this is needed (not necessarily saying it's wrong, just that I'm lacking context). @hendrikmakait you said
This failure is cause by shuffling and task fusion not playing together nicely at the moment due to our reliance on the task name being unchanged
Can you point me to where I can read more about this?
There was a problem hiding this comment.
Possibly related, are we accounting for dask.dataframe and dask.array treating this config option slightly differently? dask.array will perform low-level task fusion by default, while dask.dataframe won't.
dask.array
dask/dask/array/optimization.py
Lines 52 to 55 in a71c15b
dask.dataframe
dask/dask/dataframe/optimize.py
Lines 27 to 31 in a71c15b
Sorry for being late to the party, I'm still catching up from some recent PTO
There was a problem hiding this comment.
I'm a bit confused about why this is needed (not necessarily saying it's wrong, just that I'm lacking context).
task fusing both changes the keyname and is dropping annotations. We currently rely on both. If anybody toggles this on, we cannot use P2P
Possibly related, are we accounting for dask.dataframe and dask.array treating this config option slightly differently?
This code is only relevant for dask.dataframe
This would enable P2P shuffling by default for most shuffle based dataframe workloads (set_index, groupby w/ou split_out, etc.) iff pyarrow is installed.
There are still a couple of cases that are hard coded to tasks because they went through an elaborate evaluation which is something I don't feel entirely comfortable with toggling right now without even testing at least once. See for some discussion over here #9826
We haven't received a lot of feedback (see also dask/distributed#7509) yet but haven't encountered any critical issues during internal validation that I would suggest to flip the switch unless there are major objections (e.g. CI complains very hard).
The discussion issue also outlines a couple of pros/cons of moving to this new algorithm but I believe the benefits outweigh the cost for almost all users.