Add sanity checks to divisions setter#8806
Conversation
Adds a few sanity checks to the divisions property setter - New divisions are compatible with the existing `npartitions` - New divisions don't mix None and non-None values - New divisions are sorted (except for ordered categorical dtypes, where that's hard) - New divisions are unique except for the last value These sanity checks turned up a number of bugs that needed to be fixed. - `map_partitions` with `align_dataframes=False` would select the wrong output divisions, any use of this was relying on the caller manually fixing things later. - `set_index` could result in non-unique divisions being used in certain cases (e.g. `(0, 0, 0, 1, 2, 3, 3)`). We now deduplicate values in places where divisions are computed as needed. - A few category-dtype related fixes. Category dtypes are leaky abstractions, all of this feels a little hacky, but tests seem to pass.
|
|
||
| @divisions.setter | ||
| def divisions(self, value): | ||
| if not isinstance(value, tuple): |
There was a problem hiding this comment.
This seemingly un-deprecates passing in iterables (the check and deprecation warning below is specifically for tuple types).
Ideally, I'd love to see this type hinted and have the setter accept an Iterable, and the property return a tuple, but there is some long-running and somewhat contentious discussion about whether and how to support that in mypy.
But if we take the deprecation seriously, we'd check for a tuple type specifically here and actually annotate the property appropriately.
There was a problem hiding this comment.
I think I was confused what the final decision was on this. I'm happy to leave it at just tuple for the setter - I do think the methods that take a divisions kwarg should at least allow a list (and maybe a generic iterable).
There was a problem hiding this comment.
👍 for leaving it as tuple and type hinting. No objection to letting methods take a list/other iterable. It will mean a couple of extra casts in places, though I'm generally in favor of us feeling some pain when the API allows for looser types.
| mins, maxes, lens = _compute_partition_stats(df.index, allow_overlap=True, **kwargs) | ||
| if len(mins) == len(df.divisions) - 1: | ||
| df.divisions = tuple(mins) + (maxes[-1],) | ||
| df._divisions = tuple(mins) + (maxes[-1],) |
There was a problem hiding this comment.
Why avoid validation here and elsewhere? Philosophically, who is the intended audience for the divisions setter? To me, user code should rarely be setting divisions (reading your response to #8802, I think you would agree).
Is so bad to ask that Dask internal usage go through the validation step as well?
There was a problem hiding this comment.
We avoid the setter in cases where we are explicitly mutating the structure of df. inplace methods that may change how partitioning is done need to avoid the len check, since the number of partitions may have changed. Everywhere else we should use the setter, which is where all the other bugs were caught from.
| @@ -1317,8 +1338,10 @@ def repartition( | |||
| For convenience if given an integer this will defer to npartitions | |||
There was a problem hiding this comment.
This docstring should probably be updated to recommend tuple (needn't be in this PR, though).
There was a problem hiding this comment.
Methods with a divisions kwarg should work with any iterable (or at least tuple/list), I don't think a change is needed here.
import pandas as pd
import dask.dataframe as dd
import dask
def get_df(bid):
df = pd.DataFrame({"id": [bid, bid, bid], "value": [bid+0.1, bid+0.2, bid+0.3]})
df = df.set_index('id')
return df
ddf = dd.from_delayed([dask.delayed(get_df)(i) for i in range(5)])
# ddf.divisions=(0, 4, 4) # ValueError: This dataframe has npartitions=5, divisions should be a tuple of length=6, got 3
ddf.divisions=(0, 1, 2, 3, 4, 4) # Required before repartitioning can be done
ddf = ddf.repartition(divisions=(0, 3, 4))
ddf.divisions=(0, 4, 4) # **This doesn't do anything.**
full = ddf.compute() # gives all 15 rows
part1 = ddf.get_partition(0).compute() # Gives id 0, 1, 2 (9 rows)
part2 = ddf.get_partition(1).compute() # Gives id 3, 4 (6 rows)I was just testing this fix. Looks manually assigning divisions (line with # This doesn't do anything) still doesn't work . |
That shouldn't do anything.
As far as dask is concerned, this is a fine thing to do. The only thing wrong is that they don't match the current partitioning structure, but that can't be detected without an expensive computation. |
When I do the |
|
Setting the divisions manually like |
| ) | ||
| self._meta = meta | ||
| self._divisions = tuple(divisions) | ||
| self.divisions = tuple(divisions) |
There was a problem hiding this comment.
This was committed accidentally while debugging why cudf was hitting a RecursionError, but I think I'll leave it (provided tests pass). The only reason I see to remove it is for performance issues, but dask dataframe already has enough overhead that this should be negligible.
There was a problem hiding this comment.
+1 on more validation wherever appropriate
|
I see. I am curious what the use case of allowing to set the |
dask/dataframe/core.py
Outdated
| raise ValueError("divisions must be sorted") | ||
| if len(value[:-1]) != len(list(unique(value[:-1]))): | ||
| raise ValueError( | ||
| "divisions must be unique, except for the last element" |
There was a problem hiding this comment.
I am a little hesitant about this assertion. It is entirely possible to have a well behaved dataframe with empty divisions. For instance if you do a slice. In that case the divisions that you get have dupes but they are still right.
There was a problem hiding this comment.
I'm not sure I follow how this would fail for empty divisions? Can you provide an example?
There was a problem hiding this comment.
Ok this is contrived:
import dask
ddf = dask.datasets.timeseries(freq="15min")
ddf = ddf.sort_values("name")
output = ddf[ddf.name == "Zelda"]The first 29 partitions are empty:
[len(p) for p in output.partitions]There was a problem hiding this comment.
I guess it's not really that it would fail for empty partitions and more what should the divisions be for empty partitions? Because right now we use duplicate values to indicate empty partitions. What can we use instead?
There was a problem hiding this comment.
Hmmm, that's a fair point, I hadn't thought of that. @gjoseph92 and I talked earlier today about this restriction and were pro adding it since it found and fixed a few bugs in the set_index code, but you're correct in that it doesn't work well for cases like this (as written). However, that code above doesn't trigger the error, since the divisions of output are still unknown, only when running the following is this check triggered:
output.divisions = output.compute_current_divisions()There is code in several places that assumes that divisions are all unique though (loc for example). We'll either want to harden those code bits to cases of duplicate divisions (doable), or provide a nicer workflow for culling duplicate divisions down (for example, the work done in compute_current_divisions would also be able to know which partitions are empty and could be dropped).
There was a problem hiding this comment.
Anyway, I think that for this specific PR dropping this check here is fine and makes sense. We should still keep the sorted checks (and I think the fixes done for the uniqueness check), but the actual assertion here is unnecessary.
There was a problem hiding this comment.
Yeah that totally makes sense. I am also ok with leaving that check in since as you say there are places where non-empty divisions are assumed. I would like to have a longer discussion at some point about what the expectations should be for divisions. I think there are perfectly valid reasons for having duplicate divisions without empty partitions just because the index has enough of one value that it makes sense to split all the rows for a particular index value into multiple partitions.
There was a problem hiding this comment.
I think there are perfectly valid reasons for having duplicate divisions
Very much agreed. I'd like to see us support this. But...
I am also ok with leaving that check in since as you say there are places where non-empty divisions are assumed
Also agreed. The truth is that right now, we don't support duplicate divisions. The current data model for DataFrame seems to assume that divisions are all unique. So adding logic to validate that that contract is upheld is a good thing, even if we want to change that contract down the line. IMO, making the current data model better-defined and better-validated will only make it easier to change that data model in the future, since you can more confidently know what the rest of the code is expecting, and how your changes need to change those expectations.
There was a problem hiding this comment.
Yeah I am happy with that agreement. Thanks for engaging with me on this.
Normal users should never need to change this property. But some user code already sets divisions this way, as does a lot of internal dask-dataframe code. The purpose of the sanity checks here is to catch errors like you had in #8802, not to prevent users from doing this at all. |
|
This also needs a merge with main since the absolute imports PR landed. |
|
I also want to revert the enforcement of unique divisions (but not the fixes applied for those). We currently do support non-unique divisions in some operations ( |
We do support non-unique divisions in many (but not all) places, this check is too strict to add right now since it would break user code.
Not sure why this wasn't failing before?
|
Provided tests pass, I think this should be good to go. |
|
Ok, there's a failing |
Adds a few sanity checks to the divisions property setter, motivated by #8802 (fixes #8802).
npartitionsThese sanity checks turned up a number of bugs that needed to be fixed.
map_partitionswithalign_dataframes=Falsewould select the wrong output divisions, any use of this was relying on the caller manually fixing things later.set_indexcould result in non-unique divisions being used in certain cases (e.g.(0, 0, 0, 1, 2, 3, 3)). We now deduplicate values in places where divisions are computed as needed. Note that this means that certain operations can result in <= the number of npartitions requested (e.g. nowdf.repartition(npartitions=npartitions).npartitions <= npartitions). I think this is still fine - before we'd have the requested number of partitions, but some partitions may be completely empty. This may be considered a fix for Whendivisionshas repeats,set_indexputs all data in the last partition instead of balancing it #8437? (cc @gjoseph92).A few category-dtype related fixes. Category dtypes are leaky abstractions, all of this feels a little hacky, but tests seem to pass.
Note that this removes the deprecation warnings added in #8393, I believe those have been released long enough now to remove them.