feat(native-pos): MaterializeExchange: [1/n] Add MaterializedExchange operators and plan wiring support (#27573)#27573
Conversation
Reviewer's GuideIntroduces a new flat-buffer-based shuffle path via ExchangeWrite/ExchangeRead operators and a shared ExchangeOutputBuffer, along with unit tests and memory-reclamation behavior, to replace the existing partition-and-shuffle pipeline without touching Velox core. Sequence diagram for ExchangeWrite pushing data to ShuffleWritersequenceDiagram
actor Driver
participant ExchangeWrite
participant ExchangeOutputBuffer
participant ShuffleWriter
Driver->>ExchangeWrite: addInput(inputBatch)
activate ExchangeWrite
ExchangeWrite->>ExchangeWrite: initializeInput(inputBatch)
ExchangeWrite->>ExchangeWrite: computePartitions(rawInput, numRows)
ExchangeWrite->>ExchangeWrite: serializeRows(compactRow, numRows)
alt flatBufferSize_ >= targetSizeInBytes_
ExchangeWrite->>ExchangeWrite: flushBatch()
loop partitions
ExchangeWrite->>ExchangeOutputBuffer: enqueue(partition, RowGroupIOBuf, future)
activate ExchangeOutputBuffer
ExchangeOutputBuffer->>ExchangeOutputBuffer: append to PartitionQueue
ExchangeOutputBuffer->>ExchangeOutputBuffer: maybeApplyBackpressure(future)
alt partition bufferedBytes >= partitionDrainThreshold
ExchangeOutputBuffer->>ExchangeOutputBuffer: drainPartition(partition)
ExchangeOutputBuffer->>ExchangeOutputBuffer: coalesceRowGroups()
ExchangeOutputBuffer->>ShuffleWriter: collect(partition, key, dataView)
end
deactivate ExchangeOutputBuffer
end
end
deactivate ExchangeWrite
Driver->>ExchangeWrite: noMoreInput()
activate ExchangeWrite
ExchangeWrite->>ExchangeWrite: finalizeDriver()
ExchangeWrite->>ExchangeOutputBuffer: flushBatch()
ExchangeWrite->>velox_exec_Task: allPeersFinished(planNodeId, driver, peerFuture, promises, peers)
alt last driver
ExchangeWrite->>ExchangeOutputBuffer: noMoreData()
activate ExchangeOutputBuffer
ExchangeOutputBuffer->>ExchangeOutputBuffer: drainAll()
ExchangeOutputBuffer->>ShuffleWriter: noMoreData(success=true)
deactivate ExchangeOutputBuffer
else task terminating
ExchangeWrite->>ExchangeOutputBuffer: abort()
end
deactivate ExchangeWrite
Sequence diagram for ExchangeRead consuming batched shuffle pagessequenceDiagram
actor Driver
participant ExchangeRead
participant ExchangeClient
participant ShuffleExchangeSource
Driver->>ExchangeRead: getOutput()
activate ExchangeRead
alt currentPages_ empty
ExchangeRead->>ExchangeClient: next(callerId)
ExchangeClient->>ShuffleExchangeSource: requestPages()
ShuffleExchangeSource-->>ExchangeClient: batched pages (kFormatBatched)
ExchangeClient-->>ExchangeRead: currentPages_
end
alt rows_ empty
ExchangeRead->>ExchangeRead: parseCurrentPages()
loop pages
ExchangeRead->>ShuffleSerializedPage: rows(driverId)
ShuffleSerializedPage-->>ExchangeRead: vector string_view batchedValues
loop batchedValues
ExchangeRead->>ExchangeRead: expandBatchedPage(pageData)
end
end
end
alt rows_ not empty
ExchangeRead->>ExchangeRead: deserializeNextBatch()
ExchangeRead-->>Driver: RowVector batch
else no rows
ExchangeRead-->>Driver: null
end
alt nextRow_ == rows_.size()
ExchangeRead->>ExchangeRead: resetOutputState()
end
deactivate ExchangeRead
Driver->>ExchangeRead: close()
activate ExchangeRead
ExchangeRead->>ExchangeRead: record stats
ExchangeRead->>ExchangeClient: close()
deactivate ExchangeRead
Class diagram for new ExchangeWrite/ExchangeRead shuffle pathclassDiagram
class ExchangeWriteNode {
+ExchangeWriteNode(id, keys, numPartitions, outputType, partitionFunctionSpec, source, buffer)
+int numPartitions()
+vector~TypedExprPtr~ keys()
+ExchangeOutputBuffer* buffer()
+PartitionFunctionSpec partitionFunctionSpec()
+RowTypePtr outputType()
+vector~PlanNodePtr~ sources()
+folly_dynamic serialize()
+static PlanNodePtr create(obj, context)
-int numPartitions_
-vector~TypedExprPtr~ keys_
-RowTypePtr outputType_
-shared_ptr~PartitionFunctionSpec~ partitionFunctionSpec_
-vector~PlanNodePtr~ sources_
-shared_ptr~ExchangeOutputBuffer~ buffer_
}
class ExchangeWrite {
+ExchangeWrite(operatorId, ctx, planNode)
+void addInput(RowVectorPtr input)
+RowVectorPtr getOutput()
+bool needsInput()
+void noMoreInput()
+BlockingReason isBlocked(ContinueFuture* future)
+bool isFinished()
+void close()
-void initializeInput(RowVectorPtr input)
-void finalizeDriver()
-void flushBatch()
-void computePartitions(RowVector rawInput, int32_t numRows)
-void serializeRows(CompactRow compactRow, int32_t numRows)
-unique_ptr~IOBuf~ buildPartitionPage(vector~int32_t~ rowIndices)
-ExchangeOutputBuffer* buffer_
-int32_t numDestinations_
-unique_ptr~PartitionFunction~ partitionFunction_
-vector~column_index_t~ outputChannels_
-int64_t targetSizeInBytes_
-optional~int32_t~ fixedRowSize_
-BlockingReason blockingReason_
-ContinueFuture future_
-bool finished_
-bool driverFinalized_
-RowVectorPtr output_
-vector~uint32_t~ partitions_
-int32_t rowCount_
-int64_t flatBufferSize_
-vector~char~ flatBuffer_
-vector~int64_t~ rowOffsets_
-vector~int32_t~ rowSizes_
-vector~uint32_t~ rowPartitions_
}
class ExchangeOutputBuffer {
+ExchangeOutputBuffer(numPartitions, writer, bufferPool, maxBufferedBytes, partitionDrainThreshold)
+~ExchangeOutputBuffer()
+bool enqueue(int32_t partition, unique_ptr~IOBuf~ rowGroup, ContinueFuture* future)
+int64_t drainPartition(int32_t partition)
+uint64_t drainAll()
+void noMoreData()
+void abort()
+uint64_t reclaim(uint64_t targetBytes)
+int64_t bufferedBytes() const
+int64_t currentDrainThreshold() const
+void setNumDrivers(uint32_t numDrivers)
+bool noMoreDrivers()
+F14FastMap~string,int64_t~ stats() const
+unique_ptr~IOBuf~ allocateTrackedIOBuf(size_t size)
+int32_t numPartitions() const
+MemoryPool* pool() const
-void maybeUnblockProducers(vector~ContinuePromise~& promises)
-void finishAndClose()
-void flushToWriter(int32_t partition, unique_ptr~IOBuf~ data)
-static unique_ptr~IOBuf~ coalesceRowGroups(deque~unique_ptr~IOBuf~~& rowGroups)
-bool maybeApplyBackpressure(ContinueFuture* future)
-static void freeTrackedIOBuf(void* buf, void* userData)
-shared_ptr~ShuffleWriter~ writer_
-shared_ptr~MemoryPool~ bufferPool_
-int32_t numPartitions_
-int64_t maxBufferedBytes_
-int64_t continueBufferedBytes_
-int64_t configuredDrainThreshold_
-atomic~int64_t~ partitionDrainThreshold_
-atomic~bool~ finished_
-atomic~int64_t~ bufferedBytes_
-vector~PartitionQueue~ partitions_
-mutex stateMutex_
-uint32_t numDrivers_
-uint32_t numFinishedDrivers_
-vector~ContinuePromise~ promises_
-atomic~int64_t~ totalDrainedBytes_
-atomic~int64_t~ drainCount_
-atomic~int64_t~ backpressureCount_
-atomic~int64_t~ reclaimCount_
-vector~atomic~int64_t~~ collectCountPerPartition_
}
class PartitionQueue {
+mutex mutex
+deque~unique_ptr~IOBuf~~ rowGroups
+atomic~int64_t~ bufferedBytes
}
class ExchangeOutputBufferReclaimer {
+ExchangeOutputBufferReclaimer(ExchangeOutputBuffer* buffer)
+uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, uint64_t maxWaitMs, Stats& stats)
-ExchangeOutputBuffer* buffer_
}
class ExchangeReadNode {
+ExchangeReadNode(id, outputType)
+RowTypePtr outputType() const
+vector~PlanNodePtr~ sources() const
+bool requiresExchangeClient() const
+bool requiresSplits() const
+string_view name() const
+folly_dynamic serialize() const
+static PlanNodePtr create(obj, context)
-RowTypePtr outputType_
}
class ExchangeRead {
+ExchangeRead(operatorId, ctx, exchangeReadNode, exchangeClient)
+RowVectorPtr getOutput()
+void close()
-VectorSerde* getSerde()
-void expandBatchedPage(string_view pageData)
-void resetOutputState()
-uint64_t parseCurrentPages()
-RowVectorPtr deserializeNextBatch()
-int64_t numInputBatches_
-int64_t totalRows_
-vector~string_view~ rows_
-size_t nextRow_
}
class ExchangeWriteTranslator {
+unique_ptr~Operator~ toOperator(DriverCtx* ctx, int32_t id, PlanNodePtr node)
}
class ExchangeReadTranslator {
+unique_ptr~Operator~ toOperator(DriverCtx* ctx, int32_t id, PlanNodePtr node, shared_ptr~ExchangeClient~ exchangeClient)
}
velox_core_PlanNode <|-- ExchangeWriteNode
velox_exec_Operator <|-- ExchangeWrite
velox_exec_Operator_PlanNodeTranslator <|-- ExchangeWriteTranslator
velox_core_PlanNode <|-- ExchangeReadNode
velox_exec_Exchange <|-- ExchangeRead
velox_exec_Operator_PlanNodeTranslator <|-- ExchangeReadTranslator
velox_memory_MemoryReclaimer <|-- ExchangeOutputBufferReclaimer
ExchangeWriteNode o--> ExchangeOutputBuffer
ExchangeWrite --> ExchangeOutputBuffer : uses
ExchangeOutputBuffer --> ShuffleWriter : writer_
ExchangeOutputBuffer --> velox_memory_MemoryPool : bufferPool_
ExchangeOutputBufferReclaimer --> ExchangeOutputBuffer : buffer_
ExchangeRead --> velox_exec_ExchangeClient : exchangeClient
Flow diagram for new flat-buffer shuffle path vs legacy pathflowchart LR
subgraph Planner
PON["PartitionedOutputNode"]
end
PON -->|useExchangeWrite = false| LegacyPath
PON -->|useExchangeWrite = true| NewExchangeWriteNode
subgraph LegacyPath["Legacy shuffle path"]
direction LR
PnS["Plan: PartitionAndSerialize"]
LocalPartition["LocalPartition operator"]
ShuffleWrite["ShuffleWrite operator"]
PnS --> LocalPartition --> ShuffleWrite
end
subgraph NewPath["New flat-buffer shuffle path"]
direction LR
NewExchangeWriteNode["ExchangeWriteNode (plan)"]
ExchangeWriteOp["ExchangeWrite operator"]
EOB["ExchangeOutputBuffer"]
SW["ShuffleWriter"]
Storage["Shuffle storage (remote)"]
NewExchangeWriteNode --> ExchangeWriteOp
ExchangeWriteOp --> EOB
EOB --> SW
SW --> Storage
end
subgraph ReadSide["Read side"]
direction LR
RSN["RemoteSourceNode"]
RSN -->|useExchangeWrite = false| LegacyShuffleRead["ShuffleRead operator"]
RSN -->|useExchangeWrite = true| NewExchangeReadNode["ExchangeReadNode (plan)"]
NewExchangeReadNode --> ExchangeReadOp["ExchangeRead operator"]
Storage --> ExchangeClient["ExchangeClient / ShuffleExchangeSource"]
ExchangeClient --> ExchangeReadOp
ExchangeReadOp --> Downstream["Downstream operators"]
end
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey - I've found 3 issues, and left some high level feedback:
- ExchangeOutputBuffer::noMoreDrivers() manually unlocks and re-locks stateMutex_ while holding a std::lock_guard, which is undefined behavior; consider restructuring this to avoid manual mutex manipulation (e.g., use a scoped lock, release it before finishAndClose(), and avoid calling lock/unlock directly).
- ExchangeRead::expandBatchedPage() reads only the first int32_t of RowGroupHeader and ignores the compressed flag and header layout helpers; using serializer::detail::RowGroupHeader::read() (or at least validating the compressed field) would make this parsing more robust to future header changes and reduce the risk of format drift.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- ExchangeOutputBuffer::noMoreDrivers() manually unlocks and re-locks stateMutex_ while holding a std::lock_guard, which is undefined behavior; consider restructuring this to avoid manual mutex manipulation (e.g., use a scoped lock, release it before finishAndClose(), and avoid calling lock/unlock directly).
- ExchangeRead::expandBatchedPage() reads only the first int32_t of RowGroupHeader and ignores the compressed flag and header layout helpers; using serializer::detail::RowGroupHeader::read() (or at least validating the compressed field) would make this parsing more robust to future header changes and reduce the risk of format drift.
## Individual Comments
### Comment 1
<location path="presto-native-execution/presto_cpp/main/operators/ExchangeOutputBuffer.cpp" line_range="245-251" />
<code_context>
+}
+
+// Called by each driver when done. The last driver triggers finishAndClose.
+bool ExchangeOutputBuffer::noMoreDrivers() {
+ std::lock_guard<std::mutex> lock(stateMutex_);
+ ++numFinishedDrivers_;
+ if (numDrivers_ > 0 && numFinishedDrivers_ >= numDrivers_) {
+ stateMutex_.unlock();
+ finishAndClose();
+ stateMutex_.lock();
+ return true;
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** Manual unlocking of a mutex managed by std::lock_guard causes undefined behavior.
In `noMoreDrivers()` you create `std::lock_guard<std::mutex> lock(stateMutex_);` and then call `stateMutex_.unlock()`/`lock()` directly:
```c++
std::lock_guard<std::mutex> lock(stateMutex_);
++numFinishedDrivers_;
if (numDrivers_ > 0 && numFinishedDrivers_ >= numDrivers_) {
stateMutex_.unlock();
finishAndClose();
stateMutex_.lock();
return true;
}
```
Because `lock_guard` unconditionally calls `unlock()` in its destructor, this pattern can introduce races or deadlocks.
Use `std::unique_lock<std::mutex>` if you need to temporarily release the mutex (call `unlock()` before `finishAndClose()`), or compute an `isLast` flag while holding the lock, then release the lock and call `finishAndClose()` without any manual `lock()`/`unlock()` on `stateMutex_`.
</issue_to_address>
### Comment 2
<location path="presto-native-execution/presto_cpp/main/operators/ExchangeRead.cpp" line_range="176-177" />
<code_context>
+ size_t remaining = pageData.size();
+
+ // Iterate over one or more RowGroupHeaders in the buffer.
+ while (remaining >= kPageHeaderSize) {
+ int32_t uncompressedSize;
+ std::memcpy(&uncompressedSize, ptr, sizeof(int32_t));
+ ptr += kPageHeaderSize;
</code_context>
<issue_to_address>
**suggestion (bug_risk):** RowGroupHeader parsing assumes a specific header layout and ignores compression flag.
Here we only memcpy the first `int32_t` and then skip `kPageHeaderSize`, implicitly assuming the header layout is `[uncompressedSize][compressedSize][compressedFlag...]` and that `compressed` is always false with `uncompressedSize == compressedSize`. Any change to `RowGroupHeader::write()` (layout, new fields, compressed output) would likely break this silently. Please consider parsing via the header’s `read()` (or otherwise mirroring its fields explicitly) and validating the `compressed` flag before treating the next `uncompressedSize` bytes as raw payload.
Suggested implementation:
```cpp
const char* ptr = pageData.data();
size_t remaining = pageData.size();
// Iterate over one or more RowGroupHeaders in the buffer.
while (remaining >= kPageHeaderSize) {
serializer::detail::RowGroupHeader header;
header.read(ptr);
ptr += kPageHeaderSize;
remaining -= kPageHeaderSize;
// Validate that we are not accidentally treating a compressed RowGroup as
// uncompressed raw payload. This must stay in sync with RowGroupHeader::write().
VELOX_CHECK(
!header.compressed,
"Compressed RowGroup encountered in ExchangeRead::expandBatchedPage, "
"but only uncompressed RowGroups are supported");
const auto uncompressedSize = header.uncompressedSize;
VELOX_CHECK_GE(
remaining,
static_cast<size_t>(uncompressedSize),
"Page data truncated: expected {} bytes, got {}",
uncompressedSize,
remaining);
// Parse TRowSize-framed rows within this RowGroup.
```
I assumed the API and field names for `serializer::detail::RowGroupHeader`:
- `void RowGroupHeader::read(const char* src);`
- Public fields: `int32_t uncompressedSize;` and `bool compressed;`
If the actual API differs, you should adjust:
1. Replace `header.read(ptr);` with the correct deserialization call (e.g. `header.read(const uint8_t*&, size_t&)` or similar), updating `ptr`/`remaining` as required if `read` advances them.
2. Replace `header.uncompressedSize` with the correct accessor (e.g. `header.uncompressed_size()` or `header.uncompressedBytes`).
3. Replace `header.compressed` with the correct flag (e.g. `header.isCompressed()`, `header.compression != Compression::NONE`, etc.).
4. If `RowGroupHeader::read()` throws or returns a status on error instead of `VELOX_CHECK`, adapt the error handling accordingly.
The key behavioral requirements are:
- Deserialize via `RowGroupHeader::read()` rather than relying on raw layout.
- Use the header’s declared uncompressed size to bound the payload slice.
- Explicitly validate that the header does not indicate compressed data before treating the next `uncompressedSize` bytes as raw row payload.
</issue_to_address>
### Comment 3
<location path="presto-native-execution/presto_cpp/main/operators/tests/ExchangeTest.cpp" line_range="190-199" />
<code_context>
+ auto writer = ShuffleInterfaceFactory::factory(shuffleName_)
+ ->createWriter(writeInfo, pool());
+
+ auto buffer = std::make_shared<ExchangeOutputBuffer>(
+ numPartitions,
+ std::shared_ptr<ShuffleWriter>(std::move(writer)),
+ std::move(bufferPool),
+ /*maxBufferedBytes=*/100L * 1024 * 1024,
+ /*partitionDrainThreshold=*/ExchangeOutputBuffer::kDefaultDrainThreshold);
+
+ // Wire the reclaimer to the buffer (normally done at construction).
</code_context>
<issue_to_address>
**suggestion (testing):** Extend reclaim test to assert that buffer memory is fully released after abort/noMoreData
Consider adding an assertion that after `buffer->abort()` (or a noMoreData()/drainAll() sequence), `buffer->pool()->usedBytes()` returns to 0 (or the expected minimal overhead). This would confirm that all RowGroup memory is fully released to the pool, not just reduced by reclaim.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| auto buffer = std::make_shared<ExchangeOutputBuffer>( | ||
| numPartitions, writer, writerPool, kMaxBufferedBytes); | ||
|
|
||
| // Build partition key expressions on column 0. | ||
| std::vector<core::TypedExprPtr> keys{ | ||
| std::make_shared<core::FieldAccessTypedExpr>( | ||
| dataType->childAt(0), dataType->nameOf(0))}; | ||
|
|
||
| // Create HashPartitionFunctionSpec on column 0. | ||
| auto partitionFunctionSpec = |
There was a problem hiding this comment.
suggestion (testing): Extend reclaim test to assert that buffer memory is fully released after abort/noMoreData
Consider adding an assertion that after buffer->abort() (or a noMoreData()/drainAll() sequence), buffer->pool()->usedBytes() returns to 0 (or the expected minimal overhead). This would confirm that all RowGroup memory is fully released to the pool, not just reduced by reclaim.
…angeOutputBuffer for new exchange path (prestodb#27573) Summary: Add ExchangeWrite and ExchangeRead operators — a new shuffle write/read path that replaces PnS + LocalPartition + ShuffleWrite with a single operator using the flat-buffer model, without any Velox core changes. New classes (all in presto_cpp/main/operators): - ExchangeOutputBuffer: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - ExchangeWriteNode + ExchangeWrite: flat-buffer CompactRow serialization, O(1) memory w.r.t. partition count, RowGroupHeader+TRowSize framing - ExchangeReadNode + ExchangeRead: reads kFormatBatched pages only - Translators for both operators Planner wiring: useExchangeWrite flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
06d893c to
d5dcffe
Compare
…angeOutputBuffer for new exchange path (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
d5dcffe to
7e420b7
Compare
…ngeOutputBuffer for new exchange path (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
7e420b7 to
956e7e5
Compare
…ngeOutputBuffer for new exchange path (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
956e7e5 to
60b0d40
Compare
…ngeOutputBuffer for new exchange path (prestodb#27573) Summary: Pull Request resolved: prestodb#27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
60b0d40 to
481b93e
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
481b93e to
1e7a6d7
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
1e7a6d7 to
002aba6
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
002aba6 to
32c82f2
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: Pull Request resolved: prestodb#27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
21a1d26 to
6e027ed
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
6e027ed to
0b4aac9
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
0b4aac9 to
8fc5ca6
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
8fc5ca6 to
44d5dcf
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: Pull Request resolved: prestodb#27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
44d5dcf to
3565bfa
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
3565bfa to
eb04e88
Compare
…putBuffer for new exchange path (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
eb04e88 to
5ca581d
Compare
…rk native (1/n) (prestodb#27573) Summary: Pull Request resolved: prestodb#27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
5ca581d to
ae354cb
Compare
…rk native (1/n) (prestodb#27573) Summary: Pull Request resolved: prestodb#27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
ae354cb to
d15b956
Compare
…rk native (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
d15b956 to
cbeb58b
Compare
…rk native (1/n) (prestodb#27573) Summary: #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
cbeb58b to
861afe4
Compare
…rk native (1/n) (prestodb#27573) Summary: Pull Request resolved: prestodb#27573 #### Summary Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta) But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only as long as those threads are writing data for disjoint set of partitions. We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering) To accomplish this, instead of updating existing operators, we add new operators as below - `ExchangeWrite+ExchangeOutputBuffer` - `ExchangeRead` operators — a new shuffle write/read path these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator ### Current architecture {F1988112256} ### New architecture {F1988112251} #### New classes (all in presto_cpp/main/operators): - `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer - `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only - Translators for both operators ### Configs - `exchange.materialization.enabled=true` - `exchange.materialization.partitioning-row-batch-buffer-size=16mb` - `exchange.materialization.per-partition-buffer-size=130kb` ### Planner wiring `exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode. Uses allPeersFinished for writer close — no setNumDrivers needed. Differential Revision: D100365767
|
Please add a release note - or |
|
Please edit the PR title to follow semantic commit style to pass the failing and required CI check. See the failure in the test for advice. |
…utputBuffer MaterializedExchange (prestodb#27573) Summary: Add ExchangeWrite and ExchangeRead operators with ExchangeOutputBuffer — the core components of the materialized exchange shuffle path. New classes (all in presto_cpp/main/operators): - ExchangeOutputBuffer: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort - ExchangeWriteNode + ExchangeWrite: flat-buffer CompactRow serialization, O(1) memory w.r.t. partition count, RowGroupHeader+TRowSize framing - ExchangeReadNode + ExchangeRead: reads kFormatBatched pages only - Translators for both operators Uses allPeersFinished for writer close — no setNumDrivers needed. Planner wiring and configs: - Configs.h/cpp: add exchange.materialization.enabled (default: false), exchange.materialization.partitioning-row-batch-buffer-size (16MB), exchange.materialization.per-partition-buffer-size (130KB) - PrestoToVeloxQueryPlan.cpp: when enabled, create ExchangeWriteNode (write) and ExchangeReadNode (read) instead of PnS+LocalPartition+ShuffleWrite - PrestoServer.cpp: register ExchangeWriteTranslator + ExchangeReadTranslator - PlanConverterTest: test batchPlanConversion with exchange path enabled Reviewed By: xiaoxmeng Differential Revision: D100365767
Add
MaterializedOutput(writer-side) andMaterializedExchange(reader-side) operators withMaterializedOutputBuffer— the C++ Velox core of the materialized exchange shuffle path. Whenexchange.materialization.enabledis on, these replace the legacyPartitionAndSerialize + LocalPartition + ShuffleWritechain on the write side andShuffleReadon the read side.New classes (all in
presto_cpp/main/operators/):MaterializedOutputBuffer— thread-safe shared per-partition buffer between N concurrentMaterializedOutputdrivers and oneShuffleWriter. Each partition has its ownPartitionBuffer(singlestd::mutex+ deque + byte counter) so different partitions drain in parallel.PartitionBuffer::enqueue()takes the partition lock once: pushes the RowGroup, checks the threshold, and on a hit atomically swaps the deque out, coalesces into one contiguous IOBuf, and callswriter_->collect()— all under that same lock. This eliminates the original design's single global drain mutex where every driver serialized behind whichever driver was mid-collect.MaterializedOutputNode+MaterializedOutput— writer-side operator. Flat-buffer CompactRow serialization, O(1) memory w.r.t. partition count,RowGroupHeader + TRowSizeframing. Lifecycle viaMaterializedOutputBuffer::setNumDrivers+noMoreDrivers(last driver runsfinishAndClose).MaterializedExchangeNode+MaterializedExchange— reader-side operator extendingvelox::exec::Exchange. ReadskFormatBatchedpages, paired withMaterializedOutputNodefor symmetric A/B switching with the legacy path.MaterializedOutputTranslator,MaterializedExchangeTranslator.Memory model:
MaterializedOutputBufferowns a tracking pool; RowGroup IOBufs are allocated viaallocateTrackedIOBuf()so buffered bytes are visible to the Velox arbitrator. Writer runs in a separate system pool.maxBufferedBytesis the hard cap; producers get aContinueFuturewhen buffered bytes cross the cap and resume after the drain pulls bytes back below the 90% hysteresis (continueBufferedBytes_ = maxBufferedBytes * 9/10).Stats (via
MaterializedOutputBuffer::stats()):materializedOutputBuffer.{totalDrainedBytes, drainCount, backpressureCount, currentDrainThreshold, bufferPoolUsedBytes, bufferPoolPeakBytes, totalCollectCalls}plus per-partition collect counts.Planner wiring and configs:
Configs.{h,cpp}: addexchange.materialization.enabled(defaultfalse),exchange.materialization.partitioning-row-batch-buffer-size(16 MiB),exchange.materialization.per-partition-buffer-size(130 KiB =kDefaultDrainThreshold).PrestoToVeloxQueryPlan.cpp: when enabled, write-side fragment conversion createsMaterializedOutputNodeinstead ofPartitionAndSerialize + LocalPartition + ShuffleWrite; read-side createsMaterializedExchangeNodeinstead ofShuffleRead.PrestoServer.cpp: registerMaterializedOutputTranslator+MaterializedExchangeTranslator.PlanConverterTest::batchPlanConversioncovers the matex path.