Velox Task Barriers provide a synchronization mechanism that not only enables efficient task reuse,
important for workloads such as AI training data loading,
but also delivers the strict sequencing and checkpointing semantics required for streaming workloads.
By injecting a barrier split, users guarantee that no subsequent data is processed until the entire
DAG is flushed and the synchronization signal is unblocked. This capability serves two critical
patterns:
Task Reuse: Eliminates the overhead of repeated task initialization and teardown by safely
reconfiguring warm tasks for new queries. This is a recurring pattern in AI training data
loading workloads.
Streaming Processing: Enables continuous data handling with consistent checkpoints, allowing
stateful operators to maintain context across batches without service interruption.
The following example demonstrates the usage pattern: the user adds splits, requests a barrier,
and then must continue to drain results until the barrier is reached. This ensures all in-flight
data leaves the pipeline, allowing the barrier to complete.
// 1. Initialize the task infrastructure once auto task =Task::create(config, memoryPool); // 2. Loop to process multiple batches using the same task while(scheduler.hasMoreRequests()){ // A. Get new data splits (e.g., from a file or query) auto splits = scheduler.getNextSplits(); // B. Add splits to the running task task->addSplits(sourcePlanNodeId, splits); // C. Request a barrier to mark the end of this batch. // This injects a BarrierSplit that flows behind the data. // Note: Any splits added to the task AFTER this call will be blocked // and will not be processed until this barrier is fully cleared. auto barrierFuture = task->requestBarrier(); // D. Drain the pipeline until the barrier is passed. // Critical: We must keep pulling results so the task can flush // its buffers and reach the barrier. while(!barrierFuture.isReady()){ auto result = task->next();// Get next result batch if(result !=nullptr){ userOutput.consume(result); } } // E. Barrier reached. // The task has been reset (operators cleared, buffers flushed). // It is now ready for the next iteration immediately. }
Velox is a fully vectorized execution engine[1]. Its internal columnar memory layout enhances cache
locality, exposes more inter-instruction parallelism to CPUs, and enables the use of SIMD instructions,
significantly accelerating large-scale query processing.
However, some operators in Velox utilize a hybrid layout, where datasets can be temporarily converted
to a row-oriented format. The OrderBy operator is one example, where our implementation first
materializes the input vectors into rows, containing both sort keys and payload columns, sorts them, and
converts the rows back to vectors.
In this article, we explain the rationale behind this design decision and provide experimental evidence
for its implementation. We show a prototype of a hybrid sorting strategy that materializes only the
sort-key columns, reducing the overhead of materializing payload columns. Contrary to expectations, the
end-to-end performance did not improve—in fact, it was even up to 3× slower. We present the two
variants and discuss why one is counter-intuitively faster than the other.
The OrderBy operator in Velox’s current implementation uses a utility called SortBuffer to perform
the sorting, which consists of three stages:
Input Stage: Serializes input Columnar Vectors into a row format, stored in a RowContainer.
Sort Stage: Sorts based on keys within the RowContainer.
Output Stage: Extract output vectors column by column from the RowContainer in sorted order.
While row-based sorting is more efficient than column-based sorting[2,3], what if we only
materialize the sort key columns? We could then use the resulting sort indices to gather
the payload data into the output vectors directly. This would save the cost of converting
the payload columns to rows and back again. More importantly, it would allow us to spill
the original vectors directly to disk rather than first converting rows back into vectors
for spilling.
We have implemented a non-materializing sort strategy designed
to improve sorting performance. The approach materializes only the sort key columns and their original vector indices,
which are then used to gather the corresponding rows from the original input vectors into the output vector after the
sort completes. It changes the SortBuffer to NonMaterizedSortBuffer, which consists of three stages:
Input Stage: Holds the input vector (its shared pointer) in a list, serializes key columns
and additional index columns (VectorIndex and RowIndex) into rows, stored in a RowContainer.
Sort Stage: Sorts based on keys within the RowContainer.
Output Stage: Extracts the VectorIndex and RowIndex columns, uses them together to gather
the corresponding rows from the original input vectors into the output vector.
In theory, this should have significantly reduced the overhead of materializing payload columns,
especially for wide tables, since only sorting keys are materialized. However, the benchmark results
were the exact opposite of our expectations. Despite successfully eliminating expensive serialization
overhead and reducing the total instruction the end-to-end performance was 3x times slower.
To validate the effectiveness of our new strategy, we designed a benchmark with a varying number of
payload columns:
Inputs: 1000 Input Vectors, 4096 rows per vector.
Number of payload columns: 64, 128, 256.
L2 cache: 80 MiB, L3 cache: 108 MiB.
numPayloadColumns
Mode
Input Time
Sorting Time
Output Time
Total Time
Desc
64
Row-based
4.27s
0.79s
4.23s
11.64s
Row-based is 3.9x faster
Columnar
0.28s
0.84s
42.30s
45.90s
128
Row-based
20.25s
1.11s
5.49s
31.43s
Row-based is 2.0x faster
Columnar
0.27s
0.51s
59.15s
64.20s
256
Rows-based
29.34s
1.02s
12.85s
51.48s
Row-based is 3.0x faster
Columnar
0.87s
1.10s
144.00s
154.80s
The benchmark results confirm that Row-based Sort is the superior strategy,
delivering a 1.9x to 3.9x overall speedup compared to Columnar Sort. While Row-based Sort
incurs a significantly higher upfront cost during the Input phase (peaking at 104s), it
maintains a highly stable and efficient Output phase (maximum 32s). In contrast, Columnar
Sort suffers from severe performance degradation in the Output phase as the payload increases,
with execution times surging from 42s to 283s, resulting in a much slower total execution time
despite its negligible input overhead.
To identify the root cause of the performance divergence, we utilized perf stat to analyze
micro-architectural efficiency and perf mem to profile memory access patterns during the critical
Output phase.
Metrics
Row-based
Columnar
Desc
Total Instructions
555.6 Billion
475.6 Billion
Row +17%
IPC (Instructions Per Cycle)
2.4
0.82
Row 2.9x Higher
LLC Load Misses (Last Level Cache)
0.14 Billion
5.01 Billion
Columnar 35x Higher
Memory Level
Row-based Output
Columnar Outputs
RAM Hit
5.8%
38.1%
LFB Hit
1.7%
18.9%
RAM Hit
5.8%
38.1%
The results reveal a stark contrast in CPU utilization. Although the Row-based approach
executes 17% more instructions (due to serialization overhead), it maintains a high IPC of 2.4,
indicating a fully utilized pipeline. In contrast, the Columnar approach suffers from a low IPC
of 0.82, meaning the CPU is stalled for the majority of cycles. This is directly driven by the
35x difference in LLC Load Misses, which forces the Columnar implementation to fetch data from
main memory repeatedly. The memory profile further confirms this bottleneck: Columnar mode is
severely latency-bound, spending 38.1% of its execution time waiting for DRAM (RAM Hit) and
experiencing significant congestion in the Line Fill Buffer (18.9% LFB Hit), while Row-based
mode effectively utilizes the cache hierarchy.
Why does the non-materializing sort, specifically its gather method, cause so many cache misses?
The answer lies in its memory access pattern. Since Velox is a columnar engine, the output is
constructed column by column. For each column in an output vector, the gather process does the following:
It iterates through all rows of the current output vector.
For each row, locate the corresponding input vector via the sorted vector index.
Locates the source row in the corresponding input vector.
Copies the data from that single source cell to the target cell.
The sorted indices, by nature, offer low predictability. This forces the gather operation for a single
output column to jump unpredictably across as many as different input vectors, fetching just one
value from each. This random access pattern has two devastating consequences for performance.
First, at the micro-level, every single data read becomes a "long-distance" memory jump.
The CPU's hardware prefetcher is rendered completely ineffective by this chaotic access pattern,
resulting in almost every lookup yielding a cache miss.
Second, at the macro-level, the problem compounds with each column processed. The sheer
volume of data touched potentially exceeds the size of the L3 cache. This ensures
that by the time we start processing the next payload column, the necessary vectors have already
been evicted from the cache. Consequently, the gather process must re-fetch the same vector
metadata and data from main memory over and over again for each of the 256 payload columns.
This results in 256 passes of cache-thrashing, random memory access, leading to a catastrophic
number of cache misses and explaining the severe performance degradation.
In contrast, Velox’s current row-based approach serializes all input vectors into rows, with
each allocation producing a contiguous buffer that holds a subset of those rows. Despite the
serialization, the row layout preserves strong locality when materializing output
vectors: once rows are in the cache, they can be used to extract multiple output columns.
This leads to much better cache-line utilization and fewer cache misses than a columnar layout,
where each fetched line often yields only a single value per column. Moreover, the largely
sequential scans over contiguous buffers let the hardware prefetcher operate effectively,
boosting throughput even in the presence of serialization overhead.
This study reinforces the core principle of performance engineering: Hardware Sympathy.
Without understanding the characteristics of the memory hierarchy and optimizing for it,
simply reducing the instruction count usually does not guarantee better performance.
Efficiently merging sorted data partitions at scale is crucial for a variety of training data preparation
workloads, especially for Generative Recommenders (GRs) a new paradigm introduced in the paper
Actions Speak Louder than Words: Trillion-Parameter Sequential Transducers for Generative Recommendations.
A key requirement is to merge training data across partitions—for example, merging hourly partitions into
daily ones—while ensuring that all rows sharing the same primary key are stored consecutively. Training data is
typically partitioned and bucketed by primary key, with rows sharing the same key
stored consecutively, so merging across partitions essentially becomes a multi-way merge problem.
Normally, Apache Spark can be used for this sort-merge requirement — for example, via CLUSTER BY.
However, training datasets for a single job can often reach the PB scale, which in turn generates shuffle data at PB scale.
Although we typically apply bucketing and ordering by key when preparing training data in production,
Spark can eliminate the shuffle when merging training data from multiple hourly partitions.
However, each Spark task can only read the files planned from various partitions within a split
sequentially, placing them into the sorter and spilling as needed. Only after all files have been read
does Spark perform a sort-merge of the spilled files. This process produces a large number of small
spill files, which further degrades efficiency.
Moreover, Spark’s spill is row-based with a low compression ratio, resulting in approximately 4 times
amplification compared to the original columnar training data in the data lake. These factors
significantly degrade task stability and performance. Velox has a LocalMerge operator that can be
introduced into Apache Spark via Gluten or PySpark on Velox.
Note: To keep the focus on merging, the remainder of this article also assumes that each partition’s
training data is already sorted by primary key—a common setup in training data pipelines.
The LocalMerge operator consolidates its sources’ outputs into a single, sorted stream of rows.
It runs single-threaded, while its upstream sources may run multi-threaded within the same task,
producing multiple sorted inputs concurrently. For example, when merging 24 hourly partitions into
a single daily partition (as shown in the figure below), the merge plan fragment is split into two pipelines:
Pipeline 0: contains two operators, TableScan and CallbackSink. 24 drivers are instantiated to scan the 24 hourly partitions.
Pipeline 1: contains only a single operator, LocalMerge, with one driver responsible for performing the sort merge.
A CallbackSink operator is installed at the end of each driver in Pipeline 0. It pushes the TableScan
operator’s output vectors into the queues backing the merge streams. Inside LocalMerge, a TreeOfLosers
performs a k-way merge over the 24 merge streams supplied by the Pipeline 0 drivers, producing a single,
globally sorted output stream.
Although LocalMerge minimizes comparisons during merging, preserves row-ordering guarantees, and cleanly
isolates the single-threaded merge from the multi-threaded scan phase for predictable performance, it can
cause substantial memory pressure—particularly in training-data pipelines. In these workloads, extremely
wide tables are common, and even after column pruning, thousands of columns may remain.
Moreover, training data is typically stored in PAX-style formats such as Parquet, ORC, or DRWF.
Using Parquet as an example, the reader often needs to keep at least one page per column in memory. As a result,
simply opening a Parquet file with thousands of columns can consume significant memory even before any
merging occurs. Wide schemas further amplify per-column metadata, dictionary pages, and decompression buffers,
inflating the overall footprint. In addition, the k-way merge must hold input vectors from multiple sources
concurrently, which drives peak memory usage even higher.
To cap memory usage and avoid OOM when merging a large number of partitions, we extend LocalMerge to
process fewer local sources at a time, leverage existing spill facilities to persist intermediate
results, and introduce lazy-start activation for merge inputs. Using the case of merging 24 hourly
partitions into a single daily partition, the process is organized into two phases:
Phase 1
Break the scan-and-merge into multiple rounds (e.g., 3 rounds).
In each round, lazily start a limited number of drivers (e.g., drivers 0–7, eight at a time).
The started drivers scan data and push it into the queues backing their respective merge streams.
Perform an in-memory k-way merge and spill the results, producing a spill-file group (one or more spill files per group).
After all inputs from drivers 0–7 are consumed and spilled, the drivers will be closed, and close the file streams opened by their TableScan operators, and release associated memory.
Repeat the above steps for the remaining rounds (drivers 8–15, then drivers 16–23), ensuring peak memory stays within budget.
Phase 2
Create a concatenated file stream for each spill-file group produced in Phase 1.
Schedule one async callback for each concatenated stream to prefetch and push data into a merge stream.
Merge the outputs of the three merge streams using a k-way merge (e.g., a loser-tree), and begin streaming the final, globally sorted results to downstream operators.
The output batch rows is limited adaptively by estimating row size from the merge streams which use the averaged row size from the first batch.
Set local_merge_spill_enabled to true to enable spilling for the LocalMerge
operator (it is false by default). Then, set local_merge_max_num_merge_sources to
control the number of merge sources per round according to your memory management strategy.
Note: An executor must be configured for spilling, as it would schedule an asynchronous
callback for each concatenated stream to prefetch data and push it into the merge stream.
The number of merge sources is adjusted dynamically based on available memory, rather than being
determined by the local_merge_max_num_merge_sources parameter. The process starts with a small
number of sources, such as 2, and incrementally increases this number for subsequent
rounds (e.g., to 4) as long as sufficient memory is available. The number of sources stops increasing
once it reaches a memory-constrained limit.
In this post, I’ll share how we unblocked shared library builds in Velox, the challenges we encountered with our large CMake build system, and the creative solution that let us move forward without disrupting contributors or downstream users.
Velox’s codebase was started in Meta’s internal monorepo, which still serves as a source of truth. Changes from pull requests in the Github repository are not merged directly via the web UI. Instead, the changes are imported into the internal review and CI tool Phabricator, as a ‘diff’. There, it has to pass an additional set of CI checks before being merged into the monorepo and in turn, exported to Github as a commit on the ‘main’ branch.
Internally, Meta uses the Buck2 build system which, like its predecessor Buck, promotes small, granular build targets to improve caching and distributed compilation. Externally, however, the open-source community relies on CMake as a build system. When the CMake build system was first added, its structure mirrored the granular nature of the internal build targets.
The result was hundreds of targets: over one hundred library targets, built as static libraries, and more than two hundred executables for tests and examples. While Buck(2) is optimized for managing such granular targets, the same can not be said for CMake. Each subdirectory maintained its own CMakeLists.txt, and header includes were managed through global include_directories(), with no direct link between targets and their associated header files. This approach encouraged tight coupling across module boundaries. Over the years, dependencies accumulated organically, resulting in a highly interconnected, and in parts cyclic, dependency graph.
The combination of 300+ targets and static linking resulted in a massive build tree, dozens of GiB when building in release mode and several times larger when built with debug information. This grew to the point where we had to provision larger CI runners just to accommodate the build tree in debug mode!
Individual test executables could reach several GiB, making it impractical to transfer the executables between runners to parallelize the execution of the tests across different CI runners. Significantly delaying CI feedback for developers when pushing changes to a PR.
This size is the result of each static library containing a full copy of all of its dependencies' object files. With over 100+ strongly connected libraries, we end up with countless redundant copies of the same objects scattered throughout the build tree, blowing up the total size.
CMake usually requires the library dependency graph to be acyclic (a DAG). However it allows circular dependencies between static libraries, because it’s possible to adjust the linker invocation to work around possible issues with missing symbols.
For example, say library A depends on foo() from library B, and B in turn depends on bar() defined in A. If we link them in order A B, the linker will find foo() in B for use in A but will fail to find bar() for use in B. This happens because the linker processes the libraries from left to right and symbols that are not (yet) required are effectively ignored.
The trick is now simply to repeat the entire circular group in the linker arguments A B A B. Previously ignored symbols are now in the list of required symbols and can be resolved when the linker processes the repeated libraries.
However, there is no workaround for shared libraries. So the moment we attempted to build Velox as a set of shared libraries, CMake failed to configure. The textbook solution would involve refactoring dependencies between targets, explicitly marking dependencies - as PUBLIC( meaning forwarded to both direct and transitive dependents) or PRIVATE, ( required only for this target at build time), and manually breaking cycles. But with hundreds of targets and years of organic growth, this became an unmanageable task.
Manually addressing this was unmanageable so, our first instinct as Software Engineers was, of course, to automate the process. I wrote a python package that parses all CMakelists.txt files in the repository using Antlr4, tracks which source files and headers belong to which target and reconstructs the library dependency graph. Based on the graph and whether headers are included from a source or a header file, the tool grouped the linked targets as either PRIVATE or PUBLIC.
While this worked locally and made it easier to resolve the circular dependencies, it proved far too brittle and disruptive to implement at scale. Velox's high development velocity meant many parts of the code base changed daily, making it impossible to land modifications across over 250+ CMake files in a timely manner while also allowing for thorough review and testing. Maintaining this carefully constructed DAG of dependencies would also have required an unsustainable amount of specialized reviewer attention, time that could be spent much more productively elsewhere.
Given the practical constraints we needed a solution that would unblock shared builds without disrupting the ongoing development. To achieve this we used a common pattern of larger CMake code bases, project specific wrapper functions. A special thanks to Marcus Hanwell for introducing me to the pattern he successfully used in VTK.
We created wrapper functions for a selection of the CMake commands that create and manage build targets: add_library became velox_add_library, target_link_libraries turned into velox_link_libraries etc. These functions just forward the arguments to the wrapped core command, unless the option to build Velox as a monolithic library is set. In the latter case instead of ~100 only a single library target is created.
Each call to velox_add_library just adds the relevant source files to the main velox target. To ensure drop-in compatibility, for every original library target, the wrapper creates an ALIAS target referring to velox. This way, all existing code, downstream consumers, old and new test targets, can continue to link to their accustomed velox_xyz targets, which effectively point to the same monolithic library. There is no need to adjust CMake files throughout the codebase, keeping the migration quick and entirely transparent to developers and users.
Some special-case libraries, such as those involving the CUDA driver, still needed to be built separately for runtime or integration reasons. The wrapper logic allows exemptions for these, letting them be built as standalone libraries even in monolithic mode.
Here's a simplified version of the relevant logic:
function(velox_add_library TARGET) if(VELOX_MONO_LIBRARY) if(TARGET velox) # Target already exists, append sources to it. target_sources(velox PRIVATE ${ARGN}) else() add_library(velox ${ARGN}) endif() # create alias for compatibility add_library(${TARGET} ALIAS velox) else() # Just call add_library as normal add_library(${TARGET} ${ARGN}) endif() endfunction()
This new, monolithic library obviously doesn’t suffer from cyclic dependencies, as it only has external dependencies. So after implementing the wrapper functions, building Velox as a shared library only required some minor adjustments!
We have now switched the default configuration to build the monolithic library and transitioned to build the shared library in our main PR CI and macOS builds.
The results of our efforts can be clearly seen in our Build Metrics Report with the size of the resulting binaries being the biggest win as seen in Table 1. The workflow that collects these metrics uses our regular CI image with GCC 12.2 and ld 2.38.
The new executables built against the shared libvelox.so are a mere ~4% of their previous size! This unlocks improvements across the board both for the local developer experience, packaging/deployments for downstream users as well as CI.
Size of executables
Static
Shared
Release
18.31G
0.76G
Debug
244G
8.8G
Table 1: Total size of test, fuzzer and benchmark executables
Less impressive but still notable is the reduction of the debug build time by almost two hours1. The improvement is entirely explained by the much faster link time for the shared build of ~30 minutes compared to the 2 hours and 20 minutes it takes to link the static library.
Total CPU Time
Static
Shared
Release
13:21h
13:23h
Debug
12:50h
11:07h
Table 2: Total time spent on compilation and linking across all cores
In addition to unblocking shared builds, the wrapper function pattern offers further benefits for future feature additions or build-system wide adjustments with minimal disruption. We already use them to automate installation of header files along the Velox libraries, with no changes to the targets themselves.
The wrapper functions will also help with defining and enforcing the public Velox API by allowing us to manage header visibility, component libraries like GPU and Cloud extensions and versioning in a consistent and centralized manner.
Switching to C++20 contributes to having a modern ecosystem that is attractive in use, to develop in, and to maintain.
Going forward Velox is looking to enhance the codebase by making use of newer compiler functionalities, such as sanitization checks, by also moving to support newer compiler versions for both GCC and Clang.
Please refer to the C++20 standard for a complete list of new features.
The minimum targeted compiler versions support
coroutines
Calendar and Timezone library <chrono>
Newer versions of compilers implement more and more and C++20 features and library support.
Supporting Velox on newer compiler versions is a continuous effort.
There was some interesting behavior by the compilers. Changing the C++20 standard caused some compile errors in the existing code.
One of these errors was caused by a compiler issue itself.
In GCC 12.2.1, which is used in the CI, the std::string + operator implementation ran into a known issue causing a warning.
Because all warnings are errors we had to explictly add an exemption for this particlar compiler version.
In general, however, the found compile errors were reasonably easy to fix.
Most of the changes were compatible with C++17 as well which means the code is bit more clean.
However, one change caused slight trouble because it emitted warnings in C++17 causing build failures due to turning all warnigns into errors.
This was the change to require this in the lambda capture where applicable.
On the other hand, not addressing the changes to the lamda capture caused errors in C++20.
Overall, the switch to C++20 took some time but was not overly complicated. No changes to the CI pipelines used in the project were needed. It was limited to CMake and code changes.
This post describes the design principles and software components for extending Velox with hardware acceleration libraries like NVIDIA's cuDF. Velox provides a flexible execution model for hardware accelerators, and cuDF's data structures and algorithms align well with core components in Velox.
Extensibility is a key feature in Velox. The cuDF library integrates with Velox using the DriverAdapter interface to rewrite query plans for GPU execution. The rewriting process allows for operator replacement, operator fusion and operator addition, giving cuDF a lot of freedom when preparing a query plan with GPU operators. Once Velox converts a query plan into executable pipelines, the cuDF DriverAdapter rewrites the plan to use GPU operators:
Generally the cuDF DriverAdapter replaces operators one-to-one. An exception happens when a GPU operator is not yet implemented in cuDF, in which case a CudfConversion operator must be added to transfer from GPU to CPU, fallback to CPU execution, and then transfer back to GPU. For end-to-end GPU execution where cuDF replaces all of the Velox CPU operators, cuDF relies on Velox's pipeline-based execution model to separate stages of execution, partition the work across drivers, and schedule concurrent work on the GPU.
Velox and cuDF benefit from a shared data format, using columnar layout and Arrow-compatible buffers. The CudfVector type in the experimental cuDF backend inherits from Velox's RowVector type, and manages a std::unique_ptr to a cudf::table (link) which owns the GPU-resident data. CudfVector also maintains an rmm::cuda_stream_view (link) to ensure that asynchronously launched GPU operations are stream-ordered. Even when the data resides in GPU memory, CudfVector allows the operator interfaces to follow Velox's standard execution model of producing and consuming RowVector objects.
The first set of GPU operators in the experimental cuDF backend include TableScan, HashJoin, HashAggregation, FilterProject, OrderBy and more. cuDF's C++ API covers many features beyond this initial set, including broad support for list and struct types, high-performance string operations, and configurable null handling. These features are critical for running workloads in Velox with correct semantics for Presto and Spark users. Future work will integrate cuDF support for more Velox operators.
Compared to the typical settings for CPU execution, GPU execution with cuDF benefits from a lower driver count and larger batch size. Whereas Velox CPU performs best with ~1K rows per batch and driver count equal to the number of physical CPU cores, we have found Velox's cuDF backend to perform best with ~1 GiB batch size and 2-8 drivers for a single GPU. cuDF GPU operators launch one or more device-wide kernels and a single driver may not fully utilize GPU compute. Additional drivers improve throughput by queuing up more GPU work and avoiding stalling during host-to-device copies. Adding more drivers increases memory pressure linearly, and query processing throughput saturates once GPU compute is fully utilized. Please check out our talk, “Accelerating Velox with RAPIDS cuDF” from VeloxCon 2025 to learn more.
The experimental cuDF backend in Velox is under active development. It is implemented entirely in C++ and does not require CUDA programming knowledge. There are dozens more APIs in cuDF that can be integrated as Velox operators, and many design areas to explore such as spilling, remote storage connectors, and expression handling.
If you're interested in bringing GPU acceleration to the Velox ecosystem, please join us in the #velox-oss-community channel on the Velox Slack workspace. Your contributions will help push the limits of performance in Presto, Spark and other tools powered by Velox.
Velox depends on several libraries.
Some of these dependencies include open-source libraries from Meta, including Folly and
Facebook Thrift. These libraries are in active development and also depend on each other, so they all have to be updated to the same version at the same time.
Updating these dependencies typically involves modifying the Velox code to align with any public API or semantic changes in these dependencies.
However, a recent upgrade of Folly and Facebook Thrift to version v2025.04.28.00 caused a SEGFAULT only in one unit test in Velox
named velox_functions_remote_client_test.
We immediately put on our gdb gloves and looked at the stack traces. This issue was also reproducible in a debug build.
The SEGFAULT occurred in Facebook Thrift's ThriftServer Class during it's initialization but the offending call was invoking a destructor of a certain handler.
However, the corresponding source code was pointing to an invocation of a different function. And this code was present inside a Facebook Thrift
header called AsyncProcessor.h.
This handler (RemoteServer) was implemented in Velox as a Thrift definition. Velox compiled this thrift file using Facebook Thrift, and the generated code
was using the ServerInterface class in Facebook Thrift. This ServerInterface class was further extended from both the AsyncProcessorFactory and
ServiceHandlerBase interfaces in Facebook Thrift.
One of the culprits resulting in SEGFAULTs in the past was the conflict due to the usage of Apache Thrift and Facebook Thrift.
However, this was not the root cause this time because we were able to reproduce this issue by just building the test without the Apache Thrift dependency installed.
We were entering a new territory to investigate, and we were not sure where to start.
We then compiled an example called EchoService in Facebook Thrift that was very similar to the RemoteServer, and it worked. Then we copied and compiled the Velox RemoteServer
in Facebook Thrift and that worked too! So the culprit was likely in the compilation flags, which likely differed between Facebook Thrift and Velox.
We enabled the verbose logging for both builds and were able to spot one difference. We saw the GCC coroutines flag being used in the Facebook Thrift build.
We were also curious about the invocation of the destructor instead of the actual function. We put our gdb gloves back on and dumped the entire
vtable for the RemoteServer class and its base classes. The vtables were different when it was built in Velox vs. Facebook Thrift.
Specifically, the list of functions inherited from ServiceHandlerBase was different.
The vtable for the RemoteServer handler in the Velox build had the following entries:
Tying up both pieces of evidence, we could conclude that Velox generated a different vtable structure compared to what Facebook Thrift (and thus ThriftServer) was built with.
Looking around further, we noticed that the ServiceHandlerBase was conditionally adding functions based on the coroutines compile flag that influences the FOLLY_HAS_COROUTINES macro from the portability header.
As a result, the ThriftServer would access an incorrect function (~ServiceHandlerBase destructor at offset 3 in the first vtable above) instead of the expected
initialization function (semifuture_onStartServing at offset 3 in the second vtable above), thus resulting in a SEGFAULT.
We recompiled the Facebook Thrift dependency for Velox with the coroutines compile flag disabled, and the test passed.
At the end of the previous
article, we were halfway
through running our first distributed query:
SELECT l_partkey, count(*) FROM lineitem GROUP BY l_partkey;
We discussed how a query starts, how tasks are set up, and the interactions
between plans, operators, and drivers. We have also presented how the first
stage of the query is executed, from table scan to partitioned output - or the
producer side of the shuffle.
In this article, we will discuss the second query stage, or the consumer side
of the shuffle.
As presented in the previous post, on the first query stage each worker reads
the table, then produces a series of information packets (SerializedPages)
intended for different workers of stage 2. In our example, the lineitem table
has no particular physical partitioning or clustering key. This means that any
row of the table can have any value of l_partkey in any of the files forming
the table. So in order to group data based on l_partkey, we first need to make
sure that rows containing the same values for l_partkey are processed by the
same worker – the data shuffle at the end of stage 1.
The overall query structure is as follows:
The query coordinator distributes table scan splits to stage 1 workers in no
particular order. The workers process these and, as a side effect, fill
destination buffers that will be consumed by stage 2 workers. Assuming there
are 100 stage 2 workers, every stage 1 Driver has its own PartitionedOutput
which has 100 destinations. When the buffered serializations grow large enough,
these are handed off to the worker's OutputBufferManager.
Now let’s focus on the stage 2 query fragment. Each stage 2 worker has the
following plan fragment: PartitionedOutput over Aggregation over LocalExchange
over Exchange.
Each stage 2 Task corresponds to one destination in the OutputBufferManager of
each stage 1 worker. The first stage 2 Tasks fetches destination zero from all
the stage 1 Tasks. The second stage 2 fetches destination 1 from the first
stage Tasks and so on. Everybody talks to everybody else. The shuffle proceeds
without any central coordination.
But let's zoom in to what actually happens at the start of stage 2.
The plan has a LocalExchange node after the Exchange. This becomes two
pipelines: Exchange and LocalPartition on one side, and LocalExchange,
HashAggregation, and PartitionedOutput on the other side.
The Velox Task is intended to be multithreaded, with typically 5 to 30 Drivers.
There can be hundreds of Tasks per stage, thus amounting to thousands of
threads per stage. So, each of the 100 second stage workers is consuming 1/100
of the total output of the first stage. But it is doing this in a multithreaded
manner, with many threads consuming from the ExchangeSource. We will explain
this later.
In order to execute a multithreaded group by, we can either have a thread-safe
hash table or we can partition the data in n disjoint streams, and then proceed
aggregating each stream on its own thread. On a CPU, we almost always prefer to
have threads working on their own memory, so data will be locally partitioned
based on l_partkey using a local exchange. CPUs have complex cache coherence
protocols to give observers a consistent ordered view of memory, so
reconciliation after many cores have written the same cache line is both
mandatory and expensive. Strict memory-to-thread affinity is what makes
multicore scalability work.
To create efficient and independent memory access patterns, the second stage
reshuffles the data using a local exchange. In concept, this is like the remote
exchange between tasks, but is scoped inside the Task. The producer side
(LocalPartition) calculates a hash on the partitioning column l_partkey, and
divides this into one destination per Driver in the consumer pipeline. The
consumer pipeline has a source operator LocalExchange that reads from the queue
filled by LocalPartition. See
LocalPartition.h
for details. Also see the code in
Task.cpp
for setting up the queues between local exchange producers and consumers.
While remote shuffles work with serialized data, local exchanges pass in-memory
vectors between threads. This is also the first time we encounter the notion of
using columnar encodings to accelerate vectorized execution. Velox became known
by its extensive use of such techniques, which we call compressed execution. In
this instance, we use dictionaries to slice vectors across multiple
destinations – we will discuss it next.
Query execution often requires changes to the cardinality (number of rows in a
result) of the data. This is essentially what filters and joins do – they
either reduce the cardinality of the data, e.g., filters or selective joins, or
increase the cardinality of the data, e.g. joins with multiple key matches.
Repartitioning in LocalPartition assigns a destination to each row of the input
based on the partitioning key. It then makes a vector for each destination with
just the rows for that destination. Suppose rows 2, 8 and 100 of the current
input hash to 1. Then the vector for destination 1 would only have rows 2, 8
and 100 from the original input. We could make a vector of three rows by
copying the data. Instead, we save the copy by wrapping each column of the
original input in a DictionaryVector with length 3 and indices 2, 8 and 100.
This is much more efficient than copying, especially for wide and nested data.
Later on, the LocalExchange consumer thread running destination 1 will see a
DictionaryVector of 3 rows. When this is accessed by the HashAggregation
Operator, the aggregation sees a dictionary and will then take the indirection
and will access for row 0 the value at 2 in the base (inner) vector, for row 1
the value at 8, and so forth. The consumer for destination 0 does the exact
same thing but will access, for example, rows 4, 9 and 50.
The base of the dictionaries is the same on all the consumer threads. Each
consumer thread just looks at a different subset. The cores read the same cache
lines, but because the base is not written to (read-only), there is no
cache-coherence overhead.
To summarize, a DictionaryVector<T> is a wrapper around any vector of T. The
DictionaryVector specifies indices, which give indices into the base vector.
Dictionary encoding is typically used when there are few distinct values in a
column. Take the strings “experiment” and “baseline”. If a column only has
these values, it is far more efficient to represent it as a vector with
“experiment” at 0 and “baseline” at 1, and a DictionaryVector that has,
say, 1000 elements, where these are either the index 0 or 1.
Besides this, DictionaryVectors can also be used to denote a subset or a
reordering of elements of the base. Because all places that accept vectors also
accept DictionaryVectors, the DictionaryVector becomes the universal way of
representing selection. This is a central precept of Velox and other modern
vectorized engines. We will often come across this concept.
We have now arrived at the second pipeline of stage 2. It begins with
LocalExchange, which then feeds into HashAggregation. The LocalExchange picks a
fraction of the Task's input specific to its local destination, about
1/number-of-drivers of the task's input.
We will talk about hash tables, their specific layout and other tricks in a
later post. For now, we look at HashAggregation as a black box. This specific
aggregation is a final aggregation, which is a full pipeline barrier that only
produces output after all input is received.
How does the aggregation know it has received all its input? Let's trace the
progress of the completion signal through the shuffles. A leaf worker knows
that it is at end if the Task has received the “no more splits” message in the
last task update from the coordinator.
So, if one DataSource inside a tableScan is at end and there are no more
splits, this particular TableScan is not blocked and it is at end. This will
have the Driver call PartitionedOutput::noMoreInput() on the tableScan's
PartitionedOutput. This will cause any buffered data for any destination to get
flushed and moved over to OutputBufferManager with a note that no more is
coming. OutputBufferManager knows how many Drivers there are in the TableScan
pipeline. After it has received this many “no more data coming” messages, it
can tell all destinations that this Task will generate no more data for them.
Now, when stage 2 tasks query stage 1 producers, they will know that they are
at end when all the producers have signalled that there is no more data coming.
The response to the get data for destination request has a flag identifying the
last batch. The ExchangeSource on the stage 2 Task set the no-more-data flag.
This is then queried by all the Drivers and each of the Exchange Operators sees
this. This then calls noMoreInput in the LocalPartition. This queues up a “no
more data” signal in the local exchange queues. If the LocalExchange at the
start of the second pipeline of stage 2 sees a “no more data” from each of its
sources, then it is at end and noMoreInput is called on the HashAggregation.
This is how the end of data propagates. Up until now, HashAggregation has
produced no output, since the counts are not known until all input is received.
Now, HashAggregation starts producing batches of output, which contain the
l_partkey value and the number of times this has been seen. This reaches the
last PartitionedOutput, which in this case has only one destination, the final
worker that produces the result set. This will be at end when all the 100
sources have reported their own end of data.
We have finally walked through the distributed execution of a simple query. We
presented how data is partitioned between workers in the cluster, and then a
second time over inside each worker.
Velox and Presto are designed to aggressively parallelize execution, which
means creating distinct, non-overlapping sets of data to process on each
thread. The more threads, the more throughput. Also remember that for CPU
threads to be effective, they must process tasks that are large enough (often
over 100 microseconds worth of cpu time), and not communicate too much with
other threads or write to memory other threads are writing to. This is
accomplished with local exchanges.
Another important thing to remember from this article is how columnar encodings
(DictionaryVectors, in particular) can be used as a zero-copy way of
representing selection/reorder/duplication. We will see this pattern again with
filters, joins, and other relational operations.
Next we will be looking at joins, filters, and hash aggregations. Stay tuned!
In this article, we will discuss how a distributed compute engine executes a
query similar to the one presented in
our first article:
SELECT l_partkey, count(*) FROM lineitem GROUP BY l_partkey;
We use the TPC-H schema to illustrate the example, and
Prestissimo
as the compute engine orchestrating distributed query execution. Prestissimo is
responsible for the query engine frontend (parsing, resolving metadata,
planning, optimizing) and distributed execution (allocating resources and
shipping query fragments), and Velox is responsible for the execution of plan
fragments within a single worker node. Throughout this article, we will present
which functions are performed by Velox and which by the distributed engine -
Prestissimo, in this example.
Prestissimo first receives the query through a coordinator node, which is
responsible for parsing and planning the query. For our sample query, a
distributed query plan with three query fragments will be created:
The first fragment reads the l_partkey column from the lineitem table and
divides its output according to a hash of l_partkey.
The second fragment reads the output from the first fragment and updates a
hash table from l_partkey containing the number of times the particular value
of l_partkey has been seen (the count(*) aggregate function implementation).
The final fragment then reads the content of the hash tables, once the
second fragment has received all the rows from the first fragment.
The shuffle between the two first fragments partitions the data according to
l_partkey. Suppose there are 100 instances of the second fragment. If the hash
of l_partkey modulo 100 is 0, the row goes to the first task in the second
stage; if it is 1, the row goes to the second task, and so forth. In this way,
each second stage Task gets a distinct subset of the rows. The shuffle between
the second and third stage is a gather, meaning that there is one Task in the
third stage that will read the output of all 100 tasks in the second stage.
A Stage is the set of Tasks that share the same plan fragment. A Task is the
main integration point between Prestissimo and Velox; it’s the Velox execution
instance that physically processes all or part of the data that passes through
the stage.
To set up the distributed execution, Prestissimo first selects the workers from
the pool of Prestissimo server processes it manages. Assuming that stage 1 is
10 workers wide, it selects 10 server processes and sends the first stage plan
to them. It then selects 100 workers for stage 2 and sends the second stage
plan to these. The last stage that gathers the result has only one worker, so
Prestissimo sends the final plan to only one worker. The set of workers for
each stage may overlap, so a single worker process may host multiple stages of
one query.
Let's now look more closely at what each worker does at query setup time.
In Prestissimo, the message that sets up a Task in a worker is called Task
Update. A Task Update has the following information: the plan, configuration
settings, and an optional list of splits. Splits are further qualified by what
plan node they are intended for, and whether more splits for the recipient plan
node and split group will be coming.
Since split generation involves enumerating files from storage (so they may
take a while), Presto allows splits to be sent to workers asynchronously, such
that the generation of splits can run in parallel with the execution of the
first splits. Therefore, the first task update specifies the plan and the
config. Subsequent ones only add more splits.
Besides the plan, the coordinator provides configs as maps from string key to
string value, both top level and connector level. The connector configs have
settings for each connector; connectors are used by table scan and table writer
to deal with storage and file formats. These configs and other information,
like thread pools, top level memory pool etc. are handed to the Task in a
QueryCtx object. See velox/core/QueryCtx.h in the Velox repo for details.
Once the Velox Task is created, a TaskManager hands it splits to work on. This
is done with Task::addSplit(), and can be done after the Task has started
executing. See velox/exec/Task.h for details.
Let's zoom into what happens at Task creation: A PlanNode tree specifying what
the Task does is given to the Task as part of a PlanFragment. The most
important step done at Task creation is splitting the plan tree into pipelines.
Each pipeline then gets a DriverFactory, which is the factory class that makes
Drivers for the pipeline. The Drivers, in their turn, contain the Operators
that do the work of running the query. The DriverFactories are made in
LocalPlanner.cpp. See LocalPlanner::plan for details.
Following the execution model known as Volcano, the plan is represented by an
operator tree where each node consumes the output of its child operators, and
returns output to the parent operator. The root node is typically a
PartitionedOutputNode or a TableWriteNode. The leaf nodes are either
TableScanNode, ExchangeNode or ValuesNode (used for query literals). The full
set of Velox PlanNode can be found at velox/core/PlanNode.h.
The PlanNodes mostly correspond to Operators. PlanNodes are not executable as
such; they are only a structure describing how to make Drivers and Operators,
which do the actual execution. If the tree of nodes has a single branch, then
the plan is a single pipeline. If it has nodes with more than one child
(input), then the second input of the node becomes a separate pipeline.
Task::start() creates the DriverFactories, which then create the Drivers. To
start execution, the Drivers are queued on a thread pool executor. The main
function that runs Operators is Driver::runInternal(). See this function for
the details of how Operators and the Driver interface: Operator::isBlocked()
determines if the Driver can advance. If it cannot, it goes off thread until a
future is realized, which then puts it back on the executor.
getOutput() retrieves data from an Operator and addInputs() feeds data into
another Operator. The order of execution is to advance the last Operator which
can produce output and then feed this to the next Operator. if an Operator
cannot produce output, then getOutput() is called on the Operator before it
until one is found that can produce data. If no operator is blocked and no
Operator can produce output, then the plan is at end. The noMoreInput() method
is called on each operator. This can unblock production of results, for
example, an OrderBy can only produce its output after it knows that it has all
the input.
The Minimal Pipeline: Table Scan and Repartitioning
Table Scan. With the table scan stage of our sample query, we have one pipeline
with two operators: TableScan and PartitionedOutput. Assume this pipeline has
five Drivers, and that all these five Drivers go on the thread pool executor.
The PartitionedOutput cannot do anything because it has no input. The
TableScan::getOutput() is then called. See velox/exec/TableScan.cpp for
details. The first action TableScan takes is to look for a Split with
task::getSplitOrFuture(). If there is no split available, this returns a
future. The Driver will then park itself off thread and install a callback on
the future that will reschedule the Driver when a split is available.
It could also be the case that there is no split and the Task has been notified
that no more splits will be coming. In this case TableScan would be at the end.
Finally, if a Split is available, TableScan interprets it. Given a TableHandle
specification provided as part of the plan (list of columns and filters), the
Connector (as specified in the Split) makes a DataSource. The DataSource
handles the details of IO and file and table formats.
The DataSource is then given the split. After this, DataSource::next() can be
called repeatedly to get vectors (batches) of output from the file/section of
file specified by the Split. If the DataSource is at the end, TableScan looks
for the next split. See Connector.h for the Connector and DataSource
interfaces.
Repartitioning. Now we have traced execution up to the TableScan returning its
first batch of output. The Driver feeds this to PartitionedOutput::addInput().
See PartitionedOutput.cpp for details. PartitionedOutput first calculates a
hash on the partitioning key, in this case, l_partkey, producing a destination
number for each row in the batch RowVectorPtr input_.
Each destination has a partly filled serialization buffer (VectorStreamGroup)
for each destination worker. If there are 100 Tasks in the second stage, each
PartitionedOutput has 100 destinations, each with one VectorStreamGroup. The
main function of a VectorStreamGroup is append(), which takes a RowVectorPtr
and a set of row numbers in it. It serializes each value identified by the row
numbers and adds it to the partly formed serialization. When enough rows are
accumulated in the VectorStreamGroup, it produces a SerializedPage. See flush()
in PartitionedOutput.cpp.
The SerializedPage is a self contained serialized packet of information that
can be transmitted over the wire to the next stage. Each such page only
contains rows intended for the same recipient. These pages are then queued up
in the worker process' OutputBufferManager. Note the code with BlockingReason
in flush(). The buffer manager maintains separate queues of all consumers of
all Tasks. If a queue is full, adding output may block. This returns a future
that is realized when there is space to add more data to the queue. This
depends on when the Task's consumer Task fetches the data.
Shuffles in Prestissimo are implemented by PartitionedOutput at the producer
and Exchange at the consumer end. The OutputBufferManager keeps ready
serialized data for consumers to pick up. The binding of these to the Presto
wire protocols is in TaskManager.cpp for the producer side, and in
PrestoExchangeSource.cpp for the consumer side.
We presented how a plan becomes executable and how data moves between and
inside Operators. We discussed that a Driver can block (go off thread) to wait
for a Split to become available, or to wait for its output to be consumed. We
have now scratched the surface of running a leaf stage of a distributed query.
There is much more to Operators and vectors, though. In the next installment of
Velox Primer, we will look at what the second stage of our minimal sample query
does.
This is the first part of a series of short articles that will take you through
Velox’s internal structures and concepts. In this first part, we will discuss
how distributed queries are executed, how data is shuffled among different
stages, and present Velox concepts such as Tasks, Splits, Pipelines, Drivers,
and Operators that enable such functionality.
Velox is a library that provides the functions that go into a query fragment in
a distributed compute engine. Distributed compute engines, like Presto and
Spark run so-called exchange parallel plans. Exchange is also known as a data
shuffle, and allows data to flow from one stage to the next. Query fragments
are the things connected by shuffles, and comprise the processing that is
executed within a single worker node. A shuffle takes input from a set of
fragments and routes rows of input to particular consumers based on a
characteristic of the data, i.e., a partitioning key. The consumers of the
shuffle read from the shuffle and get rows of data from whatever producer, such
that the partitioning key of these rows matches the consumer.
Take the following query as an example:
SELECT key, count(*) FROM T GROUP BY key
Suppose it has n leaf fragments that scan different parts of T (the
green circles at stage 1). At the end of the leaf fragment, assume there is a
shuffle that shuffles the rows based on key. There are m consumer fragments
(the yellow circles at stage 2), where each gets a non-overlapping selection of
rows based on column key. Each consumer then constructs a hash table keyed
on key, where they store a count of how many times the specific value of
key has been seen.
Now, if there are 100B different values of key, the hash table would not
conveniently fit on a single machine. For efficiency, there is a point in
dividing this into 100 hash tables of 1B entries each. This is the point of
exchange parallel scale-out. Think of shuffle as a way for consumers to each
consume their own distinct slice of a large stream produced by multiple
workers.
Distributed query engines like Presto and Spark both fit the above description.
The difference is in, among other things, how they do the shuffle, but we will
get back to that later.
Within a worker node, the Velox representation of a query fragment is called a
Task (velox::exec::Task). A task is informed by mainly two things:
Plan - velox::core::PlanNode: specifies what the Task does.
Splits - velox::exec::Split: specifies the data the Task operates on.
The Splits correspond to the plan that the Task is executing. For the first
stage Tasks (table scanning), the Splits specify pieces of files to scan. For
the second stage Tasks (group by), their Splits identify the table scan Tasks
from which the group by reads its input. There are file splits
(velox::connector::ConnectorSplit) and remote splits
(velox::exec::RemoteConnectorSplit). The first identifies data to read, the
second identifies a running Task.
The distributed engine makes PlanNodes and Splits. Velox takes these and makes
Tasks. Tasks send back statistics, errors and other status information to the
distributed engine.
Inside a Task, there are Pipelines. Each pipeline is a linear sequence of
operators (velox::exec::Operator), and operators are the objects that implement
relational logic. In the case of the group by example, the first task has one
pipeline, with a TableScan (velox::exec::TableScan) and a PartitionedOutput
(velox::exec::PartitionedOutput). The second Task too has one pipeline, with
an Exchange, a LocalExchange, HashAggregation, and a
PartitionedOutput.
Each pipeline has one or more Drivers. A Driver is a container for one linear
sequence of Operators, and typically runs on its own thread. The pipeline is
the collection of Drivers with the same sequence of Operators. The individual
Operator instances belong to each Driver. The Drivers belong to the Task. These
are interlinked with smart pointers such that they are kept alive for as long as
needed.
An example of a Task with two pipelines is a hash join, with separate pipelines
for the build and for the probe side. This makes sense because the build must
be complete before the probe can proceed. We will talk more about this later.
The Operators on each Driver communicate with each other through the Driver.
The Driver picks output from one Operator, keeps tabs on stats and passes it to
the next Operator. The data passed between Operators consists of vectors. In
particular, an Operator produces/consumes a RowVector, which is a vector with a
child vector for every column of the relation - it is the equivalent of
RecordBatch in Arrow. All vectors are subclasses of velox::BaseVector.
The first Operator in a Driver is called a source. The source operator takes
Splits to figure the file/remote Task that provides its data, and produces a
sequence of RowVectors. The last operator in the pipeline is called a sink. The
sink operator does not produce an output RowVector, but rather puts the data
somewhere for a consumer to retrieve. This is typically a PartitionedOutput.
The consumer of the PartitionedOutput is an Exchange in a different task, where
the Exchange is in the source position. Operators that are neither sources or
sinks are things like FilterProject, HashProbe, HashAggregation and so on, more
on these later.
Operators also contain a state. They can be blocked, accepting input, have
output to produce or not, or may be notified that no more input is coming.
Operators do not call the API functions of other Operators directly, but
instead the Driver decides which Operator to advance next. This has the benefit
that no part of the Driver's state is captured in nested function calls. The
Driver has a flat stack and therefore can go on and off thread at any time,
without having to unwind and restore nested function frames.
We have seen how a distributed query consists of fragments joined by shuffles.
Each fragment has a Task, which has pipelines, Drivers and Operators. PlanNodes
represent what the task is supposed to do. These tell the Task how to set up
Drivers and Operators. Splits tell the Task where to find input, e.g. from
files or from the other Tasks. A Driver corresponds to a thread of execution.
It may be running or blocked, e.g. waiting for data to become available or for
its consumer to consume already produced data. Operators pass data between
themselves as vectors.
In the next article we will discuss the different stages in the life of a query.