Skip to content

Dont require dask config shuffle default#9826

Merged
rjzamora merged 9 commits intodask:mainfrom
fjetter:dont_require_dask_config_shuffle_default
Jan 20, 2023
Merged

Dont require dask config shuffle default#9826
rjzamora merged 9 commits intodask:mainfrom
fjetter:dont_require_dask_config_shuffle_default

Conversation

@fjetter
Copy link
Member

@fjetter fjetter commented Jan 12, 2023

This is a follow up to #9808 and standardizes how we determine the default shuffle algorithm in the context of a default client.

Closes #9807

Copy link
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jrbourbeau @rjzamora I'd like to get this merged asap. I can revert to the previous behavior for the occurences where this changes something but I would actually prefer having a consistent pick of these algorithms unless there are concerns about.

FWIW there is for instance dask/distributed#5554 that provides an example where disk shuffle is significantly slower than tasks based shuffle. However, I believe this is caused by the data distribution and not due to "where the shuffle is used".
Overall I'm wondering why we pick the one algo over the other in some circumstance.

FWIW this change would make it very simple to switch to P2P once that is ready.

"""Determine the default shuffle behavior based on split_out"""
if shuffle is None:
if split_out > 1:
return shuffle or config.get("shuffle", None) or "tasks"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This effectively changes the default value for local execution from tasks to disk. I'm not sure why tasks was chosen here.
The code was introduced in #9504 and the only comment I could find referencing this code area is #9504 (comment) which does not mention the choice of tasks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjetter - I'm sorry that the explanation was lost somewhere (my fault).

The explicit use of "tasks" as the default shuffle algorithm was entirely based on performance. Right now, we automatically use the shuffle-based groupby algorithm when split_out>1. However, the shuffle-based algorithm is only consistently faster than the tree-only algorithm in these cases when shuffle="tasks" (at least this was true in my local experiments). When I used the default shuffle="disk" option for many split_out=2 reductions, I saw a large regression in performance.

"In order to aggregate with 'median', you must use shuffling-based "
"aggregation (e.g., shuffle='tasks')"
)
shuffle = shuffle or config.get("shuffle", None) or "tasks"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced in #9302
there is a commit mentioning setting the default to tasks is 786e76e but there is no explanation

cc @rjzamora

token="aggregate",
split_every=split_every,
split_out=split_out,
shuffle=shuffle if isinstance(shuffle, str) else "tasks",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was introduced in #9516 which references #9302 but doesn't provide any explanation why tasks over disk

)


def get_default_shuffle_algorithm() -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to replace the various config.get("shuffle", None) calls throughout the codebase with get_default_shuffle_algorithm(). However, we really do want to avoid using "disk" for the shuffle-based groupby code path. Otherwise we will definitely be introducing performance regressions for the threaded scheduler when split_out>1.

Perhaps the get_default_shuffle_algorithm can accept an optional argument to override the fallback shuffle-algorithm default? e.g. get_default_shuffle_algorithm(fallback="tasks")?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to keep these sections the way they are and left a comment. When switching to P2P eventually we'll revisit all of this again

@fjetter
Copy link
Member Author

fjetter commented Jan 20, 2023

There as a report about disk being awfully slow a while ago, see dask/distributed#5554 which came up in the context of dask/distributed#5502 (which is basically what this issue is about)

my guess is that this is not just true for the groupby but everything. we might want to consider dropping disk entirely or change the default for everything to tasks. this obviously requires more benchmarking

The reason why I dislike picking tasks for the one operation but disk for the other is because performance characteristics for both algorithms are more or less exclusively depending on the data distribution and number of partitions (apart from hardware performance) and not on whether we're performing a set-index, shuffle, groupby, etc.
My guess is your measurements are using a similar asymmetric data distribution as described in dask/distributed#5554. Is your benchmark code/data still available somewhere?


I'll revert the behavioral change for now. I don't want this PR to be blocked by this

fjetter added a commit to fjetter/distributed that referenced this pull request Jan 20, 2023
@rjzamora
Copy link
Member

rjzamora commented Jan 20, 2023

The reason why I dislike picking tasks for the one operation but disk for the other is because performance characteristics for both algorithms are more or less exclusively depending on the data distribution and number of partitions..

I agree that it feels messy to use a different shuffle default for different algorithms. With that said, I do think it is safe to consider the groupby case to be somewhat "special". The difference is that the true default behavior for groupby is to not include any shuffle at all. It just so happens that it is possible to boost the performance of groupby(...).agg(.., split_out>1) by leveraging a task-based shuffle (but not a disk-based shuffle).

Is your benchmark code/data still available somewhere?

I don't think you need a particularly unusual data distribution to show that the shuffle based groupby is only better than the conventional ACA groupby when shuffle is not set to "disk". For example, I'm sure the performance of shuffle="disk" would be pretty low across the board for Ian's results shared in #9302 (comment). Another example (a high-cardinality groupby where a shuffle-based reduction should outperform shuffle=False):

import time
from dask.datasets import timeseries

def run(shuffle="tasks"):
    ddf = timeseries(
        id_lam=int(1e18),
        start='2000-01-01',
        end='2001-06-01',
        freq='1s',
        seed=15,
    )
    agg = ddf.groupby(
        "id",
        sort=False,
    ).agg(
        {"x": "sum", "y": "mean"},
        split_out=4, 
        shuffle=shuffle,
    )

    t0 = time.time()
    result = agg.compute()
    total_time = time.time() - t0
    print(f"shuffle: {shuffle}\ntime: {total_time} sec\nnum-rows: {len(result)}\n")

run(shuffle="tasks")  # time: 10.846901416778564 sec
run(shuffle=False)  # time: 18.84290647506714sec
run(shuffle="disk")  # time: 26.46630048751831 sec

Here we see why we probably shouldn't use shuffle="disk" over shuffle=False as a default.

@fjetter
Copy link
Member Author

fjetter commented Jan 20, 2023

don't think you need a particularly unusual data distribution to show that the shuffle based groupby is only better than the conventional ACA groupby when shuffle is not set to "disk".

This example lets me wonder whether we should drop support for disk entirely. Are there any known cases where it does perform better?

Anyhow, let's not blow up the scope of this PR, please.


I restored the behavior as before but removed the need for the config to be set. This PR is now blocking dask/distributed#7482

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd also like to use get_default_shuffle_algorithm for the shuffle-based groupby, but agree with the decision to hold off on moving from "tasks" for now. Thanks for being flexible.

@rjzamora rjzamora merged commit 60c8d6e into dask:main Jan 20, 2023
fjetter added a commit to fjetter/distributed that referenced this pull request Jan 26, 2023
@fjetter fjetter deleted the dont_require_dask_config_shuffle_default branch March 30, 2023 11:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Distributed client/scheduler detection without dask.config

3 participants