Use @timestamp field to route documents to a backing index of a data stream#82079
Use @timestamp field to route documents to a backing index of a data stream#82079martijnvg merged 12 commits intoelastic:masterfrom
Conversation
| ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); | ||
| ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); | ||
| String timestampAsString = parser.text(); | ||
| // TODO: deal with nanos too here. |
There was a problem hiding this comment.
We add something (format string) to MappingMetadata that indicates how the @timestamp field should be parsed. We would need to fetch this from the latest backing index of a data stream.
Alternatively we can add information of how the @timestamp field should be parsed to DataStream class. Which feels like a better place, since we know this prior to selecting the right backing index based on the @timestamp field here.
|
Pinging @elastic/es-analytics-geo (Team:Analytics) |
|
Pinging @elastic/es-data-management (Team:Data Management) |
imotov
left a comment
There was a problem hiding this comment.
Looks good to me in general from the TSDB perspective. Left a couple of suggestions.
|
|
||
| Index result = dataStream.selectWriteIndex(timestamp, metadata); | ||
| if (result == null) { | ||
| throw new IllegalArgumentException("no index available for a document with an @timestamp of [" + timestampAsString + "]"); |
There was a problem hiding this comment.
It would be great if we could add a bit more useful information here. I think I would rephrase it as "the document timestamp [2022-01-07T19:04:41Z] is outside of ranges of currently writable indices: [[2022-01-06T00:00:00.000Z-2022-01-06T16:02:12.251Z], [2022-01-06T16:02:12.251Z-2022-01-06T20:02:12.251Z]]" or something like this.
| } | ||
|
|
||
| @Override | ||
| public Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { |
There was a problem hiding this comment.
It feels like a bit too much logic for a data class, that the IndexRequest is essentially is. I wonder if it makes more sense as a part of IndexAbstraction instead.
dakrone
left a comment
There was a problem hiding this comment.
I left a few really minor comments, but otherwise this looks good to me.
A temporal slice of backing indices never overlap within a data stream, so either 1 backing index can be selected or none.
Can you point me to where we do this validation? I wanted to get more familiar with it.
| */ | ||
| int route(IndexRouting indexRouting); | ||
|
|
||
| default Index getConcreteWriteIndex(IndexAbstraction ia, Metadata metadata) { |
| try (XContentParser parser = contentType.xContent().createParser(TS_EXTRACT_CONFIG, source().streamInput())) { | ||
| ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); | ||
| ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.nextToken(), parser); | ||
| ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); |
There was a problem hiding this comment.
I think we need to support epoch millis here also, correct? I tested it locally and it works, but not sure why this doesn't blow up since I would expect it to fail when a document with "@timestamp": 12309123 is indexed
| return indices.get(indices.size() - 1); | ||
| } | ||
|
|
||
| public Index selectWriteIndex(Instant timestamp, Metadata metadata) { |
There was a problem hiding this comment.
Can you rename this to be something TSDB specific, like selectTimeseriesWriteIndex (since we may end up adding different selection criteria in the future) and add javadocs?
| assertThat(validate.getMessage(), containsString("pipeline cannot be an empty string")); | ||
| } | ||
|
|
||
| public void testGetConcreteWriteIndex() { |
There was a problem hiding this comment.
Can you add a test that uses epoch millis for the @timestamp instead of a string to ensure that it picks the right backing index also?
There was a problem hiding this comment.
I added a test for this and also added logic for this. The case of providing timestamp as a number representing mills since epoch failed. I fixes this via: d70aa28
The validation that validates the start and end time settings across backing indices doesn't yet exist. But maybe we can do this differently, now that we are going to make time series typed data streams. So I like to add this validation after we made data streams aware of index modes. |
dakrone
left a comment
There was a problem hiding this comment.
LGTM also!
So I like to add this validation after we made data streams aware of index modes.
Makes sense, thanks for the heads up
Currently documents that target data streams are resolved to the target the write index of the data stream being targeted.
This change adjust this logic in the bulk api, to first parse the
@timestampfield and then based on this timestamp select the right backing index. If the parsed timestamp of a document falls between a backing index's start_time and end_time then this backing index is used as write index.Note that this logic is only enabled for tsdb data streams. A temporal slice of backing indices never overlap within a data stream, so either 1 backing index can be selected or none.
Relates #74660