Skip to content

Use map_partitions (Blockwise) in to_parquet#8487

Merged
rjzamora merged 27 commits intodask:mainfrom
rjzamora:dataframe-io-layer-write
Feb 2, 2022
Merged

Use map_partitions (Blockwise) in to_parquet#8487
rjzamora merged 27 commits intodask:mainfrom
rjzamora:dataframe-io-layer-write

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Dec 14, 2021

This PR adds a new DataFrameOutputLayer and uses it to move the data-writing component of to_parquet into a proper HighLevelGraph Layer class [EDIT: This PR now uses map_partitions (with partition_info a new BlockIndex(BlockwiseDep) class) in lieu of a new DataFrameOutputLayer]. Note that this PR required the column-projection changes from #8453, because the current behavior in main is actually a bug.

With these changes, the result of ddf.visualize(optimize_graph=True) (from this reproducer) is now:

Screen Shot 2021-12-14 at 4 07 35 PM

(The "read-parquet", "rename", "reset-index", and "to-parquet" tasks are all fused into "to-parquet")

@rjzamora rjzamora changed the title Use DataFrameOutputLayer in to_parquet Use map_partitions (Blockwise) in to_parquet Dec 20, 2021
@rjzamora rjzamora added the highlevelgraph Issues relating to HighLevelGraphs. label Jan 19, 2022
@rjzamora
Copy link
Member Author

Is anyone interested in reviewing this? :D (cc @jrbourbeau, @jsignell, @ian-r-rose, @gjoseph92)

@gjoseph92 gjoseph92 self-requested a review January 19, 2022 23:48
@ian-r-rose ian-r-rose self-requested a review January 19, 2022 23:57
kwargs_pass,
),
token="to-parquet-"
+ tokenize(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you implement __dask_tokenize__ on ToParquetFunctionWrapper? Then I'd imagine the tokenization logic already in map_partitions would just work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't pass in token= explicitly, then map_partitions will produce a layer name of the form: f"{funcname(func)}-{tokenize(func, meta, *args, **kwargs)}". Therefore, we would also need to modify the ToParquetFunctionWrapper name to be "to-parquet". Is it worthwhile to add these defintions to ToParquetFunctionWrapper when we can just establish the name here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, the token= argument to map_partitions is very much mis-named. It should really be called name:

dask/dask/dataframe/core.py

Lines 6028 to 6037 in a504115

name = kwargs.pop("token", None)
parent_meta = kwargs.pop("parent_meta", None)
assert callable(func)
if name is not None:
token = tokenize(meta, *args, **kwargs)
else:
name = funcname(func)
token = tokenize(func, meta, *args, **kwargs)
name = f"{name}-{token}"

With what you have here, you'll get two tokens on the final name (map_partitions will append one automatically). I think just token="to-parquet" and a __dask_tokenize__ method on ToParquetFunctionWrapper (and BlockwiseDep) is what you want here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, the token= argument to map_partitions is very much mis-named. It should really be called name

Agree

With what you have here, you'll get two tokens on the final name

Agree - I didn't realize this before, and I certainly don't like this.

I think just token="to-parquet" and a dask_tokenize method on ToParquetFunctionWrapper (and BlockwiseDep) is what you want here.

If we specify token="to-parquet", then I don't think ToParquetFunctionWrapper.__dask_tokenize__ will ever be used. In order to avoid making changes in map_partitions, I vote that we just define the __repr__ of ToParquetFunctionWrapper to be "to-parquet", and define __dask_tokenize__ (as you suggested).

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny nits, but I think this is ready, right? Just blocked by #8453?

@rjzamora
Copy link
Member Author

I think this is ready, right? Just blocked by #8453?

Thank for all the wonderful reviewing @gjoseph92 ! I hadn't really considered #8453 to be a blocker, since the primary purpose of that PR (in my mind) is the creation_info feature. However, I now see that that PR includes changes that we have not really "agreed on," but have been copied into this PR. Perhaps we should just agree on the pertinent changes here:

  1. Does it make sense for DataFrameLayer to still exist now that it only acting as a label?
  2. Do we really need to rease a FutureWarning when the "old" behavior of project_columns is encountered?

My interest in leaving DataFrameLayer around for now (1), is that I do intend to use it to store necessary information for multi-layer column projection (and evenutally predicate pushdown). For example, we may use that class to define a base required_columns method designed to return the input columns required to produce a specific set of output columns (for that specific Layer). This would be similar to project_columns, but we would be returning a set of column names, rahter than a new Layer.

Regarding (2): I have no problem with removing the deprecation warning and just letting old behavior fail. However, I seem to remember @ian-r-rose telling me that there is at least one DataFrameIOLayer definition in down-stream code.

@gjoseph92
Copy link
Collaborator

Personally, I'd prefer merging #8453 as a separate PR just to keep the git history cleaner. Nice to be able to refer back to the PR that introduced a change and figure out the intention for it.

@rjzamora
Copy link
Member Author

@martindurant - Do you know what's going on with the test_parquet.py::test_timestamp96 CI failures? Is this maybe being cause by this PR (or my recent changes)?

@staticmethod
def write_metadata(parts, fmd, fs, path, append=False, **kwargs):
@classmethod
def write_metadata(cls, parts, meta, fs, path, append=False, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change? cls appears unused

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - It was just defined as a classmethod in utils.py, so I changed this to agree - but I guess it could be static if cls isn't used.

@martindurant
Copy link
Member

Do you know what's going on with the test_parquet.py::test_timestamp96

pandas 1.4.0... #8626

Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rjzamora nice work, this seems good to me and will be a big improvement for users.

I did just realize we should probably open issues for converting the other to_* methods to use blockwise similarly.

@rjzamora
Copy link
Member Author

Thanks for the reviews @gjoseph92 !

Just to avoid confusion - I'll probably wait to merge this until after the fastparquet failures are passing on main (and here).

@gjoseph92
Copy link
Collaborator

@rjzamora looks like tons of parquet tests failed only on windows 3.7. I'd assume this is all flakyness (workers couldn't start or something?), but slightly concerning to me that when that happened, the computation succeeded and just returned an unexpected result. I wonder if there's a race condition or something in those tests, and we could make them more resilient?

@rjzamora
Copy link
Member Author

looks like tons of parquet tests failed only on windows 3.7. I'd assume this is all flakyness (workers couldn't start or something?), but slightly concerning to me that when that happened, the computation succeeded and just returned an unexpected result. I wonder if there's a race condition or something in those tests, and we could make them more resilient?

Yeah - I haven't looked carefully through the windows 3.7 failures since many of those tests have been failing on all PRs for a while, but it does make sense to investigate ways to avoid flakyness and especially "silent" failures.

@gjoseph92 & @jrbourbeau - Is there any concern with merging this particular PR (given that the CI failures are the same here as elsewhere)?

@jsignell
Copy link
Member

jsignell commented Feb 2, 2022

I am not concerned about merging this PR with failures that match those on main.

@rjzamora
Copy link
Member Author

rjzamora commented Feb 2, 2022

I am not concerned about merging this PR with failures that match those on main.

Sounds good - I don't think anything is likely to change here, so I'll get it out of the way.
(It will also be nice to get task fusion in to_parquet!)

@rjzamora rjzamora merged commit d98c1dd into dask:main Feb 2, 2022
@rjzamora rjzamora deleted the dataframe-io-layer-write branch February 2, 2022 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataframe highlevelgraph Issues relating to HighLevelGraphs. io parquet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Suboptimal graph structure when read-writing a parquet

5 participants