|
65 | 65 | import java.util.List; |
66 | 66 | import java.util.Locale; |
67 | 67 | import java.util.Set; |
| 68 | +import java.util.stream.Collectors; |
68 | 69 |
|
69 | 70 | import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; |
70 | 71 |
|
@@ -227,34 +228,38 @@ public ShardFailedClusterStateTaskExecutor(AllocationService allocationService, |
227 | 228 | @Override |
228 | 229 | public BatchResult<ShardRoutingEntry> execute(ClusterState currentState, List<ShardRoutingEntry> tasks) throws Exception { |
229 | 230 | BatchResult.Builder<ShardRoutingEntry> batchResultBuilder = BatchResult.builder(); |
230 | | - Set<ShardRoutingEntry> nonTrivialTasks = Collections.newSetFromMap(new IdentityHashMap<>()); |
231 | | - List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>(tasks.size()); |
| 231 | + Set<ShardRoutingEntry> tasksRequiringAllocationService = Collections.newSetFromMap(new IdentityHashMap<>()); |
| 232 | + |
232 | 233 | for (ShardRoutingEntry task : tasks) { |
233 | 234 | RoutingNodes.RoutingNodeIterator routingNodeIterator = |
234 | 235 | currentState.getRoutingNodes().routingNodeIter(task.getShardRouting().currentNodeId()); |
235 | 236 | if (routingNodeIterator != null) { |
236 | 237 | for (ShardRouting maybe : routingNodeIterator) { |
237 | 238 | if (task.getShardRouting().isSameAllocation(maybe)) { |
238 | | - nonTrivialTasks.add(task); |
239 | | - failedShards.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); |
| 239 | + tasksRequiringAllocationService.add(task); |
240 | 240 | break; |
241 | 241 | } |
242 | 242 | } |
243 | 243 | } |
244 | | - if (!nonTrivialTasks.contains(task)) { |
| 244 | + if (!tasksRequiringAllocationService.contains(task)) { |
245 | 245 | // the requested shard does not exist |
246 | 246 | batchResultBuilder.success(task); |
247 | 247 | } |
248 | 248 | } |
249 | 249 | ClusterState maybeUpdatedState = currentState; |
250 | 250 | try { |
| 251 | + List<FailedRerouteAllocation.FailedShard> failedShards = |
| 252 | + tasksRequiringAllocationService |
| 253 | + .stream() |
| 254 | + .map(task -> new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)) |
| 255 | + .collect(Collectors.toList()); |
251 | 256 | RoutingAllocation.Result result = allocationService.applyFailedShards(currentState, failedShards); |
252 | 257 | if (result.changed()) { |
253 | 258 | maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); |
254 | 259 | } |
255 | | - batchResultBuilder.successes(nonTrivialTasks); |
| 260 | + batchResultBuilder.successes(tasksRequiringAllocationService); |
256 | 261 | } catch (Throwable t) { |
257 | | - batchResultBuilder.failures(nonTrivialTasks, t); |
| 262 | + batchResultBuilder.failures(tasksRequiringAllocationService, t); |
258 | 263 | } |
259 | 264 | return batchResultBuilder.build(maybeUpdatedState); |
260 | 265 | } |
|
0 commit comments