Skip to content

Commit c8f2a7f

Browse files
committed
Address comments
Signed-off-by: Heng Qian <qianheng@amazon.com>
1 parent b161a3b commit c8f2a7f

2 files changed

Lines changed: 11 additions & 18 deletions

File tree

opensearch/src/main/java/org/opensearch/sql/opensearch/planner/rules/DedupPushdownRule.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -125,10 +125,8 @@ protected void apply(
125125
PlanUtils.addIgnoreNullBucketHintToAggregate(relBuilder);
126126
// peek the aggregate after hint being added
127127
LogicalAggregate aggregate = (LogicalAggregate) relBuilder.build();
128-
// assert aggregate.getInput().getRowType().getFieldCount() == groupByList.size() :
129-
// String.format("The input's field size should be trimmed to equal to group list size %d, but
130-
// got %d", groupByList.size(), aggregate.getInput().getRowType().getFieldCount());
131-
assert aggregate.getGroupSet().asList().equals(newGroupByList);
128+
assert aggregate.getGroupSet().asList().equals(newGroupByList)
129+
: "The group set of aggregate should be exactly the same as the generated group list";
132130

133131
CalciteLogicalIndexScan newScan =
134132
(CalciteLogicalIndexScan) scan.pushDownAggregate(aggregate, targetChildProject);

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/CalciteLogicalIndexScan.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import java.util.List;
1010
import java.util.Map;
1111
import java.util.Objects;
12-
import java.util.Optional;
1312
import java.util.stream.Collectors;
1413
import javax.annotation.Nullable;
1514
import lombok.Getter;
@@ -403,19 +402,15 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of
403402
// Since the AggPushDownAction is shared among different PushDownContext, its size() may be
404403
// inaccurate(<= the actual size).
405404
// So take the previous limit into account to decide whether it can update the context.
406-
boolean canReduceEstimatedRowsCount = true;
407-
if (pushDownContext.isLimitPushed()) {
408-
Optional<Integer> previousRowCount =
409-
pushDownContext.getQueue().reversed().stream()
410-
.takeWhile(operation -> operation.type() != PushDownType.AGGREGATION)
411-
.filter(operation -> operation.type() == PushDownType.LIMIT)
412-
.findFirst()
413-
.map(operation -> (LimitDigest) operation.digest())
414-
.map(limitDigest -> limitDigest.offset() + limitDigest.limit());
415-
if (previousRowCount.isPresent()) {
416-
canReduceEstimatedRowsCount = totalSize < previousRowCount.get();
417-
}
418-
}
405+
boolean canReduceEstimatedRowsCount =
406+
!pushDownContext.isLimitPushed()
407+
|| pushDownContext.getQueue().reversed().stream()
408+
.takeWhile(op -> op.type() != PushDownType.AGGREGATION)
409+
.filter(op -> op.type() == PushDownType.LIMIT)
410+
.findFirst()
411+
.map(op -> (LimitDigest) op.digest())
412+
.map(d -> totalSize < d.offset() + d.limit())
413+
.orElse(true);
419414

420415
// Push down the limit into the aggregation bucket in advance to detect whether the limit
421416
// can update the aggregation builder

0 commit comments

Comments
 (0)