Skip to content

Provide a ClusterStateTaskExecutor implementation for simple batch execution#90343

Merged
pxsalehi merged 13 commits intoelastic:mainfrom
pxsalehi:ps260922-batchClusterStateExecutor
Sep 28, 2022
Merged

Provide a ClusterStateTaskExecutor implementation for simple batch execution#90343
pxsalehi merged 13 commits intoelastic:mainfrom
pxsalehi:ps260922-batchClusterStateExecutor

Conversation

@pxsalehi
Copy link
Copy Markdown
Member

@pxsalehi pxsalehi commented Sep 26, 2022

Closes #89192

@pxsalehi
Copy link
Copy Markdown
Member Author

@elasticmachine run elasticsearch-ci/bwc

@pxsalehi pxsalehi force-pushed the ps260922-batchClusterStateExecutor branch from 5765bb0 to f671fb5 Compare September 26, 2022 15:49
@pxsalehi pxsalehi changed the title [WIP] Make it easier to write a batchable cluster state update Provide a ClusterStateTaskExecutor implementation for simple batch execution Sep 27, 2022
@pxsalehi
Copy link
Copy Markdown
Member Author

I have also adapted five of the existing custom batch executors (that were simply iterating the tasks) to use the new class. There are about 24 implementations of ClusterStateTaskExecutor left. I will go through them again, maybe I could update one or two of them. The rest seem to fall into two categories:

  • They do have some low-level logic to them, although not all seem to do anything complicated and potentially could be rewritten to make them simpler. I am reluctant to touch those since I am not entirely sure what the implication would be. Some do some validation or keep track of the previous tasks in the batch, e.g. AutoCreateAction.

  • Some would have been easy to change, but they pass around some values to their taskContext.success() or have custom logic there relying on the values calculated during task execution. E.g. FinalizeBlocksExecutor and AddBlocksExecutor in MetadataIndexStateService.

I am not sure whether we'd need to also try to cover those cases somehow.

@pxsalehi pxsalehi marked this pull request as ready for review September 27, 2022 12:36
@elasticsearchmachine elasticsearchmachine added the needs:triage Requires assignment of a team area label label Sep 27, 2022
@pxsalehi pxsalehi added >non-issue :Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. and removed needs:triage Requires assignment of a team area label labels Sep 27, 2022
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Meta label for distributed team. label Sep 27, 2022
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

Copy link
Copy Markdown
Member

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. I left a few suggestions/comments.

* series of executions, each taking an input cluster state and producing a new cluster state that serves as the
* input of the next task in the batch.
*/
abstract class DefaultBatchExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming (and/or marketing) suggestion 😄

Suggested change
abstract class DefaultBatchExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> {
abstract class SimpleBatchedExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> {

Also I think this could reasonably be a top-level class.

* @param curState The cluster state on which the task should be executed.
* @return The resulting cluster state after executing this task.
*/
public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState curState);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any implementations need access to the whole TaskContext, just the task itself.

* @param curState The cluster state on which the task should be executed.
* @return The resulting cluster state after executing this task.
*/
public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState curState);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also naming suggestion, no need to come up with a more specific name when there's only one of them:

Suggested change
public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState curState);
public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState clusterState);

*/
public ClusterState afterBatchExecution(
BatchExecutionContext<T> batchExecutionContext,
ClusterState initState,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel bad about giving folks access to the initial state here. I think that could lead to some unexpected behaviour. Really the only question should be whether there were any changes. WDYT about having two methods, one called if initState == curState and the other called if something changed? Either that or a boolean parameter.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flag would be good since then the afterBatchExecution logic would be all in one place. If we take away the initState, DeleteDesiredNodesExecutor wouldn't fit into this. I guess it is a special "low-level" executor anyway.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DeleteDesiredNodesExecutor we have that initState == curState so that should still be fine. I'm ok with passing in curState, it's only initState that seems potentially problematic.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so instead of

initialState().copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE))

I can just use

clusterState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE))

and they'd be the same?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

Although that does raise a good point that BatchExecutionContext gives access to the initial state. And the tasks. I think we don't want that either, we should always run this method within a dropHeadersContext() and then it should only need the state.

public ClusterState afterBatchExecution(
BatchExecutionContext<T> batchExecutionContext,
ClusterState initState,
ClusterState curState
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming suggestion as above:

Suggested change
ClusterState curState
ClusterState clusterState

(at the least we should call it currentState in full, no need to make it harder for non-English speakers for the sake of those 4 chars)

* @param taskContext The task that failed with an exception.
* @param e The Exception thrown by the task execution.
*/
public void taskFailed(TaskContext<T> taskContext, Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this not to have any overrides, let's just hard-code it for now.

* @param taskContext The task that successfully finished execution.
*/
public void taskSucceeded(TaskContext<T> taskContext) {
taskContext.success(() -> {});
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expect this will almost always be overridden, and it would trappily lose a listener notification not to do so. Let's make this abstract and make implementations be explicit if they want a no-op.

for (final var taskContext : batchExecutionContext.taskContexts()) {
try (var ignored = taskContext.captureResponseHeaders()) {
curState = executeTask(taskContext, curState);
taskSucceeded(taskContext);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than asking implementations to manipulate the TaskContext themselves, I think they will almost always just want to call a method on the underlying task. Can we do this instead?

Suggested change
taskSucceeded(taskContext);
taskContext.success(() -> taskSucceeded(taskContext.getTask()));

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about when the task is itself a ClusterStateAckListener? In that case we'd have to ask the executor to implement an empty taskSucceeded and in the batch execution inspect the type of the task? Or taskSucceeded would optionally return the task? Both seem not much better. Or maybe there is a simpler way to deal with that? Or do we explicitly not want to accommodate that case?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we probably want a whole different executor for ack-sensitive tasks, with different callbacks. Typically these things complete their listener in onAllNodesAcked() rather than when the publication completes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess I'll just drop it from this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 sounds good to me

* series of executions, each taking an input cluster state and producing a new cluster state that serves as the
* input of the next task in the batch.
*/
abstract class DefaultBatchExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make void clusterStatePublished(ClusterState) a final method and allow an override void clusterStatePublished() which doesn't expose the published state? Implementations shouldn't be using the published state I think.

Copy link
Copy Markdown
Member

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, I like it. One tiny comment/suggestion.

…eservedStateErrorTaskExecutor.java

Co-authored-by: David Turner <david.turner@elastic.co>
Comment on lines +58 to +74
@Override
public final ClusterState execute(BatchExecutionContext<T> batchExecutionContext) throws Exception {
var initState = batchExecutionContext.initialState();
var clusterState = initState;
for (final var taskContext : batchExecutionContext.taskContexts()) {
try (var ignored = taskContext.captureResponseHeaders()) {
var task = taskContext.getTask();
clusterState = executeTask(task, clusterState);
taskContext.success(() -> taskSucceeded(task));
} catch (Exception e) {
taskContext.onFailure(e);
}
}
try (var ignored = batchExecutionContext.dropHeadersContext()) {
return afterBatchExecution(clusterState, clusterState != initState);
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

*
* @param task The task that successfully finished execution.
*/
public abstract void taskSucceeded(T task);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is worth adding new interface for tasks:

interface ClusterStateUpdateTask extends ClusterStateTaskListener {
   ClusterState execute(ClusterState clusterState);
   void onSuccess();
}

I believe this way it would be possible to have a single non abstract executor implementation for most of the cases as we already have dedicated tasks for most of the things.

var executor1 = new BatchExecutor(); // no need for custom after batch execution
var executor2 = new BatchExecutor((clusterState, modified) -> { /*custom after batch execution*/ });

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep that seems like a logical followup too. Except:

  • we should allow these tasks to pass a result from execute to onSuccess (see my other comment)

  • I'd rather customise afterBatchExecution by overriding a method than passing in a lambda - I find it to be clearer what the code actually does this way.

  • the name ClusterStateUpdateTask is already widely used for unbatched tasks, we should call it something else

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into that a bit more. For now I didn't want to force anything on the tasks, but just to save some boilerplate code that gets repeated whenever a basic executor is needed (which we ask everyone with a cluster state update task, to provide).

@DaveCTurner
Copy link
Copy Markdown
Member

Some would have been easy to change, but they pass around some values to their taskContext.success()

Btw I think it would be worth doing this in a follow-up. It's a pretty useful pattern, and if there's no obvious way to do it then I bet we'll get folks adding a mutable field to the task object to hold the result which is a bit yucky.

I think I'd do this by adding to the executor another type parameter TaskResult, having execute return a Tuple<ClusterState,TaskResult>, and passing the result for each task to taskSucceeded.

I think we probably want a whole different executor for ack-sensitive tasks, with different callbacks.

I also think this is worth doing in a follow-up.

@pxsalehi pxsalehi merged commit 0958649 into elastic:main Sep 28, 2022
@pxsalehi
Copy link
Copy Markdown
Member Author

Thanks David, and Ievgen. I'll follow up with more PRs.

justincr-elastic pushed a commit to justincr-elastic/elasticsearch that referenced this pull request Oct 8, 2022
…ecution (elastic#90343)

This change provides a basic implementation for batch executors that
simply need to execute the tasks in the batch iteratively, producing a
cluster state after each task. This allows executing the tasks in the
batch as a series of executions, each taking an input cluster state and
producing a new cluster state that serves as the input of the next task
in the batch.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Cluster Coordination Cluster formation and cluster state publication, including cluster membership and fault detection. >non-issue Team:Distributed Meta label for distributed team. v8.6.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Make it easier to write a batchable cluster state update

4 participants