Skip to content

Commit f7be012

Browse files
committed
Fallback to sub-aggregation if composite aggregation doesn't support (#4413)
* Fallback to sub-aggregation if composite aggregation doesn't support Signed-off-by: Heng Qian <qianheng@amazon.com> * merging main Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> * Address comments Signed-off-by: Heng Qian <qianheng@amazon.com> --------- Signed-off-by: Heng Qian <qianheng@amazon.com> (cherry picked from commit abd5bd3) Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent 5b751ec commit f7be012

14 files changed

Lines changed: 400 additions & 96 deletions

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteBinCommandIT.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,4 +943,54 @@ public void testStatsWithBinsOnTimeField_Avg() throws IOException {
943943
rows(41.8, "2024-07-01 00:04:00"),
944944
rows(50.0, "2024-07-01 00:05:00"));
945945
}
946+
947+
@Test
948+
public void testStatsWithBinsOnTimeAndTermField_Count() throws IOException {
949+
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
950+
enabledOnlyWhenPushdownIsEnabled();
951+
952+
JSONObject result =
953+
executeQuery(
954+
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false count() by"
955+
+ " region, @timestamp");
956+
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
957+
// https://github.com/opensearch-project/sql/issues/4317
958+
verifySchema(
959+
result,
960+
schema("count()", null, "bigint"),
961+
schema("region", null, "string"),
962+
schema("@timestamp", null, "string"));
963+
// auto_date_histogram will choose span=5m for bins=3
964+
verifyDataRows(
965+
result,
966+
rows(1, "eu-west", "2024-07-01 00:03:00"),
967+
rows(2, "us-east", "2024-07-01 00:00:00"),
968+
rows(1, "us-east", "2024-07-01 00:05:00"),
969+
rows(2, "us-west", "2024-07-01 00:01:00"));
970+
}
971+
972+
@Test
973+
public void testStatsWithBinsOnTimeAndTermField_Avg() throws IOException {
974+
// TODO: Remove this after addressing https://github.com/opensearch-project/sql/issues/4317
975+
enabledOnlyWhenPushdownIsEnabled();
976+
977+
JSONObject result =
978+
executeQuery(
979+
"source=events_null | bin @timestamp bins=3 | stats bucket_nullable=false "
980+
+ " avg(cpu_usage) by region, @timestamp");
981+
// TODO: @timestamp should keep date as its type, to be addressed by this issue:
982+
// https://github.com/opensearch-project/sql/issues/4317
983+
verifySchema(
984+
result,
985+
schema("avg(cpu_usage)", null, "double"),
986+
schema("region", null, "string"),
987+
schema("@timestamp", null, "string"));
988+
// auto_date_histogram will choose span=5m for bins=3
989+
verifyDataRows(
990+
result,
991+
rows(42.1, "eu-west", "2024-07-01 00:03:00"),
992+
rows(50.25, "us-east", "2024-07-01 00:00:00"),
993+
rows(50, "us-east", "2024-07-01 00:05:00"),
994+
rows(40.25, "us-west", "2024-07-01 00:01:00"));
995+
}
946996
}

integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,40 @@ public void testExplainStatsWithBinsOnTimeField() throws IOException {
358358
"source=events | bin @timestamp bins=3 | stats avg(cpu_usage) by @timestamp"));
359359
}
360360

