Pushdown boolean filter to parquet filters#6261
Pushdown boolean filter to parquet filters#6261TomAugspurger wants to merge 6 commits intodask:masterfrom
Conversation
|
cc @gforsyth for reference. Not really ready for review yet. |
mrocklin
left a comment
There was a problem hiding this comment.
I'm very excited to see this start. Thanks for beginning this @TomAugspurger
| flat_keys = list(core.flatten(keys)) | ||
| dsk = optimize_read_parquet_getitem(dsk, keys=flat_keys) | ||
| if config.get("optimization.dataframe.io.parquet.predicate_pushdown"): | ||
| dsk = optimize_read_parquet_predicate_pushdown(dsk, keys=flat_keys) |
There was a problem hiding this comment.
Are there cases when we wouldn't want this optimization?
There was a problem hiding this comment.
Probably not. Right now the primary motivation is it to easily toggle the behavior on and off.
| into_kwargs = {} | ||
| if func == operator.getitem and getitem_key is not None: | ||
| into = BlockwiseGetitem | ||
| into_kwargs["getitem_key"] = getitem_key |
There was a problem hiding this comment.
Alternatively we could also be explicit about using a BlockwiseGetitem Mapping type when constructing graphs. I would not be surprised if in the future Dask graph construction looks a lot more like selecting from a variety of types of mappings rather than constructing dicts by hand. I don't know though, and I'd welcome conversation on this topic if you have experience here.
There was a problem hiding this comment.
Yes I think I like that suggestion better.
There was a problem hiding this comment.
Ahh, this might be a bit hard though. The call stack is roughly
DataFrame.__getitem__ ->
partitionwise_graph ->
blockwise ->
Blockwise (or a BlockwiseGetitem, BlockwiseFilter, etc.)
So it's a bit hard for DataFrame.__getitem__ to specify the mapping type. We would presumably need something like a partitionwise_getitem_graph, and partitonwise_getitem, which doesn't feel much cleaner.
| numblocks, | ||
| concatenate=None, | ||
| new_axes=None, | ||
| getitem_key=None, |
There was a problem hiding this comment.
After looking through the code for a bit I'm still not sure I understand exactly how this keyword is supposed to work.
There was a problem hiding this comment.
I'm not really happy with the name here. The intent is to capture the value 'B' in the expression
df['B']But it's not restricted to being a simple scalar. It can also be a more complicated expression:
In [24]: df = dd.read_parquet("/tmp/data.parquet/", engine="pyarrow")
In [25]: df['B'].dask.layers[df.B._name].getitem_key
Out[25]: 'B'
In [26]: df2 = df[df.B == 1]
In [27]: df2.dask.layers[df2._name].getitem_key
Out[27]: BlockwiseFilter<(('getitem-3a656fd14824e63ee9448be31f58035b', ('.0',)), (1, None)) -> eq-b37a6a08cbf5864e07ef16e236c2e22c>
In [28]: df2.dask.layers[df2._name].getitem_key.filter_key
Out[28]: (<function _operator.eq(a, b, /)>, 1)Happy to have better names here.
| numblocks, | ||
| concatenate=None, | ||
| new_axes=None, | ||
| getitem_key=None, |
There was a problem hiding this comment.
I'm not really happy with the name here. The intent is to capture the value 'B' in the expression
df['B']But it's not restricted to being a simple scalar. It can also be a more complicated expression:
In [24]: df = dd.read_parquet("/tmp/data.parquet/", engine="pyarrow")
In [25]: df['B'].dask.layers[df.B._name].getitem_key
Out[25]: 'B'
In [26]: df2 = df[df.B == 1]
In [27]: df2.dask.layers[df2._name].getitem_key
Out[27]: BlockwiseFilter<(('getitem-3a656fd14824e63ee9448be31f58035b', ('.0',)), (1, None)) -> eq-b37a6a08cbf5864e07ef16e236c2e22c>
In [28]: df2.dask.layers[df2._name].getitem_key.filter_key
Out[28]: (<function _operator.eq(a, b, /)>, 1)Happy to have better names here.
| numblocks, | ||
| concatenate=None, | ||
| new_axes=None, | ||
| filter_key=None, |
There was a problem hiding this comment.
I might call this something like filter_expression or filter_condition, since it's a tuple of (callable, other)
In [28]: df2.dask.layers[df2._name].getitem_key.filter_key
Out[28]: (<function _operator.eq(a, b, /)>, 1)
| flat_keys = list(core.flatten(keys)) | ||
| dsk = optimize_read_parquet_getitem(dsk, keys=flat_keys) | ||
| if config.get("optimization.dataframe.io.parquet.predicate_pushdown"): | ||
| dsk = optimize_read_parquet_predicate_pushdown(dsk, keys=flat_keys) |
There was a problem hiding this comment.
Probably not. Right now the primary motivation is it to easily toggle the behavior on and off.
| into_kwargs = {} | ||
| if func == operator.getitem and getitem_key is not None: | ||
| into = BlockwiseGetitem | ||
| into_kwargs["getitem_key"] = getitem_key |
There was a problem hiding this comment.
Yes I think I like that suggestion better.
|
Still some test failures. I'm failing to update the graph properly during the optimization. |
|
Small update: the current test failures are a result of the optimized (Pdb) old
ParquetSubgraph<name='read-parquet-f40c15c3a3e19cf06f9cf50940ea0529', n_parts=14, columns=['signal1', 'fake_categorical1', 'fake_categorical2']>
(Pdb) new
ParquetSubgraph<name='read-parquet-23ea2e645a34b971a5ee41db45a0a74b', n_parts=5, columns=['signal1', 'fake_categorical1', 'fake_categorical2']>
(Pdb) len(old), len(new)
(14, 5)Performance-wise, I think that's probably what we want. We want to avoid dealing at all with pieces that have been optimized out. But this makes the rewriting process more involved, as downstream operations already contain references to the optimized-out partitions (Pdb) pp new_hlg.layers[block.output]
BlockwiseGetitem<(('read-parquet-23ea2e645a34b971a5ee41db45a0a74b', ('.0',)), ('eq-567a8dfe6ca952ed8e7d5b9c3ba9d6dd', ('.0',))) -> getitem-f637692806b448eab812f57bbd5e4286>
(Pdb) pp len(new_hlg.layers[block.output])
14Planning to investigate how much work updating dependent blocks is (naively, it sounds tricky). If it proves too difficult I might ensure that the new ParquetSubgraph has the same number of parts. I'd expect this will still give a speedup. |
I think it's actually not possible to completely remove these keys, and probably not desirable anyway. Something like the following df = dd.read_parquet(...)
df[df.A == 1].compute()will eventually call compute with all the keys for the dataframe, which will have all the partitions. We can't just remove them. Thinking more though, I think we just return the emtpy |
|
The latest version ensures the optimized graph as the same number of partitions / parts. My current plan is to clean this up when I have time (tomorrow or Thursday) and then will be in a reviewable state. |
|
Returning empty This points us in the question of whether or not we want to build a higher level expression system, beyond high level graphs. |
|
cc @martindurant. |
|
Quick note from the top of the thread: the output expression should actually be i.e., assuming we successfully select among the partitions, we still need to filter the output. This means that column B must be among the chosen columns loaded from the dataset. (unless we are dealing with a partition-by-directory case where a field only has one value per input file) Also, there is a follow-on to this that may have been mentioned elsewhere, that where the order of operations commute, it's nearly always better to do filtering before other things for all cases, not just parquet. |
martindurant
left a comment
There was a problem hiding this comment.
I just left a couple of comments for now. The two optimize functions are pretty hard for me to read, might take some time.
| dataframe: | ||
| io: | ||
| parquet: | ||
| predicate_pushdown: true |
There was a problem hiding this comment.
Name might imply that actual row filtering is happening in the read library, which is not the case.
|
|
||
| ddf = dd.read_parquet(fn, engine=engine, name="df-") | ||
| result = ddf[ddf["A"] == 1] | ||
| dsk, = dask.optimize(result) |
There was a problem hiding this comment.
Shouldn't we investigate the contents of dsk?
| ddf = dd.read_parquet(fn, engine=engine, name="df-") | ||
| result = ddf[ddf["A"] == 1] | ||
| dsk, = dask.optimize(result) | ||
| result.compute() |
There was a problem hiding this comment.
Assert that contains no A == 1
| # got the names backwards | ||
| column_layer, filter_layer = filter_layer, column_layer | ||
| func, other = filter_layer.getitem_key.filter_key | ||
| symbols = {"lt": "<", "le": "<=", "eq": "=", "gt": ">", "ge": ">="} |
There was a problem hiding this comment.
fastparquet used "==" for equals, and I thought pyarrow allowed both.
|
I'm going to close this since an approach based on the new |
This (very, very rough draft) PR implements a kind of predicate pushdown. An expression like
Is optimized to something like
The basic stages are
To facilitate the detection of pushdown opportunities, I've made a couple light subclasses of Blockwise. The motivation is to avoid having to dive deeply into the blockwise structure just to figure out things like "which column was selected?".