Skip to content

Commit d0aea54

Browse files
Add PaginatedHitSource Tests (#144642)
Adds tests for both ClientScrollablePaginatedHitSource and RemoteScrollablePaginatedHitSource to validate the close behaviour of the parent class PaginatedHitSource Relates: elastic/elasticsearch-team#2088
1 parent dee07c7 commit d0aea54

2 files changed

Lines changed: 101 additions & 1 deletion

File tree

modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollablePaginatedHitSourceTests.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
import org.elasticsearch.action.ActionRequest;
1515
import org.elasticsearch.action.ActionResponse;
1616
import org.elasticsearch.action.ActionType;
17+
import org.elasticsearch.action.search.ClearScrollRequest;
18+
import org.elasticsearch.action.search.ClearScrollResponse;
1719
import org.elasticsearch.action.search.SearchRequest;
1820
import org.elasticsearch.action.search.SearchResponse;
1921
import org.elasticsearch.action.search.SearchScrollRequest;
22+
import org.elasticsearch.action.search.TransportClearScrollAction;
2023
import org.elasticsearch.action.search.TransportSearchAction;
2124
import org.elasticsearch.action.search.TransportSearchScrollAction;
2225
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
@@ -37,12 +40,15 @@
3740
import org.elasticsearch.threadpool.TestThreadPool;
3841
import org.elasticsearch.threadpool.ThreadPool;
3942
import org.junit.After;
43+
import org.junit.Assert;
4044
import org.junit.Before;
4145

46+
import java.util.ArrayList;
4247
import java.util.List;
4348
import java.util.concurrent.ArrayBlockingQueue;
4449
import java.util.concurrent.BlockingQueue;
4550
import java.util.concurrent.TimeUnit;
51+
import java.util.concurrent.atomic.AtomicBoolean;
4652
import java.util.concurrent.atomic.AtomicInteger;
4753
import java.util.concurrent.atomic.AtomicReference;
4854
import java.util.function.Consumer;
@@ -52,6 +58,7 @@
5258
import static org.apache.lucene.tests.util.TestUtil.randomSimpleString;
5359
import static org.elasticsearch.common.bytes.BytesReferenceTestUtils.equalBytes;
5460
import static org.elasticsearch.core.TimeValue.timeValueSeconds;
61+
import static org.hamcrest.Matchers.contains;
5562
import static org.hamcrest.Matchers.instanceOf;
5663

5764
public class ClientScrollablePaginatedHitSourceTests extends ESTestCase {
@@ -157,10 +164,61 @@ public void testScrollKeepAlive() {
157164
new SearchRequest().scroll(timeValueSeconds(10))
158165
);
159166

167+
paginatedHitSource.setScroll("scroll_id");
160168
paginatedHitSource.startNextScroll(timeValueSeconds(100));
161169
client.validateRequest(TransportSearchScrollAction.TYPE, (SearchScrollRequest r) -> assertEquals(r.scroll().seconds(), 110));
162170
}
163171

172+
/** When scroll ID is empty or null, close runs cleanup immediately without calling clearScroll. */
173+
public void testCloseWhenScrollIdEmpty() {
174+
MockClient client = new MockClient(threadPool);
175+
TaskId parentTask = new TaskId("thenode", randomInt());
176+
ClientScrollablePaginatedHitSource paginatedHitSource = new ClientScrollablePaginatedHitSource(
177+
logger,
178+
BackoffPolicy.constantBackoff(TimeValue.ZERO, 0),
179+
threadPool,
180+
Assert::fail,
181+
r -> fail(),
182+
e -> fail(),
183+
new ParentTaskAssigningClient(client, parentTask),
184+
new SearchRequest().scroll(timeValueSeconds(10))
185+
);
186+
AtomicBoolean closeCallbackCalled = new AtomicBoolean();
187+
188+
paginatedHitSource.close(() -> closeCallbackCalled.set(true));
189+
190+
assertTrue(closeCallbackCalled.get());
191+
assertFalse(client.hasExecuted(TransportClearScrollAction.TYPE));
192+
}
193+
194+
/** When scroll ID is set, close calls clearScroll and runs cleanup after it completes. */
195+
public void testCloseWhenScrollIdSet() throws InterruptedException {
196+
MockClient client = new MockClient(threadPool);
197+
TaskId parentTask = new TaskId("thenode", randomInt());
198+
ClientScrollablePaginatedHitSource paginatedHitSource = new ClientScrollablePaginatedHitSource(
199+
logger,
200+
BackoffPolicy.constantBackoff(TimeValue.ZERO, 0),
201+
threadPool,
202+
Assert::fail,
203+
r -> fail(),
204+
e -> fail(),
205+
new ParentTaskAssigningClient(client, parentTask),
206+
new SearchRequest().scroll(timeValueSeconds(10))
207+
);
208+
paginatedHitSource.setScroll("scroll_123");
209+
AtomicBoolean closeCallbackCalled = new AtomicBoolean();
210+
211+
paginatedHitSource.close(() -> closeCallbackCalled.set(true));
212+
213+
client.awaitOperation();
214+
client.validateRequest(
215+
TransportClearScrollAction.TYPE,
216+
(ClearScrollRequest r) -> assertThat(r.getScrollIds(), contains("scroll_123"))
217+
);
218+
client.respond(TransportClearScrollAction.TYPE, new ClearScrollResponse(true, 1));
219+
assertTrue(closeCallbackCalled.get());
220+
}
221+
164222
private SearchResponse createSearchResponse() {
165223
// create a simulated response.
166224
SearchHit hit = SearchHit.unpooled(0, "id").sourceRef(new BytesArray("{}"));
@@ -214,6 +272,7 @@ public void validateRequest(ActionType<Response> actionType, Consumer<? super Re
214272

215273
private static class MockClient extends AbstractClient {
216274
private ExecuteRequest<?, ?> executeRequest;
275+
private final List<ActionType<?>> executedActions = new ArrayList<>();
217276

218277
MockClient(ThreadPool threadPool) {
219278
super(Settings.EMPTY, threadPool, TestProjectResolvers.alwaysThrow());
@@ -225,11 +284,15 @@ protected synchronized <Request extends ActionRequest, Response extends ActionRe
225284
Request request,
226285
ActionListener<Response> listener
227286
) {
228-
287+
executedActions.add(action);
229288
this.executeRequest = new ExecuteRequest<>(action, request, listener);
230289
this.notifyAll();
231290
}
232291

292+
boolean hasExecuted(ActionType<?> action) {
293+
return executedActions.contains(action);
294+
}
295+
233296
@SuppressWarnings("unchecked")
234297
public <Request extends ActionRequest, Response extends ActionResponse> void respondx(
235298
ActionType<Response> action,

modules/reindex/src/test/java/org/elasticsearch/reindex/remote/RemoteScrollablePaginatedHitSourceTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.Version;
2929
import org.elasticsearch.action.search.SearchRequest;
3030
import org.elasticsearch.client.HeapBufferedAsyncResponseConsumer;
31+
import org.elasticsearch.client.ResponseListener;
3132
import org.elasticsearch.client.RestClient;
3233
import org.elasticsearch.common.BackoffPolicy;
3334
import org.elasticsearch.common.ParsingException;
@@ -76,6 +77,7 @@
7677
import static org.mockito.ArgumentMatchers.any;
7778
import static org.mockito.Mockito.doThrow;
7879
import static org.mockito.Mockito.mock;
80+
import static org.mockito.Mockito.never;
7981
import static org.mockito.Mockito.verify;
8082
import static org.mockito.Mockito.when;
8183

@@ -432,6 +434,41 @@ public void testCleanupFailure() throws Exception {
432434
assertTrue(cleanupCallbackCalled.get());
433435
}
434436

437+
/** When scroll ID is empty or null, close runs cleanup immediately without calling clearScroll. */
438+
public void testCloseWhenScrollIdEmpty() throws Exception {
439+
RestClient client = mock(RestClient.class);
440+
RemoteInfo remoteInfo = remoteInfo();
441+
TestRemoteScrollablePaginatedHitSource paginatedHitSource = new TestRemoteScrollablePaginatedHitSource(client, remoteInfo);
442+
AtomicBoolean closeCallbackCalled = new AtomicBoolean();
443+
444+
paginatedHitSource.close(() -> closeCallbackCalled.set(true));
445+
446+
assertTrue(closeCallbackCalled.get());
447+
verify(client, never()).performRequestAsync(any(), any());
448+
verify(client).close();
449+
}
450+
451+
/** When scroll ID is set, close calls clearScroll and runs cleanup after it completes. */
452+
public void testCloseWhenScrollIdSet() throws Exception {
453+
RestClient client = mock(RestClient.class);
454+
when(client.performRequestAsync(any(), any())).thenAnswer(invocation -> {
455+
ResponseListener listener = invocation.getArgument(1);
456+
listener.onSuccess(mock(org.elasticsearch.client.Response.class));
457+
return null;
458+
});
459+
RemoteInfo remoteInfo = remoteInfo();
460+
TestRemoteScrollablePaginatedHitSource paginatedHitSource = new TestRemoteScrollablePaginatedHitSource(client, remoteInfo);
461+
paginatedHitSource.remoteVersion = Version.CURRENT;
462+
paginatedHitSource.setScroll("scroll_123");
463+
AtomicBoolean closeCallbackCalled = new AtomicBoolean();
464+
465+
paginatedHitSource.close(() -> closeCallbackCalled.set(true));
466+
467+
assertTrue(closeCallbackCalled.get());
468+
verify(client).performRequestAsync(any(), any());
469+
verify(client).close();
470+
}
471+
435472
private RemoteScrollablePaginatedHitSource sourceWithMockedRemoteCall(String... paths) throws Exception {
436473
return sourceWithMockedRemoteCall(true, ContentType.APPLICATION_JSON, paths);
437474
}

0 commit comments

Comments
 (0)