Create a mode: time_series option for indices#75638
Create a mode: time_series option for indices#75638nik9000 wants to merge 46 commits intoelastic:masterfrom
mode: time_series option for indices#75638Conversation
Enabling this setting will put the index into a mode optimized for time
series data, grouping documents who's fields are annotated with
`dimension`. Because time time series data is usually write-once we also
disable your ability to update or delete documents.
We have big plans for things we can do with data organized in this way,
but for now the primary advantage of enabling `time_series_mode` is that
it shrinks the index on disk. We did some local tests with k8s
monitoring data in ECS format. Here are some figures for a single
sharded index without replicas, force merged to a single segment:
uncompressed JSON: 150GB
defaults: 60GB
best_compression: 39GB
time_series_mode: 33GB
best_compression and time_series_mode: 22GB
((NOCOMMIT: get the size measurements after running the speed tests))
So the compression you get is similar to best_compression and it can
operate "on top of" best compression to further reduce the index size.
The price is, like we mentioned above, disabling update and delete of
documents. In addition to that, there is a performance cost on ingest
and on mere. Loading the data from above took:
<load> <force_merge>
best_compression: 2.6 hours 0.8 hours
best_compression sort on all dimensions:
time_series_mode: (measuring)
best_compression and time_series_mode: (measuring)
In addition to the space savings this creates an unsearchable `_tsid`
field which functions as a short hand for aggregating on all the
dimension. So you can run this:
```
POST localhost:9200/test/_search?pretty
{
"size": 0,
"aggs": {
"tsid": {
"terms": { "field": "_tsid" },
"aggs": {
"max_n": { "max": { "field": "n" } }
}
}
}
}
```
and the `key` field in the json of the terms bucket will contain all of
the dimensions. Something like this:
```
"aggregations" : {
"tsid" : {
"buckets" : [
{"key":{"dim1":"a","dim2":"a"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"a","dim2":"b"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"a","dim2":"c"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"a","dim2":"d"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"b","dim2":"a"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"b","dim2":"b"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"b","dim2":"c"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"b","dim2":"d"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"c","dim2":"a"},"doc_count":6,"max_n":{"value":6}},
{"key":{"dim1":"c","dim2":"b"},"doc_count":6,"max_n":{"value":6}}
]
}
}
```
| "alias routing incompatible the destination index [" + abstraction.getName() + "] because it is in time series mode" | ||
| ); | ||
| } | ||
| routing(routingFromTimeSeries(abstraction, timeSeriesGeneratorLookup)); |
There was a problem hiding this comment.
I'd like to put this into its own field and modify the routing behavior to pick it up. But I've not done that in this PR because I think it's fairly mechanical and change and I believe I'd have to disable a lot of BWC tests for it. I'd like to do that in a follow up. So instead I'm hiding the tsid in the _routing field, base 64 encoding it. Ewwwwww. But it works and we can zap it in a follow up.
There was a problem hiding this comment.
Maybe you already looked into it, but this might require some changes for _split to keep working with time series indices if we did that. _split looks at the value of the _routing field to know how to split shards in a way that still honors the rounting key (see ShardSplittingQuery).
There was a problem hiding this comment.
OOOH! I hadn't thought of it. You've given me a wonderful thing to experiment with. I imagine I break this now. I'll look!
There was a problem hiding this comment.
Shard splitting query is indeed busted here. We'll need revive it, but I'd like to wait for a follow up for that.
| return routing; | ||
| } | ||
|
|
||
| // TODO this is the same sort of code we have in bulk action already. we should share |
There was a problem hiding this comment.
Over the years we've grown a lot of very similar code over in TransportBulkAction. We should unify these. somehow.
| public static final Predicate<String> INDEX_SETTINGS_KEY_PREDICATE = (s) -> s.startsWith(IndexMetadata.INDEX_SETTING_PREFIX); | ||
|
|
||
| public static final Set<Setting<?>> BUILT_IN_INDEX_SETTINGS = Set.of( | ||
| private static final Set<Setting<?>> ALWAYS_ENABLED_BUILT_IN_INDEX_SETTINGS = Set.of( |
There was a problem hiding this comment.
This makes sure that the index setting is only registered if the feature flag is enabled or we're on a snapshot. I tried to do this in the way that "looks small" in the diff so it's easier for us to reason about....
| @@ -443,34 +447,28 @@ protected String contentType() { | |||
|
|
|||
| @Override | |||
| protected void parseCreateField(DocumentParserContext context) throws IOException { | |||
There was a problem hiding this comment.
I wanted very much to reuse the parsing code but it was super twisty. I believe the new code doesn't change the behavior but let's us reuse it during tsid extraction.
|
|
||
| @Nullable | ||
| public MappedFieldType getFieldType(String fieldName) { | ||
| // TODO use time series id generation? |
There was a problem hiding this comment.
It'd be complex, but we already have the time series id generation code on the coordinating node. So - if you are querying all of the dimensions - we could generate the time series id that must match here and then use the routing it makes to limit the nodes we land on.
| * mapping's {@link CompressedXContent} but {@link CompressedXContent#equals(Object)} | ||
| * will try to decompress the mapping if the crc matches but the compressed bytes | ||
| * don't. That's wasteful for us - probably for everyone. If the crc and compressed | ||
| * bytes match that's a match. |
There was a problem hiding this comment.
It's wasteful specifically because if we don't match we need to decompress the bytes anyway. Since that's slow we don't want to decompress a second time. I'm fairly sure it'd be safe to modify the equals implementation in CompressedXContent but I don't want to tempt fate in this PR.
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return mapping.hashCode(); |
There was a problem hiding this comment.
CompressedXContent#hashCode is fast so its safe to just delegate to it.
| @Override | ||
| public Object format(BytesRef value) { | ||
| try { | ||
| return TimeSeriesIdGenerator.parse(new BytesArray(value).streamInput()); |
There was a problem hiding this comment.
I chose to make parse static here so any code can decode a _tsid without knowing what mapping it came from. We could go the other way - it'd save a few bytes of encoding. But I don't think its worth it.
| } | ||
|
|
||
| public void testIndexSorting() { | ||
| public void testIndexSortingNoDocValues() { |
There was a problem hiding this comment.
I just renamed this method. I didn't change what it does. All of the removed lines are just relocated into buildIndexSort.
| assertNotNull(documentMapper.sourceMapper()); | ||
| assertNotNull(documentMapper.IndexFieldMapper()); | ||
| assertEquals(10, documentMapper.mappers().getMapping().getMetadataMappersMap().size()); | ||
| assertEquals(10, documentMapper.mappers().getMatchingFieldNames("*").size()); |
There was a problem hiding this comment.
These assertions failed in my PR but were hard to read. So I made them easier to read while fixing them.
|
@elasticsearchmachine run elasticsearch-ci/part-2 |
|
Wow, you are so efficient! |
|
Not so much! We've been talking about this for a while. And I was working
on this for a while too!
…On Thu, Jul 22, 2021, 23:25 weizijun ***@***.***> wrote:
Wow, you are so efficient!
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#75638 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABUXIX5WMJE26EWFCQYO7LTZDOKJANCNFSM5A2M2XQQ>
.
|
|
Great stuff! I have 2 questions:
|
That's a great question! One who's answer I'll want to stick in the javadocs somewhere. But I'll reply here too. We sort on tsid first because all the things we want to do work better that way. Well, two things I can think of:
@imotov I sorted by |
I like the simplicity of the binary choice for dimension fields, though I had wondered about this too for the case of one high-cardinality dimension and another low-cardinality dimension since I think we'd generally prefer sorting by the low-cardinality dimension first and then by the high-cardinality dimension. Maybe one way to make this work (in a follow-up PR, this one is complex already) would be to allow configuring an index sort on timeseries indices, but only using dimension fields. And then the timeseries mode would add |
I like the simplicity too! Being able to configure index sorting with special rules could do the trick. I think its safe to keep this in mind but experiment more with it later. @weizijun had an similar feeling proposal: allow the users to specify which dimensions are used in routing. If you don't use a dimension in routing then it is just used for index sorting. It'd be useful to force similar documents even closer together. Though I don't know how space savings it'd buy. @weizijun, do you have some experimental results? I'd like to throw this out there: Traditionally Elasticsearch routes documents to a particular index based on the This PR will cause more unbalanced shards. For sure. But not as many as just going crazy with routing would. Because it scatters different time series to different nodes randomly. We do expect for some time series to get more documents than others. But we expect there to be many many time series so we expect that in most cases the more chatty time series will get scattered to the different shards evenly. Now, looping around to the routing proposal: it feels like exactly the kind of thing that'd make it easy to accidentally route the high traffic time series indices to the same shard. That doesn't mean its not a good idea too. But, like giving folks more control over the sort order, its something I want to do later. And carefully. Or not at all. |
That's a good question. Since we will be returning data in the ascending order, it might be easier for us to iterate over the data in the ascending order as well, in this case we don't have to flip it later on. I don't think ordering of
I agree with @jpountz and @nik9000, things are already very complicated as they are, but I also agree with @hendrikmuhs that indeed data have hierarchical structures. I just want to add a caveat that naturally occurring hierarchies do not always match optimizations. Let's take for example So a natural hierarchy would be job, instance, cpu and then mode. However, a typical query would be most likely interested in average non idle cpu rate I think this idea is definitely worth exploring, but I would like to spend more time to work with this type of data before we start working on this sort of optimization. I just don't think we will get it right at the moment. |
I'll drop that into a comment. I think its a thing we could change our minds on in the future if we need to. |
You can't write any documents into the index, but you can create one. This is useful because you might want to create an index and then use dynamic mappings and templates to fill in the dimensions later.
|
I've updated the timings in the PR. I don't have a rally track for this (yet) so I'm taking some fairly rough numbers. Indexing my data takes about 2.6 hours with or without timeseries mode. It takes about 10% longer if you sort of all timensions (2.9 hours) instead of generating the time series id and sorting by that. We like generating the timeseries and anyway because we want to aggregate and route on out - but its nice to know we get some time savings by sorting on it. Similarly, we get time savings when you force merge down to a single segment. The force merge with just best_compression takes .8 hours. In time series mode it takes 110% longer (1.7 hours). This seems to be dominated by merging the sorted segments. If you just use best_compression the segments aren't sorted at all so this isn't surprising. The force_merge time when you sort on every segment is 2.2 hours. That's almost as long as it takes to index the data with best_compression! Still, these show areas for improvement - merging sorted segments is slower than we'd like. Finally, I got some new size estimates. Remember, this data uncompressed is 150GB. I keep it compressed with zstd and it clocks in at 18gb. gzipped it is 21gb. When you index it with best_compression a single shard is, like we said above, 39.3gb. That's with the search index, doc values, and the _source compressed with zlib. Doing it in time series mode it is 21.8gb, pretty close to gzip even though we still have the search index and doc values! That's lovely. But we can do better. We will do better. In a follow up. |
|
Pinging @elastic/es-analytics-geo (Team:Analytics) |
|
I've converted this back to a draft because I've discovered some "fun" stuff around recoveries. The trick I do with putting the tsid in to the |
imotov
left a comment
There was a problem hiding this comment.
Nice! I really like where it is going. Left a few minor comments.
I noticed that there are 11 TODOs in this PR, are you planning to fulfill some of them in this PR or they all are for the follow up PRs?
| @Override | ||
| public boolean hasTextCharacters() { | ||
| throw new UnsupportedOperationException("use text() instead"); | ||
| return false; |
There was a problem hiding this comment.
This is the most confusing method in XContentParser :) Good catch!
| body: | ||
| settings: | ||
| index: | ||
| time_series_mode: true |
There was a problem hiding this comment.
I wonder if this is the first of possibly other indices with specialized operation modes. These modes are most likely to be mutually exclusive. So, would it make more sense to have mode: time_series here?
| this.type = mapping.getRoot().name(); | ||
| this.mappingLookup = MappingLookup.fromMapping(mapping); | ||
| this.mappingSource = mapping.toCompressedXContent(); | ||
| timeSeriesIdGenerator = inTimeSeriesMode ? mapping.buildTimeSeriesIdGenerator() : null; |
There was a problem hiding this comment.
For a moment, absence of this. here confused me.
There was a problem hiding this comment.
Digging deeper, that statement gives me an uneasy feeling for some reason. I feel like if we make it responsibility of the Mapping to buildTimeSeriesIdGenerator shouldn't we supply Mapping with enough information for it to know if building it makes any sense in the first place? Should this logic go into Mapping? WDYT?
There was a problem hiding this comment.
I'll play with moving this. @romseygeek suggested moving it to MappingLookup I'll try both and see how I feel.
| } | ||
|
|
||
| private final Function<Index, LocalIndex> lookupLocalIndex; | ||
| private final Function<IndexMetadata, TimeSeriesIdGenerator> buildTimeSeriedIdGenerator; |
| } | ||
| } | ||
|
|
||
| private abstract static class Value { |
There was a problem hiding this comment.
It is an internal class, but I think Value still sounds a bit too generic. When I first saw AsyncValue I thought it is some generic class that calculate values asynchronously. In reality Value here is basically optional (in case of exception) versioned (to match mapping version) TimeSeriesIdGenerator. So, it contains a really specific thing. I think we can increase readability if we replaced Value with IdGeneratorProvider or IdConfig or something else that doesn't sound like a generic mechanism for lazily storing arbitrary values.
| } | ||
| return new IpTsidGen(nullValue); | ||
| } | ||
| private static class IpTsidGen extends TimeSeriesIdGenerator.StringLeaf { |
| } | ||
| return new KeywordTsidGen(nullValue); | ||
| } | ||
| private static class KeywordTsidGen extends TimeSeriesIdGenerator.StringLeaf { |
| Collections.sort(dimensionNames); | ||
| throw new IllegalArgumentException("Document must contain one of the dimensions " + dimensionNames); | ||
| } | ||
| Collections.sort(values, Comparator.comparing(Map.Entry::getKey)); |
There was a problem hiding this comment.
I think there is a dedicated method for that - Map.Entry.comparingByKey()
| root.extract(values, "", parser); | ||
| if (values.isEmpty()) { | ||
| List<String> dimensionNames = new ArrayList<>(); | ||
| root.collectDimensionNames("", dimensionNames::add); |
There was a problem hiding this comment.
You can probably move this to constructor, you have to run collectDimensionNames anyway.
There was a problem hiding this comment.
I didn't want keep the string sitting around though. In case there are zillions of these things.
imotov
left a comment
There was a problem hiding this comment.
I did another pass and left a couple of minor comments. Besides disabling specifying dimensions in dynamic mappings that we discussed and possibly dealing with copy_to into dimension fields, I think it looks good. It would be great if somebody from @elastic/es-distributed could take a look at it as well.
| public void preParse(DocumentParserContext context) { | ||
| String routing = context.sourceToParse().routing(); | ||
| if (context.indexSettings().mode() == IndexMode.TIME_SERIES) { | ||
| // TODO when we stop storing the tsid in the routing fail any request with routing in time series mode |
There was a problem hiding this comment.
I have been thinking about that. Allowing routing in the time_series mode might actually make sense. Yes, it is allowing users to shoot themselves in the foot. but routing is an advanced feature that already enables screwing things up on "standard" indices, but it also enables advanced users to build solutions they wouldn't be able to build otherwise.
Having only few elements of TSID in the routing key doesn't break anything, but it can speed things up and help in case of multi-tenant solutions.
There was a problem hiding this comment.
I suppose so. I can revive support for routing once we have a field on IndexRequest specifically for the tsid. That was something I was hoping to do in a follow up.
| } | ||
| if (INDEX_SORT_MISSING_SETTING.exists(settings)) { | ||
| throw new IllegalArgumentException("Can't set [" + INDEX_SORT_MISSING_SETTING.getKey() + "] in time series mode"); | ||
| } |
There was a problem hiding this comment.
Feels like it can be replaced with something like
for (Setting<?> setting: indexMode.unsupportedSettings()) {
if (setting.exists(settings)) {
throw new IllegalArgumentException("Can't set [" + setting.getKey() + "] in time series mode");
}
}
or moved completely into mode.checkSettings(settings)
I am now sure what's up. Its..... icky. The dynamic mapping runs on the data node and doesn't tell the coordinating node what to do. I think the plan for this PR is accept that its broken and then try and fix it in a follow up. Alternatively, we could disallow dynamic mapping adding dimensions. That might be ok in the short term, but I think folks need some way of dynamically adding dimensions and dynamic mappings are the most sensible way to do it in the framework we have now. So we'd have to get it working sooner or later. |
I've pushed a change to detect when we modify the list of dimensions with a dynamic mapping update and reject the document. That'll have to do in the short term. In the long term we'll have to make it work but that's a bigger thing. Big enough that we talked about taking another approach to this change entirely. But, we believe the the majority of the change required to get dynamic mappings working lines up with the change required to get hard time bounds on indices working - so we're going to do it anyway. Probably. My plan is to scrounge up more reviews for this ASAP and then get this in and start on the dynamic mapping updates. |
|
For anyone following along from home - it took me a while to get good reviews on this, but we found enough "fun" corner cases involving races with mapping that we're rethinking this. We're unlikely to merge this code as it stands, but fairly likely to use it as an inspiration that will be pretty similar in practice. Just without the races. |
This adds more tests copied from the our original TSDB prototype in PR elastic#75638 that are still applicable time series mode indices. There are a bunch of disabled assertions because we are not yet generating `_tsid` but the non-disabled assertions are useful. And we will soon be generating the `_tsid` so we can re-enable those assertions.
This adds more tests copied from the our original TSDB prototype in PR #75638 that are still applicable time series mode indices. There are a bunch of disabled assertions because we are not yet generating `_tsid` but the non-disabled assertions are useful. And we will soon be generating the `_tsid` so we can re-enable those assertions.
Enabling this setting will put the index into a mode optimized for time
series data, grouping documents who's fields are annotated with
dimension. Because time time series data is usually write-once we alsodisable your ability to update or delete documents.
We have big plans for things we can do with data organized in this way,
but for now the primary advantage of enabling
time_series_modeis thatit shrinks the index on disk. We did some local tests with k8s
monitoring data in ECS format. Here are some figures for a single
sharded index without replicas, force merged to a single segment:
So the compression you get is similar to best_compression and it can
operate "on top of" best compression to further reduce the index size.
The price is, like we mentioned above, disabling update and delete of
documents. In addition to that, there is a performance cost on ingest
and on mere. Loading the data from above took:
In addition to the space savings this creates an unsearchable
_tsidfield which functions as a short hand for aggregating on all the
dimension. So you can run this:
and the
keyfield in the json of the terms bucket will contain all ofthe dimensions. Something like this: