Dont require dask config shuffle default#9826
Conversation
…_dask_config_shuffle_default
fjetter
left a comment
There was a problem hiding this comment.
@jrbourbeau @rjzamora I'd like to get this merged asap. I can revert to the previous behavior for the occurences where this changes something but I would actually prefer having a consistent pick of these algorithms unless there are concerns about.
FWIW there is for instance dask/distributed#5554 that provides an example where disk shuffle is significantly slower than tasks based shuffle. However, I believe this is caused by the data distribution and not due to "where the shuffle is used".
Overall I'm wondering why we pick the one algo over the other in some circumstance.
FWIW this change would make it very simple to switch to P2P once that is ready.
| """Determine the default shuffle behavior based on split_out""" | ||
| if shuffle is None: | ||
| if split_out > 1: | ||
| return shuffle or config.get("shuffle", None) or "tasks" |
There was a problem hiding this comment.
This effectively changes the default value for local execution from tasks to disk. I'm not sure why tasks was chosen here.
The code was introduced in #9504 and the only comment I could find referencing this code area is #9504 (comment) which does not mention the choice of tasks
There was a problem hiding this comment.
@fjetter - I'm sorry that the explanation was lost somewhere (my fault).
The explicit use of "tasks" as the default shuffle algorithm was entirely based on performance. Right now, we automatically use the shuffle-based groupby algorithm when split_out>1. However, the shuffle-based algorithm is only consistently faster than the tree-only algorithm in these cases when shuffle="tasks" (at least this was true in my local experiments). When I used the default shuffle="disk" option for many split_out=2 reductions, I saw a large regression in performance.
| "In order to aggregate with 'median', you must use shuffling-based " | ||
| "aggregation (e.g., shuffle='tasks')" | ||
| ) | ||
| shuffle = shuffle or config.get("shuffle", None) or "tasks" |
| token="aggregate", | ||
| split_every=split_every, | ||
| split_out=split_out, | ||
| shuffle=shuffle if isinstance(shuffle, str) else "tasks", |
| ) | ||
|
|
||
|
|
||
| def get_default_shuffle_algorithm() -> str: |
There was a problem hiding this comment.
It makes sense to replace the various config.get("shuffle", None) calls throughout the codebase with get_default_shuffle_algorithm(). However, we really do want to avoid using "disk" for the shuffle-based groupby code path. Otherwise we will definitely be introducing performance regressions for the threaded scheduler when split_out>1.
Perhaps the get_default_shuffle_algorithm can accept an optional argument to override the fallback shuffle-algorithm default? e.g. get_default_shuffle_algorithm(fallback="tasks")?
There was a problem hiding this comment.
I chose to keep these sections the way they are and left a comment. When switching to P2P eventually we'll revisit all of this again
|
There as a report about disk being awfully slow a while ago, see dask/distributed#5554 which came up in the context of dask/distributed#5502 (which is basically what this issue is about) my guess is that this is not just true for the groupby but everything. we might want to consider dropping disk entirely or change the default for everything to tasks. this obviously requires more benchmarking The reason why I dislike picking tasks for the one operation but disk for the other is because performance characteristics for both algorithms are more or less exclusively depending on the data distribution and number of partitions (apart from hardware performance) and not on whether we're performing a set-index, shuffle, groupby, etc. I'll revert the behavioral change for now. I don't want this PR to be blocked by this |
I agree that it feels messy to use a different shuffle default for different algorithms. With that said, I do think it is safe to consider the groupby case to be somewhat "special". The difference is that the true default behavior for groupby is to not include any shuffle at all. It just so happens that it is possible to boost the performance of
I don't think you need a particularly unusual data distribution to show that the shuffle based groupby is only better than the conventional ACA groupby when shuffle is not set to import time
from dask.datasets import timeseries
def run(shuffle="tasks"):
ddf = timeseries(
id_lam=int(1e18),
start='2000-01-01',
end='2001-06-01',
freq='1s',
seed=15,
)
agg = ddf.groupby(
"id",
sort=False,
).agg(
{"x": "sum", "y": "mean"},
split_out=4,
shuffle=shuffle,
)
t0 = time.time()
result = agg.compute()
total_time = time.time() - t0
print(f"shuffle: {shuffle}\ntime: {total_time} sec\nnum-rows: {len(result)}\n")
run(shuffle="tasks") # time: 10.846901416778564 sec
run(shuffle=False) # time: 18.84290647506714sec
run(shuffle="disk") # time: 26.46630048751831 secHere we see why we probably shouldn't use |
This example lets me wonder whether we should drop support for disk entirely. Are there any known cases where it does perform better? Anyhow, let's not blow up the scope of this PR, please. I restored the behavior as before but removed the need for the config to be set. This PR is now blocking dask/distributed#7482 |
rjzamora
left a comment
There was a problem hiding this comment.
I'd also like to use get_default_shuffle_algorithm for the shuffle-based groupby, but agree with the decision to hold off on moving from "tasks" for now. Thanks for being flexible.
This is a follow up to #9808 and standardizes how we determine the default shuffle algorithm in the context of a default client.
Closes #9807