Skip to content

Commit d67df3a

Browse files
authored
ILM use Priority.IMMEDIATE for stop ILM cluster update (elastic#54909)
This changes the priority of the cluster state update that stops ILM altogether to `IMMEDIATE`. We've chosen to change this as it can be useful to temporarily stop ILM if a cluster is overwhelmed, but a `NORMAL` priority can see the "stop ILM update" not make it up the tasks queue. On the same note, we're keeping the `start ILM` cluster update priority to `NORMAL` on purpose such that we only start `ILM` if the cluster can handle it.
1 parent c7dc033 commit d67df3a

5 files changed

Lines changed: 152 additions & 8 deletions

File tree

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.LocalNodeMasterListener;
1919
import org.elasticsearch.cluster.metadata.IndexMetadata;
2020
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.Priority;
2122
import org.elasticsearch.common.Strings;
2223
import org.elasticsearch.common.component.Lifecycle.State;
2324
import org.elasticsearch.common.settings.Settings;
@@ -363,7 +364,13 @@ assert isClusterServiceStoppedOrClosed() : "close is called by closing the plugi
363364
}
364365

365366
public void submitOperationModeUpdate(OperationMode mode) {
366-
clusterService.submitStateUpdateTask("ilm_operation_mode_update", OperationModeUpdateTask.ilmMode(mode));
367+
OperationModeUpdateTask ilmOperationModeUpdateTask;
368+
if (mode == OperationMode.STOPPING || mode == OperationMode.STOPPED) {
369+
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.IMMEDIATE, mode);
370+
} else {
371+
ilmOperationModeUpdateTask = OperationModeUpdateTask.ilmMode(Priority.NORMAL, mode);
372+
}
373+
clusterService.submitStateUpdateTask("ilm_operation_mode_update {OperationMode " + mode.name() + "}", ilmOperationModeUpdateTask);
367374
}
368375

