Skip to content

[Feature][Task] Add a new abstract task-AbstractLoopStateTask #11103

@ruanwenjun

Description

@ruanwenjun

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

Background

Right now, there exist some tasks, that need to call a third-party platform, and query the task instance status from the third-party platform until the task instance is finished (success/failed).

image

Class design

image

We can create a base class for this kink of task

 /**
 * This class is the base class for all loop task type.
 * <p>
 * The loop task type means, we will submit a task, and loop the task status until the task is finished.
 */
public abstract class BaseLoopTaskExecutor<SubmitTaskMethodT extends LoopTaskSubmitTaskMethodDefinition, QueryTaskMethodT extends LoopTaskQueryStatusMethodDefinition>
    extends AbstractTaskExecutor {

    protected final LoopTaskDefinition<SubmitTaskMethodT, QueryTaskMethodT> loopTaskDefinition;
    // init this parameters in init method(todo: we need to refactor the init method)
    protected AbstractParameters loopTaskParameters;

    protected BaseLoopTaskExecutor(@NonNull TaskExecutionContext taskExecutionContext,
                                   @NonNull LoopTaskDefinition<SubmitTaskMethodT, QueryTaskMethodT> loopTaskDefinition) {
        super(taskExecutionContext);
        this.loopTaskDefinition = loopTaskDefinition;
    }

    @Override
    public void handle() throws TaskException {
        try {
            LoopTaskSubmitTaskMethodDefinition submitTaskMethod = loopTaskDefinition.getSubmitTaskMethod();
            LoopTaskQueryStatusMethodDefinition queryTaskStateMethod = loopTaskDefinition.getQueryTaskStateMethod();
            long loopInterval = queryTaskStateMethod.getTaskInstanceStatusQueryInterval().toMillis();

            LoopTaskInstanceInfo loopTaskInstanceInfo = submitTaskMethod.submitLoopTaskInstance();
            this.appIds = loopTaskInstanceInfo.getTaskInstanceId();
            while (!queryTaskStateMethod.queryTaskInstanceStatus(loopTaskInstanceInfo).isFinished()) {
                Thread.sleep(loopInterval);
            }
        } catch (InterruptedException e) {
            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
            logger.error("The current loop thread has been interrupted", e);
            Thread.currentThread().interrupt();
            throw new TaskException("The current loop thread has been interrupted");
        } catch (Exception ex) {
            setExitStatusCode(TaskConstants.EXIT_CODE_FAILURE);
            logger.error("Loop task execute error", ex);
            throw new TaskException("Loop task execute error", ex);
        }
    }
}

If someone wants to implement this kind of task, he only needs to provide a LoopTaskDefinition.

In deed, we can provide the default implementation for HTTP call, if someone submitTask/ queryStatus by HTTP call, he can just provide a YAML config, we will translate these to method (But I think only a few people will use this).

Use case

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions