Skip to content

Commit ee83da0

Browse files
committed
And a callback-accepting simple batching executor
1 parent 1b91b8d commit ee83da0

2 files changed

Lines changed: 17 additions & 20 deletions

File tree

server/src/main/java/org/elasticsearch/cluster/ClusterStateTaskExecutor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.IdentityHashMap;
1414
import java.util.List;
1515
import java.util.Map;
16+
import java.util.function.BiFunction;
1617

1718
public interface ClusterStateTaskExecutor<T extends ClusterStateTaskListener> {
1819
/**
@@ -151,18 +152,25 @@ public String describeTasks(List<T> tasks) {
151152
}
152153

153154
static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> simpleBatched() {
154-
return (currentState, tasks) -> {
155+
return simpleBatched((originalState, updatedState) -> updatedState);
156+
}
157+
158+
static <T extends ClusterStateUpdateTask> ClusterStateTaskExecutor<T> simpleBatched(
159+
BiFunction<ClusterState, ClusterState, ClusterState> clusterStateCallback
160+
) {
161+
return (originalState, tasks) -> {
155162
ClusterTasksResult.Builder<T> builder = ClusterTasksResult.builder();
156-
ClusterState state = currentState;
163+
ClusterState updatedState = originalState;
157164
for (T task : tasks) {
158165
try {
159-
state = task.execute(state);
166+
updatedState = task.execute(updatedState);
160167
builder.success(task);
161168
} catch (Exception e) {
162169
builder.failure(task, e);
163170
}
164171
}
165-
return builder.build(state);
172+
updatedState = clusterStateCallback.apply(originalState, updatedState);
173+
return builder.build(updatedState);
166174
};
167175
}
168176
}

server/src/main/java/org/elasticsearch/cluster/metadata/MetadataUpdateSettingsService.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
1818
import org.elasticsearch.cluster.ClusterState;
1919
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
20-
import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult;
2120
import org.elasticsearch.cluster.block.ClusterBlock;
2221
import org.elasticsearch.cluster.block.ClusterBlocks;
2322
import org.elasticsearch.cluster.routing.RoutingTable;
@@ -72,23 +71,13 @@ public MetadataUpdateSettingsService(
7271
this.indicesService = indicesService;
7372
this.shardLimitValidator = shardLimitValidator;
7473
this.threadPool = threadPool;
75-
this.executor = (currentState, tasks) -> {
76-
ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder();
77-
ClusterState state = currentState;
78-
for (AckedClusterStateUpdateTask task : tasks) {
79-
try {
80-
state = task.execute(state);
81-
builder.success(task);
82-
} catch (Exception e) {
83-
builder.failure(task, e);
84-
}
85-
}
86-
if (state != currentState) {
74+
this.executor = ClusterStateTaskExecutor.simpleBatched((originalState, updatedState) -> {
75+
if (updatedState != originalState) {
8776
// reroute in case things change that require it (like number of replicas)
88-
state = allocationService.reroute(state, "settings update");
77+
updatedState = allocationService.reroute(updatedState, "settings update");
8978
}
90-
return builder.build(state);
91-
};
79+
return updatedState;
80+
});
9281
}
9382

9483
public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {

0 commit comments

Comments
 (0)