-
Notifications
You must be signed in to change notification settings - Fork 222
[Feature Request] OpenSearchArrowClient (OSAC) to support consuming Arrow Flight Streams #1502
Description
Is your feature request related to a problem? Please describe
With Arrow Flight as a new transport and Streaming support with arrow format introduced in opensearch-project/OpenSearch#16679, I would like to propose OpenSearchArrowClient (OSAC) - enhancing the OpenSearch Java Client with Arrow Flight support. It delivers search and aggregation results as Arrow streams, reusing SearchRequest APIs and handling authentication transparently.
Describe the solution you'd like
Details on Arrow Flight java client - https://arrow.apache.org/docs/java/flight.html#
Motivation
Enable Arrow stream consumption for OpenSearch data.
Maintain consistency with existing request-building APIs.
Simplify auth for Flight interactions.
Long term, we would like to migrate and replace consumption of all OpenSearch APIs producing columnar data using OSAC.
Sample usage:
Search
OpenSearchArrowClient client = new OpenSearchArrowClient(
"localhost", 9200, "localhost", 50051, "admin", "password", true
);
SearchRequest request = new SearchRequest("logs-*")
.source(SearchSourceBuilder.searchSource()
.query(QueryBuilders.matchQuery("message", "error"))
.sort("timestamp", SortOrder.DESC)
.size(100));
try (FlightStream stream = client.searchAsStream(request)) {
while (stream.next()) {
VectorSchemaRoot root = stream.getRoot();
VarCharVector idVector = (VarCharVector) root.getVector("_id");
Float4Vector scoreVector = (Float4Vector) root.getVector("_score");
VarCharVector sourceVector = (VarCharVector) root.getVector("_source");
// Process batch
for (int i = 0; i < root.getRowCount(); i++) {
String id = new String(idVector.get(i));
float score = scoreVector.get(i);
String source = new String(sourceVector.get(i));
System.out.printf("ID: %s, Score: %.1f, Source: %s%n", id, score, source);
}
}
}
client.shutdown();| _id | _score | _source |
|---|---|---|
| log_2025-03-21_001 | 2.3 | {"timestamp": "2025-03-21T10:00:00", "message": "disk error"} |
| log_2025-03-21_002 | 1.9 | {"timestamp": "2025-03-21T09:55:12", "message": "network error"} |
| log_2025-03-20_003 | 1.7 | {"timestamp": "2025-03-20T23:45:00", "message": "auth error"} |
Aggregation
OpenSearchArrowClient client = new OpenSearchArrowClient(
"localhost", 9200, "localhost", 50051, false
);
TermsAggregationBuilder byRegion = AggregationBuilders.terms("by_region")
.field("region").size(5)
.subAggregation(AggregationBuilders.terms("by_category").field("category").size(5));
SearchRequest request = new SearchRequest("sales")
.source(SearchSourceBuilder.searchSource().aggregation(byRegion));
try (FlightStream stream = client.searchAsStream(request)) {
while (stream.next()) {
VectorSchemaRoot root = stream.getRoot();
VarCharVector regionVector = (VarCharVector) root.getVector("region");
VarCharVector categoryVector = (VarCharVector) root.getVector("category");
BigIntVector countVector = (BigIntVector) root.getVector("doc_count");
// Process batch
for (int i = 0; i < root.getRowCount(); i++) {
String region = new String(regionVector.get(i));
String category = new String(categoryVector.get(i));
long count = countVector.get(i);
System.out.printf("Region: %s, Category: %s, Count: %d%n", region, category, count);
}
}
}
client.shutdown();| region | category | doc_count |
|---|---|---|
| North | Electronics | 120 |
| North | Clothing | 85 |
| South | Electronics | 95 |
| South | Furniture | 60 |
| West | Books | 75 |
Note:
- Nested hits and aggregations can be handled using Arrow format, however, it is currently not described in the examples here. I will add more such examples as we will progress.
- Search hits and aggregation responses cannot be combined together as its columnar format with the rigid schema.
Next Steps
- Prototype OpenSearchArrowClient.
- Test with OpenSearch Flight server (#16691, #16963).
- Refine based on feedback.
Related component
Search:Query Capabilities
Describe alternatives you've considered
No response
Additional context
No response
Metadata
Metadata
Assignees
Labels
Type
Projects
Status
Status
