Return result from cluster state task execution#83562
Conversation
12fa733 to
3e483b9
Compare
The `MasterService` executes batches of tasks which compute changes to the `ClusterState`. After executing each batch the `MasterService` publishes the updated cluster state and notifies every task in the batch when the publication completes. Many tasks compute some kind of result during their execution which needs to be made available to the publication completion handler for subsequent activities. Today there's no good general way to pass anything to the completion handler other than the fact that the publication succeeded. Some tasks work around this by storing their result in the `ClusterState` itself. Others use the executor to capture the result and pass it through. Neither solution works well with batching: later tasks in a batch may overwrite the part of the `ClusterState` containing the results of earlier tasks, and batching executors are re-used across batches. This commit adjusts the `ClusterStateTaskExecutor` interface so that now implementations must supply a listener for each task they successfully execute. The `MasterService` collects the listeners for the batch and notifies them all when publication completes. This gives the executor control over the completion handler of each task which lets it pass in any extra data needed.
3e483b9 to
e281ed5
Compare
|
Pinging @elastic/es-distributed (Team:Distributed) |
|
|
||
| public boolean isSuccess() { | ||
| return this == SUCCESS; | ||
| assert failure == null ^ taskListener == null; |
There was a problem hiding this comment.
Should this assertion be moved to constructor?
public TaskResult {
assert failure == null ^ taskListener == null;
}
| public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception { | ||
| final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes()); | ||
| boolean removed = false; | ||
| final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.builder(); |
There was a problem hiding this comment.
Nit: I would simplify to:
| final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.builder(); | |
| var resultBuilder = ClusterTasksResult.<Task>builder(); |
|
@DaveCTurner can you give me a little time here? I'd like to go over this one was well. I should be able to get to it this afternoon. |
|
I'd rather not actually rewrite any of the executors in this PR to keep it self-contained, but Ievgen and I found it helpful to consider how one might rewrite them using this new API. For instance: diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java
index 4c8b12bb7c7..595e056b8e9 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java
@@ -12,8 +12,9 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.desirednodes.DesiredNodesSettingsValidator;
@@ -28,13 +29,14 @@ import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
+import java.util.List;
import java.util.Locale;
import static java.lang.String.format;
public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction<UpdateDesiredNodesRequest, UpdateDesiredNodesResponse> {
private final DesiredNodesSettingsValidator settingsValidator;
- private final ClusterStateTaskExecutor<ClusterStateUpdateTask> taskExecutor;
+ private final ClusterStateTaskExecutor<UpdateDesiredNodesTask> taskExecutor;
@Inject
public TransportUpdateDesiredNodesAction(
@@ -58,7 +60,7 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
ThreadPool.Names.SAME
);
this.settingsValidator = settingsValidator;
- this.taskExecutor = new DesiredNodesClusterStateTaskExecutor();
+ this.taskExecutor = new UpdateDesiredNodesTaskExecutor();
}
@Override
@@ -79,29 +81,8 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
clusterService.submitStateUpdateTask(
"update-desired-nodes",
- new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
- volatile boolean replacedExistingHistoryId = false;
-
- @Override
- public ClusterState execute(ClusterState currentState) {
- final ClusterState updatedState = updateDesiredNodes(currentState, request);
- final DesiredNodes previousDesiredNodes = DesiredNodesMetadata.latestFromClusterState(currentState);
- final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(updatedState);
- replacedExistingHistoryId = previousDesiredNodes != null
- && previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false;
- return updatedState;
- }
-
- @Override
- public void onFailure(Exception e) {
- listener.onFailure(e);
- }
-
- @Override
- public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
- listener.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId));
- }
- },
+ new UpdateDesiredNodesTask(request, listener),
+ ClusterStateTaskConfig.build(Priority.URGENT),
taskExecutor
);
} catch (Exception e) {
@@ -109,6 +90,54 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
}
}
+ record UpdateDesiredNodesTask(UpdateDesiredNodesRequest request, ActionListener<UpdateDesiredNodesResponse> listener)
+ implements
+ ClusterStateTaskListener {
+
+ @Override
+ public void onFailure(Exception e) {
+ listener.onFailure(e);
+ }
+
+ @Override
+ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+ assert false : "not called";
+ }
+ }
+
+ static class UpdateDesiredNodesTaskExecutor implements ClusterStateTaskExecutor<UpdateDesiredNodesTask> {
+
+ @Override
+ public ClusterTasksResult<UpdateDesiredNodesTask> execute(ClusterState currentState, List<UpdateDesiredNodesTask> tasks)
+ throws Exception {
+
+ var results = ClusterTasksResult.<UpdateDesiredNodesTask>builder();
+
+ for (UpdateDesiredNodesTask task : tasks) {
+ try {
+ final ClusterState updatedState = updateDesiredNodes(currentState, task.request);
+ final DesiredNodes previousDesiredNodes = DesiredNodesMetadata.latestFromClusterState(currentState);
+ final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(updatedState);
+ currentState = updatedState;
+
+ final boolean replacedExistingHistoryId = previousDesiredNodes != null
+ && previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false;
+
+ results.success(
+ task,
+ task.listener.delegateFailure(
+ (delegate, cs) -> delegate.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId))
+ )
+ );
+ } catch (Exception e) {
+ results.failure(task, e);
+ }
+ }
+
+ return results.build(currentState);
+ }
+ }
+
static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredNodesRequest request) {
DesiredNodesMetadata desiredNodesMetadata = currentState.metadata().custom(DesiredNodesMetadata.TYPE, DesiredNodesMetadata.EMPTY);
DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();(obvs this can be more streamlined too, there's no real need to construct a cluster state N times any more, but that's a separate thing) |
original-brownbear
left a comment
There was a problem hiding this comment.
LGTM sorry for the delay, this took me a while to think through :) Good stuff, thanks David!
|
Thanks both 🙂 |
Today the `MasterService` permits clients to submit a batch of tasks which it guarantees to execute together, but the only place that this functionality is used in production code is for completing an election. It was done this way so that each join could succeed or fail independently, but since elastic#83562 we can track the status of joins through to completion without needing them all to be separate tasks. This commit introduces a `JoinTask` which represents the whole batch of joins as a single task. It also gives us a place to hang the strange `_FINISH_ELECTION_` task that was used to flag whether a batch was an election-completing batch or not. Relates elastic#83784
Today the `MasterService` permits clients to submit a batch of tasks which it guarantees to execute together, but the only place that this functionality is used in production code is for completing an election. It was done this way so that each join could succeed or fail independently, but since #83562 we can track the status of joins through to completion without needing them all to be separate tasks. This commit introduces a `JoinTask` which represents the whole batch of joins as a single task. It also gives us a place to hang the strange `_FINISH_ELECTION_` task that was used to flag whether a batch was an election-completing batch or not. Relates #83784
The `MasterService` executes batches of tasks which compute changes to the `ClusterState`. After executing each batch the `MasterService` publishes the updated cluster state and may notify tasks in the batch when the nodes have acked the state (or failed, or timed out). Many tasks compute some kind of result during their execution which needs to be made available to the ack completion handler for subsequent activities. Today there's no good general way to pass anything to the ack completion handler other than the fact that the update was acked or not. Some tasks work around this by storing their result in the `ClusterState` itself. Others use the executor to capture the result and pass it through. Neither solution works well with batching: later tasks in a batch may overwrite the part of the `ClusterState` containing the results of earlier tasks, and batching executors are re-used across batches. This commit adjusts the `ClusterStateTaskExecutor` interface so that now implementations that wish to listen for acks must supply a listener for each task they successfully execute. The `MasterService` collects the listeners for the batch and notifies them as acks are received. This gives the executor control over the ack handler of each task which lets it pass in any extra data needed. Effectively this is the same as elastic#83562 but for ack listeners instead of publish listeners.
The `MasterService` executes batches of tasks which compute changes to the `ClusterState`. After executing each batch the `MasterService` publishes the updated cluster state and may notify tasks in the batch when the nodes have acked the state (or failed, or timed out). Many tasks compute some kind of result during their execution which needs to be made available to the ack completion handler for subsequent activities. Today there's no good general way to pass anything to the ack completion handler other than the fact that the update was acked or not. Some tasks work around this by storing their result in the `ClusterState` itself. Others use the executor to capture the result and pass it through. Neither solution works well with batching: later tasks in a batch may overwrite the part of the `ClusterState` containing the results of earlier tasks, and batching executors are re-used across batches. This commit adjusts the `ClusterStateTaskExecutor` interface so that now implementations that wish to listen for acks must supply a listener for each task they successfully execute. The `MasterService` collects the listeners for the batch and notifies them as acks are received. This gives the executor control over the ack handler of each task which lets it pass in any extra data needed. Effectively this is the same as #83562 but for ack listeners instead of publish listeners.
The `MasterService` executes batches of tasks which compute changes to the `ClusterState`. After executing each batch the `MasterService` publishes the updated cluster state and may notify tasks in the batch when the nodes have acked the state (or failed, or timed out). Many tasks compute some kind of result during their execution which needs to be made available to the ack completion handler for subsequent activities. Today there's no good general way to pass anything to the ack completion handler other than the fact that the update was acked or not. Some tasks work around this by storing their result in the `ClusterState` itself. Others use the executor to capture the result and pass it through. Neither solution works well with batching: later tasks in a batch may overwrite the part of the `ClusterState` containing the results of earlier tasks, and batching executors are re-used across batches. This commit adjusts the `ClusterStateTaskExecutor` interface so that now implementations that wish to listen for acks must supply a listener for each task they successfully execute. The `MasterService` collects the listeners for the batch and notifies them as acks are received. This gives the executor control over the ack handler of each task which lets it pass in any extra data needed. Effectively this is the same as elastic#83562 but for ack listeners instead of publish listeners.
The
MasterServiceexecutes batches of tasks which compute changes tothe
ClusterState. After executing each batch theMasterServicepublishes the updated cluster state and notifies every task in the batch
when the publication completes. Many tasks compute some kind of result
during their execution which needs to be made available to the
publication completion handler for subsequent activities.
Today there's no good general way to pass anything to the completion
handler other than the fact that the publication succeeded. Some tasks
work around this by storing their result in the
ClusterStateitself.Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the
ClusterStatecontaining the results ofearlier tasks, and batching executors are re-used across batches.
This commit adjusts the
ClusterStateTaskExecutorinterface so that nowimplementations must supply a listener for each task they successfully
execute. The
MasterServicecollects the listeners for the batch andnotifies them all when publication completes. This gives the executor
control over the completion handler of each task which lets it pass in
any extra data needed.