Skip to content

feat(native-pos): MaterializeExchange: [1/n] Add MaterializedExchange operators and plan wiring support (#27573)#27573

Merged
shrinidhijoshi merged 1 commit into
prestodb:masterfrom
shrinidhijoshi:export-D100365767
May 6, 2026
Merged

feat(native-pos): MaterializeExchange: [1/n] Add MaterializedExchange operators and plan wiring support (#27573)#27573
shrinidhijoshi merged 1 commit into
prestodb:masterfrom
shrinidhijoshi:export-D100365767

Conversation

@shrinidhijoshi

@shrinidhijoshi shrinidhijoshi commented Apr 13, 2026

Copy link
Copy Markdown
Collaborator

Add MaterializedOutput (writer-side) and MaterializedExchange (reader-side) operators with MaterializedOutputBuffer — the C++ Velox core of the materialized exchange shuffle path. When exchange.materialization.enabled is on, these replace the legacy PartitionAndSerialize + LocalPartition + ShuffleWrite chain on the write side and ShuffleRead on the read side.

New classes (all in presto_cpp/main/operators/):

  • MaterializedOutputBuffer — thread-safe shared per-partition buffer between N concurrent MaterializedOutput drivers and one ShuffleWriter. Each partition has its own PartitionBuffer (single std::mutex + deque + byte counter) so different partitions drain in parallel. PartitionBuffer::enqueue() takes the partition lock once: pushes the RowGroup, checks the threshold, and on a hit atomically swaps the deque out, coalesces into one contiguous IOBuf, and calls writer_->collect() — all under that same lock. This eliminates the original design's single global drain mutex where every driver serialized behind whichever driver was mid-collect.
  • MaterializedOutputNode + MaterializedOutput — writer-side operator. Flat-buffer CompactRow serialization, O(1) memory w.r.t. partition count, RowGroupHeader + TRowSize framing. Lifecycle via MaterializedOutputBuffer::setNumDrivers + noMoreDrivers (last driver runs finishAndClose).
  • MaterializedExchangeNode + MaterializedExchange — reader-side operator extending velox::exec::Exchange. Reads kFormatBatched pages, paired with MaterializedOutputNode for symmetric A/B switching with the legacy path.
  • Plan-node translators: MaterializedOutputTranslator, MaterializedExchangeTranslator.

Memory model:

  • MaterializedOutputBuffer owns a tracking pool; RowGroup IOBufs are allocated via allocateTrackedIOBuf() so buffered bytes are visible to the Velox arbitrator. Writer runs in a separate system pool.
  • Cooperative backpressure: maxBufferedBytes is the hard cap; producers get a ContinueFuture when buffered bytes cross the cap and resume after the drain pulls bytes back below the 90% hysteresis (continueBufferedBytes_ = maxBufferedBytes * 9/10).

Stats (via MaterializedOutputBuffer::stats()):
materializedOutputBuffer.{totalDrainedBytes, drainCount, backpressureCount, currentDrainThreshold, bufferPoolUsedBytes, bufferPoolPeakBytes, totalCollectCalls} plus per-partition collect counts.

Planner wiring and configs:

  • Configs.{h,cpp}: add exchange.materialization.enabled (default false), exchange.materialization.partitioning-row-batch-buffer-size (16 MiB), exchange.materialization.per-partition-buffer-size (130 KiB = kDefaultDrainThreshold).
  • PrestoToVeloxQueryPlan.cpp: when enabled, write-side fragment conversion creates MaterializedOutputNode instead of PartitionAndSerialize + LocalPartition + ShuffleWrite; read-side creates MaterializedExchangeNode instead of ShuffleRead.
  • PrestoServer.cpp: register MaterializedOutputTranslator + MaterializedExchangeTranslator.
  • PlanConverterTest::batchPlanConversion covers the matex path.
== NO RELEASE NOTE ==

@shrinidhijoshi shrinidhijoshi requested review from a team as code owners April 13, 2026 17:45
@prestodb-ci prestodb-ci added the from:Meta PR from Meta label Apr 13, 2026
@sourcery-ai

sourcery-ai Bot commented Apr 13, 2026

Copy link
Copy Markdown
Contributor

Reviewer's Guide

Introduces a new flat-buffer-based shuffle path via ExchangeWrite/ExchangeRead operators and a shared ExchangeOutputBuffer, along with unit tests and memory-reclamation behavior, to replace the existing partition-and-shuffle pipeline without touching Velox core.

Sequence diagram for ExchangeWrite pushing data to ShuffleWriter

sequenceDiagram
  actor Driver
  participant ExchangeWrite
  participant ExchangeOutputBuffer
  participant ShuffleWriter

  Driver->>ExchangeWrite: addInput(inputBatch)
  activate ExchangeWrite
  ExchangeWrite->>ExchangeWrite: initializeInput(inputBatch)
  ExchangeWrite->>ExchangeWrite: computePartitions(rawInput, numRows)
  ExchangeWrite->>ExchangeWrite: serializeRows(compactRow, numRows)
  alt flatBufferSize_ >= targetSizeInBytes_
    ExchangeWrite->>ExchangeWrite: flushBatch()
    loop partitions
      ExchangeWrite->>ExchangeOutputBuffer: enqueue(partition, RowGroupIOBuf, future)
      activate ExchangeOutputBuffer
      ExchangeOutputBuffer->>ExchangeOutputBuffer: append to PartitionQueue
      ExchangeOutputBuffer->>ExchangeOutputBuffer: maybeApplyBackpressure(future)
      alt partition bufferedBytes >= partitionDrainThreshold
        ExchangeOutputBuffer->>ExchangeOutputBuffer: drainPartition(partition)
        ExchangeOutputBuffer->>ExchangeOutputBuffer: coalesceRowGroups()
        ExchangeOutputBuffer->>ShuffleWriter: collect(partition, key, dataView)
      end
      deactivate ExchangeOutputBuffer
    end
  end
  deactivate ExchangeWrite

  Driver->>ExchangeWrite: noMoreInput()
  activate ExchangeWrite
  ExchangeWrite->>ExchangeWrite: finalizeDriver()
  ExchangeWrite->>ExchangeOutputBuffer: flushBatch()
  ExchangeWrite->>velox_exec_Task: allPeersFinished(planNodeId, driver, peerFuture, promises, peers)
  alt last driver
    ExchangeWrite->>ExchangeOutputBuffer: noMoreData()
    activate ExchangeOutputBuffer
    ExchangeOutputBuffer->>ExchangeOutputBuffer: drainAll()
    ExchangeOutputBuffer->>ShuffleWriter: noMoreData(success=true)
    deactivate ExchangeOutputBuffer
  else task terminating
    ExchangeWrite->>ExchangeOutputBuffer: abort()
  end
  deactivate ExchangeWrite
Loading

Sequence diagram for ExchangeRead consuming batched shuffle pages

sequenceDiagram
  actor Driver
  participant ExchangeRead
  participant ExchangeClient
  participant ShuffleExchangeSource

  Driver->>ExchangeRead: getOutput()
  activate ExchangeRead
  alt currentPages_ empty
    ExchangeRead->>ExchangeClient: next(callerId)
    ExchangeClient->>ShuffleExchangeSource: requestPages()
    ShuffleExchangeSource-->>ExchangeClient: batched pages (kFormatBatched)
    ExchangeClient-->>ExchangeRead: currentPages_
  end

  alt rows_ empty
    ExchangeRead->>ExchangeRead: parseCurrentPages()
    loop pages
      ExchangeRead->>ShuffleSerializedPage: rows(driverId)
      ShuffleSerializedPage-->>ExchangeRead: vector string_view batchedValues
      loop batchedValues
        ExchangeRead->>ExchangeRead: expandBatchedPage(pageData)
      end
    end
  end

  alt rows_ not empty
    ExchangeRead->>ExchangeRead: deserializeNextBatch()
    ExchangeRead-->>Driver: RowVector batch
  else no rows
    ExchangeRead-->>Driver: null
  end

  alt nextRow_ == rows_.size()
    ExchangeRead->>ExchangeRead: resetOutputState()
  end
  deactivate ExchangeRead

  Driver->>ExchangeRead: close()
  activate ExchangeRead
  ExchangeRead->>ExchangeRead: record stats
  ExchangeRead->>ExchangeClient: close()
  deactivate ExchangeRead
Loading

Class diagram for new ExchangeWrite/ExchangeRead shuffle path

classDiagram
  class ExchangeWriteNode {
    +ExchangeWriteNode(id, keys, numPartitions, outputType, partitionFunctionSpec, source, buffer)
    +int numPartitions()
    +vector~TypedExprPtr~ keys()
    +ExchangeOutputBuffer* buffer()
    +PartitionFunctionSpec partitionFunctionSpec()
    +RowTypePtr outputType()
    +vector~PlanNodePtr~ sources()
    +folly_dynamic serialize()
    +static PlanNodePtr create(obj, context)
    -int numPartitions_
    -vector~TypedExprPtr~ keys_
    -RowTypePtr outputType_
    -shared_ptr~PartitionFunctionSpec~ partitionFunctionSpec_
    -vector~PlanNodePtr~ sources_
    -shared_ptr~ExchangeOutputBuffer~ buffer_
  }

  class ExchangeWrite {
    +ExchangeWrite(operatorId, ctx, planNode)
    +void addInput(RowVectorPtr input)
    +RowVectorPtr getOutput()
    +bool needsInput()
    +void noMoreInput()
    +BlockingReason isBlocked(ContinueFuture* future)
    +bool isFinished()
    +void close()
    -void initializeInput(RowVectorPtr input)
    -void finalizeDriver()
    -void flushBatch()
    -void computePartitions(RowVector rawInput, int32_t numRows)
    -void serializeRows(CompactRow compactRow, int32_t numRows)
    -unique_ptr~IOBuf~ buildPartitionPage(vector~int32_t~ rowIndices)
    -ExchangeOutputBuffer* buffer_
    -int32_t numDestinations_
    -unique_ptr~PartitionFunction~ partitionFunction_
    -vector~column_index_t~ outputChannels_
    -int64_t targetSizeInBytes_
    -optional~int32_t~ fixedRowSize_
    -BlockingReason blockingReason_
    -ContinueFuture future_
    -bool finished_
    -bool driverFinalized_
    -RowVectorPtr output_
    -vector~uint32_t~ partitions_
    -int32_t rowCount_
    -int64_t flatBufferSize_
    -vector~char~ flatBuffer_
    -vector~int64_t~ rowOffsets_
    -vector~int32_t~ rowSizes_
    -vector~uint32_t~ rowPartitions_
  }

  class ExchangeOutputBuffer {
    +ExchangeOutputBuffer(numPartitions, writer, bufferPool, maxBufferedBytes, partitionDrainThreshold)
    +~ExchangeOutputBuffer()
    +bool enqueue(int32_t partition, unique_ptr~IOBuf~ rowGroup, ContinueFuture* future)
    +int64_t drainPartition(int32_t partition)
    +uint64_t drainAll()
    +void noMoreData()
    +void abort()
    +uint64_t reclaim(uint64_t targetBytes)
    +int64_t bufferedBytes() const
    +int64_t currentDrainThreshold() const
    +void setNumDrivers(uint32_t numDrivers)
    +bool noMoreDrivers()
    +F14FastMap~string,int64_t~ stats() const
    +unique_ptr~IOBuf~ allocateTrackedIOBuf(size_t size)
    +int32_t numPartitions() const
    +MemoryPool* pool() const
    -void maybeUnblockProducers(vector~ContinuePromise~& promises)
    -void finishAndClose()
    -void flushToWriter(int32_t partition, unique_ptr~IOBuf~ data)
    -static unique_ptr~IOBuf~ coalesceRowGroups(deque~unique_ptr~IOBuf~~& rowGroups)
    -bool maybeApplyBackpressure(ContinueFuture* future)
    -static void freeTrackedIOBuf(void* buf, void* userData)
    -shared_ptr~ShuffleWriter~ writer_
    -shared_ptr~MemoryPool~ bufferPool_
    -int32_t numPartitions_
    -int64_t maxBufferedBytes_
    -int64_t continueBufferedBytes_
    -int64_t configuredDrainThreshold_
    -atomic~int64_t~ partitionDrainThreshold_
    -atomic~bool~ finished_
    -atomic~int64_t~ bufferedBytes_
    -vector~PartitionQueue~ partitions_
    -mutex stateMutex_
    -uint32_t numDrivers_
    -uint32_t numFinishedDrivers_
    -vector~ContinuePromise~ promises_
    -atomic~int64_t~ totalDrainedBytes_
    -atomic~int64_t~ drainCount_
    -atomic~int64_t~ backpressureCount_
    -atomic~int64_t~ reclaimCount_
    -vector~atomic~int64_t~~ collectCountPerPartition_
  }

  class PartitionQueue {
    +mutex mutex
    +deque~unique_ptr~IOBuf~~ rowGroups
    +atomic~int64_t~ bufferedBytes
  }

  class ExchangeOutputBufferReclaimer {
    +ExchangeOutputBufferReclaimer(ExchangeOutputBuffer* buffer)
    +uint64_t reclaim(MemoryPool* pool, uint64_t targetBytes, uint64_t maxWaitMs, Stats& stats)
    -ExchangeOutputBuffer* buffer_
  }

  class ExchangeReadNode {
    +ExchangeReadNode(id, outputType)
    +RowTypePtr outputType() const
    +vector~PlanNodePtr~ sources() const
    +bool requiresExchangeClient() const
    +bool requiresSplits() const
    +string_view name() const
    +folly_dynamic serialize() const
    +static PlanNodePtr create(obj, context)
    -RowTypePtr outputType_
  }

  class ExchangeRead {
    +ExchangeRead(operatorId, ctx, exchangeReadNode, exchangeClient)
    +RowVectorPtr getOutput()
    +void close()
    -VectorSerde* getSerde()
    -void expandBatchedPage(string_view pageData)
    -void resetOutputState()
    -uint64_t parseCurrentPages()
    -RowVectorPtr deserializeNextBatch()
    -int64_t numInputBatches_
    -int64_t totalRows_
    -vector~string_view~ rows_
    -size_t nextRow_
  }

  class ExchangeWriteTranslator {
    +unique_ptr~Operator~ toOperator(DriverCtx* ctx, int32_t id, PlanNodePtr node)
  }

  class ExchangeReadTranslator {
    +unique_ptr~Operator~ toOperator(DriverCtx* ctx, int32_t id, PlanNodePtr node, shared_ptr~ExchangeClient~ exchangeClient)
  }

  velox_core_PlanNode <|-- ExchangeWriteNode
  velox_exec_Operator <|-- ExchangeWrite
  velox_exec_Operator_PlanNodeTranslator <|-- ExchangeWriteTranslator
  velox_core_PlanNode <|-- ExchangeReadNode
  velox_exec_Exchange <|-- ExchangeRead
  velox_exec_Operator_PlanNodeTranslator <|-- ExchangeReadTranslator
  velox_memory_MemoryReclaimer <|-- ExchangeOutputBufferReclaimer

  ExchangeWriteNode o--> ExchangeOutputBuffer
  ExchangeWrite --> ExchangeOutputBuffer : uses
  ExchangeOutputBuffer --> ShuffleWriter : writer_
  ExchangeOutputBuffer --> velox_memory_MemoryPool : bufferPool_
  ExchangeOutputBufferReclaimer --> ExchangeOutputBuffer : buffer_
  ExchangeRead --> velox_exec_ExchangeClient : exchangeClient
Loading

Flow diagram for new flat-buffer shuffle path vs legacy path

flowchart LR
  subgraph Planner
    PON["PartitionedOutputNode"]
  end

  PON -->|useExchangeWrite = false| LegacyPath
  PON -->|useExchangeWrite = true| NewExchangeWriteNode

  subgraph LegacyPath["Legacy shuffle path"]
    direction LR
    PnS["Plan: PartitionAndSerialize"]
    LocalPartition["LocalPartition operator"]
    ShuffleWrite["ShuffleWrite operator"]
    PnS --> LocalPartition --> ShuffleWrite
  end

  subgraph NewPath["New flat-buffer shuffle path"]
    direction LR
    NewExchangeWriteNode["ExchangeWriteNode (plan)"]
    ExchangeWriteOp["ExchangeWrite operator"]
    EOB["ExchangeOutputBuffer"]
    SW["ShuffleWriter"]
    Storage["Shuffle storage (remote)"]

    NewExchangeWriteNode --> ExchangeWriteOp
    ExchangeWriteOp --> EOB
    EOB --> SW
    SW --> Storage
  end

  subgraph ReadSide["Read side"]
    direction LR
    RSN["RemoteSourceNode"]
    RSN -->|useExchangeWrite = false| LegacyShuffleRead["ShuffleRead operator"]
    RSN -->|useExchangeWrite = true| NewExchangeReadNode["ExchangeReadNode (plan)"]
    NewExchangeReadNode --> ExchangeReadOp["ExchangeRead operator"]
    Storage --> ExchangeClient["ExchangeClient / ShuffleExchangeSource"]
    ExchangeClient --> ExchangeReadOp
    ExchangeReadOp --> Downstream["Downstream operators"]
  end
Loading

File-Level Changes

Change Details Files
Add ExchangeWrite operator and plan node to serialize partitioned rows into per-partition flat-buffer RowGroup pages and push them into a shared output buffer.
  • Define ExchangeWriteNode plan node with partitioning keys, partition function spec, output type, and injected ExchangeOutputBuffer, including (de)serialization logic for the plan node.
  • Implement ExchangeWrite operator that projects input, computes partitions, serializes rows using CompactRow into a contiguous flat buffer, groups rows by partition, frames them with RowGroupHeader + TRowSize, and enqueues per-partition IOBufs into ExchangeOutputBuffer with backpressure support.
  • Use allPeersFinished barrier for driver coordination so that the last driver drains and closes the buffer/writer, recording runtime stats and handling task-abort paths via buffer->abort().
  • Register ExchangeWriteTranslator to construct ExchangeWrite from ExchangeWriteNode in the operator registry.
presto-native-execution/presto_cpp/main/operators/ExchangeWrite.h
presto-native-execution/presto_cpp/main/operators/ExchangeWrite.cpp
Introduce ExchangeOutputBuffer as a shared, partitioned output queue with memory tracking, backpressure, and reclamation semantics for shuffle writers.
  • Implement ExchangeOutputBuffer to hold per-partition queues of RowGroup IOBufs, track total and per-partition buffered bytes via a dedicated MemoryPool, and drive ShuffleWriter::collect() calls on threshold-based drains.
  • Provide allocateTrackedIOBuf with a custom free callback so row-group buffers are tracked under the buffer pool and visible to the Velox arbitrator.
  • Implement backpressure using a global buffered-bytes high-watermark and ContinuePromise/Future pairs that are fulfilled when drains reduce buffered usage below a resume threshold.
  • Add reclaim logic that halves the partitionDrainThreshold (bounded by kMinDrainThreshold), then drains largest partitions first to free memory under pressure, and expose stats including drain/backpressure/reclaim counts and per-partition collect counts.
  • Add ExchangeOutputBufferReclaimer that delegates MemoryReclaimer::reclaim calls to the buffer’s reclaim method so the arbitrator can shed buffer memory safely.
presto-native-execution/presto_cpp/main/operators/ExchangeOutputBuffer.h
presto-native-execution/presto_cpp/main/operators/ExchangeOutputBuffer.cpp
Add ExchangeRead operator and plan node to consume batched shuffle pages produced by ExchangeWrite, parse CompactRow payloads, and output RowVectors.
  • Define ExchangeReadNode plan node with outputType-only schema and (de)serialization, marked as requiring an ExchangeClient and splits.
  • Implement ExchangeRead operator as a specialized Exchange that expects kFormatBatched CompactRow pages, expands RowGroupHeader + big-endian TRowSize-framed rows into string_views, and deserializes them into RowVectors via CompactRow::deserialize.
  • Implement adaptive output batch sizing based on estimated row size and preferredOutputBatchBytes, and record basic stats (input batches, total rows) on close.
  • Register ExchangeReadTranslator to build ExchangeRead from ExchangeReadNode using an injected ExchangeClient.
presto-native-execution/presto_cpp/main/operators/ExchangeRead.h
presto-native-execution/presto_cpp/main/operators/ExchangeRead.cpp
Add end-to-end and memory-behavior tests for the new exchange path using a local persistent shuffle implementation.
  • Create ExchangeTest fixture that wires ExchangeWrite/ExchangeRead/ShuffleRead operators, registers a LocalPersistentShuffleFactory-backed ShuffleInterface and ShuffleExchangeSource, and uses temp directories for local shuffle files.
  • Add end-to-end tests that write via ExchangeWrite and read via ExchangeRead over LocalShuffle for various scenarios: basic small data, large data, single partition, many partitions, and empty input, asserting result equivalence.
  • Test ExchangeOutputBuffer memory tracking under a constrained pool by enqueuing many fixed-size RowGroups until a VeloxRuntimeError (OOM) is thrown and verifying pool usedBytes is non-zero.
  • Test reclaim behavior by wiring an ExchangeOutputBufferReclaimer, enqueuing data, invoking reclaim to force drains and threshold halving, verifying reduced pool usage and earlier subsequent drains, enforcing a minimum drain threshold, and checking emitted stats.
  • Ensure shuffle directory cleanup after each test to avoid file leakage between test runs.
presto-native-execution/presto_cpp/main/operators/tests/ExchangeTest.cpp

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@sourcery-ai sourcery-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - I've found 3 issues, and left some high level feedback:

  • ExchangeOutputBuffer::noMoreDrivers() manually unlocks and re-locks stateMutex_ while holding a std::lock_guard, which is undefined behavior; consider restructuring this to avoid manual mutex manipulation (e.g., use a scoped lock, release it before finishAndClose(), and avoid calling lock/unlock directly).
  • ExchangeRead::expandBatchedPage() reads only the first int32_t of RowGroupHeader and ignores the compressed flag and header layout helpers; using serializer::detail::RowGroupHeader::read() (or at least validating the compressed field) would make this parsing more robust to future header changes and reduce the risk of format drift.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- ExchangeOutputBuffer::noMoreDrivers() manually unlocks and re-locks stateMutex_ while holding a std::lock_guard, which is undefined behavior; consider restructuring this to avoid manual mutex manipulation (e.g., use a scoped lock, release it before finishAndClose(), and avoid calling lock/unlock directly).
- ExchangeRead::expandBatchedPage() reads only the first int32_t of RowGroupHeader and ignores the compressed flag and header layout helpers; using serializer::detail::RowGroupHeader::read() (or at least validating the compressed field) would make this parsing more robust to future header changes and reduce the risk of format drift.

## Individual Comments

### Comment 1
<location path="presto-native-execution/presto_cpp/main/operators/ExchangeOutputBuffer.cpp" line_range="245-251" />
<code_context>
+}
+
+// Called by each driver when done. The last driver triggers finishAndClose.
+bool ExchangeOutputBuffer::noMoreDrivers() {
+  std::lock_guard<std::mutex> lock(stateMutex_);
+  ++numFinishedDrivers_;
+  if (numDrivers_ > 0 && numFinishedDrivers_ >= numDrivers_) {
+    stateMutex_.unlock();
+    finishAndClose();
+    stateMutex_.lock();
+    return true;
+  }
</code_context>
<issue_to_address>
**issue (bug_risk):** Manual unlocking of a mutex managed by std::lock_guard causes undefined behavior.

In `noMoreDrivers()` you create `std::lock_guard<std::mutex> lock(stateMutex_);` and then call `stateMutex_.unlock()`/`lock()` directly:

```c++
std::lock_guard<std::mutex> lock(stateMutex_);
++numFinishedDrivers_;
if (numDrivers_ > 0 && numFinishedDrivers_ >= numDrivers_) {
  stateMutex_.unlock();
  finishAndClose();
  stateMutex_.lock();
  return true;
}
```
Because `lock_guard` unconditionally calls `unlock()` in its destructor, this pattern can introduce races or deadlocks.

Use `std::unique_lock<std::mutex>` if you need to temporarily release the mutex (call `unlock()` before `finishAndClose()`), or compute an `isLast` flag while holding the lock, then release the lock and call `finishAndClose()` without any manual `lock()`/`unlock()` on `stateMutex_`.
</issue_to_address>

### Comment 2
<location path="presto-native-execution/presto_cpp/main/operators/ExchangeRead.cpp" line_range="176-177" />
<code_context>
+  size_t remaining = pageData.size();
+
+  // Iterate over one or more RowGroupHeaders in the buffer.
+  while (remaining >= kPageHeaderSize) {
+    int32_t uncompressedSize;
+    std::memcpy(&uncompressedSize, ptr, sizeof(int32_t));
+    ptr += kPageHeaderSize;
</code_context>
<issue_to_address>
**suggestion (bug_risk):** RowGroupHeader parsing assumes a specific header layout and ignores compression flag.

Here we only memcpy the first `int32_t` and then skip `kPageHeaderSize`, implicitly assuming the header layout is `[uncompressedSize][compressedSize][compressedFlag...]` and that `compressed` is always false with `uncompressedSize == compressedSize`. Any change to `RowGroupHeader::write()` (layout, new fields, compressed output) would likely break this silently. Please consider parsing via the header’s `read()` (or otherwise mirroring its fields explicitly) and validating the `compressed` flag before treating the next `uncompressedSize` bytes as raw payload.

Suggested implementation:

```cpp
  const char* ptr = pageData.data();
  size_t remaining = pageData.size();

  // Iterate over one or more RowGroupHeaders in the buffer.
  while (remaining >= kPageHeaderSize) {
    serializer::detail::RowGroupHeader header;
    header.read(ptr);
    ptr += kPageHeaderSize;
    remaining -= kPageHeaderSize;

    // Validate that we are not accidentally treating a compressed RowGroup as
    // uncompressed raw payload. This must stay in sync with RowGroupHeader::write().
    VELOX_CHECK(
        !header.compressed,
        "Compressed RowGroup encountered in ExchangeRead::expandBatchedPage, "
        "but only uncompressed RowGroups are supported");

    const auto uncompressedSize = header.uncompressedSize;

    VELOX_CHECK_GE(
        remaining,
        static_cast<size_t>(uncompressedSize),
        "Page data truncated: expected {} bytes, got {}",
        uncompressedSize,
        remaining);

    // Parse TRowSize-framed rows within this RowGroup.

```

I assumed the API and field names for `serializer::detail::RowGroupHeader`:

- `void RowGroupHeader::read(const char* src);`
- Public fields: `int32_t uncompressedSize;` and `bool compressed;`

If the actual API differs, you should adjust:

1. Replace `header.read(ptr);` with the correct deserialization call (e.g. `header.read(const uint8_t*&, size_t&)` or similar), updating `ptr`/`remaining` as required if `read` advances them.
2. Replace `header.uncompressedSize` with the correct accessor (e.g. `header.uncompressed_size()` or `header.uncompressedBytes`).
3. Replace `header.compressed` with the correct flag (e.g. `header.isCompressed()`, `header.compression != Compression::NONE`, etc.).
4. If `RowGroupHeader::read()` throws or returns a status on error instead of `VELOX_CHECK`, adapt the error handling accordingly.

The key behavioral requirements are:
- Deserialize via `RowGroupHeader::read()` rather than relying on raw layout.
- Use the header’s declared uncompressed size to bound the payload slice.
- Explicitly validate that the header does not indicate compressed data before treating the next `uncompressedSize` bytes as raw row payload.
</issue_to_address>

### Comment 3
<location path="presto-native-execution/presto_cpp/main/operators/tests/ExchangeTest.cpp" line_range="190-199" />
<code_context>
+  auto writer = ShuffleInterfaceFactory::factory(shuffleName_)
+                    ->createWriter(writeInfo, pool());
+
+  auto buffer = std::make_shared<ExchangeOutputBuffer>(
+      numPartitions,
+      std::shared_ptr<ShuffleWriter>(std::move(writer)),
+      std::move(bufferPool),
+      /*maxBufferedBytes=*/100L * 1024 * 1024,
+      /*partitionDrainThreshold=*/ExchangeOutputBuffer::kDefaultDrainThreshold);
+
+  // Wire the reclaimer to the buffer (normally done at construction).
</code_context>
<issue_to_address>
**suggestion (testing):** Extend reclaim test to assert that buffer memory is fully released after abort/noMoreData

Consider adding an assertion that after `buffer->abort()` (or a noMoreData()/drainAll() sequence), `buffer->pool()->usedBytes()` returns to 0 (or the expected minimal overhead). This would confirm that all RowGroup memory is fully released to the pool, not just reduced by reclaim.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment thread presto-native-execution/presto_cpp/main/operators/ExchangeOutputBuffer.cpp Outdated
Comment thread presto-native-execution/presto_cpp/main/operators/ExchangeRead.cpp Outdated
Comment on lines +190 to +199
auto buffer = std::make_shared<ExchangeOutputBuffer>(
numPartitions, writer, writerPool, kMaxBufferedBytes);

// Build partition key expressions on column 0.
std::vector<core::TypedExprPtr> keys{
std::make_shared<core::FieldAccessTypedExpr>(
dataType->childAt(0), dataType->nameOf(0))};

// Create HashPartitionFunctionSpec on column 0.
auto partitionFunctionSpec =

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Extend reclaim test to assert that buffer memory is fully released after abort/noMoreData

Consider adding an assertion that after buffer->abort() (or a noMoreData()/drainAll() sequence), buffer->pool()->usedBytes() returns to 0 (or the expected minimal overhead). This would confirm that all RowGroup memory is fully released to the pool, not just reduced by reclaim.

@meta-codesync meta-codesync Bot changed the title [Sapphire][Shuffle](1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path [Sapphire][Shuffle](1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (#27573) Apr 13, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 13, 2026
…angeOutputBuffer for new exchange path (prestodb#27573)

Summary:

Add ExchangeWrite and ExchangeRead operators — a new shuffle write/read path
that replaces PnS + LocalPartition + ShuffleWrite with a single operator using
the flat-buffer model, without any Velox core changes.

New classes (all in presto_cpp/main/operators):
- ExchangeOutputBuffer: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- ExchangeWriteNode + ExchangeWrite: flat-buffer CompactRow serialization,
  O(1) memory w.r.t. partition count, RowGroupHeader+TRowSize framing
- ExchangeReadNode + ExchangeRead: reads kFormatBatched pages only
- Translators for both operators

Planner wiring: useExchangeWrite flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…angeOutputBuffer for new exchange path (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
@meta-codesync meta-codesync Bot changed the title [Sapphire][Shuffle](1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (#27573) feat(native-pos): (1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (#27573) Apr 14, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…ngeOutputBuffer for new exchange path (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
@shrinidhijoshi shrinidhijoshi changed the title feat(native-pos): (1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (#27573) feat(native-pos): Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path [1/n] (#27573) Apr 14, 2026
@meta-codesync meta-codesync Bot changed the title feat(native-pos): Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path [1/n] (#27573) feat(native-pos): (1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (#27573) Apr 14, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…ngeOutputBuffer for new exchange path (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…ngeOutputBuffer for new exchange path (prestodb#27573)

Summary:
Pull Request resolved: prestodb#27573

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
@shrinidhijoshi shrinidhijoshi changed the title feat(native-pos): (1/n) Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (#27573) feat(native-pos): Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (1/n) (#27573) Apr 14, 2026
@meta-codesync meta-codesync Bot changed the title feat(native-pos): Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (1/n) (#27573) feat(native-pos): Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (1/n) (#27573) Apr 14, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 14, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:
Pull Request resolved: prestodb#27573

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:
Pull Request resolved: prestodb#27573

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…putBuffer for new exchange path (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
@shrinidhijoshi shrinidhijoshi changed the title feat(native-pos): Add new ExchangeRead, ExchangeWrite and ExchangeOutputBuffer for new exchange path (1/n) (#27573) feat(native-pos): Add new materialized Exchange path in presto-on-spark native (1/n) Apr 15, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…rk native (1/n) (prestodb#27573)

Summary:
Pull Request resolved: prestodb#27573

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
@meta-codesync meta-codesync Bot changed the title feat(native-pos): Add new materialized Exchange path in presto-on-spark native (1/n) feat(native-pos): Add new materialized Exchange path in presto-on-spark native (1/n) (#27573) Apr 15, 2026
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…rk native (1/n) (prestodb#27573)

Summary:
Pull Request resolved: prestodb#27573

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 15, 2026
…rk native (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 16, 2026
…rk native (1/n) (prestodb#27573)

Summary:

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer` 
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}  

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
shrinidhijoshi added a commit to shrinidhijoshi/presto that referenced this pull request Apr 16, 2026
…rk native (1/n) (prestodb#27573)

Summary:
Pull Request resolved: prestodb#27573

#### Summary

Current ShuffleWrite path does single threaded row-by-row processing to maintain thread safety for the shuffle writer (cosco internally at Meta)
But the shuffleWriter is actually thread safe if it is accessed partition exclusive manner. i.e. multi threads can write to it but only
as long as those threads are writing data for disjoint set of partitions.

We can exploit this by just creating a per partition buffer before the shuffle write operator and write multi threaded into this buffer with locks per partition. Everytime we write into a partition buffer, if it is full, we also flush it in the same operation. Also, now that we have batched many rows in our buffer, we write the whole batch as 1 row from shuffle writer POV, which reduces the overhead of per row operations inside the shuffle writer (checksumming, memcpy/buffering)

To accomplish this, instead of updating existing operators, we add new operators as below

- `ExchangeWrite+ExchangeOutputBuffer`
- `ExchangeRead` operators — a new shuffle write/read path

these replaces `PartitionAndSerialize + LocalPartition + ShuffleWrite` with a single operator

### Current architecture

{F1988112256}

### New architecture

 {F1988112251}

#### New classes (all in presto_cpp/main/operators):

- `ExchangeOutputBuffer`: shared per-partition queue buffer owning ShuffleWriter,
  ContinueFuture backpressure, error propagation on close/abort
- `ExchangeWriteNode` + `ExchangeWrite`: Buffers all input data into single contiguous buffer then writes out per partition `enqueue` calls into the ExchangeOutputBuffer
- `ExchangeReadNode` + `ExchangeRead`: reads kFormatBatched pages only
- Translators for both operators

### Configs

- `exchange.materialization.enabled=true`
- `exchange.materialization.partitioning-row-batch-buffer-size=16mb`
- `exchange.materialization.per-partition-buffer-size=130kb`

### Planner wiring

`exchange.materialization.enabled=true` flag gates ExchangeWriteNode (write) and
ExchangeReadNode (read) creation from PartitionedOutputNode/RemoteSourceNode.

Uses allPeersFinished for writer close — no setNumDrivers needed.

Differential Revision: D100365767
@steveburnett

Copy link
Copy Markdown
Contributor

Please add a release note - or NO RELEASE NOTE - following the Release Notes Guidelines to pass the failing but not required CI check.

@steveburnett

Copy link
Copy Markdown
Contributor

Please edit the PR title to follow semantic commit style to pass the failing and required CI check. See the failure in the test for advice.

…utputBuffer MaterializedExchange (prestodb#27573)

Summary:

Add ExchangeWrite and ExchangeRead operators with ExchangeOutputBuffer — the core components of the materialized exchange shuffle path.

New classes (all in presto_cpp/main/operators):
- ExchangeOutputBuffer: shared per-partition queue buffer owning ShuffleWriter, ContinueFuture backpressure, error propagation on close/abort
- ExchangeWriteNode + ExchangeWrite: flat-buffer CompactRow serialization, O(1) memory w.r.t. partition count, RowGroupHeader+TRowSize framing
- ExchangeReadNode + ExchangeRead: reads kFormatBatched pages only
- Translators for both operators

Uses allPeersFinished for writer close — no setNumDrivers needed.

Planner wiring and configs:
- Configs.h/cpp: add exchange.materialization.enabled (default: false), exchange.materialization.partitioning-row-batch-buffer-size (16MB), exchange.materialization.per-partition-buffer-size (130KB)
- PrestoToVeloxQueryPlan.cpp: when enabled, create ExchangeWriteNode (write) and ExchangeReadNode (read) instead of PnS+LocalPartition+ShuffleWrite
- PrestoServer.cpp: register ExchangeWriteTranslator + ExchangeReadTranslator
- PlanConverterTest: test batchPlanConversion with exchange path enabled

Reviewed By: xiaoxmeng

Differential Revision: D100365767
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants