Provide a ClusterStateTaskExecutor implementation for simple batch execution#90343
Conversation
|
@elasticmachine run elasticsearch-ci/bwc |
5765bb0 to
f671fb5
Compare
|
I have also adapted five of the existing custom batch executors (that were simply iterating the tasks) to use the new class. There are about 24 implementations of
I am not sure whether we'd need to also try to cover those cases somehow. |
|
Pinging @elastic/es-distributed (Team:Distributed) |
DaveCTurner
left a comment
There was a problem hiding this comment.
Looks great. I left a few suggestions/comments.
| * series of executions, each taking an input cluster state and producing a new cluster state that serves as the | ||
| * input of the next task in the batch. | ||
| */ | ||
| abstract class DefaultBatchExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> { |
There was a problem hiding this comment.
Naming (and/or marketing) suggestion 😄
| abstract class DefaultBatchExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> { | |
| abstract class SimpleBatchedExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> { |
Also I think this could reasonably be a top-level class.
| * @param curState The cluster state on which the task should be executed. | ||
| * @return The resulting cluster state after executing this task. | ||
| */ | ||
| public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState curState); |
There was a problem hiding this comment.
I don't think any implementations need access to the whole TaskContext, just the task itself.
| * @param curState The cluster state on which the task should be executed. | ||
| * @return The resulting cluster state after executing this task. | ||
| */ | ||
| public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState curState); |
There was a problem hiding this comment.
Also naming suggestion, no need to come up with a more specific name when there's only one of them:
| public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState curState); | |
| public abstract ClusterState executeTask(TaskContext<T> taskContext, ClusterState clusterState); |
| */ | ||
| public ClusterState afterBatchExecution( | ||
| BatchExecutionContext<T> batchExecutionContext, | ||
| ClusterState initState, |
There was a problem hiding this comment.
I feel bad about giving folks access to the initial state here. I think that could lead to some unexpected behaviour. Really the only question should be whether there were any changes. WDYT about having two methods, one called if initState == curState and the other called if something changed? Either that or a boolean parameter.
There was a problem hiding this comment.
I think the flag would be good since then the afterBatchExecution logic would be all in one place. If we take away the initState, DeleteDesiredNodesExecutor wouldn't fit into this. I guess it is a special "low-level" executor anyway.
There was a problem hiding this comment.
In DeleteDesiredNodesExecutor we have that initState == curState so that should still be fine. I'm ok with passing in curState, it's only initState that seems potentially problematic.
There was a problem hiding this comment.
so instead of
initialState().copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE))
I can just use
clusterState.copyAndUpdateMetadata(metadata -> metadata.removeCustom(DesiredNodesMetadata.TYPE))
and they'd be the same?
There was a problem hiding this comment.
Right.
Although that does raise a good point that BatchExecutionContext gives access to the initial state. And the tasks. I think we don't want that either, we should always run this method within a dropHeadersContext() and then it should only need the state.
| public ClusterState afterBatchExecution( | ||
| BatchExecutionContext<T> batchExecutionContext, | ||
| ClusterState initState, | ||
| ClusterState curState |
There was a problem hiding this comment.
Naming suggestion as above:
| ClusterState curState | |
| ClusterState clusterState |
(at the least we should call it currentState in full, no need to make it harder for non-English speakers for the sake of those 4 chars)
| * @param taskContext The task that failed with an exception. | ||
| * @param e The Exception thrown by the task execution. | ||
| */ | ||
| public void taskFailed(TaskContext<T> taskContext, Exception e) { |
There was a problem hiding this comment.
I expect this not to have any overrides, let's just hard-code it for now.
| * @param taskContext The task that successfully finished execution. | ||
| */ | ||
| public void taskSucceeded(TaskContext<T> taskContext) { | ||
| taskContext.success(() -> {}); |
There was a problem hiding this comment.
I expect this will almost always be overridden, and it would trappily lose a listener notification not to do so. Let's make this abstract and make implementations be explicit if they want a no-op.
| for (final var taskContext : batchExecutionContext.taskContexts()) { | ||
| try (var ignored = taskContext.captureResponseHeaders()) { | ||
| curState = executeTask(taskContext, curState); | ||
| taskSucceeded(taskContext); |
There was a problem hiding this comment.
Rather than asking implementations to manipulate the TaskContext themselves, I think they will almost always just want to call a method on the underlying task. Can we do this instead?
| taskSucceeded(taskContext); | |
| taskContext.success(() -> taskSucceeded(taskContext.getTask())); |
There was a problem hiding this comment.
What about when the task is itself a ClusterStateAckListener? In that case we'd have to ask the executor to implement an empty taskSucceeded and in the batch execution inspect the type of the task? Or taskSucceeded would optionally return the task? Both seem not much better. Or maybe there is a simpler way to deal with that? Or do we explicitly not want to accommodate that case?
There was a problem hiding this comment.
I think we probably want a whole different executor for ack-sensitive tasks, with different callbacks. Typically these things complete their listener in onAllNodesAcked() rather than when the publication completes.
There was a problem hiding this comment.
So I guess I'll just drop it from this PR.
| * series of executions, each taking an input cluster state and producing a new cluster state that serves as the | ||
| * input of the next task in the batch. | ||
| */ | ||
| abstract class DefaultBatchExecutor<T extends ClusterStateTaskListener> implements ClusterStateTaskExecutor<T> { |
There was a problem hiding this comment.
Could we make void clusterStatePublished(ClusterState) a final method and allow an override void clusterStatePublished() which doesn't expose the published state? Implementations shouldn't be using the published state I think.
DaveCTurner
left a comment
There was a problem hiding this comment.
Great, I like it. One tiny comment/suggestion.
...er/src/main/java/org/elasticsearch/reservedstate/service/ReservedStateErrorTaskExecutor.java
Outdated
Show resolved
Hide resolved
…eservedStateErrorTaskExecutor.java Co-authored-by: David Turner <david.turner@elastic.co>
| @Override | ||
| public final ClusterState execute(BatchExecutionContext<T> batchExecutionContext) throws Exception { | ||
| var initState = batchExecutionContext.initialState(); | ||
| var clusterState = initState; | ||
| for (final var taskContext : batchExecutionContext.taskContexts()) { | ||
| try (var ignored = taskContext.captureResponseHeaders()) { | ||
| var task = taskContext.getTask(); | ||
| clusterState = executeTask(task, clusterState); | ||
| taskContext.success(() -> taskSucceeded(task)); | ||
| } catch (Exception e) { | ||
| taskContext.onFailure(e); | ||
| } | ||
| } | ||
| try (var ignored = batchExecutionContext.dropHeadersContext()) { | ||
| return afterBatchExecution(clusterState, clusterState != initState); | ||
| } | ||
| } |
| * | ||
| * @param task The task that successfully finished execution. | ||
| */ | ||
| public abstract void taskSucceeded(T task); |
There was a problem hiding this comment.
I wonder if it is worth adding new interface for tasks:
interface ClusterStateUpdateTask extends ClusterStateTaskListener {
ClusterState execute(ClusterState clusterState);
void onSuccess();
}
I believe this way it would be possible to have a single non abstract executor implementation for most of the cases as we already have dedicated tasks for most of the things.
var executor1 = new BatchExecutor(); // no need for custom after batch execution
var executor2 = new BatchExecutor((clusterState, modified) -> { /*custom after batch execution*/ });
There was a problem hiding this comment.
Yep that seems like a logical followup too. Except:
-
we should allow these tasks to pass a result from
executetoonSuccess(see my other comment) -
I'd rather customise
afterBatchExecutionby overriding a method than passing in a lambda - I find it to be clearer what the code actually does this way. -
the name
ClusterStateUpdateTaskis already widely used for unbatched tasks, we should call it something else
There was a problem hiding this comment.
I'll look into that a bit more. For now I didn't want to force anything on the tasks, but just to save some boilerplate code that gets repeated whenever a basic executor is needed (which we ask everyone with a cluster state update task, to provide).
Btw I think it would be worth doing this in a follow-up. It's a pretty useful pattern, and if there's no obvious way to do it then I bet we'll get folks adding a mutable field to the task object to hold the result which is a bit yucky. I think I'd do this by adding to the executor another type parameter
I also think this is worth doing in a follow-up. |
|
Thanks David, and Ievgen. I'll follow up with more PRs. |
…ecution (elastic#90343) This change provides a basic implementation for batch executors that simply need to execute the tasks in the batch iteratively, producing a cluster state after each task. This allows executing the tasks in the batch as a series of executions, each taking an input cluster state and producing a new cluster state that serves as the input of the next task in the batch.
Closes #89192