Skip to content

Commit 52fe8aa

Browse files
RyanL1997penghuo
andauthored
[BugFix] Fix Memory Exhaustion for Multiple Filtering Operations in PPL (#4841)
* [BugFix] Fix Regex OOM when there are 10+ regex clauses Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix unit tests Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix tests Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix explain tests and corresponding commands Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix explain tests for testFilterPushDownExplain Signed-off-by: Jialiang Liang <jiallian@amazon.com> * peng - isolate the fix logic to its own visitor class Signed-off-by: Jialiang Liang <jiallian@amazon.com> * Directly apply Calcite CoreRules.FILTER_MERGE before VolcanoPlanner plan Co-authored-by: Peng Huo <penghuo@gmail.com> Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix the UTs Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix the ITs after rebase Signed-off-by: Jialiang Liang <jiallian@amazon.com> * fix clickbench IT and more ITs Signed-off-by: Jialiang Liang <jiallian@amazon.com> * address comments from peng Signed-off-by: Jialiang Liang <jiallian@amazon.com> * add yaml test Signed-off-by: Jialiang Liang <jiallian@amazon.com> --------- Signed-off-by: Jialiang Liang <jiallian@amazon.com> Co-authored-by: Peng Huo <penghuo@gmail.com>
1 parent 679d8ca commit 52fe8aa

37 files changed

Lines changed: 291 additions & 126 deletions

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@
1616
import lombok.extern.log4j.Log4j2;
1717
import org.apache.calcite.jdbc.CalciteSchema;
1818
import org.apache.calcite.plan.RelTraitDef;
19+
import org.apache.calcite.plan.hep.HepPlanner;
20+
import org.apache.calcite.plan.hep.HepProgram;
21+
import org.apache.calcite.plan.hep.HepProgramBuilder;
1922
import org.apache.calcite.rel.RelCollation;
2023
import org.apache.calcite.rel.RelCollations;
2124
import org.apache.calcite.rel.RelNode;
2225
import org.apache.calcite.rel.core.Sort;
2326
import org.apache.calcite.rel.logical.LogicalSort;
27+
import org.apache.calcite.rel.rules.FilterMergeRule;
2428
import org.apache.calcite.schema.SchemaPlus;
2529
import org.apache.calcite.sql.parser.SqlParser;
2630
import org.apache.calcite.tools.FrameworkConfig;
@@ -52,6 +56,9 @@
5256
@AllArgsConstructor
5357
@Log4j2
5458
public class QueryService {
59+
private static final HepProgram FILTER_MERGE_PROGRAM =
60+
new HepProgramBuilder().addRuleInstance(FilterMergeRule.Config.DEFAULT.toRule()).build();
61+
5562
private final Analyzer analyzer;
5663
private final ExecutionEngine executionEngine;
5764
private final Planner planner;
@@ -100,6 +107,7 @@ public void executeWithCalcite(
100107
CalcitePlanContext.create(
101108
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
102109
RelNode relNode = analyze(plan, context);
110+
relNode = mergeAdjacentFilters(relNode);
103111
RelNode optimized = optimize(relNode, context);
104112
RelNode calcitePlan = convertToCalcitePlan(optimized);
105113
executionEngine.execute(calcitePlan, context, listener);
@@ -145,6 +153,7 @@ public void explainWithCalcite(
145153
context.run(
146154
() -> {
147155
RelNode relNode = analyze(plan, context);
156+
relNode = mergeAdjacentFilters(relNode);
148157
RelNode optimized = optimize(relNode, context);
149158
RelNode calcitePlan = convertToCalcitePlan(optimized);
150159
executionEngine.explain(calcitePlan, format, context, listener);
@@ -259,6 +268,16 @@ public RelNode analyze(UnresolvedPlan plan, CalcitePlanContext context) {
259268
return getRelNodeVisitor().analyze(plan, context);
260269
}
261270

271+
/**
272+
* Run Calcite FILTER_MERGE once so adjacent filters created during analysis can collapse before
273+
* the rest of optimization.
274+
*/
275+
private RelNode mergeAdjacentFilters(RelNode relNode) {
276+
HepPlanner planner = new HepPlanner(FILTER_MERGE_PROGRAM);
277+
planner.setRoot(relNode);
278+
return planner.findBestExp();
279+
}
280+
262281
/** Analyze {@link UnresolvedPlan}. */
263282
public LogicalPlan analyze(UnresolvedPlan plan, QueryType queryType) {
264283
return analyzer.analyze(plan, new AnalysisContext(queryType));

integ-test/src/test/resources/expectedOutput/calcite/big5/composite_date_histogram_daily.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ calcite:
55
LogicalProject(count()=[$1], span(`@timestamp`,1d)=[$0])
66
LogicalAggregate(group=[{0}], count()=[COUNT()])
77
LogicalProject(span(`@timestamp`,1d)=[SPAN($17, 1, 'd')])
8-
LogicalFilter(condition=[IS NOT NULL($17)])
9-
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2022-12-30 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-07 12:00:00':VARCHAR)))])
10-
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
8+
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2022-12-30 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-07 12:00:00':VARCHAR)))])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
1110
physical: |
12-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[@timestamp], FILTER->SEARCH($0, Sarg[['2022-12-30 00:00:00':VARCHAR..'2023-01-07 12:00:00':VARCHAR); NULL AS FALSE]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), span(`@timestamp`,1d)], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2022-12-30T00:00:00.000Z","to":"2023-01-07T12:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"span(`@timestamp`,1d)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1d"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[@timestamp], FILTER->SEARCH($0, Sarg[['2022-12-30 00:00:00':VARCHAR..'2023-01-07 12:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), span(`@timestamp`,1d)], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2022-12-30T00:00:00.000Z","to":"2023-01-07T12:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"span(`@timestamp`,1d)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1d"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/big5/composite_terms.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ calcite:
55
LogicalProject(count()=[$2], process.name=[$0], cloud.region=[$1])
66
LogicalAggregate(group=[{0, 1}], count()=[COUNT()])
77
LogicalProject(process.name=[$7], cloud.region=[$14])
8-
LogicalFilter(condition=[AND(IS NOT NULL($7), IS NOT NULL($14))])
9-
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-02 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-02 10:00:00':VARCHAR)))])
10-
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
8+
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-02 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-02 10:00:00':VARCHAR)), IS NOT NULL($7), IS NOT NULL($14))])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
1110
physical: |
12-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp], FILTER->SEARCH($2, Sarg[['2023-01-02 00:00:00':VARCHAR..'2023-01-02 10:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), process.name, cloud.region], SORT->[1 DESC LAST, 2 ASC FIRST], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2023-01-02T00:00:00.000Z","to":"2023-01-02T10:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["process.name","cloud.region","@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"process.name":{"terms":{"field":"process.name","missing_bucket":false,"order":"desc"}}},{"cloud.region":{"terms":{"field":"cloud.region","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp], FILTER->AND(SEARCH($2, Sarg[['2023-01-02 00:00:00':VARCHAR..'2023-01-02 10:00:00':VARCHAR)]:VARCHAR), IS NOT NULL($0), IS NOT NULL($1)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), process.name, cloud.region], SORT->[1 DESC LAST, 2 ASC FIRST], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"range":{"@timestamp":{"from":"2023-01-02T00:00:00.000Z","to":"2023-01-02T10:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},{"exists":{"field":"process.name","boost":1.0}},{"exists":{"field":"cloud.region","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["process.name","cloud.region","@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"process.name":{"terms":{"field":"process.name","missing_bucket":false,"order":"desc"}}},{"cloud.region":{"terms":{"field":"cloud.region","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/big5/composite_terms_keyword.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ calcite:
55
LogicalProject(count()=[$3], process.name=[$0], cloud.region=[$1], aws.cloudwatch.log_stream=[$2])
66
LogicalAggregate(group=[{0, 1, 2}], count()=[COUNT()])
77
LogicalProject(process.name=[$7], cloud.region=[$14], aws.cloudwatch.log_stream=[$34])
8-
LogicalFilter(condition=[AND(IS NOT NULL($7), IS NOT NULL($14), IS NOT NULL($34))])
9-
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-02 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-02 10:00:00':VARCHAR)))])
10-
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
8+
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-02 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-02 10:00:00':VARCHAR)), IS NOT NULL($7), IS NOT NULL($14), IS NOT NULL($34))])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
1110
physical: |
12-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp, aws.cloudwatch.log_stream], FILTER->SEARCH($2, Sarg[['2023-01-02 00:00:00':VARCHAR..'2023-01-02 10:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},count()=COUNT()), PROJECT->[count(), process.name, cloud.region, aws.cloudwatch.log_stream], SORT->[1 DESC LAST, 2 ASC FIRST, 3 ASC FIRST], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2023-01-02T00:00:00.000Z","to":"2023-01-02T10:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["process.name","cloud.region","@timestamp","aws.cloudwatch.log_stream"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"process.name":{"terms":{"field":"process.name","missing_bucket":false,"order":"desc"}}},{"cloud.region":{"terms":{"field":"cloud.region","missing_bucket":false,"order":"asc"}}},{"aws.cloudwatch.log_stream":{"terms":{"field":"aws.cloudwatch.log_stream","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[process.name, cloud.region, @timestamp, aws.cloudwatch.log_stream], FILTER->AND(SEARCH($2, Sarg[['2023-01-02 00:00:00':VARCHAR..'2023-01-02 10:00:00':VARCHAR)]:VARCHAR), IS NOT NULL($0), IS NOT NULL($1), IS NOT NULL($3)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1, 2},count()=COUNT()), PROJECT->[count(), process.name, cloud.region, aws.cloudwatch.log_stream], SORT->[1 DESC LAST, 2 ASC FIRST, 3 ASC FIRST], LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"range":{"@timestamp":{"from":"2023-01-02T00:00:00.000Z","to":"2023-01-02T10:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},{"exists":{"field":"process.name","boost":1.0}},{"exists":{"field":"cloud.region","boost":1.0}},{"exists":{"field":"aws.cloudwatch.log_stream","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["process.name","cloud.region","@timestamp","aws.cloudwatch.log_stream"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10,"sources":[{"process.name":{"terms":{"field":"process.name","missing_bucket":false,"order":"desc"}}},{"cloud.region":{"terms":{"field":"cloud.region","missing_bucket":false,"order":"asc"}}},{"aws.cloudwatch.log_stream":{"terms":{"field":"aws.cloudwatch.log_stream","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/big5/date_histogram_minute_agg.yaml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ calcite:
44
LogicalProject(count()=[$1], span(`@timestamp`,1m)=[$0])
55
LogicalAggregate(group=[{0}], count()=[COUNT()])
66
LogicalProject(span(`@timestamp`,1m)=[SPAN($17, 1, 'm')])
7-
LogicalFilter(condition=[IS NOT NULL($17)])
8-
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-01 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-03 00:00:00':VARCHAR)))])
9-
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
7+
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-01 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-03 00:00:00':VARCHAR)))])
8+
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
109
physical: |
11-
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[@timestamp], FILTER->SEARCH($0, Sarg[['2023-01-01 00:00:00':VARCHAR..'2023-01-03 00:00:00':VARCHAR); NULL AS FALSE]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), span(`@timestamp`,1m)], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2023-01-01T00:00:00.000Z","to":"2023-01-03T00:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"span(`@timestamp`,1m)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1m"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
10+
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[@timestamp], FILTER->SEARCH($0, Sarg[['2023-01-01 00:00:00':VARCHAR..'2023-01-03 00:00:00':VARCHAR)]:VARCHAR), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), span(`@timestamp`,1m)], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"range":{"@timestamp":{"from":"2023-01-01T00:00:00.000Z","to":"2023-01-03T00:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}},"_source":{"includes":["@timestamp"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":10000,"sources":[{"span(`@timestamp`,1m)":{"date_histogram":{"field":"@timestamp","missing_bucket":false,"order":"asc","fixed_interval":"1m"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/big5/keyword_in_range.yaml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ calcite:
33
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
44
LogicalProject(agent=[$0], process=[$6], log=[$8], message=[$11], tags=[$12], cloud=[$13], input=[$15], @timestamp=[$17], ecs=[$18], data_stream=[$20], meta=[$24], host=[$26], metrics=[$27], aws=[$30], event=[$35])
55
LogicalSort(fetch=[10])
6-
LogicalFilter(condition=[AND(>=($17, TIMESTAMP('2023-01-01 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-03 00:00:00':VARCHAR)))])
7-
LogicalFilter(condition=[query_string(MAP('query', 'process.name:kernel':VARCHAR))])
8-
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
6+
LogicalFilter(condition=[AND(query_string(MAP('query', 'process.name:kernel':VARCHAR)), >=($17, TIMESTAMP('2023-01-01 00:00:00':VARCHAR)), <($17, TIMESTAMP('2023-01-03 00:00:00':VARCHAR)))])
7+
CalciteLogicalIndexScan(table=[[OpenSearch, big5]])
98
physical: |
109
CalciteEnumerableIndexScan(table=[[OpenSearch, big5]], PushDownContext=[[PROJECT->[agent, process, log, message, tags, cloud, input, @timestamp, ecs, data_stream, meta, host, metrics, aws, event], FILTER->AND(query_string(MAP('query', 'process.name:kernel':VARCHAR)), SEARCH($7, Sarg[['2023-01-01 00:00:00':VARCHAR..'2023-01-03 00:00:00':VARCHAR)]:VARCHAR)), LIMIT->10, LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10,"timeout":"1m","query":{"bool":{"must":[{"query_string":{"query":"process.name:kernel","fields":[],"type":"best_fields","default_operator":"or","max_determinized_states":10000,"enable_position_increments":true,"fuzziness":"AUTO","fuzzy_prefix_length":0,"fuzzy_max_expansions":50,"phrase_slop":0,"escape":false,"auto_generate_synonyms_phrase_query":true,"fuzzy_transpositions":true,"boost":1.0}},{"range":{"@timestamp":{"from":"2023-01-01T00:00:00.000Z","to":"2023-01-03T00:00:00.000Z","include_lower":true,"include_upper":false,"format":"date_time","boost":1.0}}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["agent","process","log","message","tags","cloud","input","@timestamp","ecs","data_stream","meta","host","metrics","aws","event"],"excludes":[]}}, requestedTotalSize=10, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)