Skip to content

Return result from cluster state task execution#83562

Merged
elasticsearchmachine merged 5 commits intoelastic:masterfrom
DaveCTurner:2022-02-06-master-service-non-void-task-result
Feb 10, 2022
Merged

Return result from cluster state task execution#83562
elasticsearchmachine merged 5 commits intoelastic:masterfrom
DaveCTurner:2022-02-06-master-service-non-void-task-result

Conversation

@DaveCTurner
Copy link
Copy Markdown
Member

@DaveCTurner DaveCTurner commented Feb 6, 2022

The MasterService executes batches of tasks which compute changes to
the ClusterState. After executing each batch the MasterService
publishes the updated cluster state and notifies every task in the batch
when the publication completes. Many tasks compute some kind of result
during their execution which needs to be made available to the
publication completion handler for subsequent activities.

Today there's no good general way to pass anything to the completion
handler other than the fact that the publication succeeded. Some tasks
work around this by storing their result in the ClusterState itself.
Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the ClusterState containing the results of
earlier tasks, and batching executors are re-used across batches.

This commit adjusts the ClusterStateTaskExecutor interface so that now
implementations must supply a listener for each task they successfully
execute. The MasterService collects the listeners for the batch and
notifies them all when publication completes. This gives the executor
control over the completion handler of each task which lets it pass in
any extra data needed.

@DaveCTurner DaveCTurner added WIP >refactoring :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. v8.2.0 labels Feb 6, 2022
@DaveCTurner DaveCTurner force-pushed the 2022-02-06-master-service-non-void-task-result branch 3 times, most recently from 12fa733 to 3e483b9 Compare February 6, 2022 15:05
The `MasterService` executes batches of tasks which compute changes to
the `ClusterState`. After executing each batch the `MasterService`
publishes the updated cluster state and notifies every task in the batch
when the publication completes. Many tasks compute some kind of result
during their execution which needs to be made available to the
publication completion handler for subsequent activities.

Today there's no good general way to pass anything to the completion
handler other than the fact that the publication succeeded. Some tasks
work around this by storing their result in the `ClusterState` itself.
Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the `ClusterState` containing the results of
earlier tasks, and batching executors are re-used across batches.

This commit adjusts the `ClusterStateTaskExecutor` interface so that now
implementations must supply a listener for each task they successfully
execute. The `MasterService` collects the listeners for the batch and
notifies them all when publication completes. This gives the executor
control over the completion handler of each task which lets it pass in
any extra data needed.
@DaveCTurner DaveCTurner force-pushed the 2022-02-06-master-service-non-void-task-result branch from 3e483b9 to e281ed5 Compare February 6, 2022 16:36
@DaveCTurner DaveCTurner marked this pull request as ready for review February 6, 2022 18:01
@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team. label Feb 6, 2022
@elasticmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

@DaveCTurner DaveCTurner requested review from idegtiarenko and original-brownbear and removed request for original-brownbear February 6, 2022 18:01

public boolean isSuccess() {
return this == SUCCESS;
assert failure == null ^ taskListener == null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this assertion be moved to constructor?

public TaskResult {
  assert failure == null ^ taskListener == null;
}

Copy link
Copy Markdown
Member Author

@DaveCTurner DaveCTurner Feb 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL records can have constructors :) 31331cb

