Skip to content

Commit fe62472

Browse files
authored
Update request builder after pushdown sort into agg buckets (#4541)
Signed-off-by: Lantao Jin <ltjin@amazon.com>
1 parent 42a415f commit fe62472

File tree

4 files changed

+74
-15
lines changed

4 files changed

+74
-15
lines changed
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
setup:
2+
- do:
3+
query.settings:
4+
body:
5+
transient:
6+
plugins.calcite.enabled : true
7+
- do:
8+
bulk:
9+
index: test
10+
refresh: true
11+
body:
12+
- '{"index": {}}'
13+
- '{"category":"A","has_flag":true,"value":10}'
14+
- '{"index": {}}'
15+
- '{"category":"B","has_flag":true,"value":20}'
16+
- '{"index": {}}'
17+
- '{"category":"C","has_flag":true,"value":30}'
18+
- '{"index": {}}'
19+
- '{"category":"D","has_flag":false,"value":40}'
20+
- '{"index": {}}'
21+
- '{"category":"E","has_flag":false,"value":50}'
22+
- '{"index": {}}'
23+
- '{"category":"F","has_flag":false,"value":60}'
24+
25+
---
26+
teardown:
27+
- do:
28+
query.settings:
29+
body:
30+
transient:
31+
plugins.calcite.enabled : false
32+
33+
---
34+
"Join with fields":
35+
- skip:
36+
features:
37+
- headers
38+
- allowed_warnings
39+
- do:
40+
headers:
41+
Content-Type: 'application/json'
42+
ppl:
43+
body:
44+
query: source=test | stats COUNT() as cnt by category, has_flag | fields category, has_flag, cnt | join left=L right=R ON L.has_flag = R.has_flag [source=test | stats COUNT() as overall_cnt by has_flag]
45+
46+
- match: { total: 6 }

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchSortIndexScanRule.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ protected OpenSearchSortIndexScanRule(Config config) {
2323
public void onMatch(RelOptRuleCall call) {
2424
final Sort sort = call.rel(0);
2525
final AbstractCalciteIndexScan scan = call.rel(1);
26+
if (sort.getConvention() != scan.getConvention()) {
27+
return;
28+
}
2629

2730
var collations = sort.collation.getFieldCollations();
2831
AbstractCalciteIndexScan newScan = scan.pushDownSort(collations);

opensearch/src/main/java/org/opensearch/sql/opensearch/request/AggregateAnalyzer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,8 @@ private static Pair<AggregationBuilder, MetricParser> createRegularAggregation(
380380
case AVG -> Pair.of(
381381
helper.build(args.getFirst(), AggregationBuilders.avg(aggFieldName)),
382382
new SingleValueParser(aggFieldName));
383+
// 1. Only case SUM, skip SUM0 / COUNT since calling avg() in DSL should be faster.
384+
// 2. To align with databases, SUM0 is not preferred now.
383385
case SUM -> Pair.of(
384386
helper.build(args.getFirst(), AggregationBuilders.sum(aggFieldName)),
385387
new SingleValueParser(aggFieldName));

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -258,20 +258,8 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
258258
// aggregators.
259259
return null;
260260
}
261-
262-
// Propagate the sort to the new scan
263261
RelTraitSet traitsWithCollations = getTraitSet().plus(RelCollations.of(collations));
264-
AbstractCalciteIndexScan newScan =
265-
buildScan(
266-
getCluster(),
267-
traitsWithCollations,
268-
hints,
269-
table,
270-
osIndex,
271-
getRowType(),
272-
// Existing collations are overridden (discarded) by the new collations,
273-
pushDownContext.cloneWithoutSort());
274-
262+
PushDownContext pushDownContextWithoutSort = this.pushDownContext.cloneWithoutSort();
275263
AbstractAction<?> action;
276264
Object digest;
277265
if (pushDownContext.isAggregatePushed()) {
@@ -281,7 +269,27 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
281269
aggAction ->
282270
aggAction.pushDownSortIntoAggBucket(collations, getRowType().getFieldNames());
283271
digest = collations;
272+
pushDownContextWithoutSort.add(PushDownType.SORT, digest, action);
273+
return buildScan(
274+
getCluster(),
275+
traitsWithCollations,
276+
hints,
277+
table,
278+
osIndex,
279+
getRowType(),
280+
pushDownContextWithoutSort.clone());
284281
} else {
282+
// Propagate the sort to the new scan
283+
AbstractCalciteIndexScan newScan =
284+
buildScan(
285+
getCluster(),
286+
traitsWithCollations,
287+
hints,
288+
table,
289+
osIndex,
290+
getRowType(),
291+
// Existing collations are overridden (discarded) by the new collations,
292+
pushDownContextWithoutSort);
285293
List<SortBuilder<?>> builders = new ArrayList<>();
286294
for (RelFieldCollation collation : collations) {
287295
int index = collation.getFieldIndex();
@@ -310,9 +318,9 @@ public AbstractCalciteIndexScan pushDownSort(List<RelFieldCollation> collations)
310318
}
311319
action = (OSRequestBuilderAction) requestBuilder -> requestBuilder.pushDownSort(builders);
312320
digest = builders.toString();
321+
newScan.pushDownContext.add(PushDownType.SORT, digest, action);
322+
return newScan;
313323
}
314-
newScan.pushDownContext.add(PushDownType.SORT, digest, action);
315-
return newScan;
316324
} catch (Exception e) {
317325
if (LOG.isDebugEnabled()) {
318326
LOG.debug("Cannot pushdown the sort {}", getCollationNames(collations), e);

0 commit comments

Comments
 (0)