Skip to content

[FEA]: external shuffle registration#9521

Draft
wence- wants to merge 4 commits intodask:mainfrom
wence-:wence/feature/shuffle-registration
Draft

[FEA]: external shuffle registration#9521
wence- wants to merge 4 commits intodask:mainfrom
wence-:wence/feature/shuffle-registration

Conversation

@wence-
Copy link
Contributor

@wence- wence- commented Sep 28, 2022

It is currently difficult to try out new shuffle plugins outside of
dask because one cannot easily monkey-patch into all the places that
the top-level shuffle.shuffle method is imported within dask itself.

To enable external shuffle engines, introduce a registry of known
shuffle methods that are tried in priority order. To register a new
method, one must provide a function with the same signature as
shuffle.shuffle, a priority (different from all existing registered
priorities) and a function which decides (given the requested shuffle
name, column(s) to shuffle on, and names of the dataframe columns)
whether or not a method is suitable.

  • Closes #xxxx
  • Tests added / passed
  • Passes pre-commit run --all-files

It is currently difficult to try out new shuffle plugins outside of
dask because one cannot easily monkey-patch into all the places that
the top-level shuffle.shuffle method is imported within dask itself.

To enable external shuffle engines, introduce a registry of known
shuffle methods that are tried in priority order. To register a new
method, one must provide a function with the same signature as
shuffle.shuffle, a priority (different from all existing registered
priorities) and a function which decides (given the requested shuffle
name, column(s) to shuffle on, and names of the dataframe columns)
whether or not a method is suitable.
Avoids a map_partitions call in the case that the default shuffle type
is "tasks".
@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

@wence-
Copy link
Contributor Author

wence- commented Sep 28, 2022

I haven't ripped out the special-casing for p2p shuffle (I think it could be moved to just be registered from distributed with this scheme) in part because I do not understand all the cases when it can be used, and also want to see if this is an approach that in general meets with approval.

@quasiben
Copy link
Member

add to allowlist

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting this together @wence- !

I strongly agree with the general idea to make it easier to register custom/external algorithms that may depend on the environment where dask is being used (e.g. distributed exectution on a dask-cuda cluster). My primary hesitation with this design that ShuffleRegistry may be trying to do more work than it needs to.

I also wonder if this is another case where entrypoints would be the easiest way to manage/register custom algorithms (in a way that could be extended to other pluggable algorithms - like groupby aggregations and worker-aware reductions). For example, an external library could register a new algorithm by putting something like this in its setup.cfg:

   [options.entry_points]
   dask.algorithms.shuffle =
      explicit-comms = dask_cuda:ExplicitCommsShuffleMethod

index,
shuffle=shuffle,
npartitions=npartitions,
max_branch=max_branch,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine that there are some key-word arguments that we will want all methods to support, and some that may be method-specific. So, we may want to pass through something like **kwargs (and be sure to validate the arguments in the method).

Comment on lines +416 to +423
priority
The priority of this method, higher priority methods are tried first
can_use
Function to check if this method is useable for the given
shuffle parameters. Receives the requested shuffle name
(e.g. "tasks", "disk"), the index to shuffle on, and a
list of dataframe column names. Should return True if this
method can shuffle the dataframe, and False otherwise.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I quite understand why the priority and can_use functionality is necessary here. At the moment, I get the impression that we can get away fine without it (but I could certainly be wrong).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was to be able to replicate the current functionality, but I suppose if we say "shuffle names are distinct", and you ask for them by name, then priority/can_use is unnecessary.

raise RuntimeError("Unreachable, didn't find a shuffle method")


def _task_shuffle_no_partitions(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the underlying shuffle method should just be responsible for deciding whether or not it should create/drop a "_partitions" column?

I get that the current dask.dataframe design tries to simplify things by (usually) creating a single "_partitions" column before calling down to rearrange_by_column. However, it seems like it would be a lot simpler if dask.dataframe just defined a clearly-documented function to generate this "_partitions" column (and required the underlying method to choose whether or not to use it).

@wence-
Copy link
Contributor Author

wence- commented Sep 28, 2022

I also wonder if this is another case where entrypoints would be the easiest way to manage/register custom algorithms (in a way that could be extended to other pluggable algorithms - like groupby aggregations and worker-aware reductions). For example, an external library could register a new algorithm by putting something like this in its setup.cfg:

I don't like this, because it's code in configuration files, so you don't get type-checking and so forth. Moreover, it is more magic: third party packages that you didn't import would register their shuffle methods without you knowing. With the current scheme, that could allow them to be used (shuffle=None) without you knowing. Which I guess argues in favour of explicitly-named shuffle methods (and requiring a call to rearrange_by_columns to specify the method).

@rjzamora
Copy link
Member

rjzamora commented Sep 28, 2022

it's code in configuration files, so you don't get type-checking and so forth

The setup.cfg just specifies where the endpoint code is. So, I don't think there is any less type-checking and so forth at all.

it is more magic: third party packages that you didn't import would register their shuffle methods without you knowing. With the current scheme, that could allow them to be used (shuffle=None) without you knowing. Which I guess argues in favour of explicitly-named shuffle methods (and requiring a call to rearrange_by_columns to specify the method).

Yes, with the current scheme in this PR it is indeed more magic. However, I'm not sure that I agree with the current scheme yet. In my mind, entrypoints would just be a simple way for external libraries to register custom algorithms. The user would never use the custom algorithm (or even search for its entrypoint!) if it wasn't specified in the dask.config or by a user-specified shuffle kwarg.

@wence-
Copy link
Contributor Author

wence- commented Sep 28, 2022

it's code in configuration files, so you don't get type-checking and so forth

The setup.cfg just specifies where the endpoint code is. So, I don't think there is any less type-checking and so forth at all.

With a registration-based approach you can require that your registered method satisfies an interface in the registration:

def register(name: str, method: Callable[[int, int], bool]):
   ...

register("foo", lambda x: x) -> mypy complains

With the config-based approach, the new method is "disconnected" statically from the registration mechanism.

@wence-
Copy link
Contributor Author

wence- commented Sep 28, 2022

The user would never use the custom algorithm (or even search for its entrypoint!) if it wasn't specified in the dask.config or by a user-specified shuffle kwarg.

I suppose that I was worrying about someone registering a catch-all method. This is morally runtime-based interface extension, so any new interfaces that are registered shouldn't be able to affect existing code.

@rjzamora
Copy link
Member

rjzamora commented Sep 28, 2022

With a registration-based approach you can require that your registered method satisfies an interface in the registration

This is a fair point. The future plan is to move away from having users directly import dask_cudf in their code, but to use the dask.config and entrypoints to specify the desired backend (dask/design-docs#1, #9475). I suppose the backend entrypoint for cudf would just need to be responsible for registering any cudf-specific algorithms. I think the plan for dask_cuda is similar.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants