Add basic backend dispatching to dask-expr#728
Conversation
…to creation-dispatch
|
@phofl - I know you are busy with many things, but I'd be interested to know if you have any thoughts/concerns about this PR. My hope is that there is nothing controversial here: We are reusing the existing dispatching mechanisms and conventions from |
dask_expr/_backends.py
Outdated
| try: | ||
| impl = self._lookup[backend] | ||
| except KeyError: | ||
| entrypoints = detect_entrypoints(f"dask_expr.{self._module_name}.backends") |
There was a problem hiding this comment.
The only reason we cannot use CreationDispatch directly is the fact that dask_cudf is already using the "dask.dataframe.backends" path to expose the "cudf" backend entrypoint for dask.dataframe.
We can just use CreationDispatch directly after dask/dask#10794 makes it possible to modify the entrypoint path upon initialization.
There was a problem hiding this comment.
Okay, removed this now that dask/dask#10794 is in.
There was a problem hiding this comment.
Scratch that - I should probably wait for the next dask/dask release to remove DXCreationDispatch
dask_expr/_backends.py
Outdated
| dataframe_creation_dispatch = DXCreationDispatch( | ||
| module_name="dataframe", | ||
| default="pandas", | ||
| entrypoint_class=DataFrameBackendEntrypoint, | ||
| name="dataframe_creation_dispatch", | ||
| ) |
There was a problem hiding this comment.
Note that we want to introduce a new CreationDispatch object for dask-expr, because we want to distinguish "legacy" dask-dataframe creation functions from newer expression-based creation functions (to be discovered at the "dask_expr.dataframe.backend" entrypoint path instead of "dask.dataframe.backend").
In both legacy and expr-based code, the same "dask.dataframe.backend" config value is used to specify the default backend.
| def new_collection(expr): | ||
| """Create new collection from an expr""" | ||
|
|
||
| meta = expr._meta | ||
| expr._name # Ensure backend is imported | ||
| if is_dataframe_like(meta): | ||
| return DataFrame(expr) | ||
| elif is_series_like(meta): | ||
| return Series(expr) | ||
| elif is_index_like(meta): | ||
| return Index(expr) | ||
| else: | ||
| return Scalar(expr) | ||
| return get_collection_type(meta)(expr) |
There was a problem hiding this comment.
Note that we have been using new_collection from the beginning, because we knew we needed to make this change to support other backends (i.e. cudf) long term.
The behavior of new_collection is now consistent with the behavior of new_dd_object in dask.dataframe.
|
|
||
| from dask.utils import Dispatch | ||
|
|
||
| get_collection_type = Dispatch("get_collection_type") |
There was a problem hiding this comment.
I don't think we will need many dispatch functions here. So, we could also define this in dask.datafame.dispatch.
There was a problem hiding this comment.
This is now publicly exposed as dask_expr.get_collection_type. This can always be moved to dask.datafame.dispatch in the future.
|
I've disabled released ci because we enabled a couple of other things that aren't in released dask yet. You can remove everything that depends on released here and wouldn't be necessary with main now. I don't intend to release before the next dask release anyway, so testing against the released version isn't very important |
Awesome. Thanks! |
|
thx |
Mostly addresses #15027 dask/dask-expr#728 exposed the necessary mechanisms for us to define a custom dask-expr backend for `cudf`. The new dispatching mechanisms are effectively the same as those in `dask.dataframe`. The only difference is that we are now registering/implementing "expression-based" collections. This PR does the following: - Defines a basic `DataFrameBackendEntrypoint` class for collection creation, and registers new collections using `get_collection_type`. - Refactors the `dask_cudf` import structure to properly support the `"dataframe.query-planning"` configuration. - Modifies CI to test dask-expr support for some of the `dask_cudf` tests. This coverage can be expanded in follow-up work. ~**Experimental Change**: This PR patches `dask_expr._expr.Expr.__new__` to enable type-based dispatching. This effectively allows us to surgically replace problematic `Expr` subclasses that do not work for cudf-backed data. For example, this PR replaces the upstream `TakeLast` expression to avoid using `squeeze` (since this method is not supported by cudf). This particular fix can be moved upstream relatively easily. However, having this kind of "patching" mechanism may be valuable for more complicated pandas/cudf discrepancies.~ ## Usage example ```python from dask import config config.set({"dataframe.query-planning": True}) import dask_cudf df = dask_cudf.DataFrame.from_dict( {"x": range(100), "y": [1, 2, 3, 4] * 25, "z": ["1", "2"] * 50}, npartitions=10, ) df["y2"] = df["x"] + df["y"] agg = df.groupby("y").agg({"y2": "mean"})["y2"] agg.simplify().pprint() ``` Dask cuDF should now be using dask-expr for "query planning": ``` Projection: columns='y2' GroupbyAggregation: arg={'y2': 'mean'} observed=True split_out=1'y' Assign: y2= Projection: columns=['y'] FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y'] Add: Projection: columns='x' FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y'] Projection: columns='y' FromPandas: frame='<dataframe>' npartitions=10 columns=['x', 'y'] ``` ## TODO - [x] Add basic tests - [x] Confirm that general design makes sense **Follow Up Work**: - Expand dask-expr test coverage - Fix local and upstream bugs - Add documentation once "critical mass" is reached Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Lawrence Mitchell (https://github.com/wence-) - Vyas Ramasubramani (https://github.com/vyasr) - Bradley Dice (https://github.com/bdice) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Ray Douglass (https://github.com/raydouglass) URL: #14805
It seems like we will temporarily need to allow external libraries (i.e. cudf) to register a
DaskBackendEntrypointfor bothdask.dataframeanddask_expr. I think the easiest approach is to let cudf use a"dask-expr.dataframe.backend"entrypoint path for dask-expr (and continue using"dask.dataframe.backend"in the legacydask.dataframeAPI).This PR also introduces a simple
get_collection_typedispatch function (the dask-expr version ofget_parallel_type). This completely breaks cudf support for now, but dask-expr + cudf is already broken, and this is probably the only way to fix it anyway.NOTE: This PR does not introduce any new dispatching mechanisms (like #321). It just adds enough code to support the existing dispatching mechanisms used in
dask.dataframe.