Add ddf.compute_current_divisions to get divisions on a sorted index or column#8517
Add ddf.compute_current_divisions to get divisions on a sorted index or column#8517
ddf.compute_current_divisions to get divisions on a sorted index or column#8517Conversation
dask/dataframe/shuffle.py
Outdated
| has_overlap = any(mins[i] <= maxes[i - 1] for i in range(1, len(mins))) | ||
| if has_overlap: | ||
| raise ValueError( | ||
| f"Partitions must be sorted ascending by {'index' if col is None else col}" |
There was a problem hiding this comment.
You may have just waited many minutes to get this error message, I wonder if we can include any more information from our hard-earned mins and maxes to say which partitions overlap, what the overlapping values are, what you can do about it, etc.?
Also, the fact that there's a fix_overlap method makes me wonder if we should always error in this case, or if sometimes we should return the divisions with overlap but then, if you're going to set them, require you to use fix_overlap?
There was a problem hiding this comment.
Yes you are definitely right. I was trying to just think about optimizing for the best-case scenario, but we should think about how to give more informative errors. I am leaning towards preferring informative errors rather than trying to resolve overlap. Especially in this method that just computes existing divisions.
There was a problem hiding this comment.
Yeah, it would just be nice if in order to fix the overlap, you didn't have to recompute the divisions again, since mins, maxes has all the information we need.
But I'm not sure how much this matters? As a user, when would you call compute_current_divisions? I'd think only when you believe the current partitions are already sorted. So if they weren't actually sorted, that would indicate that some assumptions were actually wrong and that you needed to change your code (add a sort_values or set_index or something before), not necessarily that you'd want overlap automatically fixed?
Though I also don't know how fix_overlap works, and how much more efficient it is.
dask/dataframe/shuffle.py
Outdated
| mins, maxes = compute_mins_and_maxes(df, **kwargs) | ||
|
|
||
| df.divisions = tuple(mins) + (list(maxes)[-1],) | ||
| overlap = [i for i in range(1, len(mins)) if mins[i] <= maxes[i - 1]] |
There was a problem hiding this comment.
Why'd you flip the >= to <= here? I think what you have now is right, but I'm not sure if fix_overlap is expecting the inverse for some reason.
There was a problem hiding this comment.
Thanks for reminding me! Yeah I think it was wrong before and sending too much to fix_overlap. But I need to double check that theory.
jsignell
left a comment
There was a problem hiding this comment.
Thanks for looking this over Gabe! I'm going to try to add some tests this week.
dask/dataframe/shuffle.py
Outdated
| mins, maxes = compute_mins_and_maxes(df, **kwargs) | ||
|
|
||
| df.divisions = tuple(mins) + (list(maxes)[-1],) | ||
| overlap = [i for i in range(1, len(mins)) if mins[i] <= maxes[i - 1]] |
There was a problem hiding this comment.
Thanks for reminding me! Yeah I think it was wrong before and sending too much to fix_overlap. But I need to double check that theory.
dask/dataframe/shuffle.py
Outdated
| has_overlap = any(mins[i] <= maxes[i - 1] for i in range(1, len(mins))) | ||
| if has_overlap: | ||
| raise ValueError( | ||
| f"Partitions must be sorted ascending by {'index' if col is None else col}" |
There was a problem hiding this comment.
Yes you are definitely right. I was trying to just think about optimizing for the best-case scenario, but we should think about how to give more informative errors. I am leaning towards preferring informative errors rather than trying to resolve overlap. Especially in this method that just computes existing divisions.
17924a0 to
429f7cb
Compare
acf95eb to
c4a2cf3
Compare
| if ( | ||
| sorted(mins) != list(mins) | ||
| or sorted(maxes) != list(maxes) | ||
| or any(a > b for a, b in zip(mins, maxes)) |
There was a problem hiding this comment.
I don't think this line really did anything meaningful. I can't imagine how the min of an array could ever be greater than the max.
dask/dataframe/shuffle.py
Outdated
| or any(a > b for a, b in zip(mins, maxes)) | ||
| sorted(non_empty_mins) != non_empty_mins | ||
| or sorted(non_empty_maxes) != non_empty_maxes | ||
| or any(a < b for a, b in zip(non_empty_mins[1:], non_empty_maxes[:-1])) |
There was a problem hiding this comment.
This line checks that the min of of the next partition is always greater than or equal to the max of the partition before. So it catches cases like:
(1, 4)
(2, 5)
| maxes = remove_nans(maxes) | ||
|
|
||
| non_empty_mins = [m for m, l in zip(mins, lens) if l != 0] | ||
| non_empty_maxes = [m for m, l in zip(maxes, lens) if l != 0] |
There was a problem hiding this comment.
We don't care about empty partitions when computing current divisions because we will never be repartitioning. We guarantee that:
input.npartitions + 1 == len(divisions)
There was a problem hiding this comment.
So the idea here is that functions calling _compute_partition_stats(allow_overlap=True) are prepared for the fact that the number of mins/maxes might not match the number of divisions?
There was a problem hiding this comment.
yeah that's the idea. This seemed like the safest way to do the right thing when fixing overlap.
|
Ok @gjoseph92 I am starting to be pretty happy with this. |
gjoseph92
left a comment
There was a problem hiding this comment.
Looking good to me as well!
dask/dataframe/shuffle.py
Outdated
| ): | ||
| raise ValueError( | ||
| "Partitions have overlapping values, so divisions are non-unique." | ||
| "Use `set_index(sorted=True)` with no `divisions` to allow dask to fix the overlap." |
There was a problem hiding this comment.
Is no divisions necessary? Or would it be possible to print the divisions you should use in this error message, with duplicates removed, so that you can save having to do a compute pass over the data again?
If the divisions are sorted properly, but just have duplicates, then I think the only thing we can do right now is combine all the duplicates into one partition. Seems like the feeling in #8437 was that we can't handle duplicate divisions right now, so there's no other choice.
There was a problem hiding this comment.
I think you can either set sorted=True or you can set divisions. If you set both then we expect divisions to be exactly the right size for the current npartitions.
There was a problem hiding this comment.
Ah I see. You only get to go down the fix_overlap code path if you don't pass in divisions. And you're thinking that doing that—even though it requires eagerly re-computing divisions again—will ultimately be more performant than a shuffle-based set_index, because it gets to use the fix_overlap process.
I'm not sure about that; I feel like the fact that set_index(divisions=...) would be fully lazy whereas set_index(sorted=True) would require re-computing the statistics you just computed might make the shuffle more worthwhile, even though you don't get to benefit from the fact that it's already sorted.
There was a problem hiding this comment.
Also, why make allow_overlap=False only warn?
There was a problem hiding this comment.
Yeah that makes sense about set_index(divisions=... being potentially better. When I was comparing them I was trying to minimize the number of tasks, but thinking it over this morning that clearly doesn't make sense - we are ok having more tasks if it means that we can be lazy.
Also, why make
allow_overlap=Falseonly warn?
I wanted to be able to output the values even if they hit this scenario. Otherwise how will they know what to put into the set_index(divisions=...)? I'm fine to switch it back if you prefer to error. There is a test case that demonstrates a potentially confusing scenario:
A = pd.DataFrame({"key": [1, 2, 3, 4, 4, 5, 6, 7], "value": list("abcd" * 2)})
a = dd.from_pandas(A, npartitions=2)
with pytest.warns(UserWarning, match="Partitions have overlapping values"):
divisions = a.compute_current_divisions("key")You'll notice that the "key" is sorted, it's just got an unfortunate duplicate right at the partition split so you end up with [1, 2, 3, 4] in the first partition and [4, 5, 6, 7] in the second. This counts as an overlap because if we just set (0, 4, 7) as the divisions we will drop rows in chained operations because dask won't know about the 4 in the first partition.
There was a problem hiding this comment.
Ahh I see. I think that's a good reason to keep it as a warning then. And then actually setting those divisions anywhere (or via set_index) should give you an error.
| maxes = remove_nans(maxes) | ||
|
|
||
| non_empty_mins = [m for m, l in zip(mins, lens) if l != 0] | ||
| non_empty_maxes = [m for m, l in zip(maxes, lens) if l != 0] |
There was a problem hiding this comment.
So the idea here is that functions calling _compute_partition_stats(allow_overlap=True) are prepared for the fact that the number of mins/maxes might not match the number of divisions?
|
I don't know if it may come in useful or not, but I worked on a private method to calculate "appropriate" divisions. The idea is that, in order for each partition to imply afterwards a similar computational load (assuming that we have a cluster with as many machines as partitions), there are 2 possibilities if we have an index/column that can have duplicate values:
Here's the code. The 1st option implies calls It's also interesting to note that the 2 above strategies give the same result for an index/column whose values are all unique. def compute_divisions(df: DaskDataFrame) -> List[Any]:
"""Internal function to compute divisions as a list"""
# Number of partitions to use
part_num = df.npartitions
# Branch depending on strategy
if strategy == "distinct":
# Sort distinct values
sorted_ind = sorted(df.index.unique().compute())
aux = len(sorted_ind)
if aux == 0: # empty dataframe -> nothing to do
return []
# Compute partition size if divisions were perfectly even
part_size = max(1, int(ceil(aux / part_num))) # try to divide equally, if partitions > values set to 1
# Compute divisions
divisions = sorted_ind[::part_size] # go over distinct values using part_size as step, giving divisions...
divisions.append(sorted_ind[-1]) # except last one
# By ops
else:
# Count the number of times each element is present
index_counts: PandasSeries = df.index.value_counts().compute().sort_index()
aux = index_counts.sum()
if aux == 0: # empty dataframe -> nothing to do
return []
# Compute partition size if divisions were perfectly even
part_size = max(1, aux // part_num) # try to divide equally, if more partitions than distinct values set to 1
# Compute divisions
split_point, size, divisions = None, 0, []
for ind, count in index_counts.items(): # go over elements and counts
if split_point is None: # new split point must be appended
divisions.append(ind) # split point is current element
split_point = ind # new partition can start
size += count # add count to current partition size
if size >= part_size: # current partition size overflowed theoretical limit
size = 0 # restore size as new partition starts
split_point = None # mark this as None so next one appended is the beginning of next partition
# Append last division if not already present
if split_point != ind:
divisions.append(ind)
# Done
return divisions |
ddf.calculate_divisions to get divisions on a sorted index or columnddf.compute_current_divisions to get divisions on a sorted index or column
Co-authored-by: Gabe Joseph <gjoseph92@gmail.com>
|
@jorloplaz that is very interesting. Thanks for sharing! That will be the next step probably, explicitly computing not the current divisions but the ideal divisions given different scenarios like you describe. |
I might be misunderstanding you @jorloplaz, but this seems like the same purpose as the |
|
I'll take a look @gjoseph92 to |
|
Ok I am going to merge this @gjoseph92 🤞 😬 |
set_index? #8435pre-commit run --all-filesThe goal of this is to create a well documented method for calculating divisions. The idea is that people can use this method and save the result rather than recalculating divisions potentially ever time they set an index.
In my understanding #8435 describes a future state where setting index is split into separate pieces that are all explicit:
Here are some examples that I think would be better supported by this new flow:
Start by reading in from a single csv file. This results in a RangeIndex and no partitions
Sorted index
Sorted column
Unsorted column