Skip to content

Move custom sort function logic to internal sort_values#8571

Merged
jsignell merged 8 commits intodask:mainfrom
charlesbluca:internal-custom-sort
Feb 18, 2022
Merged

Move custom sort function logic to internal sort_values#8571
jsignell merged 8 commits intodask:mainfrom
charlesbluca:internal-custom-sort

Conversation

@charlesbluca
Copy link
Copy Markdown
Member

This PR moves the handling of custom sorting functions to shuffle.sort_values, so that usages of the internal sort_values function will not have to manually specify a default sort_function and sort_function_kwargs.

cc @rjzamora who raised this concern in the downstream implementation of this in rapidsai/cudf#9789

  • Closes #xxxx
  • Tests added / passed
  • Passes pre-commit run --all-files

Comment on lines -4369 to -4377
if self.npartitions == 1:
return self.map_partitions(sort_function, **sort_kwargs)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noting that we would also need to move the single partition sort case to the internal sort_values so that custom sorting functions are supported there; this has the side effect of making it so multi-column sorting is no longer supported for single partition cases, which is niche enough that I don't think this would cause any downstream breakage

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain this a bit more, maybe add a snippet or a test demonstrating the behavior change? From reading your comment, it seems that now the test you included in #8345 would fail, but it seems to pass?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! So before if we were to try something like

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({"x": [1, 2, 3, 1, 2, 3], "y": [1, 2, 3, 4, 5, 6]})
ddf = dd.from_pandas(df, npartitions=1)

ddf.sort_values(by=["x", "y"]).compute()

Because the dataframe has a single partition, we would reach the map_partitions call in I removed above before reaching Dask's input validation checks in shuffle.sort_values, one of which checks that by is only a single column:

if not isinstance(by, str):
# support ["a"] as input
if isinstance(by, list) and len(by) == 1 and isinstance(by[0], str):
by = by[0]
else:
raise NotImplementedError(
"Dataframe only supports sorting by a single column which must "
"be passed as a string or a list of a single string.\n"
"You passed %s" % str(by)
)

Now that the map_partitions call is moved to be after these input validation checks, the above snippet would trigger the NotImplementedError, even though it is possible for the single-partition dataframe to be sorted by multiple columns.

As I'm writing this out, I realize that it should be possible to modify this check of by somewhat to allow multiple columns when the dataframe has a single partition - I will try making those changes now 🙂

it seems that now the test you included in #8345 would fail, but it seems to pass?

That test achieves multi-column sorting in a different, somewhat hacky way - by passing a single by-column to Dask's sort_values and multiple by-columns to the custom sorting function (which is essentially just a wrapper for the partition library's sort_values), Dask will perform a rough initial "sort" using the single column it has been provided before passing off the remaining work of the multi-column sort to the partition library (pandas, cuDF, etc.) that actually supports multi-column sorting.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation! That makes sense, I'm wondering if it would be good to add in something like your above snippet for ddf.sort_values(by=["x", "y"]).compute() to a test?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that's a good idea - I'll add a single partition sorting test

@ian-r-rose ian-r-rose self-requested a review January 19, 2022 17:15
@scharlottej13
Copy link
Copy Markdown
Contributor

I think #8345 and this conversation are helpful context.

@bryanwweber
Copy link
Copy Markdown
Contributor

@ian-r-rose Can you take another look here when you get a chance?

@jsignell jsignell added hygiene Improve code quality and reduce maintenance overhead needs review Needs review from a contributor. labels Feb 4, 2022
Copy link
Copy Markdown
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow review @charlesbluca!

@pytest.mark.parametrize("by", ["a", "b"])
@pytest.mark.parametrize("nelem", [10, 500])
@pytest.mark.parametrize("nparts", [1, 10])
def test_sort_values(nelem, nparts, by, ascending):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a legitimate set of test failures -- nparts is no longer a parameterized value here, and should be removed.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

if not isinstance(by, list):
by = [by]
if len(by) > 1 and df.npartitions > 1:
raise NotImplementedError(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would still be helpful to do a str check here. Naively, I tried passing in a dd.Series for by (pandas accepts this, and different parts of the dask API accept this as well, such as set_index). On main this leads to the helpful NotImplementedError, but with this change the error is much more opaque because it gets further than this validation step.

Now, it should also be possible to write this function so it takes a dd.Series, but that's probably outside the scope of this PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ended up consolidating this all into the same check, with a error message that provides a general overview of the expected input and when multi-column sorting is available

ddf = dd.from_pandas(df, npartitions=nparts)
ddf = dd.from_pandas(df, npartitions=10)

with dask.config.set(scheduler="single-threaded"):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this configuration important? I'd expect it to work regardless of the scheduler I use, and it doesn't actually seem to take effect, since we leave the config block before ever computing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is here for debugging purposes, as sorting helper functions like set_partitions_pre run in parallel on a multi-threaded scheduler and it's not possible to debug each individual run of the function unless this config option is set.

You're correct in expecting this to work without the single-threaded scheduler, happy to remove if you think that's the best option here

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on whether it should stay, though maybe a comment that it's only here for debugging purposes would be helpful

Copy link
Copy Markdown
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience on this @charlesbluca! I'm happy with where this stands now

@jsignell jsignell removed the needs review Needs review from a contributor. label Feb 18, 2022
@jsignell jsignell merged commit 2b72812 into dask:main Feb 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataframe hygiene Improve code quality and reduce maintenance overhead

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants