-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Looking through Dask's codebase, it seems like there isn't a consistent typing for a Dask object's divisions; in some places (like set_sorted_index), we return an object with a tuple divisions, while in others (such as set_partition) we return an object with a list divisions. This becomes an issue in cases where we compare divisions between different objects, as we can run into cases where the elements contained in two objects' divisions are identical, but they are not seen as equal.
Some questions that come to mind:
- Is there an ideal type for
divisions? I would assume tuples sincedivisionsis generally treated as immutable even in the list case, but list functionality is used in several places in the codebase to assembledivisions. - If there is an ideal type for
divisions, how can we enforce it? It seems like one reason this problem exists is because in most places, list and tupledivisionsfunction exactly the same - it is typically only when they are compared that issues arise. One potential solution would be to makedivisionsa property with a setter method that either:- Implicitly sets the input value to whatever type we desire
divisionsto be - Raises an error if the input value is not the proper
divisionstype
- Implicitly sets the input value to whatever type we desire
- If there is no ideal type for
divisions, is there a workaround for comparisons?
cc @jsignell as I notice you are doing some work on divisions in #8379
EDIT:
To give additional context, I encountered this issue while debugging some breakage in dask-sql:
In some cases, when performing JOIN operations, dask-sql implicitly calls single_partition_join through dd.merge. Recently, #8341 did some refactoring to this function which, among other things, changed the divisions of the merged result from a tuple to a list (I don't think this was an intention of the PR, just a side effect).
This causes breakage later on in dask-sql if we attempt to subscript the result of this merge operation with a Series (something like df[df[col].where(...)]) with identical tuple divisions, as DataFrame.__getitem__ does a divisions check to decide whether or not to _maybe_align_partitions:
Lines 4108 to 4113 in f588189
| if isinstance(key, Series): | |
| # do not perform dummy calculation, as columns will not be changed. | |
| if self.divisions != key.divisions: | |
| from .multi import _maybe_align_partitions | |
| self, key = _maybe_align_partitions([self, key]) |
And _maybe_align_partitions does a divisions equality check to decide whether or not to actually align_partitions (which fails if not all divisions are known):
Lines 166 to 169 in f588189
| divisions = dfs[0].divisions | |
| if not all(df.divisions == divisions for df in dfs): | |
| dfs2 = iter(align_partitions(*dfs)[0]) | |
| return [a if not isinstance(a, _Frame) else next(dfs2) for a in args] |
Here's a minimal reproducer of that particular issue:
import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame({"a": list(range(40))})
ddf = dd.from_pandas(df, npartitions=4)
cond = ddf.a > 20
# set unknown but inequal divisions
ddf.divisions = [None] * 5
cond.divisions = (None,) * 5
ddf[cond]