369376
/**

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/OperationModeUpdateTask.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,28 +11,40 @@
1111
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1212
import org.elasticsearch.cluster.metadata.Metadata;
1313
import org.elasticsearch.common.Nullable;
14-
import org.elasticsearch.xpack.core.ilm.OperationMode;
14+
import org.elasticsearch.common.Priority;
1515
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
16+
import org.elasticsearch.xpack.core.ilm.OperationMode;
1617
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
1718

19+
/**
20+
* This task updates the operation mode state for ILM.
21+
*
22+
* As stopping ILM proved to be an action we want to sometimes take in order to allow clusters to stabilise when under heavy load this
23+
* task might run at {@link Priority#IMMEDIATE} priority so please make sure to keep this task as lightweight as possible.
24+
*/
1825
public class OperationModeUpdateTask extends ClusterStateUpdateTask {
1926
private static final Logger logger = LogManager.getLogger(OperationModeUpdateTask.class);
2027
@Nullable
2128
private final OperationMode ilmMode;
2229
@Nullable
2330
private final OperationMode slmMode;
2431

25-
private OperationModeUpdateTask(OperationMode ilmMode, OperationMode slmMode) {
32+
private OperationModeUpdateTask(Priority priority, OperationMode ilmMode, OperationMode slmMode) {
33+
super(priority);
2634
this.ilmMode = ilmMode;
2735
this.slmMode = slmMode;
2836
}
2937

3038
public static OperationModeUpdateTask ilmMode(OperationMode mode) {
31-
return new OperationModeUpdateTask(mode, null);
39+
return ilmMode(Priority.NORMAL, mode);
40+
}
41+
42+
public static OperationModeUpdateTask ilmMode(Priority priority, OperationMode mode) {
43+
return new OperationModeUpdateTask(priority, mode, null);
3244
}
3345

3446
public static OperationModeUpdateTask slmMode(OperationMode mode) {
35-
return new OperationModeUpdateTask(null, mode);
47+
return new OperationModeUpdateTask(Priority.NORMAL, null, mode);
3648
}
3749

3850
OperationMode getILMOperationMode() {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/TransportStopILMAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.cluster.block.ClusterBlockLevel;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.service.ClusterService;
19+
import org.elasticsearch.common.Priority;
1920
import org.elasticsearch.common.inject.Inject;
2021
import org.elasticsearch.common.io.stream.StreamInput;
2122
import org.elasticsearch.tasks.Task;
@@ -50,7 +51,8 @@ protected AcknowledgedResponse read(StreamInput in) throws IOException {
5051
@Override
5152
protected void masterOperation(Task task, StopILMRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
5253
clusterService.submitStateUpdateTask("ilm_operation_mode_update",
53-
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
54+
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.IMMEDIATE, request, listener) {
55+
5456
@Override
5557
public ClusterState execute(ClusterState currentState) {
5658
return (OperationModeUpdateTask.ilmMode(OperationMode.STOPPING)).execute(currentState);

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNode;
1919
import org.elasticsearch.cluster.node.DiscoveryNodes;
2020
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.Priority;
2122
import org.elasticsearch.common.collect.ImmutableOpenMap;
2223
import org.elasticsearch.common.component.Lifecycle.State;
2324
import org.elasticsearch.common.settings.ClusterSettings;
@@ -40,8 +41,10 @@
4041
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
4142
import org.elasticsearch.xpack.core.ilm.Step;
4243
import org.elasticsearch.xpack.core.scheduler.SchedulerEngine;
44+
import org.hamcrest.Description;
4345
import org.junit.After;
4446
import org.junit.Before;
47+
import org.mockito.ArgumentMatcher;
4548
import org.mockito.Mockito;
4649

4750
import java.time.Clock;
@@ -62,9 +65,11 @@
6265
import static org.hamcrest.Matchers.equalTo;
6366
import static org.mockito.Matchers.any;
6467
import static org.mockito.Matchers.anyString;
68+
import static org.mockito.Matchers.argThat;
6569
import static org.mockito.Matchers.eq;
6670
import static org.mockito.Mockito.doAnswer;
6771
import static org.mockito.Mockito.mock;
72+
import static org.mockito.Mockito.verify;
6873
import static org.mockito.Mockito.when;
6974

7075
public class IndexLifecycleServiceTests extends ESTestCase {
@@ -242,7 +247,8 @@ private void verifyCanStopWithStep(String stoppableStep) {
242247
doAnswer(invocationOnMock -> {
243248
changedOperationMode.set(true);
244249
return null;
245-
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
250+
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
251+
any(OperationModeUpdateTask.class));
246252
indexLifecycleService.applyClusterState(event);
247253
indexLifecycleService.triggerPolicies(currentState, true);
248254
assertTrue(changedOperationMode.get());
@@ -294,7 +300,8 @@ public void testRequestedStopOnSafeAction() {
294300
assertThat(task.getILMOperationMode(), equalTo(OperationMode.STOPPED));
295301
moveToMaintenance.set(true);
296302
return null;
297-
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update"), any(OperationModeUpdateTask.class));
303+
}).when(clusterService).submitStateUpdateTask(eq("ilm_operation_mode_update {OperationMode STOPPED}"),
304+
any(OperationModeUpdateTask.class));
298305

299306
indexLifecycleService.applyClusterState(event);
300307
indexLifecycleService.triggerPolicies(currentState, randomBoolean());
@@ -310,6 +317,40 @@ public void testExceptionStillProcessesOtherIndicesOnMaster() {
310317
doTestExceptionStillProcessesOtherIndices(true);
311318
}
312319

320+
public void testOperationModeUpdateTaskPriority() {
321+
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPING);
322+
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPING, Priority.IMMEDIATE);
323+
indexLifecycleService.submitOperationModeUpdate(OperationMode.STOPPED);
324+
verifyOperationModeUpdateTaskPriority(OperationMode.STOPPED, Priority.IMMEDIATE);
325+
indexLifecycleService.submitOperationModeUpdate(OperationMode.RUNNING);
326+
verifyOperationModeUpdateTaskPriority(OperationMode.RUNNING, Priority.NORMAL);
327+
}
328+
329+
private void verifyOperationModeUpdateTaskPriority(OperationMode mode, Priority expectedPriority) {
330+
verify(clusterService).submitStateUpdateTask(
331+
Mockito.eq("ilm_operation_mode_update {OperationMode " + mode.name() +"}"),
332+
argThat(new ArgumentMatcher<OperationModeUpdateTask>() {
333+
334+
Priority actualPriority = null;
335+
336+
@Override
337+
public boolean matches(Object argument) {
338+
if (argument instanceof OperationModeUpdateTask == false) {
339+
return false;
340+
}
341+
actualPriority = ((OperationModeUpdateTask) argument).priority();
342+
return actualPriority == expectedPriority;
343+
}
344+
345+
@Override
346+
public void describeTo(Description description) {
347+
description.appendText("the cluster state update task priority must be "+ expectedPriority+" but got: ")
348+
.appendText(actualPriority.name());
349+
}
350+
})
351+
);
352+
}
353+
313354
@SuppressWarnings("unchecked")
314355
public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
315356
String policy1 = randomAlphaOfLengthBetween(1, 20);
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ilm.action;
8+
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.support.ActionFilters;
11+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
12+
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
15+
import org.elasticsearch.cluster.service.ClusterService;
16+
import org.elasticsearch.common.Priority;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.tasks.TaskId;
19+
import org.elasticsearch.test.ESTestCase;
20+
import org.elasticsearch.threadpool.ThreadPool;
21+
import org.elasticsearch.transport.TransportService;
22+
import org.elasticsearch.xpack.core.ilm.StopILMRequest;
23+
import org.elasticsearch.xpack.core.ilm.action.StopILMAction;
24+
import org.hamcrest.Description;
25+
import org.mockito.ArgumentMatcher;
26+
27+
import static java.util.Collections.emptyMap;
28+
import static org.mockito.Matchers.argThat;
29+
import static org.mockito.Matchers.eq;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.verify;
32+
33+
public class TransportStopILMActionTests extends ESTestCase {
34+
35+
private static final ActionListener<AcknowledgedResponse> EMPTY_LISTENER = new ActionListener<>() {
36+
@Override
37+
public void onResponse(AcknowledgedResponse response) {
38+
39+
}
40+
41+
@Override
42+
public void onFailure(Exception e) {
43+
44+
}
45+
};
46+
47+
@SuppressWarnings("unchecked")
48+
public void testStopILMClusterStatePriorityIsImmediate() {
49+
ClusterService clusterService = mock(ClusterService.class);
50+
51+
TransportStopILMAction transportStopILMAction = new TransportStopILMAction(mock(TransportService.class),
52+
clusterService, mock(ThreadPool.class), mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
53+
Task task = new Task(randomLong(), "transport", StopILMAction.NAME, "description",
54+
new TaskId(randomLong() + ":" + randomLong()), emptyMap());
55+
StopILMRequest request = new StopILMRequest();
56+
transportStopILMAction.masterOperation(task, request, ClusterState.EMPTY_STATE, EMPTY_LISTENER);
57+
58+
verify(clusterService).submitStateUpdateTask(
59+
eq("ilm_operation_mode_update"),
60+
argThat(new ArgumentMatcher<AckedClusterStateUpdateTask<AcknowledgedResponse>>() {
61+
62+
Priority actualPriority = null;
63+
64+
@Override
65+
public boolean matches(Object argument) {
66+
if (argument instanceof AckedClusterStateUpdateTask == false) {
67+
return false;
68+
}
69+
actualPriority = ((AckedClusterStateUpdateTask<AcknowledgedResponse>) argument).priority();
70+
return actualPriority == Priority.IMMEDIATE;
71+
}
72+
73+
@Override
74+
public void describeTo(Description description) {
75+
description.appendText("the cluster state update task priority must be URGENT but got: ")
76+
.appendText(actualPriority.name());
77+
}
78+
})
79+
);
80+
}
81+
82+
}

0 commit comments

Comments
 (0)