Skip to content

Commit 6acdb30

Browse files
costinbpintea
andauthored
[ESQL] Introduce pluggable external datasource framework (#141678)
Enable ESQL to query data outside Elasticsearch indices via the EXTERNAL command. The framework decouples data location (storage) from data format, allowing queries over S3, HTTP, local files, and Iceberg data lakes in formats such as CSV and Parquet. - StorageProvider/StorageObject SPIs for protocol-agnostic byte access; implementations for HTTP, S3, local filesystem with range reads - FormatReader SPI for parsing bytes into ESQL Pages; CSV and Parquet implementations with schema inference and column projection - DataSourcePlugin SPI and DataSourceModule for plugin discovery - Extracted plugins: esql-datasource-http, esql-datasource-csv, esql-datasource-s3, esql-datasource-parquet, esql-datasource-iceberg - ExternalSourceResolver, OperatorFactoryRegistry, AsyncExternalSourceOperatorFactory for resolution and execution - Iceberg TableCatalog with filter pushdown; generic ExternalRelation/ ExternalSourceExec replacing Iceberg-specific plan nodes - GlobExpander, GlobMatcher, FileSet for wildcard file discovery - StorageProviderRegistry per-query config; WITH-clause propagation end-to-end Developed using AI-assisted tooling Co-authored-by: Bogdan Pintea <bogdan.pintea@elastic.co>
1 parent ce37e1a commit 6acdb30

228 files changed

Lines changed: 30005 additions & 3609 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build-tools-internal/version.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ cuvs_java = 25.12.0
2424
ldapsdk = 7.0.3
2525

2626
antlr4 = 4.13.1
27+
iceberg = 1.10.1
2728
# bouncy castle version for non-fips. fips jars use a different version
2829
bouncycastle=1.79
2930
# used by security and idp (need to be in sync due to cross-dependency in testing)

gradle/verification-metadata.xml

Lines changed: 295 additions & 0 deletions
Large diffs are not rendered by default.

test/fixtures/s3-fixture/src/main/java/fixture/s3/S3HttpHandler.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ public void handle(final HttpExchange exchange) throws IOException {
118118
if (blob == null) {
119119
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
120120
} else {
121+
// HEAD response must include Content-Length header for S3 clients (AWS SDK) that read file size
122+
exchange.getResponseHeaders().add("Content-Length", String.valueOf(blob.length()));
123+
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
121124
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1);
122125
}
123126
} else if (request.isListMultipartUploadsRequest()) {
@@ -181,6 +184,9 @@ public void handle(final HttpExchange exchange) throws IOException {
181184
exchange.sendResponseHeaders(RestStatus.NOT_FOUND.getStatus(), -1);
182185
} else {
183186
var range = parsePartRange(exchange);
187+
if (range.end() == null) {
188+
throw new AssertionError("Copy-part range must specify an end: " + range);
189+
}
184190
int start = Math.toIntExact(range.start());
185191
int len = Math.toIntExact(range.end() - range.start() + 1);
186192
var part = sourceBlob.slice(start, len);
@@ -379,16 +385,15 @@ public void handle(final HttpExchange exchange) throws IOException {
379385
return;
380386
}
381387

382-
// S3 supports https://www.rfc-editor.org/rfc/rfc9110.html#name-range. The AWS SDK v1.x seems to always generate range
383-
// requests with a header value like "Range: bytes=start-end" where both {@code start} and {@code end} are always defined
384-
// (sometimes to very high value for {@code end}). It would be too tedious to fully support the RFC so S3HttpHandler only
385-
// supports when both {@code start} and {@code end} are defined to match the SDK behavior.
388+
// S3 supports https://www.rfc-editor.org/rfc/rfc9110.html#name-range
389+
// This handler supports both bounded ranges (bytes=0-100) and open-ended ranges (bytes=100-)
386390
final HttpHeaderParser.Range range = parseRangeHeader(rangeHeader);
387391
if (range == null) {
388392
throw new AssertionError("Bytes range does not match expected pattern: " + rangeHeader);
389393
}
390394
long start = range.start();
391-
long end = range.end();
395+
// For open-ended ranges (bytes=N-), end is null, meaning "to end of file"
396+
long end = range.end() != null ? range.end() : blob.length() - 1;
392397
if (end < start) {
393398
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
394399
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), blob.length());

test/framework/src/main/java/org/elasticsearch/test/fixture/HttpHeaderParser.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,18 @@
1515
public enum HttpHeaderParser {
1616
;
1717

18-
private static final Pattern RANGE_HEADER_PATTERN = Pattern.compile("bytes=([0-9]+)-([0-9]+)");
18+
// Pattern supports both bounded ranges (bytes=0-100) and open-ended ranges (bytes=100-)
19+
private static final Pattern RANGE_HEADER_PATTERN = Pattern.compile("bytes=([0-9]+)-([0-9]*)");
1920
private static final Pattern CONTENT_RANGE_HEADER_PATTERN = Pattern.compile("bytes (?:(\\d+)-(\\d+)|\\*)/(?:(\\d+)|\\*)");
2021

2122
/**
2223
* Parse a "Range" header
2324
*
24-
* Note: only a single bounded range is supported (e.g. <code>Range: bytes={range_start}-{range_end}</code>)
25+
* Supports both bounded and open-ended ranges:
26+
* <ul>
27+
* <li>Bounded: <code>Range: bytes={range_start}-{range_end}</code></li>
28+
* <li>Open-ended: <code>Range: bytes={range_start}-</code> (end is null, meaning "to end of file")</li>
29+
* </ul>
2530
*
2631
* @see <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Range">MDN: Range header</a>
2732
* @param rangeHeaderValue The header value as a string
@@ -31,18 +36,38 @@ public static Range parseRangeHeader(String rangeHeaderValue) {
3136
final Matcher matcher = RANGE_HEADER_PATTERN.matcher(rangeHeaderValue);
3237
if (matcher.matches()) {
3338
try {
34-
return new Range(Long.parseLong(matcher.group(1)), Long.parseLong(matcher.group(2)));
39+
long start = Long.parseLong(matcher.group(1));
40+
String endGroup = matcher.group(2);
41+
Long end = (endGroup == null || endGroup.isEmpty()) ? null : Long.parseLong(endGroup);
42+
return new Range(start, end);
3543
} catch (NumberFormatException e) {
3644
return null;
3745
}
3846
}
3947
return null;
4048
}
4149

42-
public record Range(long start, long end) {
50+
/**
51+
* A HTTP "Range" from a Range header.
52+
*
53+
* @param start The start of the range (always present)
54+
* @param end The end of the range, or null for open-ended ranges (meaning "to end of file")
55+
*/
56+
public record Range(long start, Long end) {
57+
58+
public Range(long start, long end) {
59+
this(start, (Long) end);
60+
}
61+
62+
/**
63+
* Returns true if this is an open-ended range (no end specified).
64+
*/
65+
public boolean isOpenEnded() {
66+
return end == null;
67+
}
4368

4469
public String headerString() {
45-
return "bytes=" + start + "-" + end;
70+
return end != null ? "bytes=" + start + "-" + end : "bytes=" + start + "-";
4671
}
4772
}
4873

test/framework/src/test/java/org/elasticsearch/http/HttpHeaderParserTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ public void testParseRangeHeaderMultipleRangesNotMatched() {
4343
);
4444
}
4545

46-
public void testParseRangeHeaderEndlessRangeNotMatched() {
47-
assertNull(HttpHeaderParser.parseRangeHeader(Strings.format("bytes=%d-", randomLongBetween(0, Long.MAX_VALUE))));
46+
public void testParseRangeHeaderEndlessRange() {
47+
var bytes = randomLongBetween(0, Long.MAX_VALUE);
48+
assertEquals(new HttpHeaderParser.Range(bytes, null), HttpHeaderParser.parseRangeHeader(Strings.format("bytes=%d-", bytes)));
4849
}
4950

5051
public void testParseRangeHeaderSuffixLengthNotMatched() {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
apply plugin: 'elasticsearch.internal-es-plugin'
9+
apply plugin: 'elasticsearch.publish'
10+
11+
esplugin {
12+
name = 'esql-datasource-csv'
13+
description = 'CSV format support for ESQL external data sources'
14+
classname = 'org.elasticsearch.xpack.esql.datasource.csv.CsvDataSourcePlugin'
15+
extendedPlugins = ['x-pack-esql']
16+
}
17+
18+
base {
19+
archivesName = 'esql-datasource-csv'
20+
}
21+
22+
dependencies {
23+
// SPI interfaces from ESQL core
24+
compileOnly project(path: xpackModule('esql'))
25+
compileOnly project(path: xpackModule('esql-core'))
26+
compileOnly project(path: xpackModule('core'))
27+
compileOnly project(':server')
28+
compileOnly project(xpackModule('esql:compute'))
29+
30+
// Jackson CSV for CSV format reader
31+
implementation "com.fasterxml.jackson.dataformat:jackson-dataformat-csv:${versions.jackson}"
32+
33+
testImplementation project(':test:framework')
34+
testImplementation(testArtifact(project(xpackModule('core'))))
35+
}
36+
37+
tasks.named("dependencyLicenses").configure {
38+
mapping from: /jackson-.*/, to: 'jackson'
39+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
This copy of Jackson JSON processor streaming parser/generator is licensed under the
2+
Apache (Software) License, version 2.0 ("the License").
3+
See the License for details about distribution rights, and the
4+
specific rights regarding derivate works.
5+
6+
You may obtain a copy of the License at:
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# Jackson JSON processor
2+
3+
Jackson is a high-performance, Free/Open Source JSON processing library.
4+
It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
5+
been in development since 2007.
6+
It is currently developed by a community of developers, as well as supported
7+
commercially by FasterXML.com.
8+
9+
## Licensing
10+
11+
Jackson core and extension components may licensed under different licenses.
12+
To find the details that apply to this artifact see the accompanying LICENSE file.
13+
For more information, including possible other licensing options, contact
14+
FasterXML.com (http://fasterxml.com).
15+
16+
## Credits
17+
18+
A list of contributors may be found from CREDITS file, which is included
19+
in some artifacts (usually source distributions); but is always available
20+
from the source code management (SCM) system project uses.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
apply plugin: 'elasticsearch.internal-java-rest-test'
9+
apply plugin: org.elasticsearch.gradle.internal.precommit.CheckstylePrecommitPlugin
10+
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenApisPrecommitPlugin
11+
apply plugin: org.elasticsearch.gradle.internal.precommit.ForbiddenPatternsPrecommitPlugin
12+
apply plugin: org.elasticsearch.gradle.internal.precommit.FilePermissionsPrecommitPlugin
13+
apply plugin: org.elasticsearch.gradle.internal.precommit.LoggerUsagePrecommitPlugin
14+
apply plugin: org.elasticsearch.gradle.internal.precommit.TestingConventionsPrecommitPlugin
15+
16+
dependencies {
17+
// Test fixtures and spec reader infrastructure
18+
javaRestTestImplementation project(xpackModule('esql:qa:testFixtures'))
19+
javaRestTestImplementation project(xpackModule('esql:qa:server'))
20+
javaRestTestImplementation project(xpackModule('esql'))
21+
javaRestTestImplementation(project(path: xpackModule('esql'), configuration: 'testRuntimeElements'))
22+
23+
// S3 fixture infrastructure for mocking S3 operations
24+
javaRestTestImplementation project(':test:fixtures:s3-fixture')
25+
javaRestTestImplementation project(':test:fixtures:aws-fixture-utils')
26+
27+
// Repository S3 module for cluster
28+
clusterModules project(':modules:repository-s3')
29+
clusterPlugins project(':plugins:mapper-size')
30+
clusterPlugins project(':plugins:mapper-murmur3')
31+
32+
// The CSV datasource plugin under test
33+
clusterPlugins project(xpackModule('esql-datasource-csv'))
34+
clusterPlugins project(xpackModule('esql-datasource-http'))
35+
clusterPlugins project(xpackModule('esql-datasource-s3'))
36+
}
37+
38+
// The CSV fixtures (employees.csv and csv-basic.csv-spec) are included
39+
// directly in this module's javaRestTest/resources directory
40+
41+
tasks.named('javaRestTest') {
42+
usesDefaultDistribution("to be triaged")
43+
maxParallelForks = 1
44+
45+
// Increase timeouts for S3 operations which may take longer than standard queries
46+
systemProperty 'tests.rest.client_timeout', '60'
47+
systemProperty 'tests.rest.socket_timeout', '60'
48+
49+
// Enable more verbose logging for debugging
50+
testLogging {
51+
events = ["passed", "skipped", "failed"]
52+
exceptionFormat = "full"
53+
showStandardStreams = false
54+
}
55+
}
56+
57+
restResources {
58+
restApi {
59+
include '_common', 'bulk', 'get', 'indices', 'esql', 'xpack', 'cluster', 'capabilities', 'index'
60+
}
61+
restTests {
62+
includeXpack 'esql'
63+
}
64+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.qa.csv;
9+
10+
import org.elasticsearch.core.PathUtils;
11+
import org.elasticsearch.test.cluster.ElasticsearchCluster;
12+
import org.elasticsearch.test.cluster.local.LocalClusterConfigProvider;
13+
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
14+
15+
import java.net.URISyntaxException;
16+
import java.net.URL;
17+
import java.util.function.Supplier;
18+
19+
import static org.elasticsearch.xpack.esql.datasources.S3FixtureUtils.ACCESS_KEY;
20+
import static org.elasticsearch.xpack.esql.datasources.S3FixtureUtils.SECRET_KEY;
21+
22+
/**
23+
* Cluster configuration for CSV integration tests.
24+
*/
25+
public class Clusters {
26+
27+
public static ElasticsearchCluster testCluster(Supplier<String> s3EndpointSupplier, LocalClusterConfigProvider configProvider) {
28+
return ElasticsearchCluster.local()
29+
.distribution(DistributionType.DEFAULT)
30+
.shared(true)
31+
// Enable S3 repository plugin for S3 access
32+
.module("repository-s3")
33+
// Basic cluster settings
34+
.setting("xpack.security.enabled", "false")
35+
.setting("xpack.license.self_generated.type", "trial")
36+
// Disable ML to avoid native code loading issues in some environments
37+
.setting("xpack.ml.enabled", "false")
38+
// Allow the LOCAL storage backend to read fixture files from the test resources directory.
39+
// The esql-datasource-http plugin's entitlement policy uses shared_repo for file read access.
40+
.setting("path.repo", fixturesPath())
41+
// S3 client configuration for accessing the S3HttpFixture
42+
.setting("s3.client.default.endpoint", s3EndpointSupplier)
43+
// S3 credentials must be stored in keystore, not as regular settings
44+
.keystore("s3.client.default.access_key", ACCESS_KEY)
45+
.keystore("s3.client.default.secret_key", SECRET_KEY)
46+
// Disable SSL for HTTP fixture
47+
.setting("s3.client.default.protocol", "http")
48+
// Disable AWS SDK profile file loading by pointing to non-existent files
49+
// This prevents the SDK from trying to read ~/.aws/credentials and ~/.aws/config
50+
// which would violate Elasticsearch entitlements
51+
.environment("AWS_CONFIG_FILE", "/dev/null/aws/config")
52+
.environment("AWS_SHARED_CREDENTIALS_FILE", "/dev/null/aws/credentials")
53+
// Apply any additional configuration
54+
.apply(() -> configProvider)
55+
.build();
56+
}
57+
58+
public static ElasticsearchCluster testCluster(Supplier<String> s3EndpointSupplier) {
59+
return testCluster(s3EndpointSupplier, config -> {});
60+
}
61+
62+
private static String fixturesPath() {
63+
URL resourceUrl = Clusters.class.getResource("/iceberg-fixtures");
64+
if (resourceUrl != null && resourceUrl.getProtocol().equals("file")) {
65+
try {
66+
return PathUtils.get(resourceUrl.toURI()).toAbsolutePath().toString();
67+
} catch (URISyntaxException e) {
68+
throw new IllegalStateException("Failed to resolve fixtures path", e);
69+
}
70+
}
71+
// Fall back to a safe default; LOCAL tests will fail gracefully
72+
return "/tmp";
73+
}
74+
}

0 commit comments

Comments
 (0)