Skip to content

Commit 64e4dc2

Browse files
committed
Write prefix partition for tsid
1 parent e6d0bd2 commit 64e4dc2

16 files changed

Lines changed: 619 additions & 61 deletions

benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,8 @@ private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeE
264264
optimizedMergeEnabled,
265265
BinaryDVCompressionMode.COMPRESSED_ZSTD_LEVEL_1,
266266
true,
267-
NUMERIC_LARGE_BLOCK_SHIFT
267+
NUMERIC_LARGE_BLOCK_SHIFT,
268+
false
268269
);
269270
config.setCodec(new Elasticsearch93Lucene104Codec() {
270271
@Override

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ private static Version parseUnchecked(String version) {
239239
public static final IndexVersion IGNORED_SOURCE_AS_DOC_VALUES = def(9_078_0_00, Version.LUCENE_10_4_0);
240240
public static final IndexVersion TIME_SERIES_USE_SYNTHETIC_ID_DEFAULT = def(9_079_0_00, Version.LUCENE_10_4_0);
241241
public static final IndexVersion TSID_SINGLE_PREFIX_BYTE_FEATURE_FLAG = def(9_080_00_0, Version.LUCENE_10_4_0);
242+
public static final IndexVersion WRITE_TSID_PREFIX_PARTITION = def(9_081_00_0, Version.LUCENE_10_4_0);
242243
/*
243244
* STOP! READ THIS FIRST! No, really,
244245
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

server/src/main/java/org/elasticsearch/index/codec/PerFieldFormatSupplier.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.codecs.KnnVectorsFormat;
1414
import org.apache.lucene.codecs.PostingsFormat;
1515
import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat;
16+
import org.elasticsearch.cluster.routing.TsidBuilder;
1617
import org.elasticsearch.common.util.BigArrays;
1718
import org.elasticsearch.core.Nullable;
1819
import org.elasticsearch.index.IndexMode;
@@ -198,10 +199,19 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
198199
}
199200

200201
if (useTSDBDocValuesFormat(field)) {
201-
var indexCreatedVersion = mapperService.getIndexSettings().getIndexVersionCreated();
202+
IndexSettings indexSettings = mapperService.getIndexSettings();
203+
var indexCreatedVersion = indexSettings.getIndexVersionCreated();
202204
boolean useLargeNumericBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeNumericBlockSize();
203205
boolean useLargeBinaryBlockSize = mapperService.getIndexSettings().isUseTimeSeriesDocValuesFormatLargeBinaryBlockSize();
204-
return TSDBDocValuesFormatFactory.createDocValuesFormat(indexCreatedVersion, useLargeNumericBlockSize, useLargeBinaryBlockSize);
206+
boolean writePartitions = indexSettings.getMode() == IndexMode.TIME_SERIES
207+
&& TsidBuilder.useSingleBytePrefixLayout(indexCreatedVersion)
208+
&& indexCreatedVersion.onOrAfter(IndexVersions.WRITE_TSID_PREFIX_PARTITION);
209+
return TSDBDocValuesFormatFactory.createDocValuesFormat(
210+
indexCreatedVersion,
211+
useLargeNumericBlockSize,
212+
useLargeBinaryBlockSize,
213+
writePartitions
214+
);
205215
}
206216

207217
return docValuesFormat;
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.codec.tsdb;
11+
12+
import org.apache.lucene.index.LeafReaderContext;
13+
import org.apache.lucene.search.IndexSearcher;
14+
import org.elasticsearch.core.Nullable;
15+
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
16+
17+
import java.io.IOException;
18+
19+
/**
20+
* An extension for doc-values which is the primary sort key, and values are grouped by some prefix bytes. One example is `_tsid` in
21+
* time-series, where time-series can be grouped by the first few bytes, and the query engine can process a slice of time-series
22+
* containing all "continuous" time-series sharing the same prefix. This is necessary instead of querying a slice of docs because some
23+
* aggregations such as rate require all data points from a single time-series to be processed by the same operator.
24+
*/
25+
public interface PartitionedDocValues {
26+
/**
27+
* @param prefixes the prefix keys
28+
* @param startDocs the startDocs of corresponding prefix keys
29+
* @param numPartitions the actual number of prefixes available in the partition
30+
*/
31+
record PrefixPartitions(int[] prefixes, int[] startDocs, int numPartitions) {
32+
33+
}
34+
35+
/**
36+
* Returns the prefixed partition from the doc-values of this field if exists.
37+
* @param reused an existing prefix partitions can be reused to avoid allocating memory
38+
*/
39+
@Nullable
40+
PrefixPartitions prefixPartitions(PrefixPartitions reused) throws IOException;
41+
42+
/**
43+
* Whether this doc-values has prefix partition.
44+
*/
45+
boolean hasPrefixPartitions();
46+
47+
/**
48+
* Check if the given index searcher can be partitioned by tsid prefix.
49+
* @param searcher the index searcher to check
50+
* @return true if all non-empty segments support tsid prefix partitioning
51+
*/
52+
static boolean canPartitionByTsidPrefix(IndexSearcher searcher) throws IOException {
53+
for (LeafReaderContext leafContext : searcher.getLeafContexts()) {
54+
var sortedDV = leafContext.reader().getSortedDocValues(TimeSeriesIdFieldMapper.NAME);
55+
// empty segment
56+
if (sortedDV == null) {
57+
continue;
58+
}
59+
if (sortedDV instanceof PartitionedDocValues partition && partition.hasPrefixPartitions()) {
60+
continue;
61+
}
62+
return false;
63+
}
64+
return true;
65+
}
66+
}

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.lucene.util.packed.DirectMonotonicWriter;
4545
import org.apache.lucene.util.packed.PackedInts;
4646
import org.elasticsearch.core.IOUtils;
47+
import org.elasticsearch.core.Nullable;
4748
import org.elasticsearch.index.codec.tsdb.BinaryDVCompressionMode;
4849
import org.elasticsearch.index.codec.tsdb.TSDBDocValuesEncoder;
4950

@@ -79,6 +80,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
7980
private final DocOffsetsCodec.Encoder docOffsetsEncoder;
8081
private final int blockBytesThreshold;
8182
private final int blockCountThreshold;
83+
private final boolean writePrefixPartitions;
8284

8385
ES819TSDBDocValuesConsumer(
8486
BinaryDVCompressionMode binaryDVCompressionMode,
@@ -94,7 +96,8 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
9496
String dataCodec,
9597
String dataExtension,
9698
String metaCodec,
97-
String metaExtension
99+
String metaExtension,
100+
boolean writePrefixPartitions
98101
) throws IOException {
99102
this.binaryDVCompressionMode = binaryDVCompressionMode;
100103
this.enablePerBlockCompression = enablePerBlockCompression;
@@ -136,6 +139,7 @@ final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer {
136139
maxDoc = state.segmentInfo.maxDoc();
137140
this.skipIndexIntervalSize = skipIndexIntervalSize;
138141
this.enableOptimizedMerge = enableOptimizedMerge;
142+
this.writePrefixPartitions = writePrefixPartitions;
139143
success = true;
140144
} finally {
141145
if (success == false) {
@@ -158,7 +162,7 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti
158162
writeSkipIndex(field, producer);
159163
}
160164

161-
writeField(field, producer, -1, null);
165+
writeField(field, producer, -1, null, null);
162166
}
163167

164168
private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDocsWithValue, long numValues) {
@@ -168,8 +172,13 @@ private boolean shouldEncodeOrdinalRange(FieldInfo field, long maxOrd, int numDo
168172
&& (numDocsWithValue / maxOrd) >= minDocsPerOrdinalForOrdinalRangeEncoding;
169173
}
170174

171-
private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd, OffsetsAccumulator offsetsAccumulator)
172-
throws IOException {
175+
private long[] writeField(
176+
FieldInfo field,
177+
TsdbDocValuesProducer valuesProducer,
178+
long maxOrd,
179+
OffsetsAccumulator offsetsAccumulator,
180+
PrefixedPartitionsWriter partitionsWriter
181+
) throws IOException {
173182
int numDocsWithValue = 0;
174183
long numValues = 0;
175184

@@ -200,6 +209,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
200209
if (maxOrd == 1) {
201210
// Special case for maxOrd of 1, signal -1 that no blocks will be written
202211
meta.writeInt(-1);
212+
if (partitionsWriter != null) {
213+
partitionsWriter.trackDoc(0, 0);
214+
}
203215
} else if (shouldEncodeOrdinalRange(field, maxOrd, numDocsWithValue, numValues)) {
204216
assert offsetsAccumulator == null;
205217
// When a field is sorted, use ordinal range encode for long runs of the same ordinal.
@@ -218,6 +230,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
218230
);
219231
long lastOrd = 0;
220232
startDocs.add(0);
233+
if (partitionsWriter != null) {
234+
partitionsWriter.trackDoc(0, lastOrd);
235+
}
221236
for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) {
222237
if (disiAccumulator != null) {
223238
disiAccumulator.addDocId(doc);
@@ -226,6 +241,9 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
226241
if (nextOrd != lastOrd) {
227242
lastOrd = nextOrd;
228243
startDocs.add(doc);
244+
if (partitionsWriter != null) {
245+
partitionsWriter.trackDoc(doc, nextOrd);
246+
}
229247
}
230248
}
231249
startDocs.add(maxDoc);
@@ -255,7 +273,11 @@ private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer,
255273
offsetsAccumulator.addDoc(count);
256274
}
257275
for (int i = 0; i < count; ++i) {
258-
buffer[bufferSize++] = values.nextValue();
276+
final long v = values.nextValue();
277+
buffer[bufferSize++] = v;
278+
if (partitionsWriter != null) {
279+
partitionsWriter.trackDoc(doc, v);
280+
}
259281
if (bufferSize == numericBlockSize) {
260282
indexWriter.add(data.getFilePointer() - valuesDataOffset);
261283
if (maxOrd >= 0) {
@@ -694,13 +716,23 @@ public long cost() {
694716
if (addTypeByte) {
695717
meta.writeByte((byte) 0); // multiValued (0 = singleValued)
696718
}
697-
SortedDocValues sorted = valuesProducer.getSorted(field);
698-
int maxOrd = sorted.getValueCount();
699-
writeField(field, producer, maxOrd, null);
700-
addTermsDict(DocValues.singleton(valuesProducer.getSorted(field)));
719+
final SortedDocValues sorted = valuesProducer.getSorted(field);
720+
final int maxOrd = sorted.getValueCount();
721+
var partitionWriter = primarySortFieldNumber == field.number && writePrefixPartitions ? new PrefixedPartitionsWriter() : null;
722+
addTermsDict(DocValues.singleton(sorted), partitionWriter);
723+
if (partitionWriter != null) {
724+
partitionWriter.prepareForTrackingDocs();
725+
}
726+
writeField(field, producer, maxOrd, null, partitionWriter);
727+
if (primarySortFieldNumber == field.number) {
728+
meta.writeByte(partitionWriter != null ? (byte) 1 : (byte) 0);
729+
}
730+
if (partitionWriter != null) {
731+
partitionWriter.flush(data, meta);
732+
}
701733
}
702734

703-
private void addTermsDict(SortedSetDocValues values) throws IOException {
735+
private void addTermsDict(SortedSetDocValues values, @Nullable PrefixedPartitionsWriter partitionWriter) throws IOException {
704736
final long size = values.getValueCount();
705737
meta.writeVLong(size);
706738

@@ -755,6 +787,9 @@ private void addTermsDict(SortedSetDocValues values) throws IOException {
755787
}
756788
bufferedOutput.writeBytes(term.bytes, term.offset + prefixLength, suffixLength);
757789
}
790+
if (partitionWriter != null) {
791+
partitionWriter.trackTerm(term, ord);
792+
}
758793
maxLength = Math.max(maxLength, term.length);
759794
previous.copyBytes(term);
760795
++ord;
@@ -860,16 +895,16 @@ private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valu
860895
int numDocsWithField = valuesProducer.mergeStats.sumNumDocsWithField();
861896
long numValues = valuesProducer.mergeStats.sumNumValues();
862897
if (numDocsWithField == numValues) {
863-
writeField(field, valuesProducer, maxOrd, null);
898+
writeField(field, valuesProducer, maxOrd, null, null);
864899
} else {
865900
assert numValues > numDocsWithField;
866901
try (var accumulator = new OffsetsAccumulator(dir, context, data, numDocsWithField)) {
867-
writeField(field, valuesProducer, maxOrd, accumulator);
902+
writeField(field, valuesProducer, maxOrd, accumulator, null);
868903
accumulator.build(meta, data);
869904
}
870905
}
871906
} else {
872-
long[] stats = writeField(field, valuesProducer, maxOrd, null);
907+
long[] stats = writeField(field, valuesProducer, maxOrd, null, null);
873908
int numDocsWithField = Math.toIntExact(stats[0]);
874909
long numValues = stats[1];
875910
assert numValues >= numDocsWithField;
@@ -1012,7 +1047,7 @@ public long cost() {
10121047
}
10131048
}, maxOrd);
10141049

1015-
addTermsDict(valuesProducer.getSortedSet(field));
1050+
addTermsDict(valuesProducer.getSortedSet(field), null);
10161051
}
10171052

10181053
@Override

server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues
5454
static final int VERSION_START = 0;
5555
static final int VERSION_BINARY_DV_COMPRESSION = 1;
5656
static final int VERSION_NUMERIC_LARGE_BLOCKS = 2;
57-
static final int VERSION_CURRENT = VERSION_NUMERIC_LARGE_BLOCKS;
57+
static final int VERSION_PREFIX_PARTITIONS = 3;
58+
static final int VERSION_CURRENT = VERSION_PREFIX_PARTITIONS;
5859

5960
static final int TERMS_DICT_BLOCK_LZ4_SHIFT = 6;
6061
static final int TERMS_DICT_BLOCK_LZ4_SIZE = 1 << TERMS_DICT_BLOCK_LZ4_SHIFT;
@@ -142,6 +143,7 @@ private static boolean getOptimizedMergeEnabledDefault() {
142143
final DocOffsetsCodec docOffsetsCodec;
143144
final int blockBytesThreshold;
144145
final int blockCountThreshold;
146+
final boolean writePrefixPartitions;
145147

146148
public static ES819TSDBDocValuesFormat getInstance(boolean useLargeNumericBlock) {
147149
return useLargeNumericBlock ? new ES819TSDBDocValuesFormat(NUMERIC_LARGE_BLOCK_SHIFT) : new ES819TSDBDocValuesFormat();
@@ -218,7 +220,8 @@ public ES819TSDBDocValuesFormat(
218220
binaryDVCompressionMode,
219221
enablePerBlockCompression,
220222
numericBlockShift,
221-
DocOffsetsCodec.GROUPED_VINT
223+
DocOffsetsCodec.GROUPED_VINT,
224+
false
222225
);
223226
}
224227

@@ -230,7 +233,8 @@ public ES819TSDBDocValuesFormat(
230233
BinaryDVCompressionMode binaryDVCompressionMode,
231234
final boolean enablePerBlockCompression,
232235
final int numericBlockShift,
233-
DocOffsetsCodec docOffsetsCodec
236+
DocOffsetsCodec docOffsetsCodec,
237+
boolean writePrefixPartitions
234238
) {
235239
this(
236240
codecName,
@@ -242,7 +246,8 @@ public ES819TSDBDocValuesFormat(
242246
numericBlockShift,
243247
docOffsetsCodec,
244248
BINARY_DV_BLOCK_BYTES_THRESHOLD_DEFAULT,
245-
BINARY_DV_BLOCK_COUNT_THRESHOLD_DEFAULT
249+
BINARY_DV_BLOCK_COUNT_THRESHOLD_DEFAULT,
250+
writePrefixPartitions
246251
);
247252
}
248253

@@ -256,7 +261,8 @@ public ES819TSDBDocValuesFormat(
256261
final int numericBlockShift,
257262
DocOffsetsCodec docOffsetsCodec,
258263
int blockBytesThreshold,
259-
int blockCountThreshold
264+
int blockCountThreshold,
265+
boolean writePrefixPartitions
260266
) {
261267
super(codecName);
262268
assert numericBlockShift == NUMERIC_BLOCK_SHIFT || numericBlockShift == NUMERIC_LARGE_BLOCK_SHIFT : numericBlockShift;
@@ -272,6 +278,7 @@ public ES819TSDBDocValuesFormat(
272278
this.docOffsetsCodec = docOffsetsCodec;
273279
this.blockBytesThreshold = blockBytesThreshold;
274280
this.blockCountThreshold = blockCountThreshold;
281+
this.writePrefixPartitions = writePrefixPartitions;
275282
}
276283

277284
@Override
@@ -290,7 +297,8 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept
290297
DATA_CODEC,
291298
DATA_EXTENSION,
292299
META_CODEC,
293-
META_EXTENSION
300+
META_EXTENSION,
301+
writePrefixPartitions
294302
);
295303
}
296304

0 commit comments

Comments
 (0)