Move custom sort function logic to internal sort_values#8571
Move custom sort function logic to internal sort_values#8571
sort_values#8571Conversation
| if self.npartitions == 1: | ||
| return self.map_partitions(sort_function, **sort_kwargs) |
There was a problem hiding this comment.
Noting that we would also need to move the single partition sort case to the internal sort_values so that custom sorting functions are supported there; this has the side effect of making it so multi-column sorting is no longer supported for single partition cases, which is niche enough that I don't think this would cause any downstream breakage
There was a problem hiding this comment.
Could you explain this a bit more, maybe add a snippet or a test demonstrating the behavior change? From reading your comment, it seems that now the test you included in #8345 would fail, but it seems to pass?
There was a problem hiding this comment.
Sure! So before if we were to try something like
import pandas as pd
import dask.dataframe as dd
df = pd.DataFrame({"x": [1, 2, 3, 1, 2, 3], "y": [1, 2, 3, 4, 5, 6]})
ddf = dd.from_pandas(df, npartitions=1)
ddf.sort_values(by=["x", "y"]).compute()Because the dataframe has a single partition, we would reach the map_partitions call in I removed above before reaching Dask's input validation checks in shuffle.sort_values, one of which checks that by is only a single column:
dask/dask/dataframe/shuffle.py
Lines 88 to 97 in 3e7d1d0
Now that the map_partitions call is moved to be after these input validation checks, the above snippet would trigger the NotImplementedError, even though it is possible for the single-partition dataframe to be sorted by multiple columns.
As I'm writing this out, I realize that it should be possible to modify this check of by somewhat to allow multiple columns when the dataframe has a single partition - I will try making those changes now 🙂
it seems that now the test you included in #8345 would fail, but it seems to pass?
That test achieves multi-column sorting in a different, somewhat hacky way - by passing a single by-column to Dask's sort_values and multiple by-columns to the custom sorting function (which is essentially just a wrapper for the partition library's sort_values), Dask will perform a rough initial "sort" using the single column it has been provided before passing off the remaining work of the multi-column sort to the partition library (pandas, cuDF, etc.) that actually supports multi-column sorting.
There was a problem hiding this comment.
Thanks for the detailed explanation! That makes sense, I'm wondering if it would be good to add in something like your above snippet for ddf.sort_values(by=["x", "y"]).compute() to a test?
There was a problem hiding this comment.
Yeah that's a good idea - I'll add a single partition sorting test
|
I think #8345 and this conversation are helpful context. |
|
@ian-r-rose Can you take another look here when you get a chance? |
ian-r-rose
left a comment
There was a problem hiding this comment.
Sorry for the slow review @charlesbluca!
dask/dataframe/tests/test_shuffle.py
Outdated
| @pytest.mark.parametrize("by", ["a", "b"]) | ||
| @pytest.mark.parametrize("nelem", [10, 500]) | ||
| @pytest.mark.parametrize("nparts", [1, 10]) | ||
| def test_sort_values(nelem, nparts, by, ascending): |
There was a problem hiding this comment.
Looks like a legitimate set of test failures -- nparts is no longer a parameterized value here, and should be removed.
| if not isinstance(by, list): | ||
| by = [by] | ||
| if len(by) > 1 and df.npartitions > 1: | ||
| raise NotImplementedError( |
There was a problem hiding this comment.
I think it would still be helpful to do a str check here. Naively, I tried passing in a dd.Series for by (pandas accepts this, and different parts of the dask API accept this as well, such as set_index). On main this leads to the helpful NotImplementedError, but with this change the error is much more opaque because it gets further than this validation step.
Now, it should also be possible to write this function so it takes a dd.Series, but that's probably outside the scope of this PR.
There was a problem hiding this comment.
Ended up consolidating this all into the same check, with a error message that provides a general overview of the expected input and when multi-column sorting is available
| ddf = dd.from_pandas(df, npartitions=nparts) | ||
| ddf = dd.from_pandas(df, npartitions=10) | ||
|
|
||
| with dask.config.set(scheduler="single-threaded"): |
There was a problem hiding this comment.
Is this configuration important? I'd expect it to work regardless of the scheduler I use, and it doesn't actually seem to take effect, since we leave the config block before ever computing.
There was a problem hiding this comment.
This is here for debugging purposes, as sorting helper functions like set_partitions_pre run in parallel on a multi-threaded scheduler and it's not possible to debug each individual run of the function unless this config option is set.
You're correct in expecting this to work without the single-threaded scheduler, happy to remove if you think that's the best option here
There was a problem hiding this comment.
I don't have a strong opinion on whether it should stay, though maybe a comment that it's only here for debugging purposes would be helpful
ian-r-rose
left a comment
There was a problem hiding this comment.
Thanks for your patience on this @charlesbluca! I'm happy with where this stands now
This PR moves the handling of custom sorting functions to
shuffle.sort_values, so that usages of the internalsort_valuesfunction will not have to manually specify a defaultsort_functionandsort_function_kwargs.cc @rjzamora who raised this concern in the downstream implementation of this in rapidsai/cudf#9789
pre-commit run --all-files