Skip to content

Commit 0d89de4

Browse files
author
Rishav Sagar
committed
Add setting to limit total number of shards on a cluster
Signed-off-by: Rishav Sagar <rissag@amazon.com>
1 parent da11337 commit 0d89de4

5 files changed

Lines changed: 154 additions & 8 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
8383
- Add support to disallow search request with preference parameter with strict weighted shard routing([#5874](https://github.com/opensearch-project/OpenSearch/pull/5874))
8484
- Changing ExtensionActionRequest streaminput constructor to be public ([#6094](https://github.com/opensearch-project/OpenSearch/pull/6094))
8585
- Adds support for minimum compatible version for extensions ([#6003](https://github.com/opensearch-project/OpenSearch/pull/6003))
86+
- Add a guardrail to limit maximum number of shard on the cluster ([#6143](https://github.com/opensearch-project/OpenSearch/pull/6143))
8687

8788
### Dependencies
8889
- Update nebula-publishing-plugin to 19.2.0 ([#5704](https://github.com/opensearch-project/OpenSearch/pull/5704))

server/src/internalClusterTest/java/org/opensearch/cluster/shards/ClusterShardLimitIT.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.opensearch.action.support.master.AcknowledgedResponse;
4242
import org.opensearch.client.Client;
4343
import org.opensearch.cluster.ClusterState;
44+
import org.opensearch.cluster.metadata.IndexMetadata;
4445
import org.opensearch.cluster.metadata.Metadata;
4546
import org.opensearch.common.Priority;
4647
import org.opensearch.common.network.NetworkModule;
@@ -247,6 +248,21 @@ public void testIndexCreationOverLimitForDotIndexesFail() {
247248
assertFalse(clusterState.getMetadata().hasIndex(".test-index"));
248249
}
249250

251+
public void testCreateIndexWithMaxClusterShardSetting() {
252+
int maxAllowedShards = 2;
253+
int extraShardCount = 5;
254+
// Getting total active shards in the cluster.
255+
int currentActiveShards = client().admin().cluster().prepareHealth().get().getActiveShards();
256+
try {
257+
setMaxShardLimitCluster(maxAllowedShards);
258+
prepareCreate("test").setSettings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, extraShardCount).build()).get();
259+
} catch (final IllegalArgumentException ex) {
260+
verifyException(maxAllowedShards, currentActiveShards, extraShardCount, ex);
261+
} finally {
262+
setMaxShardLimitCluster(-1);
263+
}
264+
}
265+
250266
/**
251267
* The test checks if the index starting with the .ds- can be created if the node has
252268
* number of shards equivalent to the cluster.max_shards_per_node and the cluster.ignore_Dot_indexes
@@ -764,6 +780,12 @@ private void verifyException(int dataNodes, ShardCounts counts, IllegalArgumentE
764780
assertEquals(expectedError, e.getMessage());
765781
}
766782

783+
private void setMaxShardLimitCluster(int limit) {
784+
final Settings settings = Settings.builder().put(ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER.getKey(),
785+
limit).build();
786+
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(settings).get());
787+
}
788+
767789
private void verifyException(int maxShards, int currentShards, int extraShards, IllegalArgumentException e) {
768790
String expectedError = "Validation Failed: 1: this action would add ["
769791
+ extraShards

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ public void apply(Settings value, Settings current, Settings previous) {
261261
Metadata.DEFAULT_REPLICA_COUNT_SETTING,
262262
Metadata.SETTING_CREATE_INDEX_BLOCK_SETTING,
263263
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE,
264+
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
264265
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
265266
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
266267
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,

server/src/main/java/org/opensearch/indices/ShardLimitValidator.java

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
* @opensearch.internal
5959
*/
6060
public class ShardLimitValidator {
61+
62+
public static final String SETTING_MAX_SHARDS_PER_CLUSTER = "cluster.routing.allocation.total_shards_limit";
6163
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE = Setting.intSetting(
6264
"cluster.max_shards_per_node",
6365
1000,
@@ -66,6 +68,14 @@ public class ShardLimitValidator {
6668
Setting.Property.NodeScope
6769
);
6870

71+
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER = Setting.intSetting(
72+
SETTING_MAX_SHARDS_PER_CLUSTER,
73+
-1,
74+
-1,
75+
Setting.Property.Dynamic,
76+
Setting.Property.NodeScope
77+
);
78+
6979
public static final Setting<Boolean> SETTING_CLUSTER_IGNORE_DOT_INDEXES = Setting.boolSetting(
7080
"cluster.ignore_dot_indexes",
7181
false,
@@ -74,13 +84,16 @@ public class ShardLimitValidator {
7484
);
7585

7686
protected final AtomicInteger shardLimitPerNode = new AtomicInteger();
87+
protected final AtomicInteger shardLimitPerCluster = new AtomicInteger();
7788
private final SystemIndices systemIndices;
7889
private volatile boolean ignoreDotIndexes;
7990

8091
public ShardLimitValidator(final Settings settings, ClusterService clusterService, SystemIndices systemIndices) {
8192
this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings));
93+
this.shardLimitPerCluster.set(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER.get(settings));
8294
this.ignoreDotIndexes = SETTING_CLUSTER_IGNORE_DOT_INDEXES.get(settings);
8395
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode);
96+
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, this::setShardLimitPerCluster);
8497
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_IGNORE_DOT_INDEXES, this::setIgnoreDotIndexes);
8598
this.systemIndices = systemIndices;
8699
}
@@ -89,6 +102,10 @@ private void setShardLimitPerNode(int newValue) {
89102
this.shardLimitPerNode.set(newValue);
90103
}
91104

105+
private void setShardLimitPerCluster(int newValue) {
106+
this.shardLimitPerCluster.set(newValue);
107+
}
108+
92109
/**
93110
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting.
94111
* @return the current value of the setting
@@ -97,6 +114,14 @@ public int getShardLimitPerNode() {
97114
return shardLimitPerNode.get();
98115
}
99116

117+
/**
118+
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER} setting.
119+
* @return the current value of the setting
120+
*/
121+
public int getShardLimitPerCluster() {
122+
return shardLimitPerCluster.get();
123+
}
124+
100125
private void setIgnoreDotIndexes(boolean newValue) {
101126
this.ignoreDotIndexes = newValue;
102127
}
@@ -211,11 +236,16 @@ private boolean isDataStreamIndex(String indexName) {
211236
* an operation. If empty, a sign that the operation is valid.
212237
*/
213238
public Optional<String> checkShardLimit(int newShards, ClusterState state) {
214-
return checkShardLimit(newShards, state, getShardLimitPerNode());
239+
return checkShardLimit(newShards, state, getShardLimitPerNode(), getShardLimitPerCluster());
215240
}
216241

217242
// package-private for testing
218-
static Optional<String> checkShardLimit(int newShards, ClusterState state, int maxShardsPerNodeSetting) {
243+
static Optional<String> checkShardLimit(
244+
int newShards,
245+
ClusterState state,
246+
int maxShardsPerNodeSetting,
247+
int maxShardsPerClusterSetting
248+
) {
219249
int nodeCount = state.getNodes().getDataNodes().size();
220250

221251
// Only enforce the shard limit if we have at least one data node, so that we don't block
@@ -224,9 +254,12 @@ static Optional<String> checkShardLimit(int newShards, ClusterState state, int m
224254
return Optional.empty();
225255
}
226256
int maxShardsPerNode = maxShardsPerNodeSetting;
227-
int maxShardsInCluster = maxShardsPerNode * nodeCount;
228-
int currentOpenShards = state.getMetadata().getTotalOpenIndexShards();
257+
int maxShardsInCluster = maxShardsPerClusterSetting;
258+
if (maxShardsInCluster == -1) {
259+
maxShardsInCluster = maxShardsPerNode * nodeCount;
260+
}
229261

262+
int currentOpenShards = state.getMetadata().getTotalOpenIndexShards();
230263
if ((currentOpenShards + newShards) > maxShardsInCluster) {
231264
String errorMessage = "this action would add ["
232265
+ newShards

server/src/test/java/org/opensearch/indices/ShardLimitValidatorTests.java

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,11 @@
5959
import static org.opensearch.cluster.metadata.MetadataIndexStateServiceTests.addClosedIndex;
6060
import static org.opensearch.cluster.metadata.MetadataIndexStateServiceTests.addOpenedIndex;
6161
import static org.opensearch.cluster.shards.ShardCounts.forDataNodeCount;
62-
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES;
63-
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
6462
import static org.mockito.Mockito.mock;
6563
import static org.mockito.Mockito.when;
64+
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES;
65+
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
66+
import static org.opensearch.indices.ShardLimitValidator.SETTING_MAX_SHARDS_PER_CLUSTER;
6667

6768
public class ShardLimitValidatorTests extends OpenSearchTestCase {
6869

@@ -75,7 +76,7 @@ public void testOverShardLimit() {
7576
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas());
7677

7778
int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
78-
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode());
79+
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), -1);
7980

8081
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
8182
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
@@ -93,6 +94,30 @@ public void testOverShardLimit() {
9394
);
9495
}
9596

97+
public void testOverShardLimitWithMaxShardCountLimit() {
98+
int nodesInCluster = randomIntBetween(1, 90);
99+
ShardCounts counts = forDataNodeCount(nodesInCluster);
100+
101+
ClusterState state = createClusterForShardLimitTest(nodesInCluster, counts.getFirstIndexShards(), counts.getFirstIndexReplicas());
102+
int shardsToAdd = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
103+
int maxShardLimitOnCluster = shardsToAdd - 1;
104+
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), maxShardLimitOnCluster);
105+
106+
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
107+
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
108+
assertTrue(errorMessage.isPresent());
109+
assertEquals(
110+
"this action would add ["
111+
+ totalShards
112+
+ "] total shards, but this cluster currently has ["
113+
+ currentShards
114+
+ "]/["
115+
+ maxShardLimitOnCluster
116+
+ "] maximum shards open",
117+
errorMessage.get()
118+
);
119+
}
120+
96121
public void testUnderShardLimit() {
97122
int nodesInCluster = randomIntBetween(2, 90);
98123
// Calculate the counts for a cluster 1 node smaller than we have to ensure we have headroom
@@ -104,7 +129,7 @@ public void testUnderShardLimit() {
104129

105130
int existingShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
106131
int shardsToAdd = randomIntBetween(1, (counts.getShardsPerNode() * nodesInCluster) - existingShards);
107-
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode());
132+
Optional<String> errorMessage = ShardLimitValidator.checkShardLimit(shardsToAdd, state, counts.getShardsPerNode(), -1);
108133

109134
assertFalse(errorMessage.isPresent());
110135
}
@@ -152,6 +177,54 @@ public void testNonSystemIndexCreationFails() {
152177
);
153178
}
154179

180+
public void testNonSystemIndexCreationFailsWithMaxShardLimitOnCluster() {
181+
final int maxShardLimitOnCluster = 1;
182+
Settings limitOnlySettings = Settings.builder()
183+
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 1)
184+
.put(SETTING_CLUSTER_IGNORE_DOT_INDEXES.getKey(), false)
185+
.put(SETTING_MAX_SHARDS_PER_CLUSTER, maxShardLimitOnCluster)
186+
.build();
187+
final ShardLimitValidator shardLimitValidator = createTestShardLimitService(limitOnlySettings);
188+
final Settings settings = Settings.builder()
189+
.put(SETTING_VERSION_CREATED, Version.CURRENT)
190+
.put(SETTING_NUMBER_OF_SHARDS, 1)
191+
.put(SETTING_NUMBER_OF_REPLICAS, 1)
192+
.build();
193+
final ClusterState state = createClusterForShardLimitTest(1, 1, 0);
194+
final ValidationException exception = expectThrows(
195+
ValidationException.class,
196+
() -> shardLimitValidator.validateShardLimit("abc", settings, state)
197+
);
198+
assertEquals(
199+
"Validation Failed: 1: this action would add ["
200+
+ 2
201+
+ "] total shards, but this cluster currently has ["
202+
+ 1
203+
+ "]/["
204+
+ maxShardLimitOnCluster
205+
+ "] maximum shards open;",
206+
exception.getMessage()
207+
);
208+
}
209+
210+
public void testNonSystemIndexCreationPassesWithMaxShardLimitOnCluster() {
211+
final int maxShardLimitOnCluster = 100;
212+
Settings limitOnlySettings = Settings.builder()
213+
.put(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), 1)
214+
.put(SETTING_CLUSTER_IGNORE_DOT_INDEXES.getKey(), false)
215+
.put(SETTING_MAX_SHARDS_PER_CLUSTER, maxShardLimitOnCluster)
216+
.build();
217+
final ShardLimitValidator shardLimitValidator = createTestShardLimitService(limitOnlySettings);
218+
final Settings settings = Settings.builder()
219+
.put(SETTING_VERSION_CREATED, Version.CURRENT)
220+
.put(SETTING_NUMBER_OF_SHARDS, 1)
221+
.put(SETTING_NUMBER_OF_REPLICAS, 1)
222+
.build();
223+
final ClusterState state = createClusterForShardLimitTest(1, 1, 0);
224+
shardLimitValidator.validateShardLimit("abc", settings, state);
225+
}
226+
227+
155228
/**
156229
* This test validates that index starting with dot creation Succeeds
157230
* when the setting cluster.ignore_dot_indexes is set to true.
@@ -489,6 +562,22 @@ public static ClusterState createClusterForShardLimitTest(
489562
);
490563
}
491564

565+
/**
566+
* Creates a {@link ShardLimitValidator} for testing with the given setting and a mocked cluster service.
567+
*
568+
* @param limitOnlySettings the setting used for creating ShardLimitValidator.
569+
* @return a test instance
570+
*/
571+
private static ShardLimitValidator createTestShardLimitService(final Settings limitOnlySettings) {
572+
// Use a mocked clusterService - for unit tests we won't be updating the setting anyway.
573+
ClusterService clusterService = mock(ClusterService.class);
574+
when(clusterService.getClusterSettings()).thenReturn(
575+
new ClusterSettings(limitOnlySettings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
576+
);
577+
578+
return new ShardLimitValidator(limitOnlySettings, clusterService, new SystemIndices(emptyMap()));
579+
}
580+
492581
/**
493582
* Creates a {@link ShardLimitValidator} for testing with the given setting and a mocked cluster service.
494583
*

0 commit comments

Comments
 (0)