Skip to content

Commit fddbb70

Browse files
authored
Add configurable sytem limitations for subsearch and join command (opensearch-project#4501)
* Add configurable sytem limitations for subsearch and join command Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * typo Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> * remove rollback in doc Signed-off-by: Lantao Jin <ltjin@amazon.com> * address comments Signed-off-by: Lantao Jin <ltjin@amazon.com> * fix typo Signed-off-by: Lantao Jin <ltjin@amazon.com> * Fix IT Signed-off-by: Lantao Jin <ltjin@amazon.com> --------- Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 4d416db commit fddbb70

52 files changed

Lines changed: 1711 additions & 64 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

common/src/main/java/org/opensearch/sql/common/setting/Settings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public enum Key {
3333
PPL_REX_MAX_MATCH_LIMIT("plugins.ppl.rex.max_match.limit"),
3434
PPL_VALUES_MAX_LIMIT("plugins.ppl.values.max.limit"),
3535
PPL_SYNTAX_LEGACY_PREFERRED("plugins.ppl.syntax.legacy.preferred"),
36+
PPL_SUBSEARCH_MAXOUT("plugins.ppl.subsearch.maxout"),
37+
PPL_JOIN_SUBSEARCH_MAXOUT("plugins.ppl.join.subsearch_maxout"),
3638

3739
/** Enable Calcite as execution engine */
3840
CALCITE_ENGINE_ENABLED("plugins.calcite.enabled"),

core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class CalcitePlanContext {
3535
public final ExtendedRexBuilder rexBuilder;
3636
public final FunctionProperties functionProperties;
3737
public final QueryType queryType;
38-
public final Integer querySizeLimit;
38+
public final SysLimit sysLimit;
3939

4040
/** This thread local variable is only used to skip script encoding in script pushdown. */
4141
public static final ThreadLocal<Boolean> skipEncoding = ThreadLocal.withInitial(() -> false);
@@ -61,9 +61,9 @@ public class CalcitePlanContext {
6161

6262
@Getter public Map<String, RexLambdaRef> rexLambdaRefMap;
6363

64-
private CalcitePlanContext(FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
64+
private CalcitePlanContext(FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
6565
this.config = config;
66-
this.querySizeLimit = querySizeLimit;
66+
this.sysLimit = sysLimit;
6767
this.queryType = queryType;
6868
this.connection = CalciteToolsHelper.connect(config, TYPE_FACTORY);
6969
this.relBuilder = CalciteToolsHelper.create(config, TYPE_FACTORY, connection);
@@ -102,12 +102,12 @@ public Optional<RexCorrelVariable> peekCorrelVar() {
102102
}
103103

104104
public CalcitePlanContext clone() {
105-
return new CalcitePlanContext(config, querySizeLimit, queryType);
105+
return new CalcitePlanContext(config, sysLimit, queryType);
106106
}
107107

108108
public static CalcitePlanContext create(
109-
FrameworkConfig config, Integer querySizeLimit, QueryType queryType) {
110-
return new CalcitePlanContext(config, querySizeLimit, queryType);
109+
FrameworkConfig config, SysLimit sysLimit, QueryType queryType) {
110+
return new CalcitePlanContext(config, sysLimit, queryType);
111111
}
112112

113113
/**

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,8 @@
133133
import org.opensearch.sql.ast.tree.UnresolvedPlan;
134134
import org.opensearch.sql.ast.tree.Values;
135135
import org.opensearch.sql.ast.tree.Window;
136+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
137+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
136138
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
137139
import org.opensearch.sql.calcite.utils.BinUtils;
138140
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
@@ -1136,6 +1138,15 @@ private Optional<RexLiteral> extractAliasLiteral(RexNode node) {
11361138
public RelNode visitJoin(Join node, CalcitePlanContext context) {
11371139
List<UnresolvedPlan> children = node.getChildren();
11381140
children.forEach(c -> analyze(c, context));
1141+
// add join.subsearch_maxout limit to subsearch side
1142+
if (context.sysLimit.joinSubsearchLimit() >= 0) {
1143+
PlanUtils.replaceTop(
1144+
context.relBuilder,
1145+
LogicalSystemLimit.create(
1146+
SystemLimitType.JOIN_SUBSEARCH_MAXOUT,
1147+
context.relBuilder.peek(),
1148+
context.relBuilder.literal(context.sysLimit.joinSubsearchLimit())));
1149+
}
11391150
if (node.getJoinCondition().isEmpty()) {
11401151
// join-with-field-list grammar
11411152
List<String> leftColumns = context.relBuilder.peek(1).getRowType().getFieldNames();

core/src/main/java/org/opensearch/sql/calcite/CalciteRexNodeVisitor.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,13 @@
6868
import org.opensearch.sql.ast.expression.subquery.ExistsSubquery;
6969
import org.opensearch.sql.ast.expression.subquery.InSubquery;
7070
import org.opensearch.sql.ast.expression.subquery.ScalarSubquery;
71+
import org.opensearch.sql.ast.expression.subquery.SubqueryExpression;
7172
import org.opensearch.sql.ast.tree.UnresolvedPlan;
73+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
74+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
7275
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
7376
import org.opensearch.sql.calcite.utils.PlanUtils;
77+
import org.opensearch.sql.calcite.utils.SubsearchUtils;
7478
import org.opensearch.sql.common.utils.StringUtils;
7579
import org.opensearch.sql.data.type.ExprType;
7680
import org.opensearch.sql.exception.CalciteUnsupportedException;
@@ -465,7 +469,7 @@ private RexNode extractRexNodeFromAlias(RexNode node) {
465469
public RexNode visitInSubquery(InSubquery node, CalcitePlanContext context) {
466470
List<RexNode> nodes = node.getChild().stream().map(child -> analyze(child, context)).toList();
467471
UnresolvedPlan subquery = node.getQuery();
468-
RelNode subqueryRel = resolveSubqueryPlan(subquery, context);
472+
RelNode subqueryRel = resolveSubqueryPlan(subquery, node, context);
469473
if (subqueryRel.getRowType().getFieldCount() != nodes.size()) {
470474
throw new SemanticCheckException(
471475
"The number of columns in the left hand side of an IN subquery does not match the number"
@@ -489,7 +493,7 @@ public RexNode visitScalarSubquery(ScalarSubquery node, CalcitePlanContext conte
489493
return context.relBuilder.scalarQuery(
490494
b -> {
491495
UnresolvedPlan subquery = node.getQuery();
492-
return resolveSubqueryPlan(subquery, context);
496+
return resolveSubqueryPlan(subquery, node, context);
493497
});
494498
}
495499

@@ -498,21 +502,44 @@ public RexNode visitExistsSubquery(ExistsSubquery node, CalcitePlanContext conte
498502
return context.relBuilder.exists(
499503
b -> {
500504
UnresolvedPlan subquery = node.getQuery();
501-
return resolveSubqueryPlan(subquery, context);
505+
return resolveSubqueryPlan(subquery, node, context);
502506
});
503507
}
504508

505-
private RelNode resolveSubqueryPlan(UnresolvedPlan subquery, CalcitePlanContext context) {
509+
private RelNode resolveSubqueryPlan(
510+
UnresolvedPlan subquery, SubqueryExpression subqueryExpression, CalcitePlanContext context) {
506511
boolean isNestedSubquery = context.isResolvingSubquery();
507512
context.setResolvingSubquery(true);
508513
// clear and store the outer state
509514
boolean isResolvingJoinConditionOuter = context.isResolvingJoinCondition();
510515
if (isResolvingJoinConditionOuter) {
511516
context.setResolvingJoinCondition(false);
512517
}
513-
RelNode subqueryRel = subquery.accept(planVisitor, context);
518+
subquery.accept(planVisitor, context);
519+
520+
if (context.sysLimit.subsearchLimit() > 0 && !(subqueryExpression instanceof ScalarSubquery)) {
521+
// Add subsearch.maxout limit to exists-in subsearch:
522+
// Cannot add system limit to the top of subquery simply.
523+
// Instead, add system limit under the correlated conditions.
524+
SubsearchUtils.SystemLimitInsertionShuttle shuttle =
525+
new SubsearchUtils.SystemLimitInsertionShuttle(context);
526+
RelNode replacement = context.relBuilder.peek().accept(shuttle);
527+
if (!shuttle.isCorrelatedConditionFound()) {
528+
// If no correlated condition found, add system limit to the top of subquery.
529+
replacement =
530+
LogicalSystemLimit.create(
531+
SystemLimitType.SUBSEARCH_MAXOUT,
532+
replacement,
533+
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
534+
}
535+
PlanUtils.replaceTop(context.relBuilder, replacement);
536+
}
514537
// pop the inner plan
515-
context.relBuilder.build();
538+
RelNode subqueryRel = context.relBuilder.build();
539+
// if maxout = 0, return empty results
540+
if (context.sysLimit.subsearchLimit() == 0) {
541+
subqueryRel = context.relBuilder.values(subqueryRel.getRowType()).build();
542+
}
516543
// clear the exists subquery resolving state
517544
// restore to the previous state
518545
if (isResolvingJoinConditionOuter) {
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite;
7+
8+
import org.opensearch.sql.common.setting.Settings;
9+
10+
public record SysLimit(Integer querySizeLimit, Integer subsearchLimit, Integer joinSubsearchLimit) {
11+
/** Create SysLimit from Settings. */
12+
public static SysLimit fromSettings(Settings settings) {
13+
return settings == null
14+
? UNLIMITED_SUBSEARCH
15+
: new SysLimit(
16+
settings.getSettingValue(Settings.Key.QUERY_SIZE_LIMIT),
17+
settings.getSettingValue(Settings.Key.PPL_SUBSEARCH_MAXOUT),
18+
settings.getSettingValue(Settings.Key.PPL_JOIN_SUBSEARCH_MAXOUT));
19+
}
20+
21+
/** No limitation on subsearch */
22+
public static SysLimit UNLIMITED_SUBSEARCH = new SysLimit(10000, -1, -1);
23+
24+
/** For testing only */
25+
public static SysLimit DEFAULT = new SysLimit(10000, 10000, 50000);
26+
}

core/src/main/java/org/opensearch/sql/calcite/plan/LogicalSystemLimit.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@ public enum SystemLimitType {
3030
*
3131
* <p>This type is used to indicate that the limit is applied to the system level.
3232
*/
33-
QUERY_SIZE_LIMIT
33+
QUERY_SIZE_LIMIT,
34+
/** The max output from subsearch to join against. */
35+
JOIN_SUBSEARCH_MAXOUT,
36+
/** Max output to return from a subsearch. */
37+
SUBSEARCH_MAXOUT,
3438
}
3539

3640
@Getter private final SystemLimitType type;

core/src/main/java/org/opensearch/sql/calcite/utils/CalciteUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,14 @@
77

88
import static org.opensearch.sql.common.setting.Settings.Key.CALCITE_ENGINE_ENABLED;
99

10+
import java.util.Collection;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.function.Predicate;
14+
import java.util.stream.Collectors;
1015
import lombok.experimental.UtilityClass;
16+
import org.apache.commons.lang3.tuple.ImmutablePair;
17+
import org.apache.commons.lang3.tuple.Pair;
1118

1219
@UtilityClass
1320
public class CalciteUtils {
@@ -16,4 +23,10 @@ public static UnsupportedOperationException getOnlyForCalciteException(String fe
1623
return new UnsupportedOperationException(
1724
feature + " is supported only when " + CALCITE_ENGINE_ENABLED.getKeyValue() + "=true");
1825
}
26+
27+
public static <T> Pair<List<T>, List<T>> partition(
28+
Collection<T> collection, Predicate<T> predicate) {
29+
Map<Boolean, List<T>> map = collection.stream().collect(Collectors.partitioningBy(predicate));
30+
return new ImmutablePair<>(map.get(true), map.get(false));
31+
}
1932
}

core/src/main/java/org/opensearch/sql/calcite/utils/PlanUtils.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import static org.apache.calcite.rex.RexWindowBounds.preceding;
1313

1414
import com.google.common.collect.ImmutableList;
15+
import java.lang.reflect.Method;
1516
import java.util.ArrayList;
1617
import java.util.List;
1718
import java.util.Objects;
@@ -26,6 +27,7 @@
2627
import org.apache.calcite.rel.logical.LogicalProject;
2728
import org.apache.calcite.rel.type.RelDataType;
2829
import org.apache.calcite.rex.RexCall;
30+
import org.apache.calcite.rex.RexCorrelVariable;
2931
import org.apache.calcite.rex.RexInputRef;
3032
import org.apache.calcite.rex.RexNode;
3133
import org.apache.calcite.rex.RexOver;
@@ -486,4 +488,36 @@ public static String getActualSignature(List<RelDataType> argTypes) {
486488
.collect(Collectors.joining(","))
487489
+ "]";
488490
}
491+
492+
/**
493+
* Check if the RexNode contains any CorrelVariable.
494+
*
495+
* @param node the RexNode to check
496+
* @return true if the RexNode contains any CorrelVariable, false otherwise
497+
*/
498+
static boolean containsCorrelVariable(RexNode node) {
499+
try {
500+
node.accept(
501+
new RexVisitorImpl<Void>(true) {
502+
@Override
503+
public Void visitCorrelVariable(RexCorrelVariable correlVar) {
504+
throw new RuntimeException("Correl found");
505+
}
506+
});
507+
return false;
508+
} catch (Exception e) {
509+
return true;
510+
}
511+
}
512+
513+
/** Adds a rel node to the top of the stack while preserving the field names and aliases. */
514+
static void replaceTop(RelBuilder relBuilder, RelNode relNode) {
515+
try {
516+
Method method = RelBuilder.class.getDeclaredMethod("replaceTop", RelNode.class);
517+
method.setAccessible(true);
518+
method.invoke(relBuilder, relNode);
519+
} catch (Exception e) {
520+
throw new IllegalStateException("Unable to invoke RelBuilder.replaceTop", e);
521+
}
522+
}
489523
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.calcite.utils;
7+
8+
import java.util.List;
9+
import lombok.Getter;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.experimental.UtilityClass;
12+
import org.apache.calcite.plan.RelOptUtil;
13+
import org.apache.calcite.rel.RelNode;
14+
import org.apache.calcite.rel.RelShuttleImpl;
15+
import org.apache.calcite.rel.logical.LogicalCorrelate;
16+
import org.apache.calcite.rel.logical.LogicalFilter;
17+
import org.apache.calcite.rel.logical.LogicalIntersect;
18+
import org.apache.calcite.rel.logical.LogicalJoin;
19+
import org.apache.calcite.rel.logical.LogicalMinus;
20+
import org.apache.calcite.rel.logical.LogicalUnion;
21+
import org.apache.calcite.rex.RexNode;
22+
import org.apache.calcite.rex.RexUtil;
23+
import org.apache.commons.lang3.tuple.Pair;
24+
import org.opensearch.sql.calcite.CalcitePlanContext;
25+
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
26+
27+
@UtilityClass
28+
public class SubsearchUtils {
29+
30+
/** Insert a system_limit under correlate conditions. */
31+
private static RelNode insertSysLimitUnderCorrelateConditions(
32+
LogicalFilter logicalFilter, CalcitePlanContext context) {
33+
// Before:
34+
// LogicalFilter(condition=[AND(=($cor0.SAL, $2), >($1, 1000.0:DECIMAL(5, 1)))])
35+
// After:
36+
// LogicalFilter(condition=[=($cor0.SAL, $2)])
37+
// LogicalSystemLimit(fetch=[1], type=[SUBSEARCH_MAXOUT])
38+
// LogicalFilter(condition=[>($1, 1000.0:DECIMAL(5, 1))])
39+
RexNode originalCondition = logicalFilter.getCondition();
40+
List<RexNode> conditions = RelOptUtil.conjunctions(originalCondition);
41+
Pair<List<RexNode>, List<RexNode>> result =
42+
CalciteUtils.partition(conditions, PlanUtils::containsCorrelVariable);
43+
if (result.getLeft().isEmpty()) {
44+
return logicalFilter;
45+
}
46+
47+
RelNode input = logicalFilter.getInput();
48+
if (!result.getRight().isEmpty()) {
49+
RexNode nonCorrelCondition =
50+
RexUtil.composeConjunction(context.rexBuilder, result.getRight());
51+
input = LogicalFilter.create(input, nonCorrelCondition);
52+
}
53+
input =
54+
LogicalSystemLimit.create(
55+
LogicalSystemLimit.SystemLimitType.SUBSEARCH_MAXOUT,
56+
input,
57+
context.relBuilder.literal(context.sysLimit.subsearchLimit()));
58+
if (!result.getLeft().isEmpty()) {
59+
RexNode correlCondition = RexUtil.composeConjunction(context.rexBuilder, result.getLeft());
60+
input = LogicalFilter.create(input, correlCondition);
61+
}
62+
return input;
63+
}
64+
65+
/** Insert a system_limit under correlated conditions by visiting a plan tree. */
66+
@RequiredArgsConstructor
67+
public static class SystemLimitInsertionShuttle extends RelShuttleImpl {
68+
69+
private final CalcitePlanContext context;
70+
@Getter private boolean correlatedConditionFound = false;
71+
72+
@Override
73+
public RelNode visit(LogicalFilter filter) {
74+
RelNode newFilter = insertSysLimitUnderCorrelateConditions(filter, context);
75+
if (newFilter != filter) {
76+
correlatedConditionFound = true;
77+
return newFilter;
78+
}
79+
return super.visitChildren(filter);
80+
}
81+
82+
@Override
83+
public RelNode visit(LogicalJoin node) {
84+
return node;
85+
}
86+
87+
@Override
88+
public RelNode visit(LogicalCorrelate node) {
89+
return node;
90+
}
91+
92+
@Override
93+
public RelNode visit(LogicalUnion node) {
94+
return node;
95+
}
96+
97+
@Override
98+
public RelNode visit(LogicalIntersect node) {
99+
return node;
100+
}
101+
102+
@Override
103+
public RelNode visit(LogicalMinus node) {
104+
return node;
105+
}
106+
}
107+
}

0 commit comments

Comments
 (0)