fix(native): Preserve Java aggregation iteration order across protocol#27903
Conversation
Reviewer's GuideAdds an explicit aggregation output ordering to the Presto protocol and native converter so Prestissimo workers build AggregationNode schemas in the same order as the Java planner, keeping it backward compatible, and introduces tests to lock in ordering and compatibility behavior. Sequence diagram for aggregation output ordering across Java, protocol, and native convertersequenceDiagram
participant JavaAggregationNode
participant ProtocolSerialization
participant ProtocolAggregationNode
participant VeloxQueryPlanConverterBase
JavaAggregationNode->>JavaAggregationNode: getAggregations
JavaAggregationNode->>JavaAggregationNode: getAggregationOutputs
JavaAggregationNode->>ProtocolSerialization: to_json
ProtocolSerialization->>ProtocolAggregationNode: to_json_key aggregations
ProtocolSerialization->>ProtocolAggregationNode: to_json_key aggregationOutputs
ProtocolAggregationNode->>ProtocolSerialization: from_json
ProtocolSerialization->>ProtocolAggregationNode: from_json_key aggregations
ProtocolSerialization->>ProtocolAggregationNode: from_json_key aggregationOutputs
VeloxQueryPlanConverterBase->>ProtocolAggregationNode: toVeloxQueryPlan
VeloxQueryPlanConverterBase->>ProtocolAggregationNode: read aggregationOutputs
alt aggregationOutputs non_empty
VeloxQueryPlanConverterBase->>VeloxQueryPlanConverterBase: use aggregationOutputs
else aggregationOutputs empty
VeloxQueryPlanConverterBase->>ProtocolAggregationNode: iterate aggregations
end
VeloxQueryPlanConverterBase->>VeloxQueryPlanConverterBase: toAggregations
File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've left some high level feedback:
- AggregationNode.getAggregationOutputs() allocates a new ArrayList and wrapper on every call; if this method is used in hot paths, consider caching the list or storing it as a field derived once from aggregations to avoid repeated allocations.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- AggregationNode.getAggregationOutputs() allocates a new ArrayList and wrapper on every call; if this method is used in hot paths, consider caching the list or storing it as a field derived once from aggregations to avoid repeated allocations.Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
050a7d4 to
653dbf9
Compare
|
Addressed sourcery-ai's feedback in the latest push: |
653dbf9 to
386c644
Compare
|
Pushed a test fix: |
386c644 to
e4aca4f
Compare
|
Pushed a follow-up fix for |
e4aca4f to
ededdf5
Compare
|
Push 3 fixes the GPU build. |
The Prestissimo protocol struct protocol::AggregationNode stores aggregations as Map<VRE, Aggregation> = std::map<VRE>, which iterates in sorted name order. The Java side iterates in LinkedHashMap insertion order. When these orders diverge (e.g. approx_distinct_* mixed with sum_*), the native AggregationNode output schema differs from the Java planner's, shifting channel positions and causing type mismatches at exchange operators: type_->kindEquals(vector.type()) Type mismatch: BIGINT vs. DOUBLE Operator: LocalPartition(...) prestodb#27493 partially addressed this by switching Java rules from HashMap to LinkedHashMap (deterministic insertion order), but did not align Java with the native side's sort. Queries whose variable names happened to sort consistently with insertion order passed; others still crashed. Add an explicit aggregationOutputs: List<VariableReferenceExpression> field, populated on the Java side from aggregations.keySet() in LinkedHashMap order. The native converter uses this list when present and falls back to map iteration only for older coordinators that do not send the field. ``` == RELEASE NOTES == Native Execution Changes * Fix runtime type-mismatch crashes at exchange operators in Prestissimo when aggregation variable names sort differently from their Java allocation order. The protocol now carries an explicit aggregation output ordering so native workers build the AggregationNode output schema in the order the Java planner intended. ``` Fixes prestodb#27902
ededdf5 to
dfb76b2
Compare
|
@feilong-liu when you get a chance, could you take a look? Thanks! Also adding @amitkdutta as a reviewer. |
prestodb#27903) ## Summary `protocol::AggregationNode::aggregations` is `std::map<VRE, Aggregation>` on the native side (sorted by variable name); the Java side iterates `LinkedHashMap` insertion order. When the two orders diverge — e.g. `approx_distinct_*` mixed with `sum_*`, alphabetical sort `approx_distinct < sum` but Java inserts `sum`s first — the native `AggregationNode` output schema differs from the Java planner's. Channel positions shift and type mismatches surface at exchange operators: ``` type_->kindEquals(vector.type()) Type mismatch: BIGINT vs. DOUBLE Operator: LocalPartition(...) ``` prestodb#27493 partially addressed this by switching three Java optimizer rules from `HashMap` to `LinkedHashMap` (deterministic insertion order), but did not align Java with the native side's sort. Queries whose variable names happened to sort consistently with their insertion order passed; others still crashed. This adds an explicit `aggregationOutputs: List<VariableReferenceExpression>` field, populated on the Java side from `aggregations.keySet()` in `LinkedHashMap` order. The native converter uses this list when present and falls back to `std::map` iteration only for older coordinators that don't send the field (so rolling upgrades are safe). Fixes prestodb#27902. ## Changes - `presto-spi/.../AggregationNode.java`: `@JsonProperty(READ_ONLY) getAggregationOutputs()` — derived from `aggregations`, no constructor change. - `presto_protocol_core.h`/`.cpp`: `List<VRE> aggregationOutputs` on `protocol::AggregationNode`, with backward-compatible `from_json`. - `PrestoToVeloxQueryPlan.cpp`: converter uses `aggregationOutputs` when non-empty; falls back to map iteration otherwise. - Tests: protocol round-trip + backward-compat (`AggregationNodeTest.cpp`); Java getter ordering (`TestPreAggregateBeforeGroupId`). ## Test plan - [ ] `mvn test -pl presto-main-base -Dtest=TestPreAggregateBeforeGroupId#testAggregationOutputsPreservesInsertionOrder` - [ ] `presto-native-execution` build + run `presto_protocol_test --gtest_filter="AggregationNodeTest.*"` - [ ] Existing `TestPreAggregateBeforeGroupId` suite stays green - [ ] Manually verify a reproducer query (`approx_distinct(...)` + `sum(...)` under `GROUPING SETS` with `optimizer.pre_aggregate_before_grouping_sets=true`) no longer crashes on a Prestissimo cluster ## Release notes \`\`\` == RELEASE NOTES == Native Execution Changes * Fix runtime type-mismatch crashes at exchange operators in Prestissimo when aggregation variable names sort differently from their Java allocation order. The protocol now carries an explicit aggregation output ordering so native workers build the AggregationNode output schema in the order the Java planner intended. \`\`\` ## Summary by Sourcery Ensure native AggregationNode output schemas follow Java planner aggregation ordering by explicitly propagating aggregation output variables through the protocol and converter. Bug Fixes: - Prevent type-mismatch failures in native execution caused by differing aggregation iteration orders between Java and C++. Enhancements: - Expose an explicit aggregationOutputs list on AggregationNode in the Java SPI and carry it through the native protocol for deterministic aggregation output ordering. Tests: - Add Java and C++ regression tests to verify aggregation output ordering is preserved and that the protocol remains backward compatible when aggregationOutputs is absent.
Summary
protocol::AggregationNode::aggregationsisstd::map<VRE, Aggregation>on the native side (sorted by variable name); the Java side iteratesLinkedHashMapinsertion order. When the two orders diverge — e.g.approx_distinct_*mixed withsum_*, alphabetical sortapprox_distinct < sumbut Java insertssums first — the nativeAggregationNodeoutput schema differs from the Java planner's. Channel positions shift and type mismatches surface at exchange operators:#27493 partially addressed this by switching three Java optimizer rules from
HashMaptoLinkedHashMap(deterministic insertion order), but did not align Java with the native side's sort. Queries whose variable names happened to sort consistently with their insertion order passed; others still crashed.This adds an explicit
aggregationOutputs: List<VariableReferenceExpression>field, populated on the Java side fromaggregations.keySet()inLinkedHashMaporder. The native converter uses this list when present and falls back tostd::mapiteration only for older coordinators that don't send the field (so rolling upgrades are safe).Fixes #27902.
Changes
presto-spi/.../AggregationNode.java:@JsonProperty(READ_ONLY) getAggregationOutputs()— derived fromaggregations, no constructor change.presto_protocol_core.h/.cpp:List<VRE> aggregationOutputsonprotocol::AggregationNode, with backward-compatiblefrom_json.PrestoToVeloxQueryPlan.cpp: converter usesaggregationOutputswhen non-empty; falls back to map iteration otherwise.AggregationNodeTest.cpp); Java getter ordering (TestPreAggregateBeforeGroupId).Test plan
mvn test -pl presto-main-base -Dtest=TestPreAggregateBeforeGroupId#testAggregationOutputsPreservesInsertionOrderpresto-native-executionbuild + runpresto_protocol_test --gtest_filter="AggregationNodeTest.*"TestPreAggregateBeforeGroupIdsuite stays greenapprox_distinct(...)+sum(...)underGROUPING SETSwithoptimizer.pre_aggregate_before_grouping_sets=true) no longer crashes on a Prestissimo clusterRelease notes
```
== RELEASE NOTES ==
Native Execution Changes
```
Summary by Sourcery
Ensure native AggregationNode output schemas follow Java planner aggregation ordering by explicitly propagating aggregation output variables through the protocol and converter.
Bug Fixes:
Enhancements:
Tests: