Skip to content

Commit fabcb30

Browse files
committed
Factory methods & toList() & simplify
1 parent 7e5f1e7 commit fabcb30

6 files changed

Lines changed: 25 additions & 35 deletions

File tree

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import java.util.function.Function;
5757
import java.util.function.LongSupplier;
5858
import java.util.function.Supplier;
59-
import java.util.stream.Collectors;
6059

6160
import static org.elasticsearch.monitor.StatusInfo.Status.UNHEALTHY;
6261

@@ -369,10 +368,7 @@ default void close(Mode newMode) {}
369368
class LeaderJoinAccumulator implements JoinAccumulator {
370369
@Override
371370
public void handleJoinRequest(DiscoveryNode sender, ActionListener<Void> joinListener) {
372-
final JoinTask task = new JoinTask(
373-
List.of(new JoinTask.NodeJoinTask(sender, joinReasonService.getJoinReason(sender, Mode.LEADER), joinListener)),
374-
false
375-
);
371+
final JoinTask task = JoinTask.singleNode(sender, joinReasonService.getJoinReason(sender, Mode.LEADER), joinListener);
376372
assert joinTaskExecutor != null;
377373
masterService.submitStateUpdateTask("node-join", task, ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor);
378374
}
@@ -427,15 +423,15 @@ public void close(Mode newMode) {
427423
assert closed == false : "CandidateJoinAccumulator closed";
428424
closed = true;
429425
if (newMode == Mode.LEADER) {
430-
final JoinTask joinTask = new JoinTask(joinRequestAccumulator.entrySet().stream().map(entry -> {
426+
final JoinTask joinTask = JoinTask.completingElection(joinRequestAccumulator.entrySet().stream().map(entry -> {
431427
final DiscoveryNode discoveryNode = entry.getKey();
432428
final ActionListener<Void> listener = entry.getValue();
433429
return new JoinTask.NodeJoinTask(
434430
discoveryNode,
435431
joinReasonService.getJoinReason(discoveryNode, Mode.CANDIDATE),
436432
listener
437433
);
438-
}).collect(Collectors.toList()), true);
434+
}));
439435

440436
joinTaskExecutor = joinTaskExecutorGenerator.get();
441437
masterService.submitStateUpdateTask(

server/src/main/java/org/elasticsearch/cluster/coordination/JoinTask.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,18 @@
1616
import java.util.Collections;
1717
import java.util.List;
1818
import java.util.Objects;
19+
import java.util.stream.Stream;
1920

2021
public record JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster) implements ClusterStateTaskListener {
2122

23+
public static JoinTask singleNode(DiscoveryNode node, String reason, ActionListener<Void> listener) {
24+
return new JoinTask(List.of(new NodeJoinTask(node, reason, listener)), false);
25+
}
26+
27+
public static JoinTask completingElection(Stream<NodeJoinTask> nodeJoinTaskStream) {
28+
return new JoinTask(nodeJoinTaskStream.toList(), true);
29+
}
30+
2231
public JoinTask(List<NodeJoinTask> nodeJoinTasks, boolean isBecomingMaster) {
2332
this.nodeJoinTasks = Collections.unmodifiableList(nodeJoinTasks);
2433
this.isBecomingMaster = isBecomingMaster;

server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -175,15 +175,10 @@ public void testUpdatesNodeWithNewRoles() throws Exception {
175175
final ClusterStateTaskExecutor.ClusterTasksResult<JoinTask> result = joinTaskExecutor.execute(
176176
clusterState,
177177
List.of(
178-
new JoinTask(
179-
List.of(
180-
new JoinTask.NodeJoinTask(
181-
actualNode,
182-
"test",
183-
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
184-
)
185-
),
186-
false
178+
JoinTask.singleNode(
179+
actualNode,
180+
"test",
181+
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
187182
)
188183
)
189184
);

server/src/test/java/org/elasticsearch/cluster/metadata/AutoExpandReplicasTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public void testOnlyAutoExpandAllocationFilteringAfterAllNodesUpgraded() {
248248
// is the
249249
// master
250250

251-
state = cluster.addNodes(state, Collections.singletonList(newNode));
251+
state = cluster.addNode(state, newNode);
252252

253253
// use allocation filtering
254254
state = cluster.updateSettings(

server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@
103103
import java.util.Map;
104104
import java.util.Set;
105105
import java.util.function.Function;
106-
import java.util.stream.Collectors;
107106

108107
import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
109108
import static java.util.stream.Collectors.toMap;
@@ -350,22 +349,15 @@ public ClusterState reroute(ClusterState state, ClusterRerouteRequest request) {
350349
return execute(transportClusterRerouteAction, request, state);
351350
}
352351

353-
public ClusterState addNodes(ClusterState clusterState, List<DiscoveryNode> nodes) {
352+
public ClusterState addNode(ClusterState clusterState, DiscoveryNode discoveryNode) {
354353
return runTasks(
355354
joinTaskExecutor,
356355
clusterState,
357356
List.of(
358-
new JoinTask(
359-
nodes.stream()
360-
.map(
361-
node -> new JoinTask.NodeJoinTask(
362-
node,
363-
"dummy reason",
364-
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
365-
)
366-
)
367-
.collect(Collectors.toList()),
368-
false
357+
JoinTask.singleNode(
358+
discoveryNode,
359+
"dummy reason",
360+
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
369361
)
370362
)
371363
);
@@ -376,7 +368,7 @@ public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<Dis
376368
joinTaskExecutor,
377369
clusterState,
378370
List.of(
379-
new JoinTask(
371+
JoinTask.completingElection(
380372
nodes.stream()
381373
.map(
382374
node -> new JoinTask.NodeJoinTask(
@@ -385,8 +377,6 @@ public ClusterState joinNodesAndBecomeMaster(ClusterState clusterState, List<Dis
385377
ActionListener.wrap(() -> { throw new AssertionError("should not complete publication"); })
386378
)
387379
)
388-
.collect(Collectors.toList()),
389-
true
390380
)
391381
)
392382
);

server/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ public ClusterState randomlyUpdateClusterState(
463463
if (randomBoolean()) {
464464
// add node
465465
if (state.nodes().getSize() < 10) {
466-
state = cluster.addNodes(state, Collections.singletonList(createNode()));
466+
state = cluster.addNode(state, createNode());
467467
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
468468
}
469469
} else {
@@ -476,7 +476,7 @@ public ClusterState randomlyUpdateClusterState(
476476
}
477477
if (randomBoolean()) {
478478
// and add it back
479-
state = cluster.addNodes(state, Collections.singletonList(discoveryNode));
479+
state = cluster.addNode(state, discoveryNode);
480480
updateNodes(state, clusterStateServiceMap, indicesServiceSupplier);
481481
}
482482
}

0 commit comments

Comments
 (0)