Skip to content

Simplify contents of parts list in read_parquet#7066

Merged
jrbourbeau merged 16 commits intodask:masterfrom
rjzamora:simplify-pyarrow-parquet
Jan 26, 2021
Merged

Simplify contents of parts list in read_parquet#7066
jrbourbeau merged 16 commits intodask:masterfrom
rjzamora:simplify-pyarrow-parquet

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Jan 14, 2021

This is a prerequisite for #7042

The goal of this PR is to remove several types of information that is currently stored in every element of the central parts list used to build the raw read_parquet task graph. The focus is on (1) information that is the same for every output partition, and (2) data that is not msgpack serializable. Both of these criteria are required to implement an efficient Blockwise version of read_parquet.

In order to strip redundant information (partitioning info, categories, schema, etc.) out of the "kwargs" field of every parts element, we introduce a new common_kwargs dictionary that is constructed in Engine.read_metadata, and only attached to the first element of parts. It would be clearner to return a dedicated common_kwargs element from read_metadata, but I avoided this in order to provide backwards compatibility for external projects using that API.

In order to ensure that all information in parts can be serialized by msgpack, this PR also removes the read_from_paths=False option (availaible for the "pyarrow-dataset" engine). That option was requiring file-fragment objects to be passed in parts. To preserve row-wise filtering, we now recreate the fragment from the path and row-group list at IO time.

This PR also adds the necessary logic to allow read_from_paths=True to work with row-wise filtering. Since the move to Blockwise (not in this PR) will require all information in parts to be msgpack serializable, row-size filtering can now be accomplished by pickling each fragment, or by recreating the fragment at IO time. It seems reasonable that some workflows may benefit from the read_from_paths=True approach (which also corresponds to a smaller graph).

@rjzamora rjzamora marked this pull request as ready for review January 15, 2021 20:32
@rjzamora
Copy link
Member Author

@martindurant - The idea here is to prepare read_parquet for the move to Blockwise. Not sure if you will have time to take a look at this, but it would be great to get your eyes on this :)

Note that I would also like to follow up this PR with some cleanup in the "fastparquet" engine, but I need to do a bit of experimentation to make sure it is totally doable. I am hoping that we can align the FP engine more with the pyarrow engines but I suspect that we will need to handle partition-column construction manually if we want to avoid re-reading the global _metadata in every task and/or passing large/complex objects in the graph. I am thinking that, like engine="pyarrow", we can specify the data path, row-group list, and partitioning information for each element of parts. This way, the ParquetFile object can be created from the data file itself, which should include significantly less metadata to parse than _metadata. Note that it would be nice to remove the fmd and row_groups from the ParquetFile, and then re-attach the necessary row_groups in the task. Unfortunately, this would require partition-specific data to be pickled in distributed (which is not allowed in the current Blockwise+IO plan).

@martindurant
Copy link
Member

Unfortunately, this would require partition-specific data to be pickled in distributed (which is not allowed in the current Blockwise+IO plan).

Quick question on this: what can be sent to each partition? I thought we were sending at least the file path, so it would make sense to send the row-group binary, if we already have it.

@rjzamora
Copy link
Member Author

Quick question on this: what can be sent to each partition? I thought we were sending at least the file path, so it would make sense to send the row-group binary, if we already have it.

Thats true - We can send the row-group if we convert it to bytes first.

# We can return as a separate element in the future, but
# should avoid breaking the API for now.
if len(parts):
parts[0]["common_kwargs"] = {"categories": categories_dict or categories}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I would suggest that the ParquetFile without any row groups can be passed as common_kwargs

row_groups = pf.row_groups  # becomes the list of parts as above
pf.row_groups = []
pf.fmd.row_groups = []

Note that ParquetFile._set_attrs really does make unnecessary extra references to things which could be properties, hence the need to set empty lists twice. The individual row group objects serialise just fine with standard pickle, this is built into fastparquet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should pass the "stripped-down" ParquetFile object in the "common_kwargs". However, since it will require a bit of "fastparquet"-engine re-organization (which could get slightly messy), I intend to do that in a separate PR.

@martindurant
Copy link
Member

