Skip to content

[ESQL] Columnar I/O and vectorized block conversion for external sources#143703

Merged
costin merged 10 commits intoelastic:mainfrom
costin:esql/group-b-channel-api
Mar 6, 2026
Merged

[ESQL] Columnar I/O and vectorized block conversion for external sources#143703
costin merged 10 commits intoelastic:mainfrom
costin:esql/group-b-channel-api

Conversation

@costin
Copy link
Copy Markdown
Member

@costin costin commented Mar 5, 2026

Reduces overhead on the hot path for external datasource I/O across columnar formats
(ORC, Parquet) and text formats (CSV, NDJSON).

I/O SPI: Adds positional readBytes(long, ByteBuffer) to StorageObject so providers
can use native buffer I/O instead of stream-based reads. Local files use FileChannel,
GCS uses ReadChannel, others get a stream-based default.

ORC: Replaces per-element Block.Builder loops with bulk array operations via
ColumnBlockConversions. The no-null path does Arrays.copyOf + vector wrap; nulls
use a BitSet from the boolean mask.

Parquet: Replaces GroupRecordConverter row-at-a-time with ColumnReadStoreImpl
column-at-a-time decoding. Eliminates Group allocation, per-row dispatch, and the
getString/getBytes round-trip for binary columns.

Parallel text parsing: Introduces SegmentableFormatReader SPI and
ParallelParsingCoordinator for intra-file parallel parsing of line-oriented formats.
A file is split into byte-range segments at record boundaries, each segment is parsed on
a separate thread, and results are reassembled in order. CSV boundary detection is
quote-aware (RFC 4180, handles escaped ""). The coordinator handles executor rejection,
uses volatile signaling for cross-thread close, and applies a bounded timeout on shutdown.

The parallel parsing path was iteratively optimized: first for correctness and safety,
then by replacing byte-by-byte boundary scanning with 8 KB buffered reads, and finally
by eliminating the InputStream → Reader → String → BytesRef chain in favor of direct
byte[] scanning with zero-copy BytesRef views.

JMH results (ParallelParsingBenchmark, 200K lines)

Threads Baseline After Improvement
1 10.99 ms 4.02 ms -63%
2 12.06 ms 4.27 ms -65%
4 10.91 ms 3.60 ms -67%

Developed using AI-assisted tooling

@costin costin added >enhancement Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL v9.4.0 labels Mar 5, 2026
@costin costin requested a review from bpintea March 5, 2026 18:15
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @costin, I've created a changelog YAML for you.

@costin costin changed the title [ESQL] Add positional readBytes API to StorageObject SPI [ESQL] Columnar I/O and vectorized block conversion for external sources Mar 5, 2026
@costin costin added the ES|QL|DS ES|QL datasources label Mar 5, 2026
@elastic elastic deleted a comment from coderabbitai bot Mar 6, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Mar 6, 2026

Important

Review skipped

Auto reviews are limited based on label configuration.

🏷️ Required labels (at least one) (2)
  • Team:Delivery
  • Team:Search - Inference

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Path: .coderabbit.yml

Review profile: CHILL

Plan: Pro

Run ID: 6872b90d-1953-4b4d-a5ba-76dbe9eb14b0

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

costin added 9 commits March 6, 2026 11:53
Extend the datasource SPI with a positional readBytes(long, ByteBuffer)
method that enables zero-copy I/O for columnar formats like Parquet and
ORC, eliminating temporary byte array allocations and data duplication
when reading into ByteBuffers.
Replace element-by-element Block.Builder conversion in ORC with bulk
array copy + BlockFactory.newXxxArrayVector wrapping. Add shared
ColumnBlockConversions utility for reuse by future Parquet ColumnReader.
Fix Parquet getString/getBytes UTF-8 round-trip with getBinary.
…oding

Replace row-at-a-time Group materialization with ColumnReadStoreImpl
and ColumnReader to decode each column independently into typed arrays.
Eliminates Group object allocation, per-row type dispatch, and String
round-trips for binary columns.
The GcsStorageObject.readBytes opened a new ReadChannel per call,
overwhelming the mock GCS server during Parquet footer reads.
Simplify ParquetStorageObjectAdapter to use a single stream-based
I/O path for all reads including ByteBuffer operations.
…ders

Improve the ParallelParsingCoordinator's concurrency safety and
performance characteristics, informed by analysis of ClickHouse's
ParallelParsingInputFormat and DuckDB's CSV Parser 2.0 architectures.

Key changes:
- Handle RejectedExecutionException per-segment to prevent consumer
  deadlock when the executor is saturated or shut down
- Make closed field volatile for cross-thread visibility
- Add 60s timeout on allDone.await() to prevent indefinite blocking
- Increase minimum segment size from 64 KB to 1 MiB (per ClickHouse
  benchmarks showing 100 KB chunks are ~40% slower)
- Implement SegmentableFormatReader on CsvFormatReader with RFC 4180
  quote-aware boundary detection (handles escaped quotes "")
- Implement SegmentableFormatReader on NdJsonFormatReader with CRLF
  support
- Reuse CsvMapper across iterators (thread-safe after configuration)
- Increase BufferedReader buffer from 8 KB to 64 KB

JMH benchmark results (200K lines, in-memory, ParallelParsingBenchmark):

Before (baseline):
  parallelism=1: 10.991 ± 0.111 ms/op
  parallelism=2: 12.063 ± 1.162 ms/op
  parallelism=4: 10.913 ± 1.124 ms/op

After (this change):
  parallelism=1: 11.308 ± 0.603 ms/op
  parallelism=2: 11.545 ± 2.389 ms/op
  parallelism=4:  9.493 ± 1.056 ms/op

The 4-thread case shows ~13% improvement (10.913 → 9.493 ms/op) while
the 2-thread overhead is reduced from 9.7% to 2.1% over single-thread.
Replace byte-by-byte stream.read() in findNextRecordBoundary with 8KB
buffered reads to reduce virtual dispatch overhead during boundary
probing. Update benchmark reader to use 64KB BufferedReader buffer and
reuse BytesRef scratch buffer instead of allocating per line.

JMH (200K lines, ParallelParsingBenchmark):
  parallelism=1: 11.308 -> 8.381 ms/op (-26%)
  parallelism=2: 11.545 -> 8.628 ms/op (-25%)
  parallelism=4:  9.493 -> 6.978 ms/op (-26%)
Replace the InputStream -> BufferedReader -> readLine() -> String ->
BytesRef chain with direct byte[] buffer scanning. The segment is read
into a single byte[] via readAllBytes(), newlines are found by scanning
the buffer, and BytesRef views point directly into the buffer bytes for
zero-copy until the block builder does its one required copy.

This eliminates 2 byte[] allocations + 2 copies per line (String from
readLine + getBytes(UTF_8) for BytesRef) and removes the BufferedReader
and InputStreamReader wrappers entirely.

JMH (200K lines, ParallelParsingBenchmark):
  parallelism=1: 8.381 -> 4.020 ms/op (-52%)
  parallelism=2: 8.628 -> 4.272 ms/op (-50%)
  parallelism=4: 6.978 -> 3.602 ms/op (-48%)

Cumulative improvement from baseline (6.1):
  parallelism=1: 11.308 -> 4.020 ms/op (-64%)
  parallelism=4: 10.913 -> 3.602 ms/op (-67%)
Use Locale.ROOT to satisfy the forbidden-apis check that
disallows locale-sensitive String.format overloads.
@costin costin force-pushed the esql/group-b-channel-api branch from 634c8f5 to 6a5ba0b Compare March 6, 2026 09:57
@costin costin enabled auto-merge (squash) March 6, 2026 10:01
Copy link
Copy Markdown
Contributor

@bpintea bpintea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 Reviewed using AI tools.

public void setup() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < lineCount; i++) {
sb.append("line-").append(String.format("%08d", i)).append(",value-").append(i % 1000).append("\n");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised String.format(formatString) didn't trigger the CI.

@costin costin merged commit b0c54bc into elastic:main Mar 6, 2026
36 checks passed
@costin costin deleted the esql/group-b-channel-api branch March 6, 2026 14:19
szybia added a commit to szybia/elasticsearch that referenced this pull request Mar 6, 2026
…locations

* upstream/main: (153 commits)
  ES|QL: Update docs for TOP_SNIPPETS and DECAY (elastic#143739)
  Correctly include endpoint id in log msg in AuthorizationPoller (elastic#143743)
  Bar searching or sorting on _seq_no when disabled (elastic#143600)
  Generalize `testClientCancellation` test (elastic#143586)
  JSON_EXTRACT: zero-copy byte slicing for object, array, and number extraction (elastic#143702)
  Track recycler pages in circuit breaker (elastic#143738)
  [ESQL] Enable distributed pipeline breakers for external sources via FragmentExec (elastic#143696)
  Adding 'mode' and 'codec' fields to ES monitoring template (elastic#143673)
  [ESQL] Columnar I/O and vectorized block conversion for external sources (elastic#143703)
  Fix flaky MMR diversification YAML tests (elastic#143706)
  ES|QL codegen: check builder arguments for vector support (elastic#143724)
  Add Views Security Model (elastic#141050)
  ESQL: Prevent pushdown of unmapped fields in filters and sorts (elastic#143460)
  Don't run seq_no pruning tests in release CI (elastic#143725)
  ESQL: Support intra-row field references in ROW command (elastic#140217)
  ES|QL: Remove implicit limit in FORK branches in CSV tests (elastic#143601)
  IndexRoutingTests with and without synthetic id (elastic#143566)
  Synthetic id upgrade test in serverless (elastic#142471)
  Disable "Review skipped" comments for PRs without specified labels (elastic#143728)
  Cleanup ES|QL T-Digest code duplication, add memory accounting (elastic#143662)
  ...
sidosera pushed a commit to sidosera/elasticsearch that referenced this pull request Mar 6, 2026
…ces (elastic#143703)

Reduces overhead on the hot path for external datasource I/O across columnar formats
(ORC, Parquet) and text formats (CSV, NDJSON).

**I/O SPI**: Adds positional `readBytes(long, ByteBuffer)` to StorageObject so providers
can use native buffer I/O instead of stream-based reads. Local files use FileChannel,
GCS uses ReadChannel, others get a stream-based default.

**ORC**: Replaces per-element Block.Builder loops with bulk array operations via
`ColumnBlockConversions`. The no-null path does `Arrays.copyOf` + vector wrap; nulls
use a BitSet from the boolean mask.

**Parquet**: Replaces `GroupRecordConverter` row-at-a-time with `ColumnReadStoreImpl`
column-at-a-time decoding. Eliminates Group allocation, per-row dispatch, and the
`getString`/`getBytes` round-trip for binary columns.

**Parallel text parsing**: Introduces `SegmentableFormatReader` SPI and
`ParallelParsingCoordinator` for intra-file parallel parsing of line-oriented formats.
A file is split into byte-range segments at record boundaries, each segment is parsed on
a separate thread, and results are reassembled in order. CSV boundary detection is
quote-aware (RFC 4180, handles escaped `""`). The coordinator handles executor rejection,
uses volatile signaling for cross-thread close, and applies a bounded timeout on shutdown.

The parallel parsing path was iteratively optimized: first for correctness and safety,
then by replacing byte-by-byte boundary scanning with 8 KB buffered reads, and finally
by eliminating the `InputStream → Reader → String → BytesRef` chain in favor of direct
`byte[]` scanning with zero-copy `BytesRef` views.

### JMH results (`ParallelParsingBenchmark`, 200K lines)

| Threads | Baseline | After | Improvement |
|---|---|---|---|
| 1 | 10.99 ms | 4.02 ms | **-63%** |
| 2 | 12.06 ms | 4.27 ms | **-65%** |
| 4 | 10.91 ms | 3.60 ms | **-67%** |

Developed using AI-assisted tooling
costin added a commit to costin/elasticsearch that referenced this pull request Mar 9, 2026
Fix String.format to use Locale.ROOT in ParallelParsingBenchmark
(bpintea review comment). Add unit tests for findNextRecordBoundary
in CsvFormatReader and NdJsonFormatReader covering quote-aware
boundary detection, CRLF handling, buffer edge cases, and EOF.
Extend ColumnBlockConversionsTests with repeating null, array copy,
and larger-array edge cases for int/boolean/doubleFromLongs. Add
RangeStorageObjectTests for readBytes with non-zero position,
exact-size buffers, and partial reads.
costin added a commit that referenced this pull request Mar 10, 2026
Address review feedback from bpintea on PR #143703 and close test
coverage gaps identified in the columnar I/O and parallel parsing
changes.

Fix `String.format` to use `Locale.ROOT` in `ParallelParsingBenchmark`
(bpintea review comment). Add dedicated unit tests for the
`findNextRecordBoundary` implementations in `CsvFormatReader` (RFC 4180
quote-aware boundary detection, escaped quotes, buffer edge cases) and
`NdJsonFormatReader` (newline, CRLF, CR-only, buffer split). Extend
`ColumnBlockConversionsTests` with parity edge cases for
`intColumnFromLongs`, `booleanColumnFromLongs`, and
`doubleColumnFromLongs` (repeating null, array copy independence, larger
array than row count). Add `RangeStorageObjectTests` scenarios for
`readBytes` with non-zero position, exact-size buffers, and partial
reads from the middle of a range.

Developed using AI-assisted tooling.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL >enhancement ES|QL|DS ES|QL datasources Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants