Skip to content

[Feature Request] OpenSearchArrowClient (OSAC) to support consuming Arrow Flight Streams #1502

@rishabhmaurya

Description

@rishabhmaurya

Is your feature request related to a problem? Please describe

With Arrow Flight as a new transport and Streaming support with arrow format introduced in opensearch-project/OpenSearch#16679, I would like to propose OpenSearchArrowClient (OSAC) - enhancing the OpenSearch Java Client with Arrow Flight support. It delivers search and aggregation results as Arrow streams, reusing SearchRequest APIs and handling authentication transparently.

Describe the solution you'd like

Details on Arrow Flight java client - https://arrow.apache.org/docs/java/flight.html#

Motivation

Enable Arrow stream consumption for OpenSearch data.
Maintain consistency with existing request-building APIs.
Simplify auth for Flight interactions.

Long term, we would like to migrate and replace consumption of all OpenSearch APIs producing columnar data using OSAC.

Image
OSAC SeqDiag.txt

Sample usage:

Search

OpenSearchArrowClient client = new OpenSearchArrowClient(
    "localhost", 9200, "localhost", 50051, "admin", "password", true
);

SearchRequest request = new SearchRequest("logs-*")
    .source(SearchSourceBuilder.searchSource()
        .query(QueryBuilders.matchQuery("message", "error"))
        .sort("timestamp", SortOrder.DESC)
        .size(100));

try (FlightStream stream = client.searchAsStream(request)) {
    while (stream.next()) {
        VectorSchemaRoot root = stream.getRoot();
        VarCharVector idVector = (VarCharVector) root.getVector("_id");
        Float4Vector scoreVector = (Float4Vector) root.getVector("_score");
        VarCharVector sourceVector = (VarCharVector) root.getVector("_source");
        
        // Process batch
        for (int i = 0; i < root.getRowCount(); i++) {
            String id = new String(idVector.get(i));
            float score = scoreVector.get(i);
            String source = new String(sourceVector.get(i));
            System.out.printf("ID: %s, Score: %.1f, Source: %s%n", id, score, source);
        }
    }
}
client.shutdown();
_id _score _source
log_2025-03-21_001 2.3 {"timestamp": "2025-03-21T10:00:00", "message": "disk error"}
log_2025-03-21_002 1.9 {"timestamp": "2025-03-21T09:55:12", "message": "network error"}
log_2025-03-20_003 1.7 {"timestamp": "2025-03-20T23:45:00", "message": "auth error"}

Aggregation

OpenSearchArrowClient client = new OpenSearchArrowClient(
    "localhost", 9200, "localhost", 50051, false
);

TermsAggregationBuilder byRegion = AggregationBuilders.terms("by_region")
    .field("region").size(5)
    .subAggregation(AggregationBuilders.terms("by_category").field("category").size(5));
SearchRequest request = new SearchRequest("sales")
    .source(SearchSourceBuilder.searchSource().aggregation(byRegion));

try (FlightStream stream = client.searchAsStream(request)) {
    while (stream.next()) {
        VectorSchemaRoot root = stream.getRoot();
        VarCharVector regionVector = (VarCharVector) root.getVector("region");
        VarCharVector categoryVector = (VarCharVector) root.getVector("category");
        BigIntVector countVector = (BigIntVector) root.getVector("doc_count");
        
        // Process batch
        for (int i = 0; i < root.getRowCount(); i++) {
            String region = new String(regionVector.get(i));
            String category = new String(categoryVector.get(i));
            long count = countVector.get(i);
            System.out.printf("Region: %s, Category: %s, Count: %d%n", region, category, count);
        }
    }
}
client.shutdown();
region category doc_count
North Electronics 120
North Clothing 85
South Electronics 95
South Furniture 60
West Books 75

Note:

  1. Nested hits and aggregations can be handled using Arrow format, however, it is currently not described in the examples here. I will add more such examples as we will progress.
  2. Search hits and aggregation responses cannot be combined together as its columnar format with the rigid schema.

Next Steps

  • Prototype OpenSearchArrowClient.
  • Test with OpenSearch Flight server (#16691, #16963).
  • Refine based on feedback.

Related component

Search:Query Capabilities

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Projects

Status

🆕 New

Status

In Progress

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions