Skip to content

Add ddf.compute_current_divisions to get divisions on a sorted index or column#8517

Merged
jsignell merged 5 commits intodask:mainfrom
jsignell:calculate_divisions
Mar 8, 2022
Merged

Add ddf.compute_current_divisions to get divisions on a sorted index or column#8517
jsignell merged 5 commits intodask:mainfrom
jsignell:calculate_divisions

Conversation

@jsignell
Copy link
Member

@jsignell jsignell commented Dec 23, 2021

The goal of this is to create a well documented method for calculating divisions. The idea is that people can use this method and save the result rather than recalculating divisions potentially ever time they set an index.

In my understanding #8435 describes a future state where setting index is split into separate pieces that are all explicit:

  • (if needed) shuffle so that the data is sorted by new index column
  • calculate divisions for the partitions of that new index column
  • set the index to the new column and divisions to new divisions

Here are some examples that I think would be better supported by this new flow:

Start by reading in from a single csv file. This results in a RangeIndex and no partitions

import dask
import dask.dataframe as dd

# write a single csv to a file
dask.datasets.timeseries(freq="1H").compute().to_csv("timeseries.csv")

# read in from the file and repartition
ddf = dd.read_csv("timeseries.csv").repartition(npartitions=3)
ddf.divisions
# (None, None, None, None)

Sorted index

# get the divisions of the dataframe index
divisions = ddf.ccompute_current_divisions()
# (0, 240, 480, 719)

# set the divisions on the dataframe
ddf.divisions = divisions

Sorted column

# get the divisions of timestamp
divisions = ddf.compute_current_divisions("timestamp")
# ('2000-01-01 00:00:00',
#  '2000-01-11 00:00:00',
#  '2000-01-21 00:00:00',
#  '2000-01-30 23:00:00')

# set the index and divisions on the dataframe
ddf = ddf.set_index("timestamp", divisions=divisions)

Unsorted column

# sort dataframe by new index column
ddf = ddf.sort_values("name")

# get the divisions of name
divisions = ddf.compute_current_divisions("name")
# ('Alice', 'Hannah', 'Norbert', 'Zelda')

# set the index and divisions on the dataframe
ddf = ddf.set_index("name", divisions=divisions)

has_overlap = any(mins[i] <= maxes[i - 1] for i in range(1, len(mins)))
if has_overlap:
raise ValueError(
f"Partitions must be sorted ascending by {'index' if col is None else col}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may have just waited many minutes to get this error message, I wonder if we can include any more information from our hard-earned mins and maxes to say which partitions overlap, what the overlapping values are, what you can do about it, etc.?

Also, the fact that there's a fix_overlap method makes me wonder if we should always error in this case, or if sometimes we should return the divisions with overlap but then, if you're going to set them, require you to use fix_overlap?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are definitely right. I was trying to just think about optimizing for the best-case scenario, but we should think about how to give more informative errors. I am leaning towards preferring informative errors rather than trying to resolve overlap. Especially in this method that just computes existing divisions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it would just be nice if in order to fix the overlap, you didn't have to recompute the divisions again, since mins, maxes has all the information we need.

But I'm not sure how much this matters? As a user, when would you call compute_current_divisions? I'd think only when you believe the current partitions are already sorted. So if they weren't actually sorted, that would indicate that some assumptions were actually wrong and that you needed to change your code (add a sort_values or set_index or something before), not necessarily that you'd want overlap automatically fixed?

Though I also don't know how fix_overlap works, and how much more efficient it is.

mins, maxes = compute_mins_and_maxes(df, **kwargs)

df.divisions = tuple(mins) + (list(maxes)[-1],)
overlap = [i for i in range(1, len(mins)) if mins[i] <= maxes[i - 1]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd you flip the >= to <= here? I think what you have now is right, but I'm not sure if fix_overlap is expecting the inverse for some reason.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me! Yeah I think it was wrong before and sending too much to fix_overlap. But I need to double check that theory.

Copy link
Member Author

@jsignell jsignell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking this over Gabe! I'm going to try to add some tests this week.

mins, maxes = compute_mins_and_maxes(df, **kwargs)

df.divisions = tuple(mins) + (list(maxes)[-1],)
overlap = [i for i in range(1, len(mins)) if mins[i] <= maxes[i - 1]]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding me! Yeah I think it was wrong before and sending too much to fix_overlap. But I need to double check that theory.

has_overlap = any(mins[i] <= maxes[i - 1] for i in range(1, len(mins)))
if has_overlap:
raise ValueError(
f"Partitions must be sorted ascending by {'index' if col is None else col}"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are definitely right. I was trying to just think about optimizing for the best-case scenario, but we should think about how to give more informative errors. I am leaning towards preferring informative errors rather than trying to resolve overlap. Especially in this method that just computes existing divisions.

@jsignell jsignell force-pushed the calculate_divisions branch from acf95eb to c4a2cf3 Compare February 23, 2022 17:15
@github-actions github-actions bot removed the io label Feb 23, 2022
if (
sorted(mins) != list(mins)
or sorted(maxes) != list(maxes)
or any(a > b for a, b in zip(mins, maxes))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this line really did anything meaningful. I can't imagine how the min of an array could ever be greater than the max.

or any(a > b for a, b in zip(mins, maxes))
sorted(non_empty_mins) != non_empty_mins
or sorted(non_empty_maxes) != non_empty_maxes
or any(a < b for a, b in zip(non_empty_mins[1:], non_empty_maxes[:-1]))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line checks that the min of of the next partition is always greater than or equal to the max of the partition before. So it catches cases like:

(1, 4)
(2, 5)

maxes = remove_nans(maxes)

non_empty_mins = [m for m, l in zip(mins, lens) if l != 0]
non_empty_maxes = [m for m, l in zip(maxes, lens) if l != 0]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about empty partitions when computing current divisions because we will never be repartitioning. We guarantee that:

input.npartitions + 1 == len(divisions)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that functions calling _compute_partition_stats(allow_overlap=True) are prepared for the fact that the number of mins/maxes might not match the number of divisions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's the idea. This seemed like the safest way to do the right thing when fixing overlap.

@jsignell
Copy link
Member Author

Ok @gjoseph92 I am starting to be pretty happy with this.

@jsignell jsignell requested a review from gjoseph92 February 23, 2022 22:20
@jsignell jsignell added bug Something is broken feature Something is missing labels Feb 24, 2022
@jsignell jsignell added the needs review Needs review from a contributor. label Feb 24, 2022
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good to me as well!

):
raise ValueError(
"Partitions have overlapping values, so divisions are non-unique."
"Use `set_index(sorted=True)` with no `divisions` to allow dask to fix the overlap."
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is no divisions necessary? Or would it be possible to print the divisions you should use in this error message, with duplicates removed, so that you can save having to do a compute pass over the data again?

If the divisions are sorted properly, but just have duplicates, then I think the only thing we can do right now is combine all the duplicates into one partition. Seems like the feeling in #8437 was that we can't handle duplicate divisions right now, so there's no other choice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can either set sorted=True or you can set divisions. If you set both then we expect divisions to be exactly the right size for the current npartitions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. You only get to go down the fix_overlap code path if you don't pass in divisions. And you're thinking that doing that—even though it requires eagerly re-computing divisions again—will ultimately be more performant than a shuffle-based set_index, because it gets to use the fix_overlap process.

I'm not sure about that; I feel like the fact that set_index(divisions=...) would be fully lazy whereas set_index(sorted=True) would require re-computing the statistics you just computed might make the shuffle more worthwhile, even though you don't get to benefit from the fact that it's already sorted.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, why make allow_overlap=False only warn?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes sense about set_index(divisions=... being potentially better. When I was comparing them I was trying to minimize the number of tasks, but thinking it over this morning that clearly doesn't make sense - we are ok having more tasks if it means that we can be lazy.

Also, why make allow_overlap=False only warn?

I wanted to be able to output the values even if they hit this scenario. Otherwise how will they know what to put into the set_index(divisions=...)? I'm fine to switch it back if you prefer to error. There is a test case that demonstrates a potentially confusing scenario:

    A = pd.DataFrame({"key": [1, 2, 3, 4, 4, 5, 6, 7], "value": list("abcd" * 2)})
    a = dd.from_pandas(A, npartitions=2)
    with pytest.warns(UserWarning, match="Partitions have overlapping values"):
        divisions = a.compute_current_divisions("key")

You'll notice that the "key" is sorted, it's just got an unfortunate duplicate right at the partition split so you end up with [1, 2, 3, 4] in the first partition and [4, 5, 6, 7] in the second. This counts as an overlap because if we just set (0, 4, 7) as the divisions we will drop rows in chained operations because dask won't know about the 4 in the first partition.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see. I think that's a good reason to keep it as a warning then. And then actually setting those divisions anywhere (or via set_index) should give you an error.

maxes = remove_nans(maxes)

non_empty_mins = [m for m, l in zip(mins, lens) if l != 0]
non_empty_maxes = [m for m, l in zip(maxes, lens) if l != 0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the idea here is that functions calling _compute_partition_stats(allow_overlap=True) are prepared for the fact that the number of mins/maxes might not match the number of divisions?

@jorloplaz
Copy link
Contributor

jorloplaz commented Feb 25, 2022

I don't know if it may come in useful or not, but I worked on a private method to calculate "appropriate" divisions. The idea is that, in order for each partition to imply afterwards a similar computational load (assuming that we have a cluster with as many machines as partitions), there are 2 possibilities if we have an index/column that can have duplicate values:

  • Partitioning by mere distinct values, that is, making each partition have a number of distinct values which is as equal as possible (but without caring about the number of rows). This guarantees that a groupby call later has a similar number of groups, but can imply a very different number of rows in each partition.
  • Partitioning by a similar number of rows, that is, just accounting for how many rows there are in each partition so that they are as equally sized as possible (but without caring about how many distinct values these rows come from). This guarantees a similar amount of memory being used in each partition, but may result in a very different number of distinct (i.e., unique) values in each partition.

Here's the code. The 1st option implies calls unique, while the 2nd one calls value_counts. We are doing this in the index, but it could be done for a column as well.

It's also interesting to note that the 2 above strategies give the same result for an index/column whose values are all unique.

def compute_divisions(df: DaskDataFrame) -> List[Any]:
        """Internal function to compute divisions as a list"""
        # Number of partitions to use
        part_num = df.npartitions

        # Branch depending on strategy
        if strategy == "distinct":
            # Sort distinct values
            sorted_ind = sorted(df.index.unique().compute())
            aux = len(sorted_ind)
            if aux == 0:  # empty dataframe -> nothing to do
                return []
            # Compute partition size if divisions were perfectly even
            part_size = max(1, int(ceil(aux / part_num)))  # try to divide equally, if partitions > values set to 1
            # Compute divisions
            divisions = sorted_ind[::part_size]  # go over distinct values using part_size as step, giving divisions...
            divisions.append(sorted_ind[-1])  # except last one

        # By ops
        else:
            # Count the number of times each element is present
            index_counts: PandasSeries = df.index.value_counts().compute().sort_index()
            aux = index_counts.sum()
            if aux == 0:  # empty dataframe -> nothing to do
                return []
            # Compute partition size if divisions were perfectly even
            part_size = max(1, aux // part_num)  # try to divide equally, if more partitions than distinct values set to 1
            # Compute divisions
            split_point, size, divisions = None, 0, []
            for ind, count in index_counts.items():  # go over elements and counts
                if split_point is None:  # new split point must be appended
                    divisions.append(ind)  # split point is current element
                    split_point = ind  # new partition can start
                size += count  # add count to current partition size
                if size >= part_size:  # current partition size overflowed theoretical limit
                    size = 0  # restore size as new partition starts
                    split_point = None  # mark this as None so next one appended is the beginning of next partition
            # Append last division if not already present
            if split_point != ind:
                divisions.append(ind)

        # Done
        return divisions

@jsignell jsignell removed the needs review Needs review from a contributor. label Feb 25, 2022
@jsignell jsignell changed the title Add ddf.calculate_divisions to get divisions on a sorted index or column Add ddf.compute_current_divisions to get divisions on a sorted index or column Feb 25, 2022
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
@jsignell
Copy link
Member Author

@jorloplaz that is very interesting. Thanks for sharing! That will be the next step probably, explicitly computing not the current divisions but the ideal divisions given different scenarios like you describe.

@gjoseph92
Copy link
Collaborator

I worked on a private method to calculate "appropriate" divisions

I might be misunderstanding you @jorloplaz, but this seems like the same purpose as the partition_quantiles method, used internally by set_index in order to pick new "good" divisions. It seems like your method might be tuned for some more specific cases, but you might find the docstring in partitionquantiles.py interesting. After this PR gets merged, we're discussing exposing this partition_quantiles logic through a compute_good_divisions() method (#8435 (comment)).

@jorloplaz
Copy link
Contributor

jorloplaz commented Mar 1, 2022

I'll take a look @gjoseph92 to partition_quantiles.py, thanks for pointing to that. By the way, there's a problem with that function and ExtensionDtypes, as it seems to rely on pure numpy to calculate them, while it should delegate on pandas for that. We're discussing this issue in #5720 . Maybe you know what the exact problem is?

@jsignell
Copy link
Member Author

jsignell commented Mar 8, 2022

Ok I am going to merge this @gjoseph92 🤞 😬

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something is broken dataframe feature Something is missing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

set_index with sorted=True can drop rows in dataframes with empty partitions

3 participants