Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/141331.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
area: ES|QL
issues: []
pr: 141331
summary: Use avg metric for AMD default metric
type: enhancement
1 change: 1 addition & 0 deletions server/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@
exports org.elasticsearch.index.mapper.blockloader;
exports org.elasticsearch.index.mapper.blockloader.docvalues;
exports org.elasticsearch.index.mapper.blockloader.docvalues.fn;
exports org.elasticsearch.index.mapper.blockloader.docvalues.tracking;
exports org.elasticsearch.index.mapper.blockloader.script;
exports org.elasticsearch.readiness to org.elasticsearch.internal.sigterm;
exports org.elasticsearch.inference.metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1289,58 +1289,58 @@ max_deriv:double | step:datetime | pod:k

DefaultMetricWithFrom
required_capability: ts_command_v0
required_capability: aggregate_metric_double_default_metric
required_capability: aggregate_metric_double_avg_as_default_metric
Comment thread
limotova marked this conversation as resolved.
FROM k8s-downsampled
| STATS max = max(network.eth0.tx), std_dev = ROUND(STD_DEV(network.eth0.tx), 7) by pod
| sort pod
;

max:double | std_dev:double | pod:keyword
1060.0 | 331.5018947 | one
824.0 | 151.9744943 | three
1419.0 | 364.4118888 | two
1060.0 | 275.6970067 | one
824.0 | 184.1213952 | three
1419.0 | 356.9865993 | two
;

DefaultMetricWithTS
required_capability: ts_command_v0
required_capability: aggregate_metric_double_default_metric
required_capability: aggregate_metric_double_avg_as_default_metric
TS k8s-downsampled
| STATS max = max(network.eth0.tx), std_dev = ROUND(STD_DEV(network.eth0.tx), 7) by pod
| sort pod
;

max:double | std_dev:double | pod:keyword
1060.0 | 355.3786713 | one
824.0 | 180.6918802 | three
815.0 | 102.3469046 | two
998.0 | 335.8395781 | one
734.0 | 228.887527 | three
576.0 | 58.4465568 | two
;

DefaultMetricWithFromImplicitCasting
required_capability: ts_command_v0
required_capability: aggregate_metric_double_default_metric
required_capability: aggregate_metric_double_avg_as_default_metric
FROM k8s*
| STATS min = min(network.eth0.tx), std_dev = ROUND(STD_DEV(network.eth0.tx), 7) by pod
| sort pod
;

min:double | std_dev:double | pod:keyword
18.0 | 380.8643432 | one
48.0 | 340.5483293 | three
20.0 | 431.2070165 | two
18.0 | 375.1480039 | one
48.0 | 347.6778006 | three
20.0 | 428.6422763 | two
;

DefaultMetricWithTSImplicitCasting
required_capability: ts_command_v0
required_capability: aggregate_metric_double_default_metric
required_capability: aggregate_metric_double_avg_as_default_metric
TS k8s*
| STATS min = min(network.eth0.tx), std_dev = ROUND(STD_DEV(network.eth0.tx), 7) by pod
| STATS min = min(network.eth0.tx), max = max(network.eth0.tx), std_dev = ROUND(STD_DEV(network.eth0.tx), 7) by pod
| sort pod
;

min:double | std_dev:double | pod:keyword
193.0 | 427.9853269 | one
715.0 | 208.6955678 | three
602.0 | 375.7757842 | two
min:double | max:double | std_dev:double | pod:keyword
176.0 | 1380.0 | 428.6282305 | one
239.0 | 1241.0 | 366.7649929 | three
438.0 | 1716.0 | 450.5833552 | two
;

