Use map_partitions (Blockwise) in to_parquet#8487
Conversation
|
Is anyone interested in reviewing this? :D (cc @jrbourbeau, @jsignell, @ian-r-rose, @gjoseph92) |
dask/dataframe/io/parquet/core.py
Outdated
| kwargs_pass, | ||
| ), | ||
| token="to-parquet-" | ||
| + tokenize( |
There was a problem hiding this comment.
What if you implement __dask_tokenize__ on ToParquetFunctionWrapper? Then I'd imagine the tokenization logic already in map_partitions would just work.
There was a problem hiding this comment.
If we don't pass in token= explicitly, then map_partitions will produce a layer name of the form: f"{funcname(func)}-{tokenize(func, meta, *args, **kwargs)}". Therefore, we would also need to modify the ToParquetFunctionWrapper name to be "to-parquet". Is it worthwhile to add these defintions to ToParquetFunctionWrapper when we can just establish the name here?
There was a problem hiding this comment.
Wait, the token= argument to map_partitions is very much mis-named. It should really be called name:
Lines 6028 to 6037 in a504115
With what you have here, you'll get two tokens on the final name (map_partitions will append one automatically). I think just token="to-parquet" and a __dask_tokenize__ method on ToParquetFunctionWrapper (and BlockwiseDep) is what you want here.
There was a problem hiding this comment.
Wait, the token= argument to map_partitions is very much mis-named. It should really be called name
Agree
With what you have here, you'll get two tokens on the final name
Agree - I didn't realize this before, and I certainly don't like this.
I think just token="to-parquet" and a dask_tokenize method on ToParquetFunctionWrapper (and BlockwiseDep) is what you want here.
If we specify token="to-parquet", then I don't think ToParquetFunctionWrapper.__dask_tokenize__ will ever be used. In order to avoid making changes in map_partitions, I vote that we just define the __repr__ of ToParquetFunctionWrapper to be "to-parquet", and define __dask_tokenize__ (as you suggested).
Thank for all the wonderful reviewing @gjoseph92 ! I hadn't really considered #8453 to be a blocker, since the primary purpose of that PR (in my mind) is the
My interest in leaving Regarding (2): I have no problem with removing the deprecation warning and just letting old behavior fail. However, I seem to remember @ian-r-rose telling me that there is at least one |
|
Personally, I'd prefer merging #8453 as a separate PR just to keep the git history cleaner. Nice to be able to refer back to the PR that introduced a change and figure out the intention for it. |
|
@martindurant - Do you know what's going on with the |
| @staticmethod | ||
| def write_metadata(parts, fmd, fs, path, append=False, **kwargs): | ||
| @classmethod | ||
| def write_metadata(cls, parts, meta, fs, path, append=False, **kwargs): |
There was a problem hiding this comment.
Why the change? cls appears unused
There was a problem hiding this comment.
Yeah - It was just defined as a classmethod in utils.py, so I changed this to agree - but I guess it could be static if cls isn't used.
pandas 1.4.0... #8626 |
|
Thanks for the reviews @gjoseph92 ! Just to avoid confusion - I'll probably wait to merge this until after the fastparquet failures are passing on main (and here). |
|
@rjzamora looks like tons of parquet tests failed only on windows 3.7. I'd assume this is all flakyness (workers couldn't start or something?), but slightly concerning to me that when that happened, the computation succeeded and just returned an unexpected result. I wonder if there's a race condition or something in those tests, and we could make them more resilient? |
Yeah - I haven't looked carefully through the windows 3.7 failures since many of those tests have been failing on all PRs for a while, but it does make sense to investigate ways to avoid flakyness and especially "silent" failures. @gjoseph92 & @jrbourbeau - Is there any concern with merging this particular PR (given that the CI failures are the same here as elsewhere)? |
|
I am not concerned about merging this PR with failures that match those on main. |
Sounds good - I don't think anything is likely to change here, so I'll get it out of the way. |
This PR adds a new[EDIT: This PR now usesDataFrameOutputLayerand uses it to move the data-writing component ofto_parquetinto a properHighLevelGraphLayerclassmap_partitions(witha newpartition_infoBlockIndex(BlockwiseDep)class) in lieu of a newDataFrameOutputLayer]. Note that this PR required the column-projection changes from #8453, because the current behavior inmainis actually a bug.With these changes, the result of
ddf.visualize(optimize_graph=True)(from this reproducer) is now:(The "read-parquet", "rename", "reset-index", and "to-parquet" tasks are all fused into "to-parquet")
pre-commit run --all-files