361+
@Test
362+
public void testExplainStatsWithSubAggregation() throws IOException {
363+
enabledOnlyWhenPushdownIsEnabled();
364+
String expected = loadExpectedPlan("explain_stats_bins_on_time_and_term.yaml");
365+
assertYamlEqualsJsonIgnoreId(
366+
expected,
367+
explainQueryToString(
368+
"source=events | bin @timestamp bins=3 | stats bucket_nullable=false count() by"
369+
+ " @timestamp, region"));
370+
371+
expected = loadExpectedPlan("explain_stats_bins_on_time_and_term2.yaml");
372+
assertYamlEqualsJsonIgnoreId(
373+
expected,
374+
explainQueryToString(
375+
"source=events | bin @timestamp bins=3 | stats bucket_nullable=false avg(cpu_usage) by"
376+
+ " @timestamp, region"));
377+
}
378+
379+
@Test
380+
public void bucketNullableNotSupportSubAggregation() throws IOException {
381+
// TODO: Don't throw exception after addressing
382+
// https://github.com/opensearch-project/sql/issues/4317
383+
// When bucketNullable is true, sub aggregation is not supported. Hence we cannot pushdown the
384+
// aggregation in this query. Caused by issue
385+
// https://github.com/opensearch-project/sql/issues/4317,
386+
// bin aggregation on timestamp field won't work if not been push down.
387+
enabledOnlyWhenPushdownIsEnabled();
388+
assertThrows(
389+
Exception.class,
390+
() ->
391+
explainQueryToString(
392+
"source=events | bin @timestamp bins=3 | stats count() by @timestamp, region"));
393+
}
394+
361395
@Test
362396
public void testExplainBinWithSpan() throws IOException {
363397
String expected = loadExpectedPlan("explain_bin_span.yaml");
@@ -673,15 +707,15 @@ public void testPushdownLimitIntoAggregation() throws IOException {
673707
"source=opensearch-sql_test_index_account | stats count() by state | sort state | head"
674708
+ " 100 | head 10 from 10 "));
675709