Case statement for Aggregate Metric Double (Standard Mode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,11 @@ public enum Cap {
*/
AGGREGATE_METRIC_DOUBLE_DEFAULT_METRIC,

/**
* Support avg as a possible default metric for aggregate_metric_double
*/
AGGREGATE_METRIC_DOUBLE_AVG_AS_DEFAULT_METRIC,

/**
* Support change point detection "CHANGE_POINT".
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.index.mapper.blockloader.ConstantNull;
import org.elasticsearch.index.mapper.blockloader.docvalues.BlockDocValuesReader;
import org.elasticsearch.index.mapper.blockloader.docvalues.DoublesBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.IntsBlockLoader;
import org.elasticsearch.index.mapper.blockloader.docvalues.tracking.NumericDvSingletonOrSorted;
import org.elasticsearch.index.mapper.blockloader.docvalues.tracking.TrackingNumericDocValues;

import java.io.IOException;
import java.util.EnumMap;
Expand Down Expand Up @@ -152,4 +155,104 @@ public void close() {
Releasables.close(minReader, maxReader, sumReader, countReader);
}
}

public static class AvgBlockLoader extends BlockDocValuesReader.DocValuesBlockLoader {
NumberFieldMapper.NumberFieldType sumFieldType;
NumberFieldMapper.NumberFieldType countFieldType;

AvgBlockLoader(EnumMap<AggregateMetricDoubleFieldMapper.Metric, NumberFieldMapper.NumberFieldType> availableMetrics) {
if (availableMetrics.containsKey(AggregateMetricDoubleFieldMapper.Metric.sum) == false
|| availableMetrics.containsKey(AggregateMetricDoubleFieldMapper.Metric.value_count) == false) {
sumFieldType = null;
countFieldType = null;
} else {
sumFieldType = availableMetrics.get(AggregateMetricDoubleFieldMapper.Metric.sum);
countFieldType = availableMetrics.get(AggregateMetricDoubleFieldMapper.Metric.value_count);
}
}

@Override
public AllReader reader(CircuitBreaker breaker, LeafReaderContext context) throws IOException {
if (sumFieldType == null || countFieldType == null) {
return ConstantNull.READER;
}
NumericDvSingletonOrSorted dvSumValues = NumericDvSingletonOrSorted.get(breaker, context, sumFieldType.name());
NumericDvSingletonOrSorted dvValueCountValues = NumericDvSingletonOrSorted.get(breaker, context, countFieldType.name());
if (dvSumValues == null || dvValueCountValues == null) {
return ConstantNull.READER;
}
assert dvSumValues.sorted() == null && dvValueCountValues.sorted() == null
: "aggregate metric doubles shouldn't have multi-values";

TrackingNumericDocValues trackingSumValues = dvSumValues.singleton();
TrackingNumericDocValues trackingValueCountValues = dvValueCountValues.singleton();
var sumValues = trackingSumValues.docValues();
var valueCountValues = trackingValueCountValues.docValues();

return new BlockDocValuesReader(breaker) {
@Override
public void close() {
Releasables.close(trackingSumValues, trackingValueCountValues);
}

private int docID = -1;

@Override
protected int docId() {
return docID;
}

@Override
public String toString() {
return "BlockDocValuesReader.AggregateMetricDoubleAvg";
}

@Override
public Block read(BlockFactory factory, Docs docs, int offset, boolean nullsFiltered) throws IOException {
int expectedCount = docs.count() - offset;
if (sumValues == null || valueCountValues == null) {
return factory.constantNulls(expectedCount);
}
try (DoubleBuilder builder = factory.doublesFromDocValues(expectedCount)) {
int lastDoc = -1;

for (int i = offset; i < docs.count(); i++) {
int doc = docs.get(i);
if (doc < lastDoc) {
throw new IllegalStateException("docs within same block must be in order");
}
if (sumValues.advanceExact(doc) && valueCountValues.advanceExact(doc)) {
this.docID = doc;
lastDoc = doc;
double sum = NumericUtils.sortableLongToDouble(sumValues.longValue());
long count = valueCountValues.longValue();
builder.appendDouble(sum / count);
} else {
builder.appendNull();
}
}
return builder.build();
}
}

@Override
public void read(int docId, StoredFields storedFields, Builder builder) throws IOException {
DoubleBuilder blockBuilder = (DoubleBuilder) builder;
if (sumValues.advanceExact(docId) && valueCountValues.advanceExact(docId)) {
this.docID = docId;
var sum = NumericUtils.sortableLongToDouble(sumValues.longValue());
var count = valueCountValues.longValue();
blockBuilder.appendDouble(sum / count);
} else {
blockBuilder.appendNull();
}
}
};
}

@Override
public Builder builder(BlockFactory factory, int expectedCount) {
throw new UnsupportedOperationException("AvgBlockLoader does not have a corresponding builder");
}
Comment thread
limotova marked this conversation as resolved.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import java.util.stream.Collectors;

import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.elasticsearch.index.mapper.blockloader.BlockLoaderFunctionConfig.Function.AMD_DEFAULT;

/** A {@link FieldMapper} for a field containing aggregate metrics such as min/max/value_count etc. */
public class AggregateMetricDoubleFieldMapper extends FieldMapper {
Expand Down Expand Up @@ -415,34 +416,10 @@ public LeafAggregateMetricDoubleFieldData load(LeafReaderContext context) {
@Override
public SortedNumericDoubleValues getAggregateMetricValues(final Metric metric) {
try {
final SortedNumericDocValues values = DocValues.getSortedNumeric(
context.reader(),
subfieldName(getFieldName(), metric)
return new AggregateMetricValues(
DocValues.getSortedNumeric(context.reader(), subfieldName(getFieldName(), metric)),
metric
);

return new SortedNumericDoubleValues() {
@Override
public int docValueCount() {
return values.docValueCount();
}

@Override
public boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
}

@Override
public double nextValue() throws IOException {
long v = values.nextValue();
if (metric == Metric.value_count) {
// Only value_count metrics are encoded as integers
return v;
} else {
// All other metrics are encoded as doubles
return NumericUtils.sortableLongToDouble(v);
}
}
};
} catch (IOException e) {
throw new IllegalStateException("Cannot load doc values", e);
}
Expand Down Expand Up @@ -564,6 +541,38 @@ public BucketedSort newBucketedSort(
};
}

private static class AggregateMetricValues extends SortedNumericDoubleValues {
final SortedNumericDocValues values;
final Metric metric;

private AggregateMetricValues(SortedNumericDocValues values, Metric metric) {
this.values = values;
this.metric = metric;
}

@Override
public int docValueCount() {
return values.docValueCount();
}

@Override
public boolean advanceExact(int doc) throws IOException {
return values.advanceExact(doc);
}

@Override
public double nextValue() throws IOException {
long v = values.nextValue();
if (metric == Metric.value_count) {
// Only value_count metrics are encoded as integers
return v;
} else {
// All other metrics are encoded as doubles
return NumericUtils.sortableLongToDouble(v);
}
}
}

@Override
public ValueFetcher valueFetcher(SearchExecutionContext context, String format) {
return SourceValueFetcher.identity(name(), context, format);
Expand All @@ -573,31 +582,14 @@ public ValueFetcher valueFetcher(SearchExecutionContext context, String format)
public BlockLoader blockLoader(BlockLoaderContext blContext) {
BlockLoaderFunctionConfig cfg = blContext.blockLoaderFunctionConfig();
if (cfg != null) {
var function = cfg.function();
Metric metric = switch (function) {
case AMD_COUNT -> Metric.value_count;
case AMD_MAX -> Metric.max;
case AMD_MIN -> Metric.min;
case AMD_SUM -> Metric.sum;
// TODO: temporary until we combine https://github.com/elastic/elasticsearch/pull/141331
case AMD_DEFAULT -> {
String name = name();
if (metricFields.containsKey(Metric.max)) {
yield Metric.max;
}
for (Metric m : Metric.values()) {
if (metricFields.containsKey(m)) {
yield m;
}
}
throw new IllegalStateException("No metric found for aggregate metric field [" + name + "]");
}
default -> null;
return switch (cfg.function()) {
case AMD_DEFAULT -> new AggregateMetricDoubleBlockLoader.AvgBlockLoader(metricFields);
case AMD_COUNT -> getIndividualBlockLoader(Metric.value_count);
case AMD_MAX -> getIndividualBlockLoader(Metric.max);
case AMD_MIN -> getIndividualBlockLoader(Metric.min);
case AMD_SUM -> getIndividualBlockLoader(Metric.sum);
default -> new AggregateMetricDoubleBlockLoader(metricFields);
};
if (metric == null) {
return new AggregateMetricDoubleBlockLoader(metricFields);
}
return getIndividualBlockLoader(metric);
}
return new AggregateMetricDoubleBlockLoader(metricFields);
}
Expand Down
Loading