[ESQL] Columnar I/O and vectorized block conversion for external sources#143703
[ESQL] Columnar I/O and vectorized block conversion for external sources#143703costin merged 10 commits intoelastic:mainfrom
Conversation
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
|
Hi @costin, I've created a changelog YAML for you. |
|
Important Review skippedAuto reviews are limited based on label configuration. 🏷️ Required labels (at least one) (2)
Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yml Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Extend the datasource SPI with a positional readBytes(long, ByteBuffer) method that enables zero-copy I/O for columnar formats like Parquet and ORC, eliminating temporary byte array allocations and data duplication when reading into ByteBuffers.
Replace element-by-element Block.Builder conversion in ORC with bulk array copy + BlockFactory.newXxxArrayVector wrapping. Add shared ColumnBlockConversions utility for reuse by future Parquet ColumnReader. Fix Parquet getString/getBytes UTF-8 round-trip with getBinary.
…oding Replace row-at-a-time Group materialization with ColumnReadStoreImpl and ColumnReader to decode each column independently into typed arrays. Eliminates Group object allocation, per-row type dispatch, and String round-trips for binary columns.
The GcsStorageObject.readBytes opened a new ReadChannel per call, overwhelming the mock GCS server during Parquet footer reads. Simplify ParquetStorageObjectAdapter to use a single stream-based I/O path for all reads including ByteBuffer operations.
…ders Improve the ParallelParsingCoordinator's concurrency safety and performance characteristics, informed by analysis of ClickHouse's ParallelParsingInputFormat and DuckDB's CSV Parser 2.0 architectures. Key changes: - Handle RejectedExecutionException per-segment to prevent consumer deadlock when the executor is saturated or shut down - Make closed field volatile for cross-thread visibility - Add 60s timeout on allDone.await() to prevent indefinite blocking - Increase minimum segment size from 64 KB to 1 MiB (per ClickHouse benchmarks showing 100 KB chunks are ~40% slower) - Implement SegmentableFormatReader on CsvFormatReader with RFC 4180 quote-aware boundary detection (handles escaped quotes "") - Implement SegmentableFormatReader on NdJsonFormatReader with CRLF support - Reuse CsvMapper across iterators (thread-safe after configuration) - Increase BufferedReader buffer from 8 KB to 64 KB JMH benchmark results (200K lines, in-memory, ParallelParsingBenchmark): Before (baseline): parallelism=1: 10.991 ± 0.111 ms/op parallelism=2: 12.063 ± 1.162 ms/op parallelism=4: 10.913 ± 1.124 ms/op After (this change): parallelism=1: 11.308 ± 0.603 ms/op parallelism=2: 11.545 ± 2.389 ms/op parallelism=4: 9.493 ± 1.056 ms/op The 4-thread case shows ~13% improvement (10.913 → 9.493 ms/op) while the 2-thread overhead is reduced from 9.7% to 2.1% over single-thread.
Replace byte-by-byte stream.read() in findNextRecordBoundary with 8KB buffered reads to reduce virtual dispatch overhead during boundary probing. Update benchmark reader to use 64KB BufferedReader buffer and reuse BytesRef scratch buffer instead of allocating per line. JMH (200K lines, ParallelParsingBenchmark): parallelism=1: 11.308 -> 8.381 ms/op (-26%) parallelism=2: 11.545 -> 8.628 ms/op (-25%) parallelism=4: 9.493 -> 6.978 ms/op (-26%)
Replace the InputStream -> BufferedReader -> readLine() -> String -> BytesRef chain with direct byte[] buffer scanning. The segment is read into a single byte[] via readAllBytes(), newlines are found by scanning the buffer, and BytesRef views point directly into the buffer bytes for zero-copy until the block builder does its one required copy. This eliminates 2 byte[] allocations + 2 copies per line (String from readLine + getBytes(UTF_8) for BytesRef) and removes the BufferedReader and InputStreamReader wrappers entirely. JMH (200K lines, ParallelParsingBenchmark): parallelism=1: 8.381 -> 4.020 ms/op (-52%) parallelism=2: 8.628 -> 4.272 ms/op (-50%) parallelism=4: 6.978 -> 3.602 ms/op (-48%) Cumulative improvement from baseline (6.1): parallelism=1: 11.308 -> 4.020 ms/op (-64%) parallelism=4: 10.913 -> 3.602 ms/op (-67%)
Use Locale.ROOT to satisfy the forbidden-apis check that disallows locale-sensitive String.format overloads.
634c8f5 to
6a5ba0b
Compare
bpintea
left a comment
There was a problem hiding this comment.
🤖 Reviewed using AI tools.
| public void setup() { | ||
| StringBuilder sb = new StringBuilder(); | ||
| for (int i = 0; i < lineCount; i++) { | ||
| sb.append("line-").append(String.format("%08d", i)).append(",value-").append(i % 1000).append("\n"); |
There was a problem hiding this comment.
Surprised String.format(formatString) didn't trigger the CI.
…locations * upstream/main: (153 commits) ES|QL: Update docs for TOP_SNIPPETS and DECAY (elastic#143739) Correctly include endpoint id in log msg in AuthorizationPoller (elastic#143743) Bar searching or sorting on _seq_no when disabled (elastic#143600) Generalize `testClientCancellation` test (elastic#143586) JSON_EXTRACT: zero-copy byte slicing for object, array, and number extraction (elastic#143702) Track recycler pages in circuit breaker (elastic#143738) [ESQL] Enable distributed pipeline breakers for external sources via FragmentExec (elastic#143696) Adding 'mode' and 'codec' fields to ES monitoring template (elastic#143673) [ESQL] Columnar I/O and vectorized block conversion for external sources (elastic#143703) Fix flaky MMR diversification YAML tests (elastic#143706) ES|QL codegen: check builder arguments for vector support (elastic#143724) Add Views Security Model (elastic#141050) ESQL: Prevent pushdown of unmapped fields in filters and sorts (elastic#143460) Don't run seq_no pruning tests in release CI (elastic#143725) ESQL: Support intra-row field references in ROW command (elastic#140217) ES|QL: Remove implicit limit in FORK branches in CSV tests (elastic#143601) IndexRoutingTests with and without synthetic id (elastic#143566) Synthetic id upgrade test in serverless (elastic#142471) Disable "Review skipped" comments for PRs without specified labels (elastic#143728) Cleanup ES|QL T-Digest code duplication, add memory accounting (elastic#143662) ...
…ces (elastic#143703) Reduces overhead on the hot path for external datasource I/O across columnar formats (ORC, Parquet) and text formats (CSV, NDJSON). **I/O SPI**: Adds positional `readBytes(long, ByteBuffer)` to StorageObject so providers can use native buffer I/O instead of stream-based reads. Local files use FileChannel, GCS uses ReadChannel, others get a stream-based default. **ORC**: Replaces per-element Block.Builder loops with bulk array operations via `ColumnBlockConversions`. The no-null path does `Arrays.copyOf` + vector wrap; nulls use a BitSet from the boolean mask. **Parquet**: Replaces `GroupRecordConverter` row-at-a-time with `ColumnReadStoreImpl` column-at-a-time decoding. Eliminates Group allocation, per-row dispatch, and the `getString`/`getBytes` round-trip for binary columns. **Parallel text parsing**: Introduces `SegmentableFormatReader` SPI and `ParallelParsingCoordinator` for intra-file parallel parsing of line-oriented formats. A file is split into byte-range segments at record boundaries, each segment is parsed on a separate thread, and results are reassembled in order. CSV boundary detection is quote-aware (RFC 4180, handles escaped `""`). The coordinator handles executor rejection, uses volatile signaling for cross-thread close, and applies a bounded timeout on shutdown. The parallel parsing path was iteratively optimized: first for correctness and safety, then by replacing byte-by-byte boundary scanning with 8 KB buffered reads, and finally by eliminating the `InputStream → Reader → String → BytesRef` chain in favor of direct `byte[]` scanning with zero-copy `BytesRef` views. ### JMH results (`ParallelParsingBenchmark`, 200K lines) | Threads | Baseline | After | Improvement | |---|---|---|---| | 1 | 10.99 ms | 4.02 ms | **-63%** | | 2 | 12.06 ms | 4.27 ms | **-65%** | | 4 | 10.91 ms | 3.60 ms | **-67%** | Developed using AI-assisted tooling
Fix String.format to use Locale.ROOT in ParallelParsingBenchmark (bpintea review comment). Add unit tests for findNextRecordBoundary in CsvFormatReader and NdJsonFormatReader covering quote-aware boundary detection, CRLF handling, buffer edge cases, and EOF. Extend ColumnBlockConversionsTests with repeating null, array copy, and larger-array edge cases for int/boolean/doubleFromLongs. Add RangeStorageObjectTests for readBytes with non-zero position, exact-size buffers, and partial reads.
Address review feedback from bpintea on PR #143703 and close test coverage gaps identified in the columnar I/O and parallel parsing changes. Fix `String.format` to use `Locale.ROOT` in `ParallelParsingBenchmark` (bpintea review comment). Add dedicated unit tests for the `findNextRecordBoundary` implementations in `CsvFormatReader` (RFC 4180 quote-aware boundary detection, escaped quotes, buffer edge cases) and `NdJsonFormatReader` (newline, CRLF, CR-only, buffer split). Extend `ColumnBlockConversionsTests` with parity edge cases for `intColumnFromLongs`, `booleanColumnFromLongs`, and `doubleColumnFromLongs` (repeating null, array copy independence, larger array than row count). Add `RangeStorageObjectTests` scenarios for `readBytes` with non-zero position, exact-size buffers, and partial reads from the middle of a range. Developed using AI-assisted tooling.
Reduces overhead on the hot path for external datasource I/O across columnar formats
(ORC, Parquet) and text formats (CSV, NDJSON).
I/O SPI: Adds positional
readBytes(long, ByteBuffer)to StorageObject so providerscan use native buffer I/O instead of stream-based reads. Local files use FileChannel,
GCS uses ReadChannel, others get a stream-based default.
ORC: Replaces per-element Block.Builder loops with bulk array operations via
ColumnBlockConversions. The no-null path doesArrays.copyOf+ vector wrap; nullsuse a BitSet from the boolean mask.
Parquet: Replaces
GroupRecordConverterrow-at-a-time withColumnReadStoreImplcolumn-at-a-time decoding. Eliminates Group allocation, per-row dispatch, and the
getString/getBytesround-trip for binary columns.Parallel text parsing: Introduces
SegmentableFormatReaderSPI andParallelParsingCoordinatorfor intra-file parallel parsing of line-oriented formats.A file is split into byte-range segments at record boundaries, each segment is parsed on
a separate thread, and results are reassembled in order. CSV boundary detection is
quote-aware (RFC 4180, handles escaped
""). The coordinator handles executor rejection,uses volatile signaling for cross-thread close, and applies a bounded timeout on shutdown.
The parallel parsing path was iteratively optimized: first for correctness and safety,
then by replacing byte-by-byte boundary scanning with 8 KB buffered reads, and finally
by eliminating the
InputStream → Reader → String → BytesRefchain in favor of directbyte[]scanning with zero-copyBytesRefviews.JMH results (
ParallelParsingBenchmark, 200K lines)Developed using AI-assisted tooling