Skip to content

Add sanity checks to divisions setter#8806

Merged
jsignell merged 7 commits intodask:mainfrom
jcrist:divisions-sanity-checks
Mar 25, 2022
Merged

Add sanity checks to divisions setter#8806
jsignell merged 7 commits intodask:mainfrom
jcrist:divisions-sanity-checks

Conversation

@jcrist
Copy link
Member

@jcrist jcrist commented Mar 14, 2022

Adds a few sanity checks to the divisions property setter, motivated by #8802 (fixes #8802).

  • New divisions are compatible with the existing npartitions
  • New divisions don't mix None and non-None values
  • New divisions are sorted (except for ordered categorical dtypes, where that's not easily doable given our current datastructures)
  • New divisions are unique except for the last value

These sanity checks turned up a number of bugs that needed to be fixed.

  • map_partitions with align_dataframes=False would select the wrong output divisions, any use of this was relying on the caller manually fixing things later.

  • set_index could result in non-unique divisions being used in certain cases (e.g. (0, 0, 0, 1, 2, 3, 3)). We now deduplicate values in places where divisions are computed as needed. Note that this means that certain operations can result in <= the number of npartitions requested (e.g. now df.repartition(npartitions=npartitions).npartitions <= npartitions). I think this is still fine - before we'd have the requested number of partitions, but some partitions may be completely empty. This may be considered a fix for When divisions has repeats, set_index puts all data in the last partition instead of balancing it #8437? (cc @gjoseph92).

  • A few category-dtype related fixes. Category dtypes are leaky abstractions, all of this feels a little hacky, but tests seem to pass.

Note that this removes the deprecation warnings added in #8393, I believe those have been released long enough now to remove them.

Adds a few sanity checks to the divisions property setter

- New divisions are compatible with the existing `npartitions`
- New divisions don't mix None and non-None values
- New divisions are sorted (except for ordered categorical dtypes, where
that's hard)
- New divisions are unique except for the last value

These sanity checks turned up a number of bugs that needed to be fixed.
- `map_partitions` with `align_dataframes=False` would select the wrong
output divisions, any use of this was relying on the caller manually
fixing things later.
- `set_index` could result in non-unique divisions being used in certain
cases (e.g. `(0, 0, 0, 1, 2, 3, 3)`). We now deduplicate values in
places where divisions are computed as needed.
- A few category-dtype related fixes. Category dtypes are leaky
abstractions, all of this feels a little hacky, but tests seem to pass.

@divisions.setter
def divisions(self, value):
if not isinstance(value, tuple):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemingly un-deprecates passing in iterables (the check and deprecation warning below is specifically for tuple types).

Ideally, I'd love to see this type hinted and have the setter accept an Iterable, and the property return a tuple, but there is some long-running and somewhat contentious discussion about whether and how to support that in mypy.

But if we take the deprecation seriously, we'd check for a tuple type specifically here and actually annotate the property appropriately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was confused what the final decision was on this. I'm happy to leave it at just tuple for the setter - I do think the methods that take a divisions kwarg should at least allow a list (and maybe a generic iterable).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 for leaving it as tuple and type hinting. No objection to letting methods take a list/other iterable. It will mean a couple of extra casts in places, though I'm generally in favor of us feeling some pain when the API allows for looser types.

mins, maxes, lens = _compute_partition_stats(df.index, allow_overlap=True, **kwargs)
if len(mins) == len(df.divisions) - 1:
df.divisions = tuple(mins) + (maxes[-1],)
df._divisions = tuple(mins) + (maxes[-1],)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why avoid validation here and elsewhere? Philosophically, who is the intended audience for the divisions setter? To me, user code should rarely be setting divisions (reading your response to #8802, I think you would agree).

Is so bad to ask that Dask internal usage go through the validation step as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We avoid the setter in cases where we are explicitly mutating the structure of df. inplace methods that may change how partitioning is done need to avoid the len check, since the number of partitions may have changed. Everywhere else we should use the setter, which is where all the other bugs were caught from.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense

@@ -1317,8 +1338,10 @@ def repartition(
For convenience if given an integer this will defer to npartitions
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This docstring should probably be updated to recommend tuple (needn't be in this PR, though).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods with a divisions kwarg should work with any iterable (or at least tuple/list), I don't think a change is needed here.

@rajeee
Copy link

rajeee commented Mar 14, 2022

import pandas as pd
import dask.dataframe as dd
import dask
def get_df(bid):
    df = pd.DataFrame({"id": [bid, bid, bid], "value": [bid+0.1, bid+0.2, bid+0.3]})
    df = df.set_index('id')
    return df

ddf = dd.from_delayed([dask.delayed(get_df)(i) for i in range(5)])
# ddf.divisions=(0, 4, 4) # ValueError: This dataframe has npartitions=5, divisions should be a tuple of length=6, got 3
ddf.divisions=(0, 1, 2, 3, 4, 4) # Required before repartitioning can be done
ddf = ddf.repartition(divisions=(0, 3, 4))
ddf.divisions=(0, 4, 4)  # **This doesn't do anything.** 
full = ddf.compute() # gives all 15 rows
part1 = ddf.get_partition(0).compute() # Gives id 0, 1, 2 (9 rows)
part2 = ddf.get_partition(1).compute() # Gives id 3, 4 (6 rows)

I was just testing this fix. Looks manually assigning divisions (line with # This doesn't do anything) still doesn't work .

@jcrist
Copy link
Member Author

jcrist commented Mar 14, 2022

ddf.divisions=(0, 4, 4) # This doesn't do anything.

That shouldn't do anything.

  • ddf already has 2 partitions, the new divisions also have 2 partitions
  • the new divisions are sorted
  • the new divisions are unique except for the final value

As far as dask is concerned, this is a fine thing to do. The only thing wrong is that they don't match the current partitioning structure, but that can't be detected without an expensive computation.

@rajeee
Copy link

rajeee commented Mar 14, 2022

ddf.divisions=(0, 4, 4) # This doesn't do anything.

That shouldn't do anything.

  • ddf already has 2 partitions, the new divisions also have 2 partitions
  • the new divisions are sorted
  • the new divisions are unique except for the final value

As far as dask is concerned, this is a fine thing to do. The only thing wrong is that they don't match the current partitioning structure, but that can't be detected without an expensive computation.

When I do the ddf.get_partition(0).compute(), shouldn't it honor the new divisions? Otherwise, what's the point of setting the divisions?

@jcrist
Copy link
Member Author

jcrist commented Mar 14, 2022

Setting the divisions manually like df.divisions = ... is something you should really rarely need to do. This doesn't change anything about the computations dask will perform (hence why ddf.get_partition(0) is still the same), it only changes the metadata associated with the current dataframe. If you're implementing a custom algorithm or something you may need to manually set the divisions like this, but if all you're looking for is changing the partitioning structure then you should be using repartition or set_index instead.

)
self._meta = meta
self._divisions = tuple(divisions)
self.divisions = tuple(divisions)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was committed accidentally while debugging why cudf was hitting a RecursionError, but I think I'll leave it (provided tests pass). The only reason I see to remove it is for performance issues, but dask dataframe already has enough overhead that this should be negligible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on more validation wherever appropriate

@rajeee
Copy link

rajeee commented Mar 14, 2022

I see. I am curious what the use case of allowing to set the divisions property is then? I can see one use case is to set initial divisions when creating df from delayed objects (ddf.divisions=(0, 1, 2, 3, 4, 4) above). But that could have been done by passing divisions argument to the from_delayed function. If there are no other use cases, I feel like allowing users to set this property creates unnecessary confusion since this change won't be reflected in the computation.

raise ValueError("divisions must be sorted")
if len(value[:-1]) != len(list(unique(value[:-1]))):
raise ValueError(
"divisions must be unique, except for the last element"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little hesitant about this assertion. It is entirely possible to have a well behaved dataframe with empty divisions. For instance if you do a slice. In that case the divisions that you get have dupes but they are still right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how this would fail for empty divisions? Can you provide an example?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok this is contrived:

import dask

ddf = dask.datasets.timeseries(freq="15min") 
ddf = ddf.sort_values("name")
output  = ddf[ddf.name == "Zelda"]

The first 29 partitions are empty:

[len(p) for p in output.partitions]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's not really that it would fail for empty partitions and more what should the divisions be for empty partitions? Because right now we use duplicate values to indicate empty partitions. What can we use instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, that's a fair point, I hadn't thought of that. @gjoseph92 and I talked earlier today about this restriction and were pro adding it since it found and fixed a few bugs in the set_index code, but you're correct in that it doesn't work well for cases like this (as written). However, that code above doesn't trigger the error, since the divisions of output are still unknown, only when running the following is this check triggered:

output.divisions = output.compute_current_divisions()

There is code in several places that assumes that divisions are all unique though (loc for example). We'll either want to harden those code bits to cases of duplicate divisions (doable), or provide a nicer workflow for culling duplicate divisions down (for example, the work done in compute_current_divisions would also be able to know which partitions are empty and could be dropped).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway, I think that for this specific PR dropping this check here is fine and makes sense. We should still keep the sorted checks (and I think the fixes done for the uniqueness check), but the actual assertion here is unnecessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that totally makes sense. I am also ok with leaving that check in since as you say there are places where non-empty divisions are assumed. I would like to have a longer discussion at some point about what the expectations should be for divisions. I think there are perfectly valid reasons for having duplicate divisions without empty partitions just because the index has enough of one value that it makes sense to split all the rows for a particular index value into multiple partitions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are perfectly valid reasons for having duplicate divisions

Very much agreed. I'd like to see us support this. But...

I am also ok with leaving that check in since as you say there are places where non-empty divisions are assumed

Also agreed. The truth is that right now, we don't support duplicate divisions. The current data model for DataFrame seems to assume that divisions are all unique. So adding logic to validate that that contract is upheld is a good thing, even if we want to change that contract down the line. IMO, making the current data model better-defined and better-validated will only make it easier to change that data model in the future, since you can more confidently know what the rest of the code is expecting, and how your changes need to change those expectations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I am happy with that agreement. Thanks for engaging with me on this.

@jcrist
Copy link
Member Author

jcrist commented Mar 14, 2022

If there are no other use cases, I feel like allowing users to set this property creates unnecessary confusion since this change won't be reflected in the computation.

Normal users should never need to change this property. But some user code already sets divisions this way, as does a lot of internal dask-dataframe code. The purpose of the sanity checks here is to catch errors like you had in #8802, not to prevent users from doing this at all.

Copy link
Member

@jsignell jsignell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have started with this, but @jcrist thanks so much for taking this on! I think this represents a nice clean up.

Copy link
Contributor

@phobson phobson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good and makes sense to me -- the failures related to parquet files are curious. I haven't investigated those.

@jsignell
Copy link
Member

This also needs a merge with main since the absolute imports PR landed.

@jcrist
Copy link
Member Author

jcrist commented Mar 18, 2022

I also want to revert the enforcement of unique divisions (but not the fixes applied for those). We currently do support non-unique divisions in some operations (loc-like things are the main operations that require unique divisions). While it'd be good to make things more uniform in the future (I'd argue we should fix loc), for now we shouldn't break old code that happened to work.

jcrist added 2 commits March 18, 2022 12:24
We do support non-unique divisions in many (but not all) places, this
check is too strict to add right now since it would break user code.
@jsignell jsignell removed the almost done Work is almost done! label Mar 18, 2022
Not sure why this wasn't failing before?
@jcrist
Copy link
Member Author

jcrist commented Mar 18, 2022

Provided tests pass, I think this should be good to go.

@jcrist
Copy link
Member Author

jcrist commented Mar 18, 2022

Ok, there's a failing loc test (divisions aren't sorted). Not sure why these tests didn't fail before, perhaps github actions was degraded and they didn't run properly? Either way, this looks like a deeper fix than I have time for today.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Setting divisions on dask dataframe collapses rows.

6 participants