Skip to content

Make enrich policy execution cancelable#77188

Merged
martijnvg merged 2 commits intoelastic:masterfrom
martijnvg:enrich_cancellable_policy_execution
Sep 8, 2021
Merged

Make enrich policy execution cancelable#77188
martijnvg merged 2 commits intoelastic:masterfrom
martijnvg:enrich_cancellable_policy_execution

Conversation

@martijnvg
Copy link
Copy Markdown
Member

@martijnvg martijnvg commented Sep 2, 2021

The policy cancellation implementation is best-effort.
Prior to each transport action call the policy runner
checks whether the corresponding task has been cancelled.
If so no further action is performed, otherwise
execution is continued as planned.

The policy execution tasks is also made the parent task
of transport action requests that are being executed
by the policy runner. This will allow cancellation when
certain transport actions are being executed (e.g. reindex).
Also it should provide better insight which other tasks
are related to a policy execution task.

Additionally, to this change a step field is added to
the enrich policy status. This field will contain
the name of the currently executing transport action
request. This will help, giving better insight, what
a policy execution is doing.

Relates #48988

Example usage

(assuming policy with name my-policy exists)

Execute policy in async style manner:

POST /_enrich/policy/my-policy/_execute?wait_for_completion=false

Response:

{
    "task": "09IIwYXKQWy-T1ACgFNH_g:514"
}

Get task information about policy execution:

GET /_tasks/09IIwYXKQWy-T1ACgFNH_g:514

Response:

{
    "completed": false,
    "task": {
        "node": "09IIwYXKQWy-T1ACgFNH_g",
        "id": 514,
        "type": "enrich",
        "action": "policy_execution",
        "status": {
            "phase": "RUNNING",
            "step": "ReindexRequest"
        },
        "description": "executing enrich policy [my-policy]",
        "start_time_in_millis": 1630588070973,
        "running_time_in_nanos": 5638319278,
        "cancellable": true,
        "cancelled": false,
        "parent_task_id": "09IIwYXKQWy-T1ACgFNH_g:456",
        "headers": {}
    }
}

Cancel the policy execution:

POST /_tasks/09IIwYXKQWy-T1ACgFNH_g: 514/_cancel

Response:

{
    "nodes": {
        "09IIwYXKQWy-T1ACgFNH_g": {
            "name": "runTask-0",
            "transport_address": "127.0.0.1:9300",
            "host": "127.0.0.1",
            "ip": "127.0.0.1:9300",
            "roles": [
                "data",
                "data_cold",
                "data_content",
                "data_frozen",
                "data_hot",
                "data_warm",
                "ingest",
                "master",
                "ml",
                "remote_cluster_client",
                "transform"
            ],
            "attributes": {
                "ml.machine_memory": "34359738368",
                "xpack.installed": "true",
                "testattr": "test",
                "ml.max_open_jobs": "512",
                "ml.max_jvm_size": "536870912"
            },
            "tasks": {
                "09IIwYXKQWy-T1ACgFNH_g:514": {
                    "node": "mt5fbxn5TsKAwpEKUjxCQw",
                    "id": 514,
                    "type": "enrich",
                    "action": "policy_execution",
                    "start_time_in_millis": 1630588091095,
                    "running_time_in_nanos": 7400096666,
                    "cancellable": true,
                    "cancelled": true,
                    "parent_task_id": "09IIwYXKQWy-T1ACgFNH_g:456",
                    "headers": {}
                }
            }
        }
    }
}

The policy cancellation implementation is best-effort.
Prior to each transport action call the policy runner
checks whether the corresponding task has been cancelled.
If so no further action is performed, otherwise
execution is continued as planned.

The policy execution tasks is also made the parent task
of transport action requests that are being executed
by the policy runner. This will allow cancellation when
certain transport actions are being executed (e.g. reindex).
Also it should provide better insight which other tasks
are related to a policy execution task.

Additionally, to this change a `step` field is added to
the enrich policy status. This field will contain
the name of the currently executing transport action
request. This will help, giving better insight, what
a policy execution is doing.

Relates elastic#48988
@martijnvg martijnvg added >enhancement :Distributed/Ingest Node Execution or management of Ingest Pipelines v8.0.0 v7.16.0 labels Sep 2, 2021
@elasticmachine elasticmachine added the Team:Data Management (obsolete) DO NOT USE. This team no longer exists. label Sep 2, 2021
@elasticmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

private static final String STEP_FIELD = "step";

private final String phase;
private final String step;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a different name?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a weird situation because the phases here are more like a "status" than a "phase", but naming things is the worst. Step is a fine name in my opinion

ensureEnrichIndexIsReadOnly(createdEnrichIndex);
}

public void testRunnerCancel() throws Exception {
Copy link
Copy Markdown
Member Author

@martijnvg martijnvg Sep 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think truly integration testing policy execution cancellation isn't possible (without avoiding test related failures)?

@martijnvg martijnvg mentioned this pull request Sep 2, 2021
10 tasks
Copy link
Copy Markdown
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Just a few comments/questions for my own knowledge here.

Comment on lines +1483 to +1489
if (action.equals(randomActionType)) {
testTaskManager.getCancellableTasks()
.values()
.stream()
.filter(cancellableTask -> cancellableTask instanceof ExecuteEnrichPolicyTask)
.forEach(task -> testTaskManager.cancel(task, "", () -> {}));
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever!

Comment on lines +535 to +539
if (task.isCancelled()) {
String message = "cancelled policy execution [" + policyName + "], status [" + Strings.toString(task.getStatus()) + "]";
listener.onFailure(new TaskCancelledException(message));
return;
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't a way to propagate a task cancellation to its child tasks by default is there? Just asking for my own knowledge here, not suggesting any changes.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Via this if clause, the node task let's any thread that continues execution know that the node task has been cancelled and the on failure invocation triggers info log in InternalExecutePolicyAction line 121. The TaskCancellationService#doCancelTaskAndDescendants(...) method handles cancellation of child tasks (which in case of ExecuteEnrichPolicyTaskany cancellable task backed by transport action calls invoked from the wrapped client inEnrichPolicyRunner(e.g. reindex) ). The TaskCancellationServiceknows about this because in line 540 ofEnrichPolicyRunner` class the ExecuteEnrichPolicyTask's id is set as parent task.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that explanation!

@martijnvg martijnvg merged commit 9e0caac into elastic:master Sep 8, 2021
martijnvg added a commit to martijnvg/elasticsearch that referenced this pull request Sep 8, 2021
The policy cancellation implementation is best-effort.
Prior to each transport action call the policy runner
checks whether the corresponding task has been cancelled.
If so no further action is performed, otherwise
execution is continued as planned.

The policy execution tasks is also made the parent task
of transport action requests that are being executed
by the policy runner. This will allow cancellation when
certain transport actions are being executed (e.g. reindex).
Also it should provide better insight which other tasks
are related to a policy execution task.

Additionally, to this change a `step` field is added to
the enrich policy status. This field will contain
the name of the currently executing transport action
request. This will help, giving better insight, what
a policy execution is doing.

Relates elastic#48988
martijnvg added a commit to martijnvg/elasticsearch that referenced this pull request Sep 8, 2021
martijnvg added a commit that referenced this pull request Sep 8, 2021
Backport #77188 to 7.x branch.

The policy cancellation implementation is best-effort.
Prior to each transport action call the policy runner
checks whether the corresponding task has been cancelled.
If so no further action is performed, otherwise
execution is continued as planned.

The policy execution tasks is also made the parent task
of transport action requests that are being executed
by the policy runner. This will allow cancellation when
certain transport actions are being executed (e.g. reindex).
Also it should provide better insight which other tasks
are related to a policy execution task.

Additionally, to this change a `step` field is added to
the enrich policy status. This field will contain
the name of the currently executing transport action
request. This will help, giving better insight, what
a policy execution is doing.

Relates #48988
martijnvg added a commit that referenced this pull request Sep 8, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Ingest Node Execution or management of Ingest Pipelines >enhancement Team:Data Management (obsolete) DO NOT USE. This team no longer exists. v7.16.0 v8.0.0-alpha2

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants