|
9 | 9 | import java.util.List; |
10 | 10 | import java.util.Map; |
11 | 11 | import java.util.Objects; |
12 | | -import java.util.Optional; |
13 | 12 | import java.util.stream.Collectors; |
14 | 13 | import javax.annotation.Nullable; |
15 | 14 | import lombok.Getter; |
@@ -403,19 +402,15 @@ public AbstractRelNode pushDownLimit(LogicalSort sort, Integer limit, Integer of |
403 | 402 | // Since the AggPushDownAction is shared among different PushDownContext, its size() may be |
404 | 403 | // inaccurate(<= the actual size). |
405 | 404 | // So take the previous limit into account to decide whether it can update the context. |
406 | | - boolean canReduceEstimatedRowsCount = true; |
407 | | - if (pushDownContext.isLimitPushed()) { |
408 | | - Optional<Integer> previousRowCount = |
409 | | - pushDownContext.getQueue().reversed().stream() |
410 | | - .takeWhile(operation -> operation.type() != PushDownType.AGGREGATION) |
411 | | - .filter(operation -> operation.type() == PushDownType.LIMIT) |
412 | | - .findFirst() |
413 | | - .map(operation -> (LimitDigest) operation.digest()) |
414 | | - .map(limitDigest -> limitDigest.offset() + limitDigest.limit()); |
415 | | - if (previousRowCount.isPresent()) { |
416 | | - canReduceEstimatedRowsCount = totalSize < previousRowCount.get(); |
417 | | - } |
418 | | - } |
| 405 | + boolean canReduceEstimatedRowsCount = |
| 406 | + !pushDownContext.isLimitPushed() |
| 407 | + || pushDownContext.getQueue().reversed().stream() |
| 408 | + .takeWhile(op -> op.type() != PushDownType.AGGREGATION) |
| 409 | + .filter(op -> op.type() == PushDownType.LIMIT) |
| 410 | + .findFirst() |
| 411 | + .map(op -> (LimitDigest) op.digest()) |
| 412 | + .map(d -> totalSize < d.offset() + d.limit()) |
| 413 | + .orElse(true); |
419 | 414 |
|
420 | 415 | // Push down the limit into the aggregation bucket in advance to detect whether the limit |
421 | 416 | // can update the aggregation builder |
|
0 commit comments