676-
expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable1.json");
677-
assertJsonEqualsIgnoreId(
710+
expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable1.yaml");
711+
assertYamlEqualsJsonIgnoreId(
678712
expected,
679713
explainQueryToString(
680714
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
681715
+ " state | head 100 | head 10 from 10 "));
682716

683-
expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable2.json");
684-
assertJsonEqualsIgnoreId(
717+
expected = loadExpectedPlan("explain_limit_agg_pushdown_bucket_nullable2.yaml");
718+
assertYamlEqualsJsonIgnoreId(
685719
expected,
686720
explainQueryToString(
687721
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
@@ -853,15 +887,15 @@ public void testExplainCountsByAgg() throws IOException {
853887
public void testExplainSortOnMetricsNoBucketNullable() throws IOException {
854888
// TODO enhancement later: https://github.com/opensearch-project/sql/issues/4282
855889
enabledOnlyWhenPushdownIsEnabled();
856-
String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.json");
857-
assertJsonEqualsIgnoreId(
890+
String expected = loadExpectedPlan("explain_agg_sort_on_metrics1.yaml");
891+
assertYamlEqualsJsonIgnoreId(
858892
expected,
859893
explainQueryToString(
860894
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
861895
+ " state | sort `count()`"));
862896

863-
expected = loadExpectedPlan("explain_agg_sort_on_metrics2.json");
864-
assertJsonEqualsIgnoreId(
897+
expected = loadExpectedPlan("explain_agg_sort_on_metrics2.yaml");
898+
assertYamlEqualsJsonIgnoreId(
865899
expected,
866900
explainQueryToString(
867901
"source=opensearch-sql_test_index_account | stats bucket_nullable=false count() by"
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
5+
LogicalProject(count()=[$1], state=[$0])
6+
LogicalAggregate(group=[{0}], count()=[COUNT()])
7+
LogicalProject(state=[$7])
8+
LogicalFilter(condition=[IS NOT NULL($7)])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
physical: |
11+
EnumerableLimit(fetch=[10000])
12+
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), PROJECT->[count(), state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(sort0=[$0], dir0=[ASC-nulls-first])
5+
LogicalProject(count()=[$2], gender=[$0], state=[$1])
6+
LogicalAggregate(group=[{0, 1}], count()=[COUNT()])
7+
LogicalProject(gender=[$4], state=[$7])
8+
LogicalFilter(condition=[AND(IS NOT NULL($4), IS NOT NULL($7))])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
10+
physical: |
11+
EnumerableLimit(fetch=[10000])
12+
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
13+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), gender, state]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"gender":{"terms":{"field":"gender.keyword","missing_bucket":false,"order":"asc"}}},{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(offset=[10], fetch=[10])
5+
LogicalSort(fetch=[100])
6+
LogicalProject(count()=[$1], state=[$0])
7+
LogicalAggregate(group=[{0}], count()=[COUNT()])
8+
LogicalProject(state=[$7])
9+
LogicalFilter(condition=[IS NOT NULL($7)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], state=[$t0])
14+
EnumerableLimit(offset=[10], fetch=[10])
15+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), LIMIT->100, LIMIT->[10 from 10]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":20,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalSort(offset=[10], fetch=[10])
5+
LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[100])
6+
LogicalProject(count()=[$1], state=[$0])
7+
LogicalAggregate(group=[{0}], count()=[COUNT()])
8+
LogicalProject(state=[$7])
9+
LogicalFilter(condition=[IS NOT NULL($7)])
10+
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
11+
physical: |
12+
EnumerableLimit(fetch=[10000])
13+
EnumerableCalc(expr#0..1=[{inputs}], count()=[$t1], state=[$t0])
14+
EnumerableLimit(offset=[10], fetch=[10])
15+
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT()), SORT->[0 ASC FIRST], LIMIT->100, LIMIT->[10 from 10]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"composite_buckets":{"composite":{"size":20,"sources":[{"state":{"terms":{"field":"state.keyword","missing_bucket":false,"order":"asc"}}}]}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

integ-test/src/test/resources/expectedOutput/calcite/explain_stats_bins_on_time.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ calcite:
88
physical: |
99
EnumerableLimit(fetch=[10000])
1010
EnumerableCalc(expr#0..1=[{inputs}], expr#2=[0], expr#3=[>($t1, $t2)], count()=[$t1], @timestamp=[$t0], $condition=[$t3])
11-
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null},"aggregations":{"count()":{"value_count":{"field":"_index"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count()=COUNT())], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(count()=[$2], @timestamp=[$0], region=[$1])
5+
LogicalAggregate(group=[{0, 1}], count()=[COUNT()])
6+
LogicalProject(@timestamp=[$15], region=[$7])
7+
LogicalFilter(condition=[IS NOT NULL($7)])
8+
LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
10+
physical: |
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0, 1},count()=COUNT()), PROJECT->[count(), @timestamp, region], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"region":{"terms":{"field":"region","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":{"_key":"asc"}},"aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
calcite:
2+
logical: |
3+
LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])
4+
LogicalProject(avg(cpu_usage)=[$2], @timestamp=[$0], region=[$1])
5+
LogicalAggregate(group=[{0, 1}], avg(cpu_usage)=[AVG($2)])
6+
LogicalProject(@timestamp=[$15], region=[$7], cpu_usage=[$6])
7+
LogicalFilter(condition=[IS NOT NULL($7)])
8+
LogicalProject(environment=[$0], status_code=[$2], service=[$3], host=[$4], memory_usage=[$5], response_time=[$6], cpu_usage=[$7], region=[$8], bytes_sent=[$9], _id=[$10], _index=[$11], _score=[$12], _maxscore=[$13], _sort=[$14], _routing=[$15], @timestamp=[WIDTH_BUCKET($1, 3, -(MAX($1) OVER (), MIN($1) OVER ()), MAX($1) OVER ())])
9+
CalciteLogicalIndexScan(table=[[OpenSearch, events]])
10+
physical: |
11+
CalciteEnumerableIndexScan(table=[[OpenSearch, events]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={1, 2},avg(cpu_usage)=AVG($0)), PROJECT->[avg(cpu_usage), @timestamp, region], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","aggregations":{"region":{"terms":{"field":"region","size":1000,"min_doc_count":1,"shard_min_doc_count":0,"show_term_doc_count_error":false,"order":{"_key":"asc"}},"aggregations":{"@timestamp":{"auto_date_histogram":{"field":"@timestamp","buckets":3,"minimum_interval":null},"aggregations":{"avg(cpu_usage)":{"avg":{"field":"cpu_usage"}}}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])

0 commit comments

Comments
 (0)