Skip to content

Concurrent Segment Search Overview and Aggs HLD #6798

@sohami

Description

@sohami

Is your feature request related to a problem? Please describe.
Concurrent Search plugin is added as a sandbox plugin with some pending work tracked here. This issue provides overview of current functionality and high level design options for supporting aggregation along with other open items that should be considered

Describe the solution you'd like

Background:

Today, in OpenSearch the indexing and search request is powered by Lucene library. An OpenSearch index consists of multiple shards and each of these shards represents a Lucene index. Each shard or Lucene index can have multiple segments in it. When a search request is served at an index level then it scatters the request to all of its shards (with an upper limit at per node level) and gather responses from all the shards to merge and create a final response to be returned to the user. On each shard the search request is executed sequentially over the segments in that shard. In Lucene, there is support for the concurrent search over multiple segments which was added in OpenSearch as an experimental sandbox plugin. Currently the concurrent search plugins doesn’t support Aggs and have few other gaps which are captured below and will create different issues for it eventually.

Overview of Sequential + Concurrent Segment Search:

In OpenSearch, for each search request the query execution on shard level looks something like below for the sequential query execution. The coordinator level search request can be performed in multiple phases: (i) CanMatch (optional) - Phase to pre-filter shards based on time range, (ii) DFS (optional) - To improve on the relevance of the document score by using global term frequency and document frequency instead of local shard level info (iii) Query - Actual search happens as part of this on each shard where only docIds are returned along with scores and (iv) Fetch - The requested fields for each document is retrieved post query phase. Among all the phases the most expensive will be query phase where all the scoring and document collection takes place across the segments. This is the phase which uses lucene search api and where concurrency is also introduced to search across different segments in the shard (or lucene index).

Current query phase execution have below operations which is performed on single search thread executing that phase.

QueryPhase

Example for a simple query:

QueryExample

With concurrent search support the query execution plan will look like below. A Segment Slice is a group of segments which are assigned to each concurrent searcher thread which will perform the search/collection over those assigned segments. Depending upon the document count and segment count all the segments are partitioned into multiple slices by the IndexSearcher. Lucene uses 2 parameters MAX_SEGMENTS_PER_SLICE (default to 5) and MAX_DOCS_PER_SLICE (default to 250K) to create the slice group. While the concurrent searcher threads are performing the search on their assigned slices, the main search thread also get the last slice assigned to it for execution. Upon completion it calls reduce on CollectorManager tree with the list of all the collectors tree created per slice. The reduce mechanism is the way to merge all the collected documents and create final documents list which is used to create or populate the query result in the end. Note: Today if there is any aggregation operation present in the search request the sequential path will be executed instead of concurrent search path

Concurrent segment search flow:
ConcurrentSearch

Design Concurrent Search for Aggs:

To support concurrent search model the operators or collectors will need to implement the CollectorManager interface provided by Lucene. Operators such as TopDocs, TopFields, TopScoreDocs, etc which are native to Lucene also provides mechanism to create the corresponding CollectorManager and perform reduce across leaf slices collectors in reduce phase. OpenSearch has to utilize those which are inherently supported and implement for others like
EarlyTerminatingCollector which are not native to Lucene. Similarly all the aggregation operators are not native to Lucene it is developed only in OpenSearch application layer using the Lucene collector interface.

Properties of Aggregators:

  • All the Aggregator operators in the OpenSearch is derived from BucketCollector class which implements the Collector interface of Lucene. BucketCollector is of 2 major types: 1) DeferringBucketCollector and 2) Aggregator where Aggregator is further split into 3 categories: a) MetricsAggregator b) BucketAggregators and c) NonCollectingAggregator (for fields with no mapping).
  • Aggregator tree can be nested with parent child relationship. For example: I can have a nested bucket aggregator where top level will collect all the docs across say 5 buckets and then within each bucket the child aggregator will collect 5 sub buckets and so on.
  • Global Aggregators are executed after all the other aggregators are executed during search phase. It works on all the documents instead of only documents meeting the search criterion.
  • After aggregator collectors (both global and non global ones) collect the documents during query/search phase it creates InternalAggregations object for the aggregation tree which is intermediate data structure to hold on to aggregation results. This result is returned to the coordinator to perform the final merge/reduce of results from all the shards

For supporting concurrency across all aggregators we can use one of the following options with recommended one being Option 3.

Option 1:

  • Provide CollectorManager for each of the aggregator or some mechanism of a common CollectorManager implementation which can take care of creating different Aggregator collectors (or BucketCollectors) per slice. The collectors of each slice needs to have its own state (or internal data structure) which will be populated with documents collected from that slice
  • We need mechanism for different Aggregators to be able to reduce the collected documents across slices in reduce phase. In the reduce phase, it needs to basically merge the results for each aggregator operator in the aggregation tree across all the slices. This can be tricky with nested aggregation operators and will be dependent on the implementation of each aggregations internal state or data structure used to collect the document.
  • As part of the reduce phase it needs to create the final state of all collected results/buckets in the aggregator tree in form of a single reduced aggregator tree similar to what it will look like when executed sequentially
  • After reduce phase, all the other operations on Aggregators (or BucketCollectors) can remain as is which is creating intermediate InternalAggregation objects to serialize and send to coordinator
  • For global aggregators which gets executed later on with matchAll query separately we will need to do different plumbing as that is executed after the above lucene search phase execution. This is because global aggregators work on all the documents whereas other works on the results returned by the query (if there is any filter like range or term matching, etc)

