Automatically prepare indices for splitting#27451
Conversation
Today we require users to prepare their indices for split operations. Yet, we can do this automatically when an index is created which would make the split feature a much more appealing option since it doesn't have any 3rd party prerequisites anymore. This change automatically sets the number of routinng shards such that an index is guaranteed to be able to split once into twice as many shards. The number of routing shards is scaled towards the default shard limit per index such that indices with a smaller amount of shards can be split more often than larger ones. For instance an index with 1 or 2 shards can be split 10x (until it approaches 1024 shards) while an index created with 128 shards can only be split 3x by a factor of 2. Please note this is just a default value and users can still prepare their indices with `index.number_of_routing_shards` for custom splitting. NOTE: this change has an impact on the document distribution since we are changing the hash space. Documents are still uniformly distributed across all shards but since we are artificually changing the number of buckets in the consistent hashign space document might be hashed into different shards compared to previous versions. This is a 7.0 only change.
bleskes
left a comment
There was a problem hiding this comment.
I did an initial pass and left some comments. The basic setup looks great to me. I also like and see the need for ability to "reconfigure" the number of routing shards when the source shard is one (as otherwise what people chose may not be a divisor of our automatic routing shard value). I'm a bit concerned by the complexity of trying to leave the routing shards as they were if possible and otherwise calculating a fresh one. Why can't we go with the simple option of always re-calculating number of routing shards (and allowing to set it) if the source has 1 shard?
| int routingFactor = getRoutingFactor(numSourceShards, numTargetShards); | ||
| // now we verify that the numRoutingShards is valid in the source index | ||
| int routingNumShards = sourceIndexMetadata.getRoutingNumShards(); | ||
| // note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything |
There was a problem hiding this comment.
I understand ot's important here to bypass the assertions as we don't have any relationship between the source routing shards and the target one in the case where the source has only one physical shards. I think the "validate this in various places in the code" part is maybe a leftover from a previous iteration?
| // note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything | ||
| // this is important to special case here since we use this to validate this in various places in the code but allow to split form | ||
| // 1 to N but we never modify the sourceIndexMetadata to accommodate for that | ||
| int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards(); |
There was a problem hiding this comment.
nit: maybe do something like:
final int selectedShard;
if (numSourceShards == 1) {
selectedShard = 0;
} else {
... the current logic...
selectedShard = shardId/routingFactor;
}
return new ShardId(sourceIndexMetadata.getIndex(), selectedShard);
will be easier to read , I think.
| * the less default split operations are supported | ||
| */ | ||
| public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
| if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { |
There was a problem hiding this comment.
can you clarify why this needs to be version dependent?
There was a problem hiding this comment.
there is a comment in the line below?!
There was a problem hiding this comment.
yeah, I get this means we only do the new behavior until the cluster is fully upgraded, but I don't see why we care? I mean, if the master is a 7.0.0 master, we can start creating indices with a different hashing logic and not worry about it?
There was a problem hiding this comment.
it's mainly for testing purposes and BWC behavior being more predictable otherwise some rest tests will randomly fail
| } | ||
|
|
||
| public void testSplitFromOneToN() { | ||
| splitToN(1, 5, 10); |
There was a problem hiding this comment.
I guess you had an explicit reason not to have a shrink to one shard then split again test (where we can take values in the split that doesn't compute with the source index)? alternatively we can explicitly set the routing shards on the source index to something that doesn't make sense when we start.
There was a problem hiding this comment.
I am not sure I understand what you mean
There was a problem hiding this comment.
The test now start with 1 shard source and the split twice. both of these always have a valid number of routing shards in the source index. I think the interesting part of the test is see how we reset the number of routing shard. For example, start with a 3 shards index. Shrink to 1 (number of routing shards stays 3) then split to say, 2. Does that help?
| final int routingShards = shardSplits[2] * randomIntBetween(1, 10); | ||
| Settings.Builder settings = Settings.builder().put(indexSettings()) | ||
| .put("number_of_shards", shardSplits[0]) | ||
| .put("index.number_of_routing_shards", routingShards); |
There was a problem hiding this comment.
Shouldn't we randomly still do this?
There was a problem hiding this comment.
we can.. I will do it
| splitToN(1, randomSplit, randomSplit * 2); | ||
| } | ||
|
|
||
| private void splitToN(int... shardSplits) { |
There was a problem hiding this comment.
I see why you did it this way as it was easier to refactor, but I think we should bite the bullet and give these proper variable names.
There was a problem hiding this comment.
fair enough, I will do that
There was a problem hiding this comment.
what woud you name it a, b, c :D I mean I can do step1, step2, step3 not sure how much it will by us ot's jiust a test
| body: | ||
| script: | ||
| lang: painless | ||
| source: if (ctx._source.user == "kimchy") {ctx.op = "update"} else {ctx.op = "junk"} |
There was a problem hiding this comment.
because the doc order changes and then we get update instead of junk and that is invalid too.
jpountz
left a comment
There was a problem hiding this comment.
The change looks good to me overall, but I think we should add some comments to make it easier to understand. Also maybe add a note to the migration guide since this will change how documents are spread across shards when the routing factor is not set at index-creation time?
| // note: if the number of shards is 1 in the source index we can just assume it's correct since from 1 we can split into anything | ||
| // this is important to special case here since we use this to validate this in various places in the code but allow to split form | ||
| // 1 to N but we never modify the sourceIndexMetadata to accommodate for that | ||
| int routingNumShards = numSourceShards == 1 ? numTargetShards : sourceIndexMetadata.getRoutingNumShards(); |
| } else { | ||
| assert IndexMetaData.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING.exists(indexSettingsBuilder.build()) == false | ||
| : "index.number_of_routing_shards should be present on the target index on resize"; | ||
| : "index.number_of_routing_shards should not be present on the target index on resize"; |
| public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
| if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { | ||
| // only select this automatically for indices that are created on or after 7.0 | ||
| int base = 10; // logBase2(1024) |
There was a problem hiding this comment.
I think it's worth sharing the 1024 constant with the max value in buildNumberOfShardsSetting to be clearer where this comes from?
There was a problem hiding this comment.
not sure how you envisioned this to work?
There was a problem hiding this comment.
I was just thinking of having a public static final int DEFAULT_MAX_NUM_SHARDS = 1024 somewhere, and use it in buildNumberOfShardsSetting and here: int base = Math.log(DEFAULT_MAX_NUM_SHARDS)/Math.log(2)
| if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { | ||
| // only select this automatically for indices that are created on or after 7.0 | ||
| int base = 10; // logBase2(1024) | ||
| return numShards * 1 << Math.max(1, (base - (int) (Math.log(numShards) / Math.log(2)))); |
There was a problem hiding this comment.
It feels a bit weird to me that this will generate numbers that are greater than the maximum number of shards. Should we change the formula a bit so that the result is always in 513...1024 when the number of shards is in 1..512?
This probably deserves some comments as well, eg. I presume that the fact we do Math.max(1, ...) instead of Math.max(0, ...) is to make sure that we can always split at least once?
There was a problem hiding this comment.
yeah I will do that.
| assertEquals(ratio, (double)(intRatio), 0.0d); | ||
| assertTrue(1 < ratio); | ||
| assertTrue(ratio <= 1024); | ||
| assertEquals(0, intRatio % 2); |
There was a problem hiding this comment.
should we assert that intRatio is a power of two by checking that intRatio & (intRatio - 1) is zero? (or intRatio == Integer.highestOneBit(intRatio) if you find it easier to read
+1 I will work on a change for this |
Improved index-split docs
| splitToN(1, randomSplit, randomSplit * 2); | ||
| } | ||
|
|
||
| private void splitToN(int sourceShards, int firstSplitShards, int secondSplitShards) { |
jpountz
left a comment
There was a problem hiding this comment.
I left a comment about the computation of the default number of routing shards.
| public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
| if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { | ||
| // only select this automatically for indices that are created on or after 7.0 this will prevent this new behaviour | ||
| // until we have a fully upgraded cluster see {@link IndexMetaDataE# |
| assertTrue(1 < ratio); | ||
| assertTrue(ratio <= 1024); | ||
| assertEquals(0, intRatio % 2); | ||
| assertEquals("ration is not a power of two", intRatio, Integer.highestOneBit(intRatio)); |
| // until we have a fully upgraded cluster see {@link IndexMetaDataE# | ||
| int base = 9; // logBase2(512) | ||
| final int minNumSplits = 1; | ||
| return numShards * 1 << Math.max(minNumSplits, (base - (int) (Math.log(numShards) / Math.log(2)))); |
There was a problem hiding this comment.
I think it'd be better for results to be in 513..1024 than 512..1023 by ceiling the log? Feel free to ignore but I'd do the following:
// We use as a default number of routing shards the higher number that can be expressed as {@code numShards * 2^x`} that is less than or equal to the maximum number of shards: 1024.
int log2MaxNumShards = 10; // logBase2(1024)
int log2NumShards = 32 - Integer.numberOfLeadingZeros(numShards - 1); // ceil(logBase2(numShards))
int numSplits = log2MaxNumShards - log2NumShards;
numSplits = Math.max(1, numSplits); // Ensure the index can be split at least once
return numShards * 1 << numSplits;And then in tests make sure that the value is in 513..1024 for any number of shards in 0..512, and equal to numShards*2 for any number of shards that is greater than 512?
jpountz
left a comment
There was a problem hiding this comment.
LGTM, but please wait for Boaz's review before merging as he had different concerns from me.
bleskes
left a comment
There was a problem hiding this comment.
LGTM. Thanks for this. It must have a pain to hunt down all the test failures.
| * the less default split operations are supported | ||
| */ | ||
| public static int calculateNumRoutingShards(int numShards, Version indexVersionCreated) { | ||
| if (indexVersionCreated.onOrAfter(Version.V_7_0_0_alpha1)) { |
| Settings.Builder firstSplitSettingsBuilder = Settings.builder() | ||
| .put("index.number_of_replicas", 0) | ||
| .put("index.number_of_shards", firstSplitShards); | ||
| if (sourceShards == 1 && useRoutingPartition == false && randomBoolean()) { // try to set it if we have a source index with 1 shard |
* es/master: (38 commits) Backport wait_for_initialiazing_shards to cluster health API Carry over version map size to prevent excessive resizing (#27516) Fix scroll query with a sort that is a prefix of the index sort (#27498) Delete shard store files before restoring a snapshot (#27476) Replace `delimited_payload_filter` by `delimited_payload` (#26625) CURRENT should not be a -SNAPSHOT version if build.snapshot is false (#27512) Fix merging of _meta field (#27352) Remove unused method (#27508) unmuted test, this has been fixed by #27397 Consolidate version numbering semantics (#27397) Add wait_for_no_initializing_shards to cluster health API (#27489) [TEST] use routing partition size based on the max routing shards of the second split Adjust CombinedDeletionPolicy for multiple commits (#27456) Update composite-aggregation.asciidoc Deprecate `levenstein` in favor of `levenshtein` (#27409) Automatically prepare indices for splitting (#27451) Validate `op_type` for `_create` (#27483) Minor ShapeBuilder cleanup muted test Decouple nio constructs from the tcp transport (#27484) ...
Today we require users to prepare their indices for split operations.
Yet, we can do this automatically when an index is created which would
make the split feature a much more appealing option since it doesn't have
any 3rd party prerequisites anymore.
This change automatically sets the number of routinng shards such that
an index is guaranteed to be able to split once into twice as many shards.
The number of routing shards is scaled towards the default shard limit per index
such that indices with a smaller amount of shards can be split more often than
larger ones. For instance an index with 1 or 2 shards can be split 10x
(until it approaches 1024 shards) while an index created with 128 shards can only
be split 3x by a factor of 2. Please note this is just a default value and users
can still prepare their indices with
index.number_of_routing_shardsfor customsplitting.
NOTE: this change has an impact on the document distribution since we are changing
the hash space. Documents are still uniformly distributed across all shards but since
we are artificually changing the number of buckets in the consistent hashign space
document might be hashed into different shards compared to previous versions.
This is a 7.0 only change.