Conversation
|
cc @wence- to keep you in the loop. |
| return pa.concat_tables(shards) | ||
| table = pa.concat_tables(shards) | ||
| df = table.to_pandas(self_destruct=True) | ||
| return df.astype(meta.dtypes) |
There was a problem hiding this comment.
This might not be the most performant version, but I also don't know if it's much of a problem. I'll run an A/B test on the existing test suite.
There was a problem hiding this comment.
My guess is it'd be good to avoid a cast is possible (at least for strings) via type_mapper= in pa.Table.to_pandas(). For example, as is, this will create object-backed string[python] columns first and then cast them to string[pyarrow].
This definitely isn't a blocker for this PR, but let's add a # TODO: comment if we don't include that logic now
There was a problem hiding this comment.
I'll create a follow-up ticket once this is merged.
There was a problem hiding this comment.
This seems like a good first pass. Thanks for putting this together so quickly. My guess is that generating the object dtype columns will slow things down, both for normal reasons, and for GIL + networking reasons.
I like that we didn't feel a need to block on this though.
There was a problem hiding this comment.
Yes this slows stuff down significantly, mapping the strings (pa.string and pa.large_string) to pd.ArrowDtype makes this zero copy and should speed up follow up as types
| left = ext.get_output_partition( | ||
| shuffle_id_left, barrier_left, output_partition | ||
| ).drop(columns=_HASH_COLUMN_NAME) | ||
| ).drop(columns=_HASH_COLUMN_NAME, errors="ignore") |
There was a problem hiding this comment.
This is inelegant, but so is adding the hash column to the meta. If anybody has strong preferences, please speak up.
There was a problem hiding this comment.
This seems fine to me. Thanks for adding the informative comment 👌
jrbourbeau
left a comment
There was a problem hiding this comment.
Nice! Thanks @hendrikmakait. Overall the changes here look good to me. I left some minor comments / questions
Have you tried this out with the example in dask/dask#10326?
| shards.append(sr.read_all()) | ||
| return pa.concat_tables(shards) | ||
| table = pa.concat_tables(shards) | ||
| df = table.to_pandas(self_destruct=True) |
There was a problem hiding this comment.
I'm a little nervous about self_destruct=True as the docstring (https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas) says it's experimental and "If you use the object after calling to_pandas with this option it will crash your program".
There was a problem hiding this comment.
I took it as a recommendation from https://arrow.apache.org/docs/python/pandas.html#reducing-memory-use-in-table-to-pandas
From what I understand, it would wreck havoc if we used table after the call to to_pandas. Given that we return after the next line, that shouldn't be a problem.
There was a problem hiding this comment.
Thanks for pointing to the extra docs. I guess I was nervous about pyarrow-backed dtypes in pandas specifically. But I would hope in that case memory wouldn't be deallocated if pandas was using it after the pyarrow -> pandas handoff. If we're running this against a few use cases that do things after a shuffle (which is sounds like we are) then we should have sufficient coverage
| return pa.concat_tables(shards) | ||
| table = pa.concat_tables(shards) | ||
| df = table.to_pandas(self_destruct=True) | ||
| return df.astype(meta.dtypes) |
There was a problem hiding this comment.
My guess is it'd be good to avoid a cast is possible (at least for strings) via type_mapper= in pa.Table.to_pandas(). For example, as is, this will create object-backed string[python] columns first and then cast them to string[pyarrow].
This definitely isn't a blocker for this PR, but let's add a # TODO: comment if we don't include that logic now
| left = ext.get_output_partition( | ||
| shuffle_id_left, barrier_left, output_partition | ||
| ).drop(columns=_HASH_COLUMN_NAME) | ||
| ).drop(columns=_HASH_COLUMN_NAME, errors="ignore") |
There was a problem hiding this comment.
This seems fine to me. Thanks for adding the informative comment 👌
distributed/shuffle/_shuffle.py
Outdated
| from dask.dataframe import DataFrame | ||
|
|
||
| check_dtype_support(df._meta) | ||
| meta = df._meta.copy() |
There was a problem hiding this comment.
I see this is so we can reuse the same meta a few lines below. I'm now wondering why we need a copy there
There was a problem hiding this comment.
I think we can skip the copy here as long as we have the other one in place.
| out = await self.offload(_) | ||
| except KeyError: | ||
| out = self.schema.empty_table().to_pandas() | ||
| out = self.meta.copy() |
There was a problem hiding this comment.
Similar question here re: copy
There was a problem hiding this comment.
I am hesitant to return several references to the same dataframe object for independent partitions since they are not immutable.
| f"col{next(counter)}": pd.array( | ||
| ["lorem ipsum"] * 100, | ||
| dtype="string[python]", |
There was a problem hiding this comment.
Thanks for adding the extra test coverage here
|
Hmm looks like there are some related test failures popping up https://github.com/dask/distributed/actions/runs/5157181512/jobs/9289216557?pr=7879 |
Yes, here are the results: before after |
Whoops, forgot to check in a couple of changes to the tests. |
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 20 files ±0 20 suites ±0 11h 29m 14s ⏱️ - 18m 24s Results for commit d2a69cd. ± Comparison against base commit 8301cb7. |
Closes #7420
Closes dask/dask#10326
@jrbourbeau: As discussed offline yesterday, here's the version that uses
metainstead of apa.Schemapre-commit run --all-files