Pros:

  • No changes in coordinator layer
  • No changes in serialization protocol between coordinator and shards
  • Scalable: Collections and reduce happens concurrently at shard level where each shard performs operation for its own set of collectors. There is 2 phase of reduce: 1 at shard level then another finally at coordinator level (later is same as today)

Cons:

  • There are 50 aggregator operators and depending upon difference in the state of each aggregator we will need to understand and update each of the operator to implement the reduce phase. This can be both time consuming and error prone.

Option 2:

  • Have CollectorManager for Aggregator (same as above)
  • Don't perform the CollectorManager::reduce for Aggregation collectors on shards, create intermediate InternalAggregation data structures for list of Aggregation collectors (depending on number of slices) at each level of aggregation tree as compared to a single Aggregation collector today
  • Update coordinator to handle the list of InternalAggregation per aggregator operator in the tree (instead of 1) from a shard and perform the merge/reduce on these InternalAggregation object. Coordinator must already be performing this across the shards. For simplicity, we can think of this as it is performing merges of InternalAggregation list with 1 object per shard. Now it needs to perform merges for list of list of InternalAggregation with 1 list of objects per shard. This will be true for each level of the aggregation tree and for all the different aggregations in the request.

Pros:

  • Will be using existing mechanism to reduce the Aggregations intermediate results at coordinator layer instead of implementing reduce at each collector of an Aggregation
  • Simple and faster to implement compared to Option 1.

Cons:

  • Not scalable: We are increasing the number of InternalAggregation that needs to be reduced at coordinator by shard x slice count per shard. So in cases when we want to have high parallelism (or slice count) coordinator will become the bottleneck. This is because reduce will only happen at that level instead of 2 phase of reduce. This can also cause increase in consumed memory + CPU on coordinator as count of the entries from each shard will also increase as there can be duplicate entries for same bucket (i.e. histogram buckets) for each slice. For keeping memory under check we can lower the batch limit of the buffered aggregations (default is DEFAULT_BATCHED_REDUCE_SIZE = 512) such that reduce happens more frequently but that will still consume the CPU cycles.

Option 3 (Recommended):

  • Do hybrid of Option 1 and Option 2 where collect phase remain same and reduce on Aggregation collectors doesn’t happen as part of CollectorManager::reduce in lucene search phase.
  • After all collection is performed and the intermediate data structure of InternalAggregation is created. First level of merges/reduce happen at the shard level to merge the InternalAggregation across slices on the shard. Then it sends the merged output to the coordinator
  • Coordinator will again perform the reduce for results across shards which is same as current behavior

Pros:

  • All the pros of option 1 and option 2
    • No changes in coordinator layer
    • No changes in serialization protocol between coordinator and shards
    • Faster to implement as well by reusing the existing merge/reduce mechanism
  • Scalable: Collection happens concurrently at shard level where each shard performs operation for its own set of collectors. Reduce happens in 2 phases: a) at shard level then another b) at coordinator level (later is same as today) which will not add extra work or use more resources on the coordinator compared to Option 2.

Cons:

  • May need to handle the profile scenario differently here as the reduce phase is happening outside the Lucene search API call.

Option 4:

  • Have CollectorManager for Aggregator (same as above)
  • Instead of performing the reduce operation on each aggregator collector, make the internal data structure which needs to be shared across slices thread safe

Pros:

  • Can have smaller memory footprint at the cost of lock contention when multiple slice threads are trying to update the same shared data structure

Cons:

  • Will be complex to implement and ensure no new bugs creep in. Most of the internal structures are implemented in OpenSearch and is not using the generic data structures which can be replaced by their concurrent versions. With the complex nested aggregation tree this can become difficult to understand and handle all the cases. For example: Buckets in histograms is implemented as some sort of hash table using the Big Arrays
  • We may not see benefit from concurrent aggregation execution because of the locking in the shared data structure

Concurrent segment search with Aggs:

Concurrent Search with Aggs

Different Agg operators supported in OpenSearch
Aggregation Operators

Describe alternatives you've considered
All the options are listed above

Additional context
Different tasks which will be done in incremental way in addition to what is called out in META issue

  • Understanding concurrent search implementation and current Aggregator support in OpenSearch
  • High Level Design Option for supporting Aggregation with concurrent segment search
  • Prototype for recommended option
  • Implementation for recommended approach without profile support
  • Profile support for aggregation with concurrent search
  • Implementing Global Aggregation executed post query phase in concurrent manner
  • Support to dynamically enable/disable the concurrent search path
  • Add mechanism to change slice calculation by dynamically updating max documents and segment count considered for each slice
  • Enable all the search ITs/UTs with and without concurrent segment search
  • OSB Benchmark to be performed with concurrent search enabled
  • Explore any new metrics that can be added for concurrent segment search like threadpool stats, number of search using concurrent vs sequential path, etc
  • Explore changes needed for Task Resource tracking framework and search back pressure with concurrent segment search
  • Some future work which can be considered as well are: i) Dynamically choose to perform concurrent search on requests based on resource availability on a node.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Search:PerformancediscussIssues intended to help drive brainstorming and decision makingenhancementEnhancement or improvement to existing feature or request

    Type

    No type

    Projects

    Status

    Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions