Add row-wise filtering step to read_parquet#13334
Add row-wise filtering step to read_parquet#13334rapids-bot[bot] merged 21 commits intorapidsai:branch-23.06from
read_parquet#13334Conversation
|
cc @charlesbluca (for both dask-expr and dask-sql visibility) |
wence-
left a comment
There was a problem hiding this comment.
I think some of the core logic can be tidied up a bit, and am not sure this works correctly for more than two ANDed or ORed filters.
python/cudf/cudf/io/parquet.py
Outdated
| and df.index.name is None | ||
| and df.index.start == 0 | ||
| and df.index.step == 1 |
There was a problem hiding this comment.
Is the thought that if the index is not a default rangeindex then it must be intentional and so we must not reset it? How are we to know that the "default" rangeindex is not intentional?
There was a problem hiding this comment.
I don't have strong feeling about how we handle the default index. I just know that pandas/pyarrow avoids carrying around a filtered index if/when the original DataFrame would have had a default index anyway.
python/cudf/cudf/io/parquet.py
Outdated
| conjunctions.append(_comparisons[op](df[column], value)) | ||
|
|
||
| disjunctions.append( | ||
| operator.and_(*conjunctions) |
There was a problem hiding this comment.
and_ has type bool -> bool -> bool so this only works if you have exactly two conjunctions.
You want (handles length-1 conjunctions as well):
disjunctions.append(functools.reduce(operator.and_, conjunctions))
There was a problem hiding this comment.
Ah right - I did completely forget that operator.and_/or_ are binops
python/cudf/cudf/io/parquet.py
Outdated
| operator.or_(*disjunctions) | ||
| if len(disjunctions) > 1 | ||
| else disjunctions[0] |
There was a problem hiding this comment.
Again here, wants to be functools.reduce(operator.or_, disjunctions)
python/cudf/cudf/io/parquet.py
Outdated
| return df.reset_index(drop=True) if reset_index else df | ||
|
|
||
|
|
||
| def _apply_post_filters(df, filters): |
There was a problem hiding this comment.
Can I propose a rewrite like this, which I think separates the handler logic from the conjunction/disjunction a little more clearly? WDYT?
(Probably needs some minor modifications for py 3.9 compat (with the type-annotations)).
from functools import reduce, partial
import operator
import numpy as np
def handle_in(column, value):
if not isinstance(value, (list, set, tuple)):
raise TypeError("Value of 'in' filter must be a " "list, set, or tuple.")
return reduce(operator.or_, (operator.eq(column, value) for v in value))
def handle_is(column, value, *, negate):
if value not in {np.nan, None}:
raise TypeError("Value of 'is' or 'is not' filter " "must be np.nan or None.")
return ~column.isna() if negate else column.isna()
def _apply_post_filters(df, filters: list[tuple | list[tuple]]):
# Apply DNF filters to an in-memory DataFrame
#
# Disjunctive normal form (DNF) means that the inner-most
# tuple describes a single column predicate. These inner
# predicates are combined with an AND conjunction into a
# larger predicate. The outer-most list then combines all
# of the combined filters with an OR disjunction.
if not filters:
# No filters to apply
return df
handlers = {
"==": operator.eq,
"!=": operator.ne,
"<": operator.lt,
"<=": operator.le,
">": operator.gt,
">=": operator.ge,
"in": handle_in,
"is": partial(handle_is, negate=False),
"is not": partial(handle_is, negate=True),
}
try:
# Disjunction loop
#
# All elements of `disjunctions` shall be combined with
# an `OR` disjunction (operator.or_)
disjunctions = []
expressions = [f if isinstance(f, list) else [f] for f in filters]
for expr in expressions:
conjunction = reduce(
operator.and_,
(handlers[op](df[column], value) for (column, op, value) in expr),
)
disjunctions.append(conjunction)
return reduce(operator.or_, disjunctions)
except (KeyError, TypeError):
warnings.warn(...)
return dfThere was a problem hiding this comment.
The reduction over the disjunctions could be merged in (so no looping) but I think it's a little less readable (would be something like):
return reduce(
operator.or_,
(
reduce(
operator.and_,
(handlers[op](df[column], value) for (column, op, value) in expr),
)
for expr in expressions
),
)There was a problem hiding this comment.
Thanks for the suggestion. I revised the code to look more like your example (there were some minor bugs, and I could't quite get the type annotations right yet - so left them out). Wouldn't mind going with the loop-free code, but didn't get a chance to try it yet.
| ([[("y", "==", "c")], [("x", "<", 3)]], 6), | ||
| ([("y", "==", "c"), ("x", ">=", 5)], 1), | ||
| ([[("y", "==", "c")], [("x", "<", 3)]], 5), | ||
| ([[("y", "==", "c")], [("x", "in", (0, 9))]], 4), |
There was a problem hiding this comment.
Can you add some tests with more than two predicates being anded together (and being ored together)?
|
(Linking the discussion in #12512) |
post_filters step to read_parquetread_parquet
|
Small update: Removed the |
python/cudf/cudf/io/parquet.py
Outdated
| # All elements of `disjunctions` shall be combined with | ||
| # an `OR` disjunction (operator.or_) | ||
| disjunctions = [] | ||
| for expr in filters if isinstance(filters[0], list) else [filters]: |
There was a problem hiding this comment.
I am not fully sure this is correct handling (my suggestion might also have been wrong).
AIUI, these are possible inputs as filters:
(A, op, B)=>(A op B)[(A, op, B)]=>(A op B)[(A, op, B), (C, op, D)]=>(A op B) v (C op D)[[(A, op, B), (C, op, D)], (E, op, F)]=>((A op B) ^ (C op D)) v (E op F)[[(A, op, B), (C, op, D)], [(E, op, F), (G, op H)]]=>((A op B) ^ (C op D)) v ((E op F) ^ (G op H))
So the input type is tuple | list[tuple | list[tuple]]
But this code only handles tuple | list[list[tuple]].
TBF, my code only handled list[tuple | list[tuple]].
To rephrase, who should do the sanitisation of the filters argument to this function? It would be much easier if, by the time we got here, we always just had list[list[tuple]]. That sanitisation could either be pushed up to read_parquet or else here but a little bit earlier, so we would say something like:
def _apply_filters(df, filters : tuple | list[tuple | list[tuple]]):
if isinstance(filters, tuple): # singleton (A op B)
filters = [filters]
filters = [f if isinstance(f, list) else [f] for f in filters]
...
for expr in filters: # Now everything is pre-processed correctly.
wence-
left a comment
There was a problem hiding this comment.
Mostly commentary, but a few minor cleanups.
python/cudf/cudf/io/parquet.py
Outdated
| "Value of 'in' filter must be a " "list, set, or tuple." | ||
| "Value of 'in' or 'not in' filter must be a list, set, or tuple." | ||
| ) | ||
| return reduce(operator.or_, (operator.eq(column, v) for v in value)) |
python/cudf/cudf/io/parquet.py
Outdated
| f"filters must be None, or non-empty List[Tuple] " | ||
| f"or List[List[Tuple]]. Got {filters}" | ||
| ) | ||
| if not filters or not isinstance(filters, list): |
There was a problem hiding this comment.
Is it easier to accept empty containers as "no filters" and normalise them to None (rather than requiring specifically None as empty filters).
So:
if filters:
... validate
else:
return None
python/cudf/cudf/io/parquet.py
Outdated
| f"got {predicate}." | ||
| ) | ||
|
|
||
| filters = filters if isinstance(filters[0], list) else [filters] |
There was a problem hiding this comment.
OK, so now we definitively have a list-of-lists.
python/cudf/cudf/io/parquet.py
Outdated
|
|
||
| filters = filters if isinstance(filters[0], list) else [filters] | ||
| for conjunction in filters: | ||
| if not conjunction or not isinstance(conjunction, list): |
There was a problem hiding this comment.
OK, so each entry must be a non-empty list.
python/cudf/cudf/io/parquet.py
Outdated
| if not conjunction or not isinstance(conjunction, list): | ||
| raise TypeError(msg) | ||
| for predicate in conjunction: | ||
| _validate_predicate(predicate) |
There was a problem hiding this comment.
And each entry in that non-empty list must be a 3-tuple of appropriate type.
python/cudf/cudf/io/parquet.py
Outdated
| for predicate in conjunction: | ||
| _validate_predicate(predicate) |
There was a problem hiding this comment.
tl;dr: nothing to do here.
Could write:
if not all(isinstance(item, tuple) and len(item) == 3 for item in conjunction):
raise TypeError("Every predicate must be ...")
But I guess then it's hard to point at the bad one, unless one abuses the walrus operator like so:
if not all(isinstance(item := pred, tuple) and len(pred) == 3 for pred in conjunctions):
raise TypeError(... item)
Which is kind of ugly.
There was a problem hiding this comment.
Right - I don't expect the number of predicates to get very large here. It seems reasonable to just call _validate_predicate in a loop.
|
/merge |
|
@rjzamora do you expect the work done here to be roughly the same for |
Yes, the |
|
Thanks, I'll go ahead and open a PR to do the same for ORC then! |
Follow-up to #13334 for the special case that the `filters` argument includes column names that are **not** included in the current column projection (i.e. the `columns` argument). Although this pattern is not a common case at the moment, it is perfectly valid, and will become more common when cudf is used as a dask-expr backend (since the predicate-pushdown optimizations in dask-expr are significantly more advanced than those in dask-dataframe). **Note**: Prior to #13334, the special case targeted by this PR would not have produced any run-time errors, but it also wouldn't have produced proper filtering in many cases. Now that `cudf.read_parquet` **does** produce proper row-wise filtering, it turns out that we now need to sacrifice a bit of IO in cases like this. Although this is unfortunate, I personally feel that it is still worthwhile to enforce row-wise filtering. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - Lawrence Mitchell (https://github.com/wence-) URL: #13666
Closes #13324
Description
This PR adds a
post_filtersargument tocudf.read_parquet, which is set equal to thefiltersargument by default. When this argument is set, the specified DNF (disjunctive normal form) filter expression will be applied to the in-memorycudf.DataFrameobject after IO is performed.The overal result of this PR is that the behavior of
cudf.read_parquetbecomes more consistent with that ofpd.read_parquetin the sense that the default result will now enforce filters at a row-wise granularity for both libraries.Note on the "need" for distinct
filtersandpost_filtersargumentsMy hope is that
post_filterswill eventually go away. However, I added a distinct argument for two general reasons:"is"and"is not"operands infilters. Therefore, we can not pass along all filters fromdask/dask_cudfdown tocudf.read_parquetusing the existingfiltersargument, because we rely on pyarrow to filter out row-groups (note that dask implements its own filter-conversion utility to avoid this problem). I'm hoping pyarrow will eventually adopt these comparison types (xref: [Python] Add support for "is" and "is not" topyarrow.parquet.filters_to_expressionapache/arrow#34504)cudf.read_parquetis called fromdask_cudf.read_parquet, row-group filtering will have already been applied. Therefore, it is convenient to specify that you only need cudf to provide the post-IO row-wise filtering step. Otherwise, we are effectively duplicating some metadata processing.My primary concern with adding
post_filtersis the idea that row-wise filtering could be added at the cuio/libcudf level in the future. In that (hypothetical) case,post_filterswouldn't really be providing any value, but we would probably be able to deprecate it without much pain (if any).Additional Context
This feature is ultimately needed to support general predicate-pushdown optimizations in Dask Expressions (dask-expr). This is because the high-level optimization logic becomes much simpler when a filter-based operation of a
ReadParquetexpression can be iteratively "absorbed" into the rootReadParquetexpression.Checklist