public ClusterTasksResult<Task> execute(final ClusterState currentState, final List<Task> tasks) throws Exception {
final DiscoveryNodes.Builder remainingNodesBuilder = DiscoveryNodes.builder(currentState.nodes());
boolean removed = false;
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.builder();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would simplify to:

Suggested change
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.builder();
var resultBuilder = ClusterTasksResult.<Task>builder();

@original-brownbear
Copy link
Copy Markdown
Contributor

@DaveCTurner can you give me a little time here? I'd like to go over this one was well. I should be able to get to it this afternoon.

@DaveCTurner
Copy link
Copy Markdown
Member Author

DaveCTurner commented Feb 9, 2022

I'd rather not actually rewrite any of the executors in this PR to keep it self-contained, but Ievgen and I found it helpful to consider how one might rewrite them using this new API. For instance:

diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java
index 4c8b12bb7c7..595e056b8e9 100644
--- a/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java
+++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java
@@ -12,8 +12,9 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeAction;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
-import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.desirednodes.DesiredNodesSettingsValidator;
@@ -28,13 +29,14 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.List;
 import java.util.Locale;
 
 import static java.lang.String.format;
 
 public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction<UpdateDesiredNodesRequest, UpdateDesiredNodesResponse> {
     private final DesiredNodesSettingsValidator settingsValidator;
-    private final ClusterStateTaskExecutor<ClusterStateUpdateTask> taskExecutor;
+    private final ClusterStateTaskExecutor<UpdateDesiredNodesTask> taskExecutor;
 
     @Inject
     public TransportUpdateDesiredNodesAction(
@@ -58,7 +60,7 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
             ThreadPool.Names.SAME
         );
         this.settingsValidator = settingsValidator;
-        this.taskExecutor = new DesiredNodesClusterStateTaskExecutor();
+        this.taskExecutor = new UpdateDesiredNodesTaskExecutor();
     }
 
     @Override
@@ -79,29 +81,8 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
 
             clusterService.submitStateUpdateTask(
                 "update-desired-nodes",
-                new ClusterStateUpdateTask(Priority.URGENT, request.masterNodeTimeout()) {
-                    volatile boolean replacedExistingHistoryId = false;
-
-                    @Override
-                    public ClusterState execute(ClusterState currentState) {
-                        final ClusterState updatedState = updateDesiredNodes(currentState, request);
-                        final DesiredNodes previousDesiredNodes = DesiredNodesMetadata.latestFromClusterState(currentState);
-                        final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(updatedState);
-                        replacedExistingHistoryId = previousDesiredNodes != null
-                            && previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false;
-                        return updatedState;
-                    }
-
-                    @Override
-                    public void onFailure(Exception e) {
-                        listener.onFailure(e);
-                    }
-
-                    @Override
-                    public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
-                        listener.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId));
-                    }
-                },
+                new UpdateDesiredNodesTask(request, listener),
+                ClusterStateTaskConfig.build(Priority.URGENT),
                 taskExecutor
             );
         } catch (Exception e) {
@@ -109,6 +90,54 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
         }
     }
 
+    record UpdateDesiredNodesTask(UpdateDesiredNodesRequest request, ActionListener<UpdateDesiredNodesResponse> listener)
+        implements
+            ClusterStateTaskListener {
+
+        @Override
+        public void onFailure(Exception e) {
+            listener.onFailure(e);
+        }
+
+        @Override
+        public void clusterStateProcessed(ClusterState oldState, ClusterState newState) {
+            assert false : "not called";
+        }
+    }
+
+    static class UpdateDesiredNodesTaskExecutor implements ClusterStateTaskExecutor<UpdateDesiredNodesTask> {
+
+        @Override
+        public ClusterTasksResult<UpdateDesiredNodesTask> execute(ClusterState currentState, List<UpdateDesiredNodesTask> tasks)
+            throws Exception {
+
+            var results = ClusterTasksResult.<UpdateDesiredNodesTask>builder();
+
+            for (UpdateDesiredNodesTask task : tasks) {
+                try {
+                    final ClusterState updatedState = updateDesiredNodes(currentState, task.request);
+                    final DesiredNodes previousDesiredNodes = DesiredNodesMetadata.latestFromClusterState(currentState);
+                    final DesiredNodes latestDesiredNodes = DesiredNodesMetadata.latestFromClusterState(updatedState);
+                    currentState = updatedState;
+
+                    final boolean replacedExistingHistoryId = previousDesiredNodes != null
+                        && previousDesiredNodes.hasSameHistoryId(latestDesiredNodes) == false;
+
+                    results.success(
+                        task,
+                        task.listener.delegateFailure(
+                            (delegate, cs) -> delegate.onResponse(new UpdateDesiredNodesResponse(replacedExistingHistoryId))
+                        )
+                    );
+                } catch (Exception e) {
+                    results.failure(task, e);
+                }
+            }
+
+            return results.build(currentState);
+        }
+    }
+
     static ClusterState updateDesiredNodes(ClusterState currentState, UpdateDesiredNodesRequest request) {
         DesiredNodesMetadata desiredNodesMetadata = currentState.metadata().custom(DesiredNodesMetadata.TYPE, DesiredNodesMetadata.EMPTY);
         DesiredNodes latestDesiredNodes = desiredNodesMetadata.getLatestDesiredNodes();

(obvs this can be more streamlined too, there's no real need to construct a cluster state N times any more, but that's a separate thing)

@DaveCTurner DaveCTurner removed the WIP label Feb 9, 2022
Copy link
Copy Markdown
Contributor

@original-brownbear original-brownbear left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM sorry for the delay, this took me a while to think through :) Good stuff, thanks David!

@DaveCTurner DaveCTurner added the auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) label Feb 10, 2022
@elasticsearchmachine elasticsearchmachine merged commit 3991961 into elastic:master Feb 10, 2022
@DaveCTurner DaveCTurner deleted the 2022-02-06-master-service-non-void-task-result branch February 10, 2022 12:40
@DaveCTurner
Copy link
Copy Markdown
Member Author

