Skip to content

Commit ff6c5ed

Browse files
authored
ILM add cluster update timeout on step retry (#54878)
This commits adds a timeout when moving ILM back on to a failed step. In case the master is struggling with processing the cluster update requests these ones will expire (as we'll send them again anyway on the next ILM loop run) ILM more descriptive source messages for cluster updates Use the configured ILM step master timeout setting
1 parent d67df3a commit ff6c5ed

2 files changed

Lines changed: 62 additions & 34 deletions

File tree

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 48 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.xpack.ilm.history.ILMHistoryItem;
3333
import org.elasticsearch.xpack.ilm.history.ILMHistoryStore;
3434

35+
import java.util.Locale;
3536
import java.util.function.LongSupplier;
3637

3738
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_ORIGINATION_DATE;
@@ -204,33 +205,45 @@ void onErrorMaybeRetryFailedStep(String policy, IndexMetadata indexMetadata) {
204205
int currentRetryAttempt = lifecycleState.getFailedStepRetryCount() == null ? 1 : 1 + lifecycleState.getFailedStepRetryCount();
205206
logger.info("policy [{}] for index [{}] on an error step due to a transitive error, moving back to the failed " +
206207
"step [{}] for execution. retry attempt [{}]", policy, index, lifecycleState.getFailedStep(), currentRetryAttempt);
207-
clusterService.submitStateUpdateTask("ilm-retry-failed-step", new ClusterStateUpdateTask() {
208-
@Override
209-
public ClusterState execute(ClusterState currentState) {
210-
return IndexLifecycleTransition.moveClusterStateToPreviouslyFailedStep(currentState, index,
211-
nowSupplier, stepRegistry, true);
212-
}
208+
clusterService.submitStateUpdateTask(
209+
String.format(Locale.ROOT, "ilm-retry-failed-step {policy [%s], index [%s], failedStep [%s]}", policy, index,
210+
failedStep.getKey()),
211+
new ClusterStateUpdateTask() {
213212

214-
@Override
215-
public void onFailure(String source, Exception e) {
216-
logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed",
217-
failedStep.getKey().getName(), index), e);
218-
}
213+
@Override
214+
public TimeValue timeout() {
215+
// we can afford to drop these requests if they timeout as on the next {@link
216+
// IndexLifecycleRunner#runPeriodicStep} run the policy will still be in the ERROR step, as we haven't been able
217+
// to move it back into the failed step, so we'll try again
218+
return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterService.state().metadata().settings());
219+
}
219220

220-
@Override
221-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
222-
if (oldState.equals(newState) == false) {
223-
IndexMetadata newIndexMeta = newState.metadata().index(index);
224-
Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta);
225-
StepKey stepKey = indexMetaCurrentStep.getKey();
226-
if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) {
227-
logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " +
228-
"retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey);
229-
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
221+
@Override
222+
public ClusterState execute(ClusterState currentState) {
223+
return IndexLifecycleTransition.moveClusterStateToPreviouslyFailedStep(currentState, index,
224+
nowSupplier, stepRegistry, true);
225+
}
226+
227+
@Override
228+
public void onFailure(String source, Exception e) {
229+
logger.error(new ParameterizedMessage("retry execution of step [{}] for index [{}] failed",
230+
failedStep.getKey().getName(), index), e);
231+
}
232+
233+
@Override
234+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
235+
if (oldState.equals(newState) == false) {
236+
IndexMetadata newIndexMeta = newState.metadata().index(index);
237+
Step indexMetaCurrentStep = getCurrentStep(stepRegistry, policy, newIndexMeta);
238+
StepKey stepKey = indexMetaCurrentStep.getKey();
239+
if (stepKey != null && stepKey != TerminalPolicyStep.KEY && newIndexMeta != null) {
240+
logger.trace("policy [{}] for index [{}] was moved back on the failed step for as part of an automatic " +
241+
"retry. Attempting to execute the failed step [{}] if it's an async action", policy, index, stepKey);
242+
maybeRunAsyncAction(newState, newIndexMeta, policy, stepKey);
243+
}
230244
}
231245
}
232-
}
233-
});
246+
});
234247
} else {
235248
logger.debug("policy [{}] for index [{}] on an error step after a terminal error, skipping execution", policy, index);
236249
}
@@ -338,7 +351,7 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
338351
}
339352
} else if (currentStep instanceof ClusterStateActionStep || currentStep instanceof ClusterStateWaitStep) {
340353
logger.debug("[{}] running policy with current-step [{}]", indexMetadata.getIndex().getName(), currentStep.getKey());
341-
clusterService.submitStateUpdateTask("ilm-execute-cluster-state-steps",
354+
clusterService.submitStateUpdateTask(String.format(Locale.ROOT, "ilm-execute-cluster-state-steps [%s]", currentStep),
342355
new ExecuteStepsUpdateTask(policy, indexMetadata.getIndex(), currentStep, stepRegistry, this, nowSupplier));
343356
} else {
344357
logger.trace("[{}] ignoring step execution from cluster state change event [{}]", index, currentStep.getKey());
@@ -351,7 +364,9 @@ void runPolicyAfterStateChange(String policy, IndexMetadata indexMetadata) {
351364
*/
352365
private void moveToStep(Index index, String policy, Step.StepKey currentStepKey, Step.StepKey newStepKey) {
353366
logger.debug("[{}] moving to step [{}] {} -> {}", index.getName(), policy, currentStepKey, newStepKey);
354-
clusterService.submitStateUpdateTask("ilm-move-to-step",
367+
clusterService.submitStateUpdateTask(
368+
String.format(Locale.ROOT, "ilm-move-to-step {policy [%s], index [%s], currentStep [%s], nextStep [%s]}", policy,
369+
index.getName(), currentStepKey, newStepKey),
355370
new MoveToNextStepUpdateTask(index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, clusterState ->
356371
{
357372
IndexMetadata indexMetadata = clusterState.metadata().index(index);
@@ -368,7 +383,9 @@ private void moveToStep(Index index, String policy, Step.StepKey currentStepKey,
368383
private void moveToErrorStep(Index index, String policy, Step.StepKey currentStepKey, Exception e) {
369384
logger.error(new ParameterizedMessage("policy [{}] for index [{}] failed on step [{}]. Moving to ERROR step",
370385
policy, index.getName(), currentStepKey), e);
371-
clusterService.submitStateUpdateTask("ilm-move-to-error-step",
386+
clusterService.submitStateUpdateTask(
387+
String.format(Locale.ROOT, "ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(),
388+
currentStepKey),
372389
new MoveToErrorStepUpdateTask(index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, clusterState -> {
373390
IndexMetadata indexMetadata = clusterState.metadata().index(index);
374391
registerFailedOperation(indexMetadata, e);
@@ -379,8 +396,11 @@ private void moveToErrorStep(Index index, String policy, Step.StepKey currentSte
379396
* Set step info for the given index inside of its {@link LifecycleExecutionState} without
380397
* changing other execution state.
381398
*/
382-
private void setStepInfo(Index index, String policy, Step.StepKey currentStepKey, ToXContentObject stepInfo) {
383-
clusterService.submitStateUpdateTask("ilm-set-step-info", new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
399+
private void setStepInfo(Index index, String policy, @Nullable Step.StepKey currentStepKey, ToXContentObject stepInfo) {
400+
clusterService.submitStateUpdateTask(
401+
String.format(Locale.ROOT, "ilm-set-step-info {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(),
402+
currentStepKey),
403+
new SetStepInfoUpdateTask(index, policy, currentStepKey, stepInfo));
384404
}
385405

386406
/**

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -622,8 +622,11 @@ public void testRunPolicyClusterStateActionStep() {
622622

623623
runner.runPolicyAfterStateChange(policyName, indexMetadata);
624624

625-
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"),
626-
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step)));
625+
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
626+
Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," +
627+
"\"name\":\"cluster_state_action_step\"} => null]"),
628+
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))
629+
);
627630
Mockito.verifyNoMoreInteractions(clusterService);
628631
}
629632

@@ -640,8 +643,11 @@ public void testRunPolicyClusterStateWaitStep() {
640643

641644
runner.runPolicyAfterStateChange(policyName, indexMetadata);
642645

643-
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-execute-cluster-state-steps"),
644-
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step)));
646+
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
647+
Mockito.eq("ilm-execute-cluster-state-steps [{\"phase\":\"phase\",\"action\":\"action\"," +
648+
"\"name\":\"cluster_state_action_step\"} => null]"),
649+
Mockito.argThat(new ExecuteStepsUpdateTaskMatcher(indexMetadata.getIndex(), policyName, step))
650+
);
645651
Mockito.verifyNoMoreInteractions(clusterService);
646652
}
647653

@@ -690,15 +696,17 @@ public void testRunPolicyThatDoesntExist() {
690696
.numberOfShards(randomIntBetween(1, 5)).numberOfReplicas(randomIntBetween(0, 5)).build();
691697
// verify that no exception is thrown
692698
runner.runPolicyAfterStateChange(policyName, indexMetadata);
693-
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(Mockito.matches("ilm-set-step-info"),
699+
Mockito.verify(clusterService, Mockito.times(1)).submitStateUpdateTask(
700+
Mockito.eq("ilm-set-step-info {policy [cluster_state_action_policy], index [my_index], currentStep [null]}"),
694701
Mockito.argThat(new SetStepInfoUpdateTaskMatcher(indexMetadata.getIndex(), policyName, null,
695702
(builder, params) -> {
696703
builder.startObject();
697704
builder.field("reason", "policy [does_not_exist] does not exist");
698705
builder.field("type", "illegal_argument_exception");
699706
builder.endObject();
700707
return builder;
701-
})));
708+
}))
709+
);
702710
Mockito.verifyNoMoreInteractions(clusterService);
703711
}
704712

0 commit comments

Comments
 (0)