Conversation
It is currently difficult to try out new shuffle plugins outside of dask because one cannot easily monkey-patch into all the places that the top-level shuffle.shuffle method is imported within dask itself. To enable external shuffle engines, introduce a registry of known shuffle methods that are tried in priority order. To register a new method, one must provide a function with the same signature as shuffle.shuffle, a priority (different from all existing registered priorities) and a function which decides (given the requested shuffle name, column(s) to shuffle on, and names of the dataframe columns) whether or not a method is suitable.
Avoids a map_partitions call in the case that the default shuffle type is "tasks".
|
Can one of the admins verify this patch? Admins can comment |
|
I haven't ripped out the special-casing for |
|
add to allowlist |
rjzamora
left a comment
There was a problem hiding this comment.
Thanks for putting this together @wence- !
I strongly agree with the general idea to make it easier to register custom/external algorithms that may depend on the environment where dask is being used (e.g. distributed exectution on a dask-cuda cluster). My primary hesitation with this design that ShuffleRegistry may be trying to do more work than it needs to.
I also wonder if this is another case where entrypoints would be the easiest way to manage/register custom algorithms (in a way that could be extended to other pluggable algorithms - like groupby aggregations and worker-aware reductions). For example, an external library could register a new algorithm by putting something like this in its setup.cfg:
[options.entry_points]
dask.algorithms.shuffle =
explicit-comms = dask_cuda:ExplicitCommsShuffleMethod
| index, | ||
| shuffle=shuffle, | ||
| npartitions=npartitions, | ||
| max_branch=max_branch, |
There was a problem hiding this comment.
I imagine that there are some key-word arguments that we will want all methods to support, and some that may be method-specific. So, we may want to pass through something like **kwargs (and be sure to validate the arguments in the method).
| priority | ||
| The priority of this method, higher priority methods are tried first | ||
| can_use | ||
| Function to check if this method is useable for the given | ||
| shuffle parameters. Receives the requested shuffle name | ||
| (e.g. "tasks", "disk"), the index to shuffle on, and a | ||
| list of dataframe column names. Should return True if this | ||
| method can shuffle the dataframe, and False otherwise. |
There was a problem hiding this comment.
I don't think I quite understand why the priority and can_use functionality is necessary here. At the moment, I get the impression that we can get away fine without it (but I could certainly be wrong).
There was a problem hiding this comment.
This was to be able to replicate the current functionality, but I suppose if we say "shuffle names are distinct", and you ask for them by name, then priority/can_use is unnecessary.
| raise RuntimeError("Unreachable, didn't find a shuffle method") | ||
|
|
||
|
|
||
| def _task_shuffle_no_partitions( |
There was a problem hiding this comment.
Maybe the underlying shuffle method should just be responsible for deciding whether or not it should create/drop a "_partitions" column?
I get that the current dask.dataframe design tries to simplify things by (usually) creating a single "_partitions" column before calling down to rearrange_by_column. However, it seems like it would be a lot simpler if dask.dataframe just defined a clearly-documented function to generate this "_partitions" column (and required the underlying method to choose whether or not to use it).
I don't like this, because it's code in configuration files, so you don't get type-checking and so forth. Moreover, it is more magic: third party packages that you didn't import would register their shuffle methods without you knowing. With the current scheme, that could allow them to be used ( |
The
Yes, with the current scheme in this PR it is indeed more magic. However, I'm not sure that I agree with the current scheme yet. In my mind, entrypoints would just be a simple way for external libraries to register custom algorithms. The user would never use the custom algorithm (or even search for its entrypoint!) if it wasn't specified in the |
With a registration-based approach you can require that your registered method satisfies an interface in the registration: With the config-based approach, the new method is "disconnected" statically from the registration mechanism. |
I suppose that I was worrying about someone registering a catch-all method. This is morally runtime-based interface extension, so any new interfaces that are registered shouldn't be able to affect existing code. |
This is a fair point. The future plan is to move away from having users directly import |
It is currently difficult to try out new shuffle plugins outside of
dask because one cannot easily monkey-patch into all the places that
the top-level shuffle.shuffle method is imported within dask itself.
To enable external shuffle engines, introduce a registry of known
shuffle methods that are tried in priority order. To register a new
method, one must provide a function with the same signature as
shuffle.shuffle, a priority (different from all existing registered
priorities) and a function which decides (given the requested shuffle
name, column(s) to shuffle on, and names of the dataframe columns)
whether or not a method is suitable.
pre-commit run --all-files