Optimize the composite aggregation for match_all and range queries#28745
Optimize the composite aggregation for match_all and range queries#28745jimczi merged 24 commits intoelastic:masterfrom
Conversation
This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values
present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate
the collection when the leading source value is greater than the lowest value in the queue.
Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents
in the order of the values present in the leading source.
For instance the following aggregation:
```
"composite" : {
"sources" : [
{ "value1": { "terms" : { "field": "timestamp", "order": "asc" } } }
],
"size": 10
}
```
... can use the field `timestamp` to collect the documents with the 10 lowest values for the field instead of visiting all documents.
For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough
composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it
is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited.
This mode can execute iff:
* The leading source in the composite definition uses an indexed field of type `date` (works also with `date_histogram` source), `integer`, `long` or `keyword`.
* The query is a match_all query or a range query over the field that is used as the leading source in the composite definition.
* The sort order of the leading source is the natural order (ascending since postings and numerics are sorted in ascending order only).
If these conditions are not met this aggregation visits each document like any other agg.
|
|
||
| @Override | ||
| public boolean needsScores() { | ||
| if (collector == null) { |
There was a problem hiding this comment.
It seems that this should really never occur? Should we make this an assertion instead?
There was a problem hiding this comment.
++, I replaced it with an assert.
| protected boolean processBucket(LeafReaderContext context, LeafBucketCollector sub, | ||
| DocIdSetIterator iterator, Comparable<?> leadSourceBucket) throws IOException { | ||
| final int[] topCompositeCollected = new int[1]; | ||
| final boolean[] hasCollected = new boolean[1]; |
There was a problem hiding this comment.
why do these need to be arrays if they only contain a single element?
There was a problem hiding this comment.
Because the values are set in the anonymous class below, I find it nicer than using Atomic and I saw this pattern in other locations in the codebase.
| // of the composite definition and terminates when the leading source value is guaranteed to be | ||
| // greater than the lowest composite bucket in the queue. | ||
| sortedBucketProducer.processLeaf(context.query(), ctx, sub); | ||
| throw new CollectionTerminatedException(); |
There was a problem hiding this comment.
I wonder if it would be clearer for the SortedBucketProducer to throw the exception rather than throwing it here?
There was a problem hiding this comment.
I pushed a commit that adds a comment regarding why we throw an exception here.
colings86
left a comment
There was a problem hiding this comment.
LGTM (when the build gets to green 😄)
jpountz
left a comment
There was a problem hiding this comment.
I still need to dig more to fully understand how it works but I like the idea. Some comments:
- As we are adding more abstractions to the implementation to support optimizations, I wish we had dedicated tests for these abstractions too like the sorted docs producer, the queue, etc.
- Can we also disable this optimization when there is a high ratio of deleted docs? I don't think that would be a problem in general as merges make sure there are no more than 50% of deleted documents, but this doesn't hold anymore for users of security since hidden docs appear as deleted. Maybe also reauires that numDocs / maxDoc > 0.5?
| if (needsScores) { | ||
| Scorer scorer = weight.scorer(entry.context); | ||
| // We don't need to check if the scorer is null | ||
| // since we are sure that there are documents to replay (docIdSetIterator it not empty). |
There was a problem hiding this comment.
how do we know it is not empty?
| private boolean afterValueSet = false; | ||
|
|
||
| /** | ||
| * Ctr |
…ctly and adds tests
|
Thanks for looking @jpountz . I've pushed some commits to address your comments, the optimization is disabled when there are more than 50% of deleted documents and I've added more tests for the new abstractions. Can you take another look ? |
|
Jim and I discussed moving away from the deferring framework so that it looks a bit less weird eg. due to feeding collect with out-of-order ids. |
|
I pushed a commit that rewrites the deferring framework, @jpountz can you take another look ? |
jpountz
left a comment
There was a problem hiding this comment.
I still need to pursue this review (this is a big change!) but at first sight I find it more readable than the previous version!
| afterValue = (BytesRef) value; | ||
| } else if (value.getClass() == String.class) { | ||
| afterValue = new BytesRef((String) value); | ||
| } else { |
There was a problem hiding this comment.
do we need to accept both BytesRef and String?
| private LeafBucketCollector getSecondPassCollector(LeafBucketCollector subCollector) { | ||
| return new LeafBucketCollector() { | ||
| @Override | ||
| public void collect(int doc, long bucket) throws IOException { |
There was a problem hiding this comment.
let's assert that bucket is 0?
| return new LeafBucketCollector() { | ||
| @Override | ||
| public void collect(int doc, long bucket) throws IOException { | ||
| currentValue = filterValue; |
There was a problem hiding this comment.
do we need to set it for every doc?
| if (dvs.advanceExact(doc)) { | ||
| int num = dvs.docValueCount(); | ||
| for (int i = 0; i < num; i++) { | ||
| currentValue = dvs.nextValue(); |
There was a problem hiding this comment.
this means currentValue will always be the higher value in case of a multi-valued field, is that ok?
There was a problem hiding this comment.
currentValue is only valid for the current composite bucket, next.collect() below will fill the other sources's currentValue and the last collector in the chain will check if the final composite bucket should be added in the queue. We don't use currentValue outside of these recursive calls.
jpountz
left a comment
There was a problem hiding this comment.
OK, I had a more thorough review and I like the change in general. There is one or two places where it might make too strong assumptions about the value of the cost but other than that it looks good to me. I'd also like to see more comments to explain how things work. I suggested some javadocs improvements.
| } | ||
|
|
||
| /** | ||
| * The type of this source. |
There was a problem hiding this comment.
maybe mention how it's used?
| abstract String type(); | ||
|
|
||
| /** | ||
| * Copies the current value in <code>slot</code>. |
There was a problem hiding this comment.
maybe say how it's supposed to know about the current value?
| abstract int compare(int from, int to); | ||
|
|
||
| /** | ||
| * Compares the current value with the value in <code>slot</code>. |
There was a problem hiding this comment.
maybe say that the current value is the one from the last copyCurrent call?
| deferredCollectors.preCollection(); | ||
| for (Entry entry : entries) { | ||
| DocIdSetIterator docIdSetIterator = entry.docIdSet.iterator(); | ||
| if (docIdSetIterator == null || docIdSetIterator.cost() == 0) { |
There was a problem hiding this comment.
using cost()==0 is a bit unsafe since cost() may be completely off
| * @param vs | ||
| * @param format | ||
| * @param order | ||
| */ |
| final int[] topCompositeCollected = new int[1]; | ||
| final boolean[] hasCollected = new boolean[1]; | ||
| int cost = (int) iterator.cost(); | ||
| final DocIdSetBuilder.BulkAdder adder = builder != null ? builder.grow(cost) : null; |
There was a problem hiding this comment.
This optimization is unsafe since cost may be inaccurate. I would be ok if it was only called with postings, but it looks like this method is sometimes called with the result of DocIdSetBuilder.finish which gives approximate costs in the dense case.
|
@jpountz I pushed more changes to fix the issue with the cost approximation and added some javadocs. I think it's ready for another round ;) |
| // we need to add the matching document in the builder | ||
| // so we build a bulk adder from the approximate cost of the iterator | ||
| // and rebuild the adder during the collection if needed | ||
| int remainingBits = (int) iterator.cost(); |
There was a problem hiding this comment.
Let's do a Math.min(cost, Integer.MAX_VALUE) rather than a blind cast?
|
|
||
| @Override | ||
| public void grow(int count) { | ||
| remaining = count; |
There was a problem hiding this comment.
I don't think you need to count the number of remaining docs here, the BKD tree does it for you
There was a problem hiding this comment.
I need it because we build one doc id set per bucket (not per bkd leaf) so if the values are different inside a leaf I need to know the number of remaining docs in that leaf to create the new doc id set builder.
There was a problem hiding this comment.
I see, thanks for explaining.
| // we need to add the matching document in the builder | ||
| // so we build a bulk adder from the approximate cost of the iterator | ||
| // and rebuild the adder during the collection if needed | ||
| int remainingBits = (int) iterator.cost(); |
There was a problem hiding this comment.
let's do min(Integer.MAX_VALUE, iterator.cost())
… doc id set builder
…28745) This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate the collection when the leading source value is greater than the lowest value in the queue. Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents in the order of the values present in the leading source. For instance the following aggregation: ``` "composite" : { "sources" : [ { "value1": { "terms" : { "field": "timestamp", "order": "asc" } } } ], "size": 10 } ``` ... can use the field `timestamp` to collect the documents with the 10 lowest values for the field instead of visiting all documents. For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited. This mode can execute iff: * The leading source in the composite definition uses an indexed field of type `date` (works also with `date_histogram` source), `integer`, `long` or `keyword`. * The query is a match_all query or a range query over the field that is used as the leading source in the composite definition. * The sort order of the leading source is the natural order (ascending since postings and numerics are sorted in ascending order only). If these conditions are not met this aggregation visits each document like any other agg.
`allow_partial_search_results` is not needed for these tests.
* master: (40 commits) Do not optimize append-only if seen normal op with higher seqno (elastic#28787) [test] packaging: gradle tasks for groovy tests (elastic#29046) Prune only gc deletes below local checkpoint (elastic#28790) remove testUnassignedShardAndEmptyNodesInRoutingTable elastic#28745: remove extra option in the composite rest tests Fold EngineDiskUtils into Store, for better lock semantics (elastic#29156) Add file permissions checks to precommit task Remove execute mode bit from source files Optimize the composite aggregation for match_all and range queries (elastic#28745) [Docs] Add rank_eval size parameter k (elastic#29218) [DOCS] Remove ignore_z_value parameter link Docs: Update docs/index_.asciidoc (elastic#29172) Docs: Link C++ client lib elasticlient (elastic#28949) [DOCS] Unregister repository instead of deleting it (elastic#29206) Docs: HighLevelRestClient#multiSearch (elastic#29144) Add Z value support to geo_shape Remove type casts in logging in server component (elastic#28807) Change BroadcastResponse from ToXContentFragment to ToXContentObject (elastic#28878) REST : Split `RestUpgradeAction` into two actions (elastic#29124) Add error file docs to important settings ...
* es/master: (22 commits) Fix building Javadoc JARs on JDK for client JARs (#29274) Require JDK 10 to build Elasticsearch (#29174) Decouple NamedXContentRegistry from ElasticsearchException (#29253) Docs: Update generating test coverage reports (#29255) [TEST] Fix issue with HttpInfo passed invalid parameter Remove all dependencies from XContentBuilder (#29225) Fix sporadic failure in CompositeValuesCollectorQueueTests Propagate ignore_unmapped to inner_hits (#29261) TEST: Increase timeout for testPrimaryReplicaResyncFailed REST client: hosts marked dead for the first time should not be immediately retried (#29230) TEST: Use different translog dir for a new engine Make SearchStats implement Writeable (#29258) [Docs] Spelling and grammar changes to reindex.asciidoc (#29232) Do not optimize append-only if seen normal op with higher seqno (#28787) [test] packaging: gradle tasks for groovy tests (#29046) Prune only gc deletes below local checkpoint (#28790) remove testUnassignedShardAndEmptyNodesInRoutingTable #28745: remove extra option in the composite rest tests Fold EngineDiskUtils into Store, for better lock semantics (#29156) Add file permissions checks to precommit task ...
* es/6.x: Fix building Javadoc JARs on JDK for client JARs (#29274) Require JDK 10 to build Elasticsearch (#29174) Decouple NamedXContentRegistry from ElasticsearchException (#29253) Docs: Update generating test coverage reports (#29255) [TEST] Fix issue with HttpInfo passed invalid parameter Remove all dependencies from XContentBuilder (#29225) Fix sporadic failure in CompositeValuesCollectorQueueTests Propagate ignore_unmapped to inner_hits (#29261) TEST: Increase timeout for testPrimaryReplicaResyncFailed REST client: hosts marked dead for the first time should not be immediately retried (#29230) Make SearchStats implement Writeable (#29258) [Docs] Spelling and grammar changes to reindex.asciidoc (#29232) [test] packaging: gradle tasks for groovy tests (#29046) remove testUnassignedShardAndEmptyNodesInRoutingTable Add file permissions checks to precommit task Remove execute mode bit from source files #28745: remove 7.x option in the composite rest tests. Optimize the composite aggregation for match_all and range queries (#28745) Clarify deprecation warning for auto_generate_phrase_query (#29204)
|
Hi! It seems that the optimization of index sorting is replaced by the execution mode that visits documents in the order of the values present in the leading source. I wonder why the previous optimization could not be retained. The condition of index sorting is easier to be met and can early terminate aggregation on each segment without the condition of leading source and query. Grateful for any help! |
This change refactors the composite aggregation to add an execution mode that visits documents in the order of the values present in the leading source of the composite definition. This mode does not need to visit all documents since it can early terminate the collection when the leading source value is greater than the lowest value in the queue.
Instead of collecting the documents in the order of their doc_id, this mode uses the inverted lists (or the bkd tree for numerics) to collect documents
in the order of the values present in the leading source.
For instance the following aggregation:
... can use the field
timestampto collect the documents with the 10 lowest values for the field instead of visiting all documents.For composite aggregation with more than one source the execution can early terminate as soon as one of the 10 lowest values produces enough composite buckets. For instance if visiting the first two lowest timestamp created 10 composite buckets we can early terminate the collection since it
is guaranteed that the third lowest timestamp cannot create a composite key that compares lower than the one already visited.
This mode can execute iff:
date(works also withdate_histogramsource),integer,longorkeyword.If these conditions are not met this aggregation visits each document like any other agg.
Closes #28688