|
78 | 78 | import org.opensearch.test.OpenSearchTestCase; |
79 | 79 | import org.opensearch.threadpool.ThreadPool; |
80 | 80 | import org.opensearch.threadpool.ThreadPool.Names; |
| 81 | +import org.hamcrest.MatcherAssert; |
81 | 82 | import org.junit.Before; |
82 | 83 |
|
83 | 84 | import java.nio.charset.StandardCharsets; |
|
104 | 105 |
|
105 | 106 | import static java.util.Collections.emptyMap; |
106 | 107 | import static java.util.Collections.emptySet; |
| 108 | +import static org.hamcrest.Matchers.contains; |
107 | 109 | import static org.hamcrest.Matchers.equalTo; |
108 | 110 | import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| 111 | +import static org.hamcrest.Matchers.hasSize; |
109 | 112 | import static org.hamcrest.Matchers.instanceOf; |
110 | 113 | import static org.hamcrest.Matchers.is; |
111 | 114 | import static org.hamcrest.Matchers.notNullValue; |
112 | 115 | import static org.hamcrest.Matchers.nullValue; |
113 | 116 | import static org.hamcrest.Matchers.sameInstance; |
114 | 117 | import static org.mockito.Mockito.any; |
115 | | -import static org.mockito.Mockito.anyInt; |
116 | 118 | import static org.mockito.Mockito.anyString; |
117 | 119 | import static org.mockito.Mockito.argThat; |
118 | 120 | import static org.mockito.Mockito.doAnswer; |
@@ -1106,27 +1108,23 @@ public void testExecuteFailureWithNestedOnFailure() throws Exception { |
1106 | 1108 | verify(completionHandler, times(1)).accept(Thread.currentThread(), null); |
1107 | 1109 | } |
1108 | 1110 |
|
1109 | | - public void testBulkRequestExecutionWithFailures() throws Exception { |
| 1111 | + public void testBulkRequestExecutionWithFailures() { |
1110 | 1112 | BulkRequest bulkRequest = new BulkRequest(); |
1111 | 1113 | String pipelineId = "_id"; |
1112 | 1114 |
|
1113 | | - int numRequest = scaledRandomIntBetween(8, 64); |
1114 | | - int numIndexRequests = 0; |
1115 | | - for (int i = 0; i < numRequest; i++) { |
1116 | | - DocWriteRequest request; |
| 1115 | + int numIndexRequests = scaledRandomIntBetween(4, 32); |
| 1116 | + for (int i = 0; i < numIndexRequests; i++) { |
| 1117 | + IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); |
| 1118 | + indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); |
| 1119 | + bulkRequest.add(indexRequest); |
| 1120 | + } |
| 1121 | + int numOtherRequests = scaledRandomIntBetween(4, 32); |
| 1122 | + for (int i = 0; i < numOtherRequests; i++) { |
1117 | 1123 | if (randomBoolean()) { |
1118 | | - if (randomBoolean()) { |
1119 | | - request = new DeleteRequest("_index", "_id"); |
1120 | | - } else { |
1121 | | - request = new UpdateRequest("_index", "_id"); |
1122 | | - } |
| 1124 | + bulkRequest.add(new DeleteRequest("_index", "_id")); |
1123 | 1125 | } else { |
1124 | | - IndexRequest indexRequest = new IndexRequest("_index").id("_id").setPipeline(pipelineId).setFinalPipeline("_none"); |
1125 | | - indexRequest.source(Requests.INDEX_CONTENT_TYPE, "field1", "value1"); |
1126 | | - request = indexRequest; |
1127 | | - numIndexRequests++; |
| 1126 | + bulkRequest.add(new UpdateRequest("_index", "_id")); |
1128 | 1127 | } |
1129 | | - bulkRequest.add(request); |
1130 | 1128 | } |
1131 | 1129 |
|
1132 | 1130 | CompoundProcessor processor = mock(CompoundProcessor.class); |
@@ -1155,23 +1153,22 @@ public void testBulkRequestExecutionWithFailures() throws Exception { |
1155 | 1153 | clusterState = IngestService.innerPut(putRequest, clusterState); |
1156 | 1154 | ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, previousClusterState)); |
1157 | 1155 |
|
1158 | | - @SuppressWarnings("unchecked") |
1159 | | - BiConsumer<Integer, Exception> requestItemErrorHandler = mock(BiConsumer.class); |
1160 | | - @SuppressWarnings("unchecked") |
1161 | | - final BiConsumer<Thread, Exception> completionHandler = mock(BiConsumer.class); |
| 1156 | + final Map<Integer, Exception> errorHandler = new HashMap<>(); |
| 1157 | + final Map<Thread, Exception> completionHandler = new HashMap<>(); |
1162 | 1158 | ingestService.executeBulkRequest( |
1163 | | - numRequest, |
| 1159 | + numIndexRequests + numOtherRequests, |
1164 | 1160 | bulkRequest.requests(), |
1165 | | - requestItemErrorHandler, |
1166 | | - completionHandler, |
| 1161 | + errorHandler::put, |
| 1162 | + completionHandler::put, |
1167 | 1163 | indexReq -> {}, |
1168 | 1164 | Names.WRITE, |
1169 | 1165 | bulkRequest |
1170 | 1166 | ); |
1171 | 1167 |
|
1172 | | - verify(requestItemErrorHandler, times(numIndexRequests)).accept(anyInt(), argThat(o -> o.getCause().equals(error))); |
| 1168 | + MatcherAssert.assertThat(errorHandler.entrySet(), hasSize(numIndexRequests)); |
| 1169 | + errorHandler.values().forEach(e -> assertEquals(e.getCause(), error)); |
1173 | 1170 |
|
1174 | | - verify(completionHandler, times(1)).accept(Thread.currentThread(), null); |
| 1171 | + MatcherAssert.assertThat(completionHandler.keySet(), contains(Thread.currentThread())); |
1175 | 1172 | } |
1176 | 1173 |
|
1177 | 1174 | public void testBulkRequestExecution() throws Exception { |
|
0 commit comments