|
17 | 17 | import org.elasticsearch.cluster.AckedClusterStateUpdateTask; |
18 | 18 | import org.elasticsearch.cluster.ClusterState; |
19 | 19 | import org.elasticsearch.cluster.ClusterStateTaskExecutor; |
20 | | -import org.elasticsearch.cluster.ClusterStateTaskExecutor.ClusterTasksResult; |
21 | 20 | import org.elasticsearch.cluster.block.ClusterBlock; |
22 | 21 | import org.elasticsearch.cluster.block.ClusterBlocks; |
23 | 22 | import org.elasticsearch.cluster.routing.RoutingTable; |
@@ -72,23 +71,13 @@ public MetadataUpdateSettingsService( |
72 | 71 | this.indicesService = indicesService; |
73 | 72 | this.shardLimitValidator = shardLimitValidator; |
74 | 73 | this.threadPool = threadPool; |
75 | | - this.executor = (currentState, tasks) -> { |
76 | | - ClusterTasksResult.Builder<AckedClusterStateUpdateTask> builder = ClusterTasksResult.builder(); |
77 | | - ClusterState state = currentState; |
78 | | - for (AckedClusterStateUpdateTask task : tasks) { |
79 | | - try { |
80 | | - state = task.execute(state); |
81 | | - builder.success(task); |
82 | | - } catch (Exception e) { |
83 | | - builder.failure(task, e); |
84 | | - } |
85 | | - } |
86 | | - if (state != currentState) { |
| 74 | + this.executor = ClusterStateTaskExecutor.simpleBatched((originalState, updatedState) -> { |
| 75 | + if (updatedState != originalState) { |
87 | 76 | // reroute in case things change that require it (like number of replicas) |
88 | | - state = allocationService.reroute(state, "settings update"); |
| 77 | + updatedState = allocationService.reroute(updatedState, "settings update"); |
89 | 78 | } |
90 | | - return builder.build(state); |
91 | | - }; |
| 79 | + return updatedState; |
| 80 | + }); |
92 | 81 | } |
93 | 82 |
|
94 | 83 | public void updateSettings(final UpdateSettingsClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) { |
|
0 commit comments