Skip to content

When divisions has repeats, set_index puts all data in the last partition instead of balancing it #8437

@gjoseph92

Description

@gjoseph92

Reported in https://stackoverflow.com/a/70178087/17100540 by @DahnJ.

Given imbalanced data, you want the same value to be split across multiple output partitions:

In [1]: import dask.dataframe as dd
In [2]: import pandas as pd
In [3]: df = pd.DataFrame({"A": [0] * 10, "B": range(10)})
In [4]: ddf = dd.from_pandas(df, npartitions=5)
In [5]: s = ddf.set_index("A")
In [6]: s.divisions
(0, 0, 0, 0, 0, 0)

However, when the shuffle actually happens, all the data ends up in the last possible partition:

In [6]: dask.compute(*s.to_delayed())
Out[6]: 
(Empty DataFrame
 Columns: [B]
 Index: [],
 Empty DataFrame
 Columns: [B]
 Index: [],
 Empty DataFrame
 Columns: [B]
 Index: [],
 Empty DataFrame
 Columns: [B]
 Index: [],
    B
 A   
 0  2
 0  3
 0  4
 0  5
 0  8
 0  9
 0  0
 0  1
 0  6
 0  7)

EDIT: I realized that although set_index will calculate (0, 0, 0, 0, 0, 0) as good divisions, if you pass in divisions=(0, 0, 0, 0, 0, 0), you'll get ValueError: New division must be unique, except for the last element. So clearly there's some disagreement about whether repeated values are even valid divisions (xref #8393, cc @charlesbluca). This, plus @SultanOrazbayev's last code snippet on SO, makes me think DataFrame generally doesn't support duplicate divisions, and it's simply a bug that set_index doesn't deduplicate the output of partition_quantiles. That said, I think we should support duplicate divisions, since it's a reasonable thing to need when you have imbalanced data.

The problem is in the set_partitions_pre step of set_index, where we calculate which output partition number each row should belong to. We call searchsorted on divisions (as a Series); s is the values we're reindexing by:

partitions = divisions.searchsorted(s, side="right") - 1

Because of side="right", when there are duplicate values in divisions, searchsorted always returns the index of the last duplicate—hence why all the data ends up in the last possible partition. With side="left", it would be the other way around (all data in the first partition).

With some clever counting, I think we could deal with this while still using searchsorted. We'd probably want to remove duplicates from divisions and keep an auxiliary list of how many duplicates each division has (basically compute a run-length encoding). Then, if the output partition for a row has duplicates, we pick randomly between the N options? Or something like that.

The way the output partition is selected from N options is probably the trickiest part. Assuming that divisions represents an approximately uniform partitioning of the data, then I think picking from the N options at random would maintain that desired distribution.

Though I'm not sure how to handle the "edge" partitions. For divisions=(0, 1, 1, 2), a value of 1 could go to any of the three output partitions. However, the first and last partitions will also get 0s and 2s, respectively, whereas the middle partition will only get 1s. So to maintain uniform partition sizes, you'd want to send more 1s to the middle partition than the others—basically bias the random selection towards that? The problem is, you don't know how much to do so, because you don't know how many 1s there are relative to 0s and 2s, or even how many other values there are between 0 and 1.

Environment:

  • Dask version: a5aecac
  • Python version: 3.8.8
  • Operating System: macOS
  • Install method (conda, pip, source): source

Metadata

Metadata

Assignees

No one assigned

    Labels

    dataframeneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions