Simplify contents of parts list in read_parquet#7066
Simplify contents of parts list in read_parquet#7066jrbourbeau merged 16 commits intodask:masterfrom
Conversation
|
@martindurant - The idea here is to prepare Note that I would also like to follow up this PR with some cleanup in the "fastparquet" engine, but I need to do a bit of experimentation to make sure it is totally doable. I am hoping that we can align the FP engine more with the pyarrow engines but I suspect that we will need to handle partition-column construction manually if we want to avoid re-reading the global _metadata in every task and/or passing large/complex objects in the graph. I am thinking that, like |
Quick question on this: what can be sent to each partition? I thought we were sending at least the file path, so it would make sense to send the row-group binary, if we already have it. |
Thats true - We can send the row-group if we convert it to bytes first. |
| # We can return as a separate element in the future, but | ||
| # should avoid breaking the API for now. | ||
| if len(parts): | ||
| parts[0]["common_kwargs"] = {"categories": categories_dict or categories} |
There was a problem hiding this comment.
So I would suggest that the ParquetFile without any row groups can be passed as common_kwargs
row_groups = pf.row_groups # becomes the list of parts as above
pf.row_groups = []
pf.fmd.row_groups = []
Note that ParquetFile._set_attrs really does make unnecessary extra references to things which could be properties, hence the need to set empty lists twice. The individual row group objects serialise just fine with standard pickle, this is built into fastparquet.
There was a problem hiding this comment.
I agree that we should pass the "stripped-down" ParquetFile object in the "common_kwargs". However, since it will require a bit of "fastparquet"-engine re-organization (which could get slightly messy), I intend to do that in a separate PR.
|
The stuff in core and fp is fine. The stuff in arrow is hard to read, there is so much of it :|. On a limited first scan, it appears to be OK, but I'm sure I'm missing detail. |
This comment doesn't surprise me at all, but I'm still sorry to hear it. I'm always open to suggetions, but I know it's hard to know where to start. Some motivation to get these "simple" changes in... I am hoping to raise another PR, dependent on this one, with fastparquet-focused changes. In that branch, I rewrote most of the |
|
Feel free to test your branch with dask/fastparquet#549 to see if it helps (I may make some more improvements there yet) |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @rjzamora! Just let a few minor comments. Otherwise, is this good to go in your eyes?
| "partitioning", default_partitioning | ||
| partitioning=partitioning["obj"].discover( | ||
| *partitioning.get("args", []), | ||
| **partitioning.get("kwargs", {}), |
There was a problem hiding this comment.
It looks like we may have dropped using max_partition_dictionary_size=-1 as a default keyword argument here and in other places we call discover
There was a problem hiding this comment.
Sorry - I think you wrote this while I was working on the comment below. I removed this default value because it is somehow preventing us from using pickle on a ParquetFileFragment object. The error does not show up in the serialization/deserialization, but in the following arrow_table.to_pandas() call.
As far as I can tell - removing the max_partition_dictionary_size=-1 is not causing any (correctness) problems, because we are checking the types and manually converting the partition columns to Categorical after the data is read in anyway. With that said, I'm sure it would be slightly more performant if this option worked.
|
@jrbourbeau - I made some revisions here after investigating the performance implications of the original Notes
Click for Reproducerimport dask
import dask.dataframe as dd
from pyarrow import dataset as pa_ds
import pyarrow.parquet as pq
import pickle
# Write partitioned dataset
path = "debug.partitioned"
ddf = dask.datasets.timeseries()
ddf.to_parquet(path, engine="pyarrow", partition_on=["name"])
# Causes Error:
# ( Note that this works fine: partitioning_kwargs = {} )
partitioning_kwargs = {"max_partition_dictionary_size": -1}
# Create a dataset
ds = pa_ds.dataset(
path,
format="parquet",
partitioning=pa_ds.HivePartitioning.discover( **partitioning_kwargs),
)
schema = ds.schema
columns = schema.names
# Read a fragment (Works fine)
frag = list(ds.get_fragments())[0]
frag.to_table(schema=schema, columns=columns).to_pandas()
# Round-trip pickle, then read
frag2 = pickle.loads(pickle.dumps(frag))
frag2.to_table(schema=schema, columns=columns).to_pandas() |
@jrbourbeau - Thanks for the review! I do think this is good to go. I didn't intend to include the |
Yes, please report such issues (and thanks for the easy reproducer!). Now, it seems it is already fixed in master (and I assume pyarrow 3.0.0 as well), but I opened https://issues.apache.org/jira/browse/ARROW-11400 for it (we should still add a test for it, don't recall intentionally fixing it, it seems an issue specifically with the partition expression if it uses dictionary type)
Yeah, this was done anyway for integer types because pyarrow didn't do that before pyarrow 2.0. In principle that code could have been dropped if only supporting pyarrow >= 2.0, but so now that will only be possible when only supporting pyarrow >= 3.0 |
| filesystem=fs, | ||
| partitioning=dataset_kwargs.get( | ||
| "partitioning", default_partitioning | ||
| partitioning=partitioning["obj"].discover( |
There was a problem hiding this comment.
I don't fully understand this change.
I would expect that a user can do dd.read_parquet(...., dataset_kwargs=dict(partitioning=ds.partitioning(...))), i.e. passing a pyarrow Partitioning object. But if I read the code correctly, a user would now have to speficy partitioning={"obj: ds.partitioning(...)} ?
There was a problem hiding this comment.
Sorry -I should have seeked your advice on this earlier!
Before this PR, the "default" partitioning was defined as:
partitioning = pa_ds.HivePartitioning.discover(max_partition_dictionary_size=-1)A key goal of this PR was to allow a fragment to be regenerated from a single path, while still capturing the partitioning. However, the tests were failing when I tried using the same partitioning object on the worker at IO time. The failures were resolved when I, instead, generated a new Partitioning object on each worker at IO time. I'll try to build a simple reproducer.
There was a problem hiding this comment.
To clarify, I don't actually know/understand the correct usage of this partitioning argument. Right now, we are asking the user to specify something like:
from pyarrow import dataset as pa_ds
partitioning={"obj: pa_ds.HivePartitioning}
There was a problem hiding this comment.
Update: If we are dropping the max_partition_dictionary_size=-1 argument, we can theoretically do something like you suggested (dataset_kwargs=dict(partitioning=ds.partitioning(...)))). However, the only problem now is that the Partitioning object is not "pickleable" (so it will not work with dask.distributed)....
from pyarrow import dataset as pa_ds
import pickle
partitioning = pa_ds.partitioning(flavor="hive")
pickle.dumps(partitioning)---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-3-d5558ce55217> in <module>
3
4 partitioning = pa_ds.partitioning(flavor="hive")
----> 5 pickle.dumps(partitioning)
~/miniconda/miniconda3/envs/cudf_18/lib/python3.7/site-packages/pyarrow/_dataset.cpython-37m-x86_64-linux-gnu.so in pyarrow._dataset.PartitioningFactory.__reduce_cython__()
TypeError: self.factory,self.wrapped cannot be converted to a Python object for pickling
There was a problem hiding this comment.
Hmm, I suppose we never thought about making those picklable, that should be possible to implement though.
Now, the reason you need to serialize it, is this because you are trying to move computation away from the client (and put more information in the graph)?
Because in principle, you only need this partitioning object for the initial creationg of the dataset, once you have the parts, you shouldn't need it in read_partition (to recreate the fragment, it needs a partition_expression (fragment.partition_expression, which is picklable I think), but not partitioning object)
There was a problem hiding this comment.
To clarify, we want to avoid passing any ParquetFileFragment objects in the graph when possible
This is for the size of the graph?
It might be good to check if this is actually a problem in this case. Because to reconstruct a fragment, you need to pass (serialize) the path, the filesystem, the partition expression and the row_group_ids. But that's basically how serializing a fragment works right now, by serializing those components and reconstructing the fragment based on that.
See https://github.com/apache/arrow/blob/a8faa481c4c1bac146558b36113f6422ef2fcd2e/python/pyarrow/_dataset.pyx#L934-L942 (it also needs to serialize the function object, so that's a difference of course)
There was a problem hiding this comment.
This is for the size of the graph?
Yes - I recently added logic to avoid passing ParquetFileFragment objects in the graph to minimize the graph size (only a problem for extreme partition counts), and also to avoid the need for pickling any partition-specific inputs for the final _read_table logic (this last part is to improve performance once read_parquet is moved under the Blockwise umbrella).
So, in an ideal world, we would not need any partition-specific inputs to the final _read_table call that are not msgpack-serializable. At the moment, we achieve this by regenerating the dataset and fragment at IO time, but this is certainly not ideal. It would be better to use the approach above, but the partition_expression will still need to be pickled (all other inputs are either msgpack-serializable, or are the same for all output partitions). Also, it seems that the passing the partition_expression increases the graph size by an order of magnitude or more. It looks like I can get the size down a lot by useing __reduce__ directly, but I'd rather not rely on private methods.
Any advice on reducing the size of partition_expression?
There was a problem hiding this comment.
Just to be sure (I am not that familiar with the graph / scheduler internals), you are explicitly distinguishing msgpack vs pickle serializable? (because that's more performant / smaller size?)
Also, it seems that the passing the partition_expression increases the graph size by an order of magnitude or more. It looks like I can get the size down a lot by useing reduce directly, but I'd rather not rely on private methods.
That seems strange to me that using __reduce__ directly would help. Checking, and a simple expression (expr = ds.field("a") == "A") has a pickle size of 908 bytes (which indeed seems quite a bit), but the buffer returned from __reduce__ has a size of 818 bytes, so I don't fully understand how using this directly can improve it by a lot.
Looking at the expression serialization implementation (https://github.com/apache/arrow/blob/add501e2c619bbc864223e8d015bf615c38f9f80/cpp/src/arrow/dataset/expression.cc#L1097-L1100), that's certainly not very space efficient right now. That might be worth opening a JIRA for.
There was a problem hiding this comment.
Just to be sure (I am not that familiar with the graph / scheduler internals), you are explicitly distinguishing msgpack vs pickle serializable? (because that's more performant / smaller size?)
Pickle introduces security vulnerabilities that make it "illegal" to use on the scheduler process (msgpack does not). When we move to Blockwise, the scheduler needs to be able materialize the graph, and replace "abstract" indices with the real partition-dependent inputs for each partition. These inputs can be pickled, but they would need to be pickled individually for every output partition on the client (and then unpickled on the worker), so that the scheduler can inject the correct inputs. Therefore, the only reason to avoid the need for pickle is to avoid the overhead of calling pickle.dumps and pickle.loads for every task in the graph.
That seems strange to me that using reduce directly would help. Checking, and a simple expression (expr = ds.field("a") == "A") has a pickle size of 908 bytes (which indeed seems quite a bit), but the buffer returned from reduce has a size of 818 bytes, so I don't fully understand how using this directly can improve it by a lot.
I think you are right - I am having trouble reproducing my earlier measurement (so it was probably my mistake).
There was a problem hiding this comment.
Pickle introduces security vulnerabilities that make it "illegal" to use on the scheduler process (msgpack does not)
(plus, the scheduler will not always be python in the future!)
This is a prerequisite for #7042
The goal of this PR is to remove several types of information that is currently stored in every element of the central
partslist used to build the rawread_parquettask graph. The focus is on (1) information that is the same for every output partition, and (2) data that is notmsgpackserializable. Both of these criteria are required to implement an efficientBlockwiseversion ofread_parquet.In order to strip redundant information (partitioning info, categories, schema, etc.) out of the
"kwargs"field of everypartselement, we introduce a newcommon_kwargsdictionary that is constructed inEngine.read_metadata, and only attached to the first element ofparts. It would be clearner to return a dedicatedcommon_kwargselement fromread_metadata, but I avoided this in order to provide backwards compatibility for external projects using that API.In order to ensure that all information inpartscan be serialized bymsgpack, this PR also removes theread_from_paths=Falseoption (availaible for the "pyarrow-dataset" engine). That option was requiring file-fragment objects to be passed inparts. To preserve row-wise filtering, we now recreate the fragment from the path and row-group list at IO time.This PR also adds the necessary logic to allow
read_from_paths=Trueto work with row-wise filtering. Since the move toBlockwise(not in this PR) will require all information inpartsto bemsgpackserializable, row-size filtering can now be accomplished by pickling each fragment, or by recreating the fragment at IO time. It seems reasonable that some workflows may benefit from theread_from_paths=Trueapproach (which also corresponds to a smaller graph).