Thanks both 🙂

DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Feb 10, 2022
Today the `MasterService` permits clients to submit a batch of tasks
which it guarantees to execute together, but the only place that this
functionality is used in production code is for completing an election.
It was done this way so that each join could succeed or fail
independently, but since elastic#83562 we can track the status of joins through
to completion without needing them all to be separate tasks.

This commit introduces a `JoinTask` which represents the whole batch of
joins as a single task. It also gives us a place to hang the strange
`_FINISH_ELECTION_` task that was used to flag whether a batch was an
election-completing batch or not.

Relates elastic#83784
elasticsearchmachine pushed a commit that referenced this pull request Feb 11, 2022
Today the `MasterService` permits clients to submit a batch of tasks
which it guarantees to execute together, but the only place that this
functionality is used in production code is for completing an election.
It was done this way so that each join could succeed or fail
independently, but since #83562 we can track the status of joins through
to completion without needing them all to be separate tasks.

This commit introduces a `JoinTask` which represents the whole batch of
joins as a single task. It also gives us a place to hang the strange
`_FINISH_ELECTION_` task that was used to flag whether a batch was an
election-completing batch or not.

Relates #83784
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Feb 19, 2022
The `MasterService` executes batches of tasks which compute changes to
the `ClusterState`. After executing each batch the `MasterService`
publishes the updated cluster state and may notify tasks in the batch
when the nodes have acked the state (or failed, or timed out). Many
tasks compute some kind of result during their execution which needs to
be made available to the ack completion handler for subsequent
activities.

Today there's no good general way to pass anything to the ack completion
handler other than the fact that the update was acked or not. Some tasks
work around this by storing their result in the `ClusterState` itself.
Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the `ClusterState` containing the results of
earlier tasks, and batching executors are re-used across batches.

This commit adjusts the `ClusterStateTaskExecutor` interface so that now
implementations that wish to listen for acks must supply a listener for
each task they successfully execute. The `MasterService` collects the
listeners for the batch and notifies them as acks are received. This
gives the executor control over the ack handler of each task which lets
it pass in any extra data needed.

Effectively this is the same as elastic#83562 but for ack listeners instead of
publish listeners.
elasticsearchmachine pushed a commit that referenced this pull request Feb 22, 2022
The `MasterService` executes batches of tasks which compute changes to
the `ClusterState`. After executing each batch the `MasterService`
publishes the updated cluster state and may notify tasks in the batch
when the nodes have acked the state (or failed, or timed out). Many
tasks compute some kind of result during their execution which needs to
be made available to the ack completion handler for subsequent
activities.

Today there's no good general way to pass anything to the ack completion
handler other than the fact that the update was acked or not. Some tasks
work around this by storing their result in the `ClusterState` itself.
Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the `ClusterState` containing the results of
earlier tasks, and batching executors are re-used across batches.

This commit adjusts the `ClusterStateTaskExecutor` interface so that now
implementations that wish to listen for acks must supply a listener for
each task they successfully execute. The `MasterService` collects the
listeners for the batch and notifies them as acks are received. This
gives the executor control over the ack handler of each task which lets
it pass in any extra data needed.

Effectively this is the same as #83562 but for ack listeners instead of
publish listeners.
probakowski pushed a commit to probakowski/elasticsearch that referenced this pull request Feb 23, 2022
The `MasterService` executes batches of tasks which compute changes to
the `ClusterState`. After executing each batch the `MasterService`
publishes the updated cluster state and may notify tasks in the batch
when the nodes have acked the state (or failed, or timed out). Many
tasks compute some kind of result during their execution which needs to
be made available to the ack completion handler for subsequent
activities.

Today there's no good general way to pass anything to the ack completion
handler other than the fact that the update was acked or not. Some tasks
work around this by storing their result in the `ClusterState` itself.
Others use the executor to capture the result and pass it through.
Neither solution works well with batching: later tasks in a batch may
overwrite the part of the `ClusterState` containing the results of
earlier tasks, and batching executors are re-used across batches.

This commit adjusts the `ClusterStateTaskExecutor` interface so that now
implementations that wish to listen for acks must supply a listener for
each task they successfully execute. The `MasterService` collects the
listeners for the batch and notifies them as acks are received. This
gives the executor control over the ack handler of each task which lets
it pass in any extra data needed.

Effectively this is the same as elastic#83562 but for ack listeners instead of
publish listeners.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto-merge-without-approval Automatically merge pull request when CI checks pass (NB doesn't wait for reviews!) :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. >refactoring Team:Distributed Meta label for distributed team. v8.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants