Skip to content

Commit 7caa4dc

Browse files
Yash DattaYash Datta
authored andcommitted
PARQUET-116: Add ConfiguredUserDefined that takes a serialiazble udp directly
1 parent 0eaabf4 commit 7caa4dc

18 files changed

Lines changed: 264 additions & 94 deletions

File tree

parquet-column/src/main/java/parquet/filter2/predicate/FilterApi.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import parquet.filter2.predicate.Operators.BinaryColumn;
88
import parquet.filter2.predicate.Operators.BooleanColumn;
99
import parquet.filter2.predicate.Operators.Column;
10+
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
1011
import parquet.filter2.predicate.Operators.DoubleColumn;
1112
import parquet.filter2.predicate.Operators.Eq;
1213
import parquet.filter2.predicate.Operators.FloatColumn;
@@ -147,11 +148,17 @@ public static <T extends Comparable<T>, C extends Column<T> & SupportsLtGt> GtEq
147148
/**
148149
* Keeps records that pass the provided {@link UserDefinedPredicate}
149150
*/
150-
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable>
151-
UserDefined<T, U, S> userDefined(Column<T> column, Class<U> clazz, S o) {
152-
return new UserDefined<T, U, S>(column, clazz, o);
151+
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
152+
UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
153+
return new UserDefined<T, U>(column, clazz);
153154
}
154155

156+
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable>
157+
ConfiguredUserDefined<T, U> userDefined(Column<T> column, U udp) {
158+
return new ConfiguredUserDefined<T, U> (column, udp);
159+
}
160+
161+
155162
/**
156163
* Constructs the logical and of two predicates. Records will be kept if both the left and right predicate agree
157164
* that the record should be kept.

parquet-column/src/main/java/parquet/filter2/predicate/FilterPredicate.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,18 @@
33
import java.io.Serializable;
44

55
import parquet.filter2.predicate.Operators.And;
6+
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
67
import parquet.filter2.predicate.Operators.Eq;
78
import parquet.filter2.predicate.Operators.Gt;
89
import parquet.filter2.predicate.Operators.GtEq;
910
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
11+
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
1012
import parquet.filter2.predicate.Operators.Lt;
1113
import parquet.filter2.predicate.Operators.LtEq;
1214
import parquet.filter2.predicate.Operators.Not;
1315
import parquet.filter2.predicate.Operators.NotEq;
1416
import parquet.filter2.predicate.Operators.Or;
1517
import parquet.filter2.predicate.Operators.UserDefined;
16-
1718
/**
1819
* A FilterPredicate is an expression tree describing the criteria for which records to keep when loading data from
1920
* a parquet file. These predicates are applied in multiple places. Currently, they are applied to all row groups at
@@ -49,8 +50,10 @@ public static interface Visitor<R> {
4950
R visit(And and);
5051
R visit(Or or);
5152
R visit(Not not);
52-
<T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> R visit(UserDefined<T, U, S> udp);
53-
<T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> R visit(LogicalNotUserDefined<T, U, S> udp);
53+
<T extends Comparable<T>, U extends UserDefinedPredicate<T> > R visit(UserDefined<T, U> udp);
54+
<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > R visit(ConfiguredUserDefined<T, U> udp);
55+
<T extends Comparable<T>, U extends UserDefinedPredicate<T> > R visit(LogicalNotUserDefined<T, U> udp);
56+
<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > R visit(LogicalNotConfiguredUserDefined<T, U> udp);
5457
}
5558

5659
}

parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverseRewriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
import parquet.filter2.predicate.FilterPredicate.Visitor;
66
import parquet.filter2.predicate.Operators.And;
7+
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
78
import parquet.filter2.predicate.Operators.Eq;
89
import parquet.filter2.predicate.Operators.Gt;
910
import parquet.filter2.predicate.Operators.GtEq;
1011
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
12+
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
1113
import parquet.filter2.predicate.Operators.Lt;
1214
import parquet.filter2.predicate.Operators.LtEq;
1315
import parquet.filter2.predicate.Operators.Not;
@@ -86,12 +88,22 @@ public FilterPredicate visit(Not not) {
8688
}
8789

8890
@Override
89-
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(UserDefined<T, U, S> udp) {
91+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(UserDefined<T, U> udp) {
9092
return udp;
9193
}
9294

9395
@Override
94-
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(LogicalNotUserDefined<T, U, S> udp) {
96+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(ConfiguredUserDefined<T, U> udp) {
97+
return udp;
98+
}
99+
100+
@Override
101+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
102+
return udp;
103+
}
104+
105+
@Override
106+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined<T, U> udp) {
95107
return udp;
96108
}
97109
}

parquet-column/src/main/java/parquet/filter2/predicate/LogicalInverter.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
import parquet.filter2.predicate.FilterPredicate.Visitor;
66
import parquet.filter2.predicate.Operators.And;
7+
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
78
import parquet.filter2.predicate.Operators.Eq;
89
import parquet.filter2.predicate.Operators.Gt;
910
import parquet.filter2.predicate.Operators.GtEq;
1011
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
12+
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
1113
import parquet.filter2.predicate.Operators.Lt;
1214
import parquet.filter2.predicate.Operators.LtEq;
1315
import parquet.filter2.predicate.Operators.Not;
@@ -81,12 +83,22 @@ public FilterPredicate visit(Not not) {
8183
}
8284

8385
@Override
84-
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(UserDefined<T, U, S> udp) {
85-
return new LogicalNotUserDefined<T, U, S>(udp);
86+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(UserDefined<T, U> udp) {
87+
return new LogicalNotUserDefined<T, U>(udp);
8688
}
8789

8890
@Override
89-
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> FilterPredicate visit(LogicalNotUserDefined<T, U, S> udp) {
91+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(ConfiguredUserDefined<T, U> udp) {
92+
return new LogicalNotConfiguredUserDefined<T, U>(udp);
93+
}
94+
95+
@Override
96+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > FilterPredicate visit(LogicalNotUserDefined<T, U> udp) {
97+
return udp.getUserDefined();
98+
}
99+
100+
@Override
101+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> FilterPredicate visit(LogicalNotConfiguredUserDefined<T, U> udp) {
90102
return udp.getUserDefined();
91103
}
92104
}

parquet-column/src/main/java/parquet/filter2/predicate/Operators.java

Lines changed: 104 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -340,20 +340,19 @@ public int hashCode() {
340340
}
341341
}
342342

343-
public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> implements FilterPredicate, Serializable {
343+
public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T> > implements FilterPredicate, Serializable {
344344
private final Column<T> column;
345345
private final Class<U> udpClass;
346346
private final String toString;
347-
private final S udpConfig;
347+
348348
private static final String INSTANTIATION_ERROR_MESSAGE =
349349
"Could not instantiate custom filter: %s. User defined predicates must be static classes with a default constructor.";
350350

351-
UserDefined(Column<T> column, Class<U> udpClass, S udpConfigParam) {
351+
UserDefined(Column<T> column, Class<U> udpClass) {
352352
this.column = checkNotNull(column, "column");
353353
this.udpClass = checkNotNull(udpClass, "udpClass");
354354
String name = getClass().getSimpleName().toLowerCase();
355355
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udpClass.getName() + ")";
356-
this.udpConfig = udpConfigParam;
357356

358357
// defensively try to instantiate the class early to make sure that it's possible
359358
getUserDefinedPredicate();
@@ -369,9 +368,7 @@ public Class<U> getUserDefinedPredicateClass() {
369368

370369
public U getUserDefinedPredicate() {
371370
try {
372-
U udpInstance = udpClass.newInstance();
373-
udpInstance.configure(udpConfig);
374-
return udpInstance;
371+
return udpClass.newInstance();
375372
} catch (InstantiationException e) {
376373
throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
377374
} catch (IllegalAccessException e) {
@@ -411,18 +408,70 @@ public int hashCode() {
411408
}
412409
}
413410

411+
public static final class ConfiguredUserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > implements FilterPredicate {
412+
private final Column<T> column;
413+
private final U udp;
414+
private final String toString;
415+
416+
ConfiguredUserDefined(Column<T> column, U udp) {
417+
this.column = checkNotNull(column, "column");
418+
this.udp = checkNotNull(udp, "udp");
419+
String name = getClass().getSimpleName().toLowerCase();
420+
this.toString = name + "(" + column.getColumnPath().toDotString() + ", " + udp.getClass().getName() + ")";
421+
}
422+
423+
public Column<T> getColumn() {
424+
return column;
425+
}
426+
427+
public U getUserDefinedPredicate() {
428+
return udp;
429+
}
430+
431+
@Override
432+
public <R> R accept(Visitor<R> visitor) {
433+
return visitor.visit(this);
434+
}
435+
436+
@Override
437+
public String toString() {
438+
return toString;
439+
}
440+
441+
@Override
442+
public boolean equals(Object o) {
443+
if (this == o) return true;
444+
if (o == null || getClass() != o.getClass()) return false;
445+
446+
ConfiguredUserDefined that = (ConfiguredUserDefined) o;
447+
448+
if (!column.equals(that.column)) return false;
449+
if (!udp.equals(that.udp)) return false;
450+
451+
return true;
452+
}
453+
454+
@Override
455+
public int hashCode() {
456+
int result = column.hashCode();
457+
result = 31 * result + udp.hashCode();
458+
result = result * 31 + getClass().hashCode();
459+
return result;
460+
}
461+
}
462+
414463
// Represents the inverse of a UserDefined. It is equivalent to not(userDefined), without the use
415464
// of the not() operator
416-
public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> implements FilterPredicate, Serializable {
417-
private final UserDefined<T, U, S> udp;
465+
public static final class LogicalNotUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T> > implements FilterPredicate, Serializable {
466+
private final UserDefined<T, U> udp;
418467
private final String toString;
419468

420-
LogicalNotUserDefined(UserDefined<T, U, S> userDefined) {
469+
LogicalNotUserDefined(UserDefined<T, U> userDefined) {
421470
this.udp = checkNotNull(userDefined, "userDefined");
422471
this.toString = "inverted(" + udp + ")";
423472
}
424473

425-
public UserDefined<T, U, S> getUserDefined() {
474+
public UserDefined<T, U> getUserDefined() {
426475
return udp;
427476
}
428477

@@ -456,4 +505,48 @@ public int hashCode() {
456505
}
457506
}
458507

508+
// Represents the inverse of a ConfiguredUserDefined. It is equivalent to not(userDefined), without the use
509+
// of the not() operator
510+
public static final class LogicalNotConfiguredUserDefined <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable > implements FilterPredicate, Serializable {
511+
private final ConfiguredUserDefined<T, U> udp;
512+
private final String toString;
513+
514+
LogicalNotConfiguredUserDefined(ConfiguredUserDefined<T, U> configuredUserDefined) {
515+
this.udp = checkNotNull(configuredUserDefined, "configuredUserDefined");
516+
this.toString = "inverted(" + udp + ")";
517+
}
518+
519+
public ConfiguredUserDefined<T, U> getUserDefined() {
520+
return udp;
521+
}
522+
523+
@Override
524+
public <R> R accept(Visitor<R> visitor) {
525+
return visitor.visit(this);
526+
}
527+
528+
@Override
529+
public String toString() {
530+
return toString;
531+
}
532+
533+
@Override
534+
public boolean equals(Object o) {
535+
if (this == o) return true;
536+
if (o == null || getClass() != o.getClass()) return false;
537+
538+
LogicalNotUserDefined that = (LogicalNotUserDefined) o;
539+
540+
if (!udp.equals(that.udp)) return false;
541+
542+
return true;
543+
}
544+
545+
@Override
546+
public int hashCode() {
547+
int result = udp.hashCode();
548+
result = result * 31 + getClass().hashCode();
549+
return result;
550+
}
551+
}
459552
}

parquet-column/src/main/java/parquet/filter2/predicate/SchemaCompatibilityValidator.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@
1313
import parquet.filter2.predicate.Operators.Gt;
1414
import parquet.filter2.predicate.Operators.GtEq;
1515
import parquet.filter2.predicate.Operators.LogicalNotUserDefined;
16+
import parquet.filter2.predicate.Operators.LogicalNotConfiguredUserDefined;
1617
import parquet.filter2.predicate.Operators.Lt;
1718
import parquet.filter2.predicate.Operators.LtEq;
1819
import parquet.filter2.predicate.Operators.Not;
1920
import parquet.filter2.predicate.Operators.NotEq;
2021
import parquet.filter2.predicate.Operators.Or;
2122
import parquet.filter2.predicate.Operators.UserDefined;
23+
import parquet.filter2.predicate.Operators.ConfiguredUserDefined;
2224
import parquet.schema.MessageType;
2325
import parquet.schema.OriginalType;
2426

@@ -129,13 +131,24 @@ public Void visit(Not not) {
129131
}
130132

131133
@Override
132-
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> Void visit(UserDefined<T, U, S> udp) {
134+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > Void visit(UserDefined<T, U> udp) {
133135
validateColumn(udp.getColumn());
134136
return null;
135137
}
136138

137139
@Override
138-
public <T extends Comparable<T>, U extends UserDefinedPredicate<T, S>, S extends Serializable> Void visit(LogicalNotUserDefined<T, U, S> udp) {
140+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> Void visit(ConfiguredUserDefined<T, U> udp) {
141+
validateColumn(udp.getColumn());
142+
return null;
143+
}
144+
145+
@Override
146+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> > Void visit(LogicalNotUserDefined<T, U> udp) {
147+
return udp.getUserDefined().accept(this);
148+
}
149+
150+
@Override
151+
public <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable> Void visit(LogicalNotConfiguredUserDefined<T, U> udp) {
139152
return udp.getUserDefined().accept(this);
140153
}
141154

parquet-column/src/main/java/parquet/filter2/predicate/UserDefinedPredicate.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package parquet.filter2.predicate;
22

3-
import java.io.Serializable;
4-
53
/**
64
* A UserDefinedPredicate decides whether a record should be kept or dropped, first by
75
* inspecting meta data about a group of records to see if the entire group can be dropped,
@@ -12,12 +10,7 @@
1210
*/
1311
// TODO: consider avoiding autoboxing and adding the specialized methods for each type
1412
// TODO: downside is that's fairly unwieldy for users
15-
public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Serializable> {
16-
17-
/*
18-
* An object that can be used for filtering in the keep method
19-
*/
20-
protected S udpConfig;
13+
public abstract class UserDefinedPredicate<T extends Comparable<T> > {
2114
/**
2215
* A udp must have a default constructor.
2316
* The udp passed to {@link FilterApi} will not be serialized along with its state.
@@ -26,17 +19,8 @@ public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Se
2619
*/
2720
public UserDefinedPredicate() { }
2821

29-
/*
30-
* This method is used to set the object that is used in the keep method for filtering.
31-
* Called before returning the new instance of this class.
32-
*/
33-
public void configure(S udpConfigParam) {
34-
this.udpConfig = udpConfigParam;
35-
}
36-
3722
/**
3823
* Return true to keep the record with this value, false to drop it.
39-
* o is a filter object that can be used for filtering the value.
4024
*/
4125
public abstract boolean keep(T value);
4226

@@ -102,4 +86,4 @@ public void configure(S udpConfigParam) {
10286
* }
10387
*/
10488
public abstract boolean inverseCanDrop(Statistics<T> statistics);
105-
}
89+
}

parquet-column/src/test/java/parquet/filter2/predicate/DummyUdp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import java.io.Serializable;
44

5-
public class DummyUdp extends UserDefinedPredicate<Integer, Serializable> {
5+
public class DummyUdp extends UserDefinedPredicate<Integer> {
66

77
@Override
88
public boolean keep(Integer value) {

0 commit comments

Comments
 (0)