The stuff in core and fp is fine. The stuff in arrow is hard to read, there is so much of it :|. On a limited first scan, it appears to be OK, but I'm sure I'm missing detail.

@rjzamora
Copy link
Member Author

The stuff in arrow is hard to read, there is so much of it :|

This comment doesn't surprise me at all, but I'm still sorry to hear it. I'm always open to suggetions, but I know it's hard to know where to start.

Some motivation to get these "simple" changes in... I am hoping to raise another PR, dependent on this one, with fastparquet-focused changes. In that branch, I rewrote most of the FastParquetEngine to align with both pyarrow engines. All the tests are passing with the changes, and the performance problem discussed in #6376 is mostly resolved (time to read a partitioned dataset with 2,500 files improves by >10x).

@martindurant
Copy link
Member

Feel free to test your branch with dask/fastparquet#549 to see if it helps (I may make some more improvements there yet)

@rjzamora rjzamora marked this pull request as draft January 25, 2021 17:23
@rjzamora rjzamora marked this pull request as ready for review January 25, 2021 21:30
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora! Just let a few minor comments. Otherwise, is this good to go in your eyes?

"partitioning", default_partitioning
partitioning=partitioning["obj"].discover(
*partitioning.get("args", []),
**partitioning.get("kwargs", {}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we may have dropped using max_partition_dictionary_size=-1 as a default keyword argument here and in other places we call discover

Copy link
Member Author

@rjzamora rjzamora Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I think you wrote this while I was working on the comment below. I removed this default value because it is somehow preventing us from using pickle on a ParquetFileFragment object. The error does not show up in the serialization/deserialization, but in the following arrow_table.to_pandas() call.

As far as I can tell - removing the max_partition_dictionary_size=-1 is not causing any (correctness) problems, because we are checking the types and manually converting the partition columns to Categorical after the data is read in anyway. With that said, I'm sure it would be slightly more performant if this option worked.

@rjzamora
Copy link
Member Author

@jrbourbeau - I made some revisions here after investigating the performance implications of the original read_from_paths change. I am pretty confident that everything in this PR is fairly safe now.

Notes

  • We are still alowing the ParquetFileFragment objects to be passed in the graph when a filter is specified. However, we are now honoring read_from_paths=True. Note that the overhead of pickling/unpickling each fragment (not included in this PR) is clearly less than the overhead of regenerating a fragment in every task. Therefore, it seems unlikley that the user will want to use read_from_paths=True unless their graph is getting to be too large.

  • I removed the max_partition_dictionary_size=-1 argument that was previously being used for the default HivePartitioning discovery. It seems that this was somehow causing problems with round-trip serialization (which resulted in errors like the one discussed in [parquet] Is filters supposed to work in conjunction with partition_on? #7079). @jorisvandenbossche - I'm including a reproducer below. Would it make sense to raise this on Arrow's Jira?

Click for Reproducer
import dask
import dask.dataframe as dd
from pyarrow import dataset as pa_ds
import pyarrow.parquet as pq
import pickle

# Write partitioned dataset
path = "debug.partitioned"
ddf = dask.datasets.timeseries()
ddf.to_parquet(path, engine="pyarrow", partition_on=["name"])

# Causes Error:
# ( Note that this works fine:  partitioning_kwargs = {} )
partitioning_kwargs = {"max_partition_dictionary_size": -1}

# Create a dataset
ds = pa_ds.dataset(
    path,
    format="parquet",
    partitioning=pa_ds.HivePartitioning.discover( **partitioning_kwargs),
)
schema = ds.schema
columns = schema.names

# Read a fragment (Works fine)
frag = list(ds.get_fragments())[0]
frag.to_table(schema=schema, columns=columns).to_pandas()

# Round-trip pickle, then read
frag2 = pickle.loads(pickle.dumps(frag))
frag2.to_table(schema=schema, columns=columns).to_pandas()
---------------------------------------------------------------------------
ArrowException                            Traceback (most recent call last)
<ipython-input-6-4c4231261128> in <module>
      1 frag2 = pickle.loads(pickle.dumps(frag))
----> 2 frag2.to_table(schema=schema, columns=columns).to_pandas()

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/array.pxi in pyarrow.lib._PandasConvertible.to_pandas()

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.Table._to_pandas()

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/pandas_compat.py in table_to_blockmanager(options, table, categories, ignore_metadata, types_mapper)
    777     _check_data_column_metadata_consistency(all_columns)
    778     columns = _deserialize_column_index(table, all_columns, column_indexes)
--> 779     blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
    780 
    781     axes = [columns, index]

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/pandas_compat.py in _table_to_blocks(options, block_table, categories, extension_columns)
   1113     columns = block_table.column_names
   1114     result = pa.lib.table_to_blocks(options, block_table, categories,
-> 1115                                     list(extension_columns.keys()))
   1116     return [_reconstruct_block(item, columns, extension_columns)
   1117             for item in result]

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/table.pxi in pyarrow.lib.table_to_blocks()

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowException: Unknown error: Wrapping �H3��� failed

@rjzamora
Copy link
Member Author

Just let a few minor comments. Otherwise, is this good to go in your eyes?

@jrbourbeau - Thanks for the review! I do think this is good to go. I didn't intend to include the max_partition_dictionary_size change, but it was a necessary "bug fix" that I needed to make to run through some local benchmarks.

@jrbourbeau jrbourbeau merged commit 5694253 into dask:master Jan 26, 2021
@rjzamora rjzamora deleted the simplify-pyarrow-parquet branch January 26, 2021 16:51
@jorisvandenbossche
Copy link
Member

I removed the max_partition_dictionary_size=-1 argument that was previously being used for the default HivePartitioning discovery. It seems that this was somehow causing problems with round-trip serialization (which resulted in errors like the one discussed in #7079). @jorisvandenbossche - I'm including a reproducer below. Would it make sense to raise this on Arrow's Jira?

Yes, please report such issues (and thanks for the easy reproducer!). Now, it seems it is already fixed in master (and I assume pyarrow 3.0.0 as well), but I opened https://issues.apache.org/jira/browse/ARROW-11400 for it (we should still add a test for it, don't recall intentionally fixing it, it seems an issue specifically with the partition expression if it uses dictionary type)

As far as I can tell - removing the max_partition_dictionary_size=-1 is not causing any (correctness) problems, because we are checking the types and manually converting the partition columns to Categorical after the data is read in anyway. With that said, I'm sure it would be slightly more performant if this option worked.

Yeah, this was done anyway for integer types because pyarrow didn't do that before pyarrow 2.0. In principle that code could have been dropped if only supporting pyarrow >= 2.0, but so now that will only be possible when only supporting pyarrow >= 3.0

filesystem=fs,
partitioning=dataset_kwargs.get(
"partitioning", default_partitioning
partitioning=partitioning["obj"].discover(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand this change.
I would expect that a user can do dd.read_parquet(...., dataset_kwargs=dict(partitioning=ds.partitioning(...))), i.e. passing a pyarrow Partitioning object. But if I read the code correctly, a user would now have to speficy partitioning={"obj: ds.partitioning(...)} ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry -I should have seeked your advice on this earlier!

Before this PR, the "default" partitioning was defined as:

partitioning = pa_ds.HivePartitioning.discover(max_partition_dictionary_size=-1)

A key goal of this PR was to allow a fragment to be regenerated from a single path, while still capturing the partitioning. However, the tests were failing when I tried using the same partitioning object on the worker at IO time. The failures were resolved when I, instead, generated a new Partitioning object on each worker at IO time. I'll try to build a simple reproducer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I don't actually know/understand the correct usage of this partitioning argument. Right now, we are asking the user to specify something like:

from pyarrow import dataset as pa_ds

partitioning={"obj: pa_ds.HivePartitioning}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: If we are dropping the max_partition_dictionary_size=-1 argument, we can theoretically do something like you suggested (dataset_kwargs=dict(partitioning=ds.partitioning(...)))). However, the only problem now is that the Partitioning object is not "pickleable" (so it will not work with dask.distributed)....

from pyarrow import dataset as pa_ds
import pickle

partitioning = pa_ds.partitioning(flavor="hive")
pickle.dumps(partitioning)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-3-d5558ce55217> in <module>
      3 
      4 partitioning = pa_ds.partitioning(flavor="hive")
----> 5 pickle.dumps(partitioning)

~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so in pyarrow._dataset.PartitioningFactory.__reduce_cython__()

TypeError: self.factory,self.wrapped cannot be converted to a Python object for pickling

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I suppose we never thought about making those picklable, that should be possible to implement though.

Now, the reason you need to serialize it, is this because you are trying to move computation away from the client (and put more information in the graph)?
Because in principle, you only need this partitioning object for the initial creationg of the dataset, once you have the parts, you shouldn't need it in read_partition (to recreate the fragment, it needs a partition_expression (fragment.partition_expression, which is picklable I think), but not partitioning object)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, we want to avoid passing any ParquetFileFragment objects in the graph when possible

This is for the size of the graph?
It might be good to check if this is actually a problem in this case. Because to reconstruct a fragment, you need to pass (serialize) the path, the filesystem, the partition expression and the row_group_ids. But that's basically how serializing a fragment works right now, by serializing those components and reconstructing the fragment based on that.
See https://github.com/apache/arrow/blob/a8faa481c4c1bac146558b36113f6422ef2fcd2e/python/pyarrow/_dataset.pyx#L934-L942 (it also needs to serialize the function object, so that's a difference of course)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for the size of the graph?

Yes - I recently added logic to avoid passing ParquetFileFragment objects in the graph to minimize the graph size (only a problem for extreme partition counts), and also to avoid the need for pickling any partition-specific inputs for the final _read_table logic (this last part is to improve performance once read_parquet is moved under the Blockwise umbrella).

So, in an ideal world, we would not need any partition-specific inputs to the final _read_table call that are not msgpack-serializable. At the moment, we achieve this by regenerating the dataset and fragment at IO time, but this is certainly not ideal. It would be better to use the approach above, but the partition_expression will still need to be pickled (all other inputs are either msgpack-serializable, or are the same for all output partitions). Also, it seems that the passing the partition_expression increases the graph size by an order of magnitude or more. It looks like I can get the size down a lot by useing __reduce__ directly, but I'd rather not rely on private methods.

Any advice on reducing the size of partition_expression?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure (I am not that familiar with the graph / scheduler internals), you are explicitly distinguishing msgpack vs pickle serializable? (because that's more performant / smaller size?)

Also, it seems that the passing the partition_expression increases the graph size by an order of magnitude or more. It looks like I can get the size down a lot by useing reduce directly, but I'd rather not rely on private methods.

That seems strange to me that using __reduce__ directly would help. Checking, and a simple expression (expr = ds.field("a") == "A") has a pickle size of 908 bytes (which indeed seems quite a bit), but the buffer returned from __reduce__ has a size of 818 bytes, so I don't fully understand how using this directly can improve it by a lot.

Looking at the expression serialization implementation (https://github.com/apache/arrow/blob/add501e2c619bbc864223e8d015bf615c38f9f80/cpp/src/arrow/dataset/expression.cc#L1097-L1100), that's certainly not very space efficient right now. That might be worth opening a JIRA for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure (I am not that familiar with the graph / scheduler internals), you are explicitly distinguishing msgpack vs pickle serializable? (because that's more performant / smaller size?)

Pickle introduces security vulnerabilities that make it "illegal" to use on the scheduler process (msgpack does not). When we move to Blockwise, the scheduler needs to be able materialize the graph, and replace "abstract" indices with the real partition-dependent inputs for each partition. These inputs can be pickled, but they would need to be pickled individually for every output partition on the client (and then unpickled on the worker), so that the scheduler can inject the correct inputs. Therefore, the only reason to avoid the need for pickle is to avoid the overhead of calling pickle.dumps and pickle.loads for every task in the graph.

That seems strange to me that using reduce directly would help. Checking, and a simple expression (expr = ds.field("a") == "A") has a pickle size of 908 bytes (which indeed seems quite a bit), but the buffer returned from reduce has a size of 818 bytes, so I don't fully understand how using this directly can improve it by a lot.

I think you are right - I am having trouble reproducing my earlier measurement (so it was probably my mistake).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pickle introduces security vulnerabilities that make it "illegal" to use on the scheduler process (msgpack does not)

(plus, the scheduler will not always be python in the future!)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants