Skip to content

Pushdown boolean filter to parquet filters#6261

Closed
TomAugspurger wants to merge 6 commits intodask:masterfrom
TomAugspurger:pushdown-filters
Closed

Pushdown boolean filter to parquet filters#6261
TomAugspurger wants to merge 6 commits intodask:masterfrom
TomAugspurger:pushdown-filters

Conversation

@TomAugspurger
Copy link
Member

This (very, very rough draft) PR implements a kind of predicate pushdown. An expression like

b = dd.read_parquet("/tmp/data.parquet", engine="pyarrow")
c = b[b.B == 1]

Is optimized to something like

c = dd.read_parquet("/tmp/data.parquet", engine="pyarrow", filters=[[("B", "=", 1)]])

The basic stages are

  1. Detect that we have a pushdown opportunity (a boolean comparison following a column getitem following a read_parquet).
  2. Rewrite the read_parquet to insert the filters implied by step 1.

To facilitate the detection of pushdown opportunities, I've made a couple light subclasses of Blockwise. The motivation is to avoid having to dive deeply into the blockwise structure just to figure out things like "which column was selected?".

@TomAugspurger
Copy link
Member Author

TomAugspurger commented May 29, 2020

cc @gforsyth for reference. Not really ready for review yet.

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very excited to see this start. Thanks for beginning this @TomAugspurger

flat_keys = list(core.flatten(keys))
dsk = optimize_read_parquet_getitem(dsk, keys=flat_keys)
if config.get("optimization.dataframe.io.parquet.predicate_pushdown"):
dsk = optimize_read_parquet_predicate_pushdown(dsk, keys=flat_keys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there cases when we wouldn't want this optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. Right now the primary motivation is it to easily toggle the behavior on and off.

into_kwargs = {}
if func == operator.getitem and getitem_key is not None:
into = BlockwiseGetitem
into_kwargs["getitem_key"] = getitem_key
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we could also be explicit about using a BlockwiseGetitem Mapping type when constructing graphs. I would not be surprised if in the future Dask graph construction looks a lot more like selecting from a variety of types of mappings rather than constructing dicts by hand. I don't know though, and I'd welcome conversation on this topic if you have experience here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think I like that suggestion better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, this might be a bit hard though. The call stack is roughly

DataFrame.__getitem__ ->
  partitionwise_graph ->
    blockwise ->
      Blockwise (or a BlockwiseGetitem, BlockwiseFilter, etc.)

So it's a bit hard for DataFrame.__getitem__ to specify the mapping type. We would presumably need something like a partitionwise_getitem_graph, and partitonwise_getitem, which doesn't feel much cleaner.

numblocks,
concatenate=None,
new_axes=None,
getitem_key=None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking through the code for a bit I'm still not sure I understand exactly how this keyword is supposed to work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really happy with the name here. The intent is to capture the value 'B' in the expression

df['B']

But it's not restricted to being a simple scalar. It can also be a more complicated expression:

In [24]: df = dd.read_parquet("/tmp/data.parquet/", engine="pyarrow")

In [25]: df['B'].dask.layers[df.B._name].getitem_key
Out[25]: 'B'

In [26]: df2 = df[df.B == 1]

In [27]: df2.dask.layers[df2._name].getitem_key
Out[27]: BlockwiseFilter<(('getitem-3a656fd14824e63ee9448be31f58035b', ('.0',)), (1, None)) -> eq-b37a6a08cbf5864e07ef16e236c2e22c>

In [28]: df2.dask.layers[df2._name].getitem_key.filter_key
Out[28]: (<function _operator.eq(a, b, /)>, 1)

Happy to have better names here.

@beckernick
Copy link
Member

beckernick commented Jun 1, 2020

cc @VibhuJawa @ayushdg

numblocks,
concatenate=None,
new_axes=None,
getitem_key=None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really happy with the name here. The intent is to capture the value 'B' in the expression

df['B']

But it's not restricted to being a simple scalar. It can also be a more complicated expression:

In [24]: df = dd.read_parquet("/tmp/data.parquet/", engine="pyarrow")

In [25]: df['B'].dask.layers[df.B._name].getitem_key
Out[25]: 'B'

In [26]: df2 = df[df.B == 1]

In [27]: df2.dask.layers[df2._name].getitem_key
Out[27]: BlockwiseFilter<(('getitem-3a656fd14824e63ee9448be31f58035b', ('.0',)), (1, None)) -> eq-b37a6a08cbf5864e07ef16e236c2e22c>

In [28]: df2.dask.layers[df2._name].getitem_key.filter_key
Out[28]: (<function _operator.eq(a, b, /)>, 1)

Happy to have better names here.

numblocks,
concatenate=None,
new_axes=None,
filter_key=None,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might call this something like filter_expression or filter_condition, since it's a tuple of (callable, other)

In [28]: df2.dask.layers[df2._name].getitem_key.filter_key
Out[28]: (<function _operator.eq(a, b, /)>, 1)

flat_keys = list(core.flatten(keys))
dsk = optimize_read_parquet_getitem(dsk, keys=flat_keys)
if config.get("optimization.dataframe.io.parquet.predicate_pushdown"):
dsk = optimize_read_parquet_predicate_pushdown(dsk, keys=flat_keys)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not. Right now the primary motivation is it to easily toggle the behavior on and off.

into_kwargs = {}
if func == operator.getitem and getitem_key is not None:
into = BlockwiseGetitem
into_kwargs["getitem_key"] = getitem_key
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think I like that suggestion better.

@TomAugspurger
Copy link
Member Author

Still some test failures. I'm failing to update the graph properly during the optimization.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Jun 9, 2020

Small update: the current test failures are a result of the optimized ParquetSubgraph we construct having fewer parts than the old one

(Pdb) old
ParquetSubgraph<name='read-parquet-f40c15c3a3e19cf06f9cf50940ea0529', n_parts=14, columns=['signal1', 'fake_categorical1', 'fake_categorical2']>
(Pdb) new
ParquetSubgraph<name='read-parquet-23ea2e645a34b971a5ee41db45a0a74b', n_parts=5, columns=['signal1', 'fake_categorical1', 'fake_categorical2']>
(Pdb) len(old), len(new)
(14, 5)

Performance-wise, I think that's probably what we want. We want to avoid dealing at all with pieces that have been optimized out. But this makes the rewriting process more involved, as downstream operations already contain references to the optimized-out partitions

(Pdb) pp new_hlg.layers[block.output]
BlockwiseGetitem<(('read-parquet-23ea2e645a34b971a5ee41db45a0a74b', ('.0',)), ('eq-567a8dfe6ca952ed8e7d5b9c3ba9d6dd', ('.0',))) -> getitem-f637692806b448eab812f57bbd5e4286>
(Pdb) pp len(new_hlg.layers[block.output])
14

Planning to investigate how much work updating dependent blocks is (naively, it sounds tricky). If it proves too difficult I might ensure that the new ParquetSubgraph has the same number of parts. I'd expect this will still give a speedup.

@TomAugspurger
Copy link
Member Author

Planning to investigate how much work updating dependent blocks is (naively, it sounds tricky).

I think it's actually not possible to completely remove these keys, and probably not desirable anyway.

Something like the following

df = dd.read_parquet(...)
df[df.A == 1].compute()

will eventually call compute with all the keys for the dataframe, which will have all the partitions. We can't just remove them.

Thinking more though, I think we just return the emtpy meta for those partitions. We don't actually have to touch the filesystem again for those partitions like I was worrying.

@TomAugspurger
Copy link
Member Author

The latest version ensures the optimized graph as the same number of partitions / parts.

My current plan is to clean this up when I have time (tomorrow or Thursday) and then will be in a reviewable state.

@mrocklin
Copy link
Member

Returning empty _meta seems sensible to me.

This points us in the question of whether or not we want to build a higher level expression system, beyond high level graphs.

@TomAugspurger
Copy link
Member Author

cc @martindurant.

@martindurant
Copy link
Member

Quick note from the top of the thread: the output expression should actually be

c = dd.read_parquet("/tmp/data.parquet", engine="pyarrow", filters=[[("B", "==", 1)]])
c[c.B == 1]

i.e., assuming we successfully select among the partitions, we still need to filter the output. This means that column B must be among the chosen columns loaded from the dataset. (unless we are dealing with a partition-by-directory case where a field only has one value per input file)

Also, there is a follow-on to this that may have been mentioned elsewhere, that where the order of operations commute, it's nearly always better to do filtering before other things for all cases, not just parquet.

Copy link
Member

@martindurant martindurant left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just left a couple of comments for now. The two optimize functions are pretty hard for me to read, might take some time.

dataframe:
io:
parquet:
predicate_pushdown: true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Name might imply that actual row filtering is happening in the read library, which is not the case.


ddf = dd.read_parquet(fn, engine=engine, name="df-")
result = ddf[ddf["A"] == 1]
dsk, = dask.optimize(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we investigate the contents of dsk?

ddf = dd.read_parquet(fn, engine=engine, name="df-")
result = ddf[ddf["A"] == 1]
dsk, = dask.optimize(result)
result.compute()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert that contains no A == 1

# got the names backwards
column_layer, filter_layer = filter_layer, column_layer
func, other = filter_layer.getitem_key.filter_key
symbols = {"lt": "<", "le": "<=", "eq": "=", "gt": ">", "ge": ">="}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fastparquet used "==" for equals, and I thought pyarrow allowed both.

@TomAugspurger
Copy link
Member Author

I'm going to close this since an approach based on the new Layer abstraction is likely to work better here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants