Skip to content

Commit 540b117

Browse files
author
Paul Sanwald
committed
high level REST api: cancel task (#30745)
* Initial commit of rest high level exposure of cancel task * fix javadocs * address some code review comments * update branch to use tasks namespace instead of cluster * High-level client: list tasks failure to not lose nodeId This commit reworks testing for `ListTasksResponse` so that random fields insertion can be tested and xcontent equivalence can be checked too. Proper exclusions need to be configured, and failures need to be tested separately. This helped finding a little problem, whenever there is a node failure returned, the nodeId was lost as it was never printed out as part of the exception toXContent. * added comment * merge from master * re-work CancelTasksResponseTests to separate XContent failure cases from non-failure cases * remove duplication of logic in parser creation * code review changes * refactor TasksClient to support RequestOptions * add tests for parent task id * address final PR review comments, mostly formatting and such
1 parent f4ea2e9 commit 540b117

12 files changed

Lines changed: 442 additions & 19 deletions

File tree

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.http.entity.ContentType;
3030
import org.apache.lucene.util.BytesRef;
3131
import org.elasticsearch.action.DocWriteRequest;
32+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
3233
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
3334
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
3435
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
@@ -108,6 +109,17 @@ private RequestConverters() {
108109
// Contains only status utility methods
109110
}
110111

112+
static Request cancelTasks(CancelTasksRequest cancelTasksRequest) {
113+
Request request = new Request(HttpPost.METHOD_NAME, "/_tasks/_cancel");
114+
Params params = new Params(request);
115+
params.withTimeout(cancelTasksRequest.getTimeout())
116+
.withTaskId(cancelTasksRequest.getTaskId())
117+
.withNodes(cancelTasksRequest.getNodes())
118+
.withParentTaskId(cancelTasksRequest.getParentTaskId())
119+
.withActions(cancelTasksRequest.getActions());
120+
return request;
121+
}
122+
111123
static Request delete(DeleteRequest deleteRequest) {
112124
String endpoint = endpoint(deleteRequest.index(), deleteRequest.type(), deleteRequest.id());
113125
Request request = new Request(HttpDelete.METHOD_NAME, endpoint);
@@ -1092,6 +1104,13 @@ Params withActions(String[] actions) {
10921104
return this;
10931105
}
10941106

1107+
Params withTaskId(TaskId taskId) {
1108+
if (taskId != null && taskId.isSet()) {
1109+
return putParam("task_id", taskId.toString());
1110+
}
1111+
return this;
1112+
}
1113+
10951114
Params withParentTaskId(TaskId parentTaskId) {
10961115
if (parentTaskId != null && parentTaskId.isSet()) {
10971116
return putParam("parent_task_id", parentTaskId.toString());

client/rest-high-level/src/main/java/org/elasticsearch/client/TasksClient.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch.client;
2121

2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
24+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
2325
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
2426
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2527

@@ -65,4 +67,45 @@ public void listAsync(ListTasksRequest request, RequestOptions options, ActionLi
6567
restHighLevelClient.performRequestAsyncAndParseEntity(request, RequestConverters::listTasks, options,
6668
ListTasksResponse::fromXContent, listener, emptySet());
6769
}
70+
71+
/**
72+
* Cancel one or more cluster tasks using the Task Management API.
73+
*
74+
* See
75+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
76+
* @param cancelTasksRequest the request
77+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
78+
* @return the response
79+
* @throws IOException in case there is a problem sending the request or parsing back the response
80+
*
81+
*/
82+
public CancelTasksResponse cancel(CancelTasksRequest cancelTasksRequest, RequestOptions options ) throws IOException {
83+
return restHighLevelClient.performRequestAndParseEntity(
84+
cancelTasksRequest,
85+
RequestConverters::cancelTasks,
86+
options,
87+
parser -> CancelTasksResponse.fromXContent(parser),
88+
emptySet()
89+
);
90+
}
91+
92+
/**
93+
* Asynchronously cancel one or more cluster tasks using the Task Management API.
94+
*
95+
* See
96+
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/tasks.html"> Task Management API on elastic.co</a>
97+
* @param cancelTasksRequest the request
98+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
99+
* @param listener the listener to be notified upon request completion
100+
*/
101+
public void cancelAsync(CancelTasksRequest cancelTasksRequest, RequestOptions options, ActionListener<CancelTasksResponse> listener) {
102+
restHighLevelClient.performRequestAsyncAndParseEntity(
103+
cancelTasksRequest,
104+
RequestConverters::cancelTasks,
105+
options,
106+
parser -> CancelTasksResponse.fromXContent(parser),
107+
listener,
108+
emptySet()
109+
);
110+
}
68111
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.http.util.EntityUtils;
3030
import org.elasticsearch.action.ActionRequestValidationException;
3131
import org.elasticsearch.action.DocWriteRequest;
32+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
33+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
3234
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
3335
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
3436
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
@@ -1620,6 +1622,23 @@ public void testIndexPutSettings() throws IOException {
16201622
assertEquals(expectedParams, request.getParameters());
16211623
}
16221624

1625+
public void testCancelTasks() {
1626+
CancelTasksRequest request = new CancelTasksRequest();
1627+
Map<String, String> expectedParams = new HashMap<>();
1628+
TaskId taskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
1629+
TaskId parentTaskId = new TaskId(randomAlphaOfLength(5), randomNonNegativeLong());
1630+
request.setTaskId(taskId);
1631+
request.setParentTaskId(parentTaskId);
1632+
expectedParams.put("task_id", taskId.toString());
1633+
expectedParams.put("parent_task_id", parentTaskId.toString());
1634+
Request httpRequest = RequestConverters.cancelTasks(request);
1635+
assertThat(httpRequest, notNullValue());
1636+
assertThat(httpRequest.getMethod(), equalTo(HttpPost.METHOD_NAME));
1637+
assertThat(httpRequest.getEntity(), nullValue());
1638+
assertThat(httpRequest.getEndpoint(), equalTo("/_tasks/_cancel"));
1639+
assertThat(httpRequest.getParameters(), equalTo(expectedParams));
1640+
}
1641+
16231642
public void testListTasks() {
16241643
{
16251644
ListTasksRequest request = new ListTasksRequest();

client/rest-high-level/src/test/java/org/elasticsearch/client/TasksIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
package org.elasticsearch.client;
2121

22+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
23+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
2224
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
2325
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2426
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
27+
import org.elasticsearch.tasks.TaskId;
2528
import org.elasticsearch.tasks.TaskInfo;
2629

2730
import java.io.IOException;
@@ -58,4 +61,26 @@ public void testListTasks() throws IOException {
5861
assertTrue("List tasks were not found", listTasksFound);
5962
}
6063

64+
public void testCancelTasks() throws IOException {
65+
ListTasksRequest listRequest = new ListTasksRequest();
66+
ListTasksResponse listResponse = execute(
67+
listRequest,
68+
highLevelClient().tasks()::list,
69+
highLevelClient().tasks()::listAsync
70+
);
71+
// in this case, probably no task will actually be cancelled.
72+
// this is ok, that case is covered in TasksIT.testTasksCancellation
73+
TaskInfo firstTask = listResponse.getTasks().get(0);
74+
String node = listResponse.getPerNodeTasks().keySet().iterator().next();
75+
76+
CancelTasksRequest cancelTasksRequest = new CancelTasksRequest();
77+
cancelTasksRequest.setTaskId(new TaskId(node, firstTask.getId()));
78+
cancelTasksRequest.setReason("testreason");
79+
CancelTasksResponse response = execute(cancelTasksRequest,
80+
highLevelClient().tasks()::cancel,
81+
highLevelClient().tasks()::cancelAsync);
82+
// Since the task may or may not have been cancelled, assert that we received a response only
83+
// The actual testing of task cancellation is covered by TasksIT.testTasksCancellation
84+
assertThat(response, notNullValue());
85+
}
6186
}

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/ClusterClientDocumentationIT.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,5 @@ public void onFailure(Exception e) {
178178
assertTrue(latch.await(30L, TimeUnit.SECONDS));
179179
}
180180
}
181+
181182
}

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/TasksClientDocumentationIT.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.action.LatchedActionListener;
2525
import org.elasticsearch.action.TaskOperationFailure;
26+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
27+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
2628
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
2729
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2830
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
@@ -146,4 +148,74 @@ public void onFailure(Exception e) {
146148
assertTrue(latch.await(30L, TimeUnit.SECONDS));
147149
}
148150
}
151+
152+
public void testCancelTasks() throws IOException {
153+
RestHighLevelClient client = highLevelClient();
154+
{
155+
// tag::cancel-tasks-request
156+
CancelTasksRequest request = new CancelTasksRequest();
157+
// end::cancel-tasks-request
158+
159+
// tag::cancel-tasks-request-filter
160+
request.setTaskId(new TaskId("nodeId1", 42)); //<1>
161+
request.setActions("cluster:*"); // <2>
162+
request.setNodes("nodeId1", "nodeId2"); // <3>
163+
// end::cancel-tasks-request-filter
164+
165+
}
166+
167+
CancelTasksRequest request = new CancelTasksRequest();
168+
request.setTaskId(TaskId.EMPTY_TASK_ID);
169+
170+
// tag::cancel-tasks-execute
171+
CancelTasksResponse response = client.tasks().cancel(request, RequestOptions.DEFAULT);
172+
// end::cancel-tasks-execute
173+
174+
assertThat(response, notNullValue());
175+
176+
// tag::cancel-tasks-response-tasks
177+
List<TaskInfo> tasks = response.getTasks(); // <1>
178+
// end::cancel-tasks-response-tasks
179+
180+
181+
// tag::cancel-tasks-response-failures
182+
List<ElasticsearchException> nodeFailures = response.getNodeFailures(); // <1>
183+
List<TaskOperationFailure> taskFailures = response.getTaskFailures(); // <2>
184+
// end::-tasks-response-failures
185+
186+
assertThat(response.getNodeFailures(), equalTo(emptyList()));
187+
assertThat(response.getTaskFailures(), equalTo(emptyList()));
188+
}
189+
190+
public void testAsyncCancelTasks() throws InterruptedException {
191+
192+
RestHighLevelClient client = highLevelClient();
193+
{
194+
CancelTasksRequest request = new CancelTasksRequest();
195+
196+
// tag::cancel-tasks-execute-listener
197+
ActionListener<CancelTasksResponse> listener =
198+
new ActionListener<CancelTasksResponse>() {
199+
@Override
200+
public void onResponse(CancelTasksResponse response) {
201+
// <1>
202+
}
203+
@Override
204+
public void onFailure(Exception e) {
205+
// <2>
206+
}
207+
};
208+
// end::cancel-tasks-execute-listener
209+
210+
// Replace the empty listener by a blocking listener in test
211+
final CountDownLatch latch = new CountDownLatch(1);
212+
listener = new LatchedActionListener<>(listener, latch);
213+
214+
// tag::cancel-tasks-execute-async
215+
client.tasks().cancelAsync(request, RequestOptions.DEFAULT, listener); // <1>
216+
// end::cancel-tasks-execute-async
217+
218+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
219+
}
220+
}
149221
}

docs/java-rest/high-level/supported-apis.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,5 +140,7 @@ include::snapshot/verify_repository.asciidoc[]
140140
The Java High Level REST Client supports the following Tasks APIs:
141141

142142
* <<java-rest-high-tasks-list>>
143+
* <<java-rest-high-cluster-cancel-tasks>>
143144

144145
include::tasks/list_tasks.asciidoc[]
146+
include::tasks/cancel_tasks.asciidoc[]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
[[java-rest-high-cluster-cancel-tasks]]
2+
=== Cancel Tasks API
3+
4+
The Cancel Tasks API allows cancellation of a currently running task.
5+
6+
==== Cancel Tasks Request
7+
8+
A `CancelTasksRequest`:
9+
10+
["source","java",subs="attributes,callouts,macros"]
11+
--------------------------------------------------
12+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-request]
13+
--------------------------------------------------
14+
There are no required parameters. The task cancellation command supports the same
15+
task selection parameters as the list tasks command.
16+
17+
==== Parameters
18+
19+
["source","java",subs="attributes,callouts,macros"]
20+
--------------------------------------------------
21+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-request-filter]
22+
--------------------------------------------------
23+
<1> Cancel a task
24+
<2> Cancel only cluster-related tasks
25+
<3> Cancel all tasks running on nodes nodeId1 and nodeId2
26+
27+
==== Synchronous Execution
28+
29+
["source","java",subs="attributes,callouts,macros"]
30+
--------------------------------------------------
31+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-execute]
32+
--------------------------------------------------
33+
34+
==== Asynchronous Execution
35+
36+
The asynchronous execution requires `CancelTasksRequest` instance and an
37+
`ActionListener` instance to be passed to the asynchronous method:
38+
39+
["source","java",subs="attributes,callouts,macros"]
40+
--------------------------------------------------
41+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-async]
42+
--------------------------------------------------
43+
<1> The `CancelTasksRequest` to execute and the `ActionListener` to use
44+
when the execution completes
45+
46+
The asynchronous method does not block and returns immediately. Once it is
47+
completed the `ActionListener` is called back using the `onResponse` method
48+
if the execution successfully completed or using the `onFailure` method if
49+
it failed.
50+
51+
A typical listener for `CancelTasksResponse` looks like:
52+
53+
["source","java",subs="attributes,callouts,macros"]
54+
--------------------------------------------------
55+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[cancel-tasks-execute-listener]
56+
--------------------------------------------------
57+
<1> Called when the execution is successfully completed. The response is
58+
provided as an argument
59+
<2> Called in case of a failure. The raised exception is provided as an argument
60+
61+
==== Cancel Tasks Response
62+
63+
["source","java",subs="attributes,callouts,macros"]
64+
--------------------------------------------------
65+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-tasks]
66+
--------------------------------------------------
67+
<1> List of cancelled tasks
68+
69+
["source","java",subs="attributes,callouts,macros"]
70+
--------------------------------------------------
71+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-calc]
72+
--------------------------------------------------
73+
<1> List of cancelled tasks grouped by a node
74+
<2> List of cancelled tasks grouped by a parent task
75+
76+
["source","java",subs="attributes,callouts,macros"]
77+
--------------------------------------------------
78+
include-tagged::{doc-tests}/TasksClientDocumentationIT.java[list-tasks-response-failures]
79+
--------------------------------------------------
80+
<1> List of node failures
81+
<2> List of task cancellation failures
82+

server/src/main/java/org/elasticsearch/action/admin/cluster/node/tasks/cancel/CancelTasksResponse.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,48 @@
1919

2020
package org.elasticsearch.action.admin.cluster.node.tasks.cancel;
2121

22-
import org.elasticsearch.action.FailedNodeException;
22+
import org.elasticsearch.ElasticsearchException;
2323
import org.elasticsearch.action.TaskOperationFailure;
2424
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
25+
import org.elasticsearch.common.ParseField;
26+
import org.elasticsearch.common.Strings;
27+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
28+
import org.elasticsearch.common.xcontent.XContentBuilder;
29+
import org.elasticsearch.common.xcontent.XContentParser;
2530
import org.elasticsearch.tasks.TaskInfo;
2631

32+
import java.io.IOException;
2733
import java.util.List;
2834

35+
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.optionalConstructorArg;
36+
2937
/**
3038
* Returns the list of tasks that were cancelled
3139
*/
3240
public class CancelTasksResponse extends ListTasksResponse {
3341

42+
private static final ConstructingObjectParser<CancelTasksResponse, Void> PARSER =
43+
setupParser("cancel_tasks_response", CancelTasksResponse::new);
44+
3445
public CancelTasksResponse() {
3546
}
3647

37-
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends FailedNodeException>
48+
public CancelTasksResponse(List<TaskInfo> tasks, List<TaskOperationFailure> taskFailures, List<? extends ElasticsearchException>
3849
nodeFailures) {
3950
super(tasks, taskFailures, nodeFailures);
4051
}
52+
53+
@Override
54+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
55+
return super.toXContent(builder, params);
56+
}
57+
58+
public static CancelTasksResponse fromXContent(XContentParser parser) {
59+
return PARSER.apply(parser, null);
60+
}
61+
62+
@Override
63+
public String toString() {
64+
return Strings.toString(this, true, true);
65+
}
4166
}

0 commit comments

Comments
 (0)