|
34 | 34 | import org.opensearch.core.xcontent.ToXContent; |
35 | 35 | import org.opensearch.core.xcontent.XContentBuilder; |
36 | 36 | import org.opensearch.securityanalytics.model.ThreatIntelFeedData; |
| 37 | +import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; |
37 | 38 | import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; |
38 | 39 | import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; |
39 | 40 | import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; |
40 | | -import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; |
41 | 41 | import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; |
42 | | -import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; |
| 42 | +import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; |
43 | 43 | import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService; |
44 | 44 | import org.opensearch.securityanalytics.util.IndexUtils; |
45 | 45 | import org.opensearch.securityanalytics.util.SecurityAnalyticsException; |
|
56 | 56 | import java.util.List; |
57 | 57 | import java.util.Map; |
58 | 58 | import java.util.Optional; |
59 | | -import java.util.concurrent.CountDownLatch; |
60 | 59 | import java.util.regex.Matcher; |
61 | 60 | import java.util.regex.Pattern; |
62 | 61 | import java.util.stream.Collectors; |
@@ -104,21 +103,13 @@ public void getThreatIntelFeedData( |
104 | 103 | ActionListener<List<ThreatIntelFeedData>> listener |
105 | 104 | ) { |
106 | 105 | try { |
107 | | - |
108 | 106 | String tifdIndex = getLatestIndexByCreationDate(); |
109 | 107 | if (tifdIndex == null) { |
110 | 108 | createThreatIntelFeedData(listener); |
111 | 109 | } else { |
112 | | - SearchRequest searchRequest = new SearchRequest(tifdIndex); |
113 | | - searchRequest.source().size(9999); //TODO: convert to scroll |
114 | | - String finalTifdIndex = tifdIndex; |
115 | | - client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { |
116 | | - log.error(String.format( |
117 | | - "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); |
118 | | - listener.onFailure(e); |
119 | | - })); |
| 110 | + fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); |
120 | 111 | } |
121 | | - } catch (InterruptedException e) { |
| 112 | + } catch (Exception e) { |
122 | 113 | log.error("Failed to get threat intel feed data", e); |
123 | 114 | listener.onFailure(e); |
124 | 115 | } |
@@ -150,21 +141,16 @@ public void createIndexIfNotExists(final String indexName, final ActionListener< |
150 | 141 | .mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)); |
151 | 142 | StashedThreadContext.run( |
152 | 143 | client, |
153 | | - () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { |
154 | | - @Override |
155 | | - public void onResponse(CreateIndexResponse response) { |
156 | | - if (response.isAcknowledged()) { |
157 | | - listener.onResponse(response); |
158 | | - } else { |
159 | | - onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); |
160 | | - } |
161 | | - } |
162 | | - |
163 | | - @Override |
164 | | - public void onFailure(Exception e) { |
165 | | - listener.onFailure(e); |
166 | | - } |
167 | | - }) |
| 144 | + () -> client.admin().indices().create(createIndexRequest, |
| 145 | + ActionListener.wrap( |
| 146 | + response -> { |
| 147 | + if (response.isAcknowledged()) |
| 148 | + listener.onResponse(response); |
| 149 | + else |
| 150 | + listener.onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); |
| 151 | + |
| 152 | + }, listener::onFailure |
| 153 | + )) |
168 | 154 | ); |
169 | 155 | } |
170 | 156 |
|
@@ -223,28 +209,20 @@ public void parseAndSaveThreatIntelFeedDataCSV( |
223 | 209 | } |
224 | 210 | bulkRequestList.add(bulkRequest); |
225 | 211 |
|
226 | | - GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() { |
227 | | - @Override |
228 | | - public void onResponse(Collection<BulkResponse> bulkResponses) { |
229 | | - int idx = 0; |
230 | | - for (BulkResponse response: bulkResponses) { |
231 | | - BulkRequest request = bulkRequestList.get(idx); |
232 | | - if (response.hasFailures()) { |
233 | | - throw new OpenSearchException( |
234 | | - "error occurred while ingesting threat intel feed data in {} with an error {}", |
235 | | - StringUtils.join(request.getIndices()), |
236 | | - response.buildFailureMessage() |
237 | | - ); |
238 | | - } |
| 212 | + GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> { |
| 213 | + int idx = 0; |
| 214 | + for (BulkResponse response : bulkResponses) { |
| 215 | + BulkRequest request = bulkRequestList.get(idx); |
| 216 | + if (response.hasFailures()) { |
| 217 | + throw new OpenSearchException( |
| 218 | + "error occurred while ingesting threat intel feed data in {} with an error {}", |
| 219 | + StringUtils.join(request.getIndices()), |
| 220 | + response.buildFailureMessage() |
| 221 | + ); |
239 | 222 | } |
240 | | - listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); |
241 | 223 | } |
242 | | - |
243 | | - @Override |
244 | | - public void onFailure(Exception e) { |
245 | | - listener.onFailure(e); |
246 | | - } |
247 | | - }, bulkRequestList.size()); |
| 224 | + listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); |
| 225 | + }, listener::onFailure), bulkRequestList.size()); |
248 | 226 |
|
249 | 227 | for (int i = 0; i < bulkRequestList.size(); ++i) { |
250 | 228 | saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener); |
@@ -291,52 +269,47 @@ public void deleteThreatIntelDataIndex(final List<String> indices) { |
291 | 269 | .prepareDelete(indices.toArray(new String[0])) |
292 | 270 | .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) |
293 | 271 | .setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) |
294 | | - .execute(new ActionListener<>() { |
295 | | - @Override |
296 | | - public void onResponse(AcknowledgedResponse response) { |
297 | | - if (response.isAcknowledged() == false) { |
298 | | - onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices))); |
299 | | - } |
300 | | - } |
301 | | - |
302 | | - @Override |
303 | | - public void onFailure(Exception e) { |
304 | | - log.error("unknown exception:", e); |
305 | | - } |
306 | | - }) |
| 272 | + .execute(ActionListener.wrap( |
| 273 | + response -> { |
| 274 | + if (response.isAcknowledged() == false) { |
| 275 | + log.error(new OpenSearchException("failed to delete threat intel feed index[{}]", |
| 276 | + String.join(",", indices))); |
| 277 | + } |
| 278 | + }, e -> log.error("failed to delete threat intel feed index [{}]", e) |
| 279 | + )) |
307 | 280 | ); |
308 | 281 | } |
309 | 282 |
|
310 | | - private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException { |
311 | | - CountDownLatch countDownLatch = new CountDownLatch(1); |
| 283 | + private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) { |
312 | 284 | client.execute( |
313 | 285 | PutTIFJobAction.INSTANCE, |
314 | 286 | new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), |
315 | | - new ActionListener<>() { |
316 | | - @Override |
317 | | - public void onResponse(AcknowledgedResponse acknowledgedResponse) { |
318 | | - log.debug("Acknowledged threat intel feed updater job created"); |
319 | | - countDownLatch.countDown(); |
320 | | - String tifdIndex = getLatestIndexByCreationDate(); |
321 | | - |
322 | | - SearchRequest searchRequest = new SearchRequest(tifdIndex); |
323 | | - searchRequest.source().size(9999); //TODO: convert to scroll |
324 | | - String finalTifdIndex = tifdIndex; |
325 | | - client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { |
326 | | - log.error(String.format( |
327 | | - "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); |
| 287 | + ActionListener.wrap( |
| 288 | + r -> { |
| 289 | + if (false == r.isAcknowledged()) { |
| 290 | + listener.onFailure(new Exception("Failed to acknowledge Put Tif job action")); |
| 291 | + return; |
| 292 | + } |
| 293 | + log.debug("Acknowledged threat intel feed updater job created"); |
| 294 | + String tifdIndex = getLatestIndexByCreationDate(); |
| 295 | + fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); |
| 296 | + }, e -> { |
| 297 | + log.debug("Failed to create threat intel feed updater job", e); |
328 | 298 | listener.onFailure(e); |
329 | | - })); |
330 | | - } |
331 | | - |
332 | | - @Override |
333 | | - public void onFailure(Exception e) { |
334 | | - log.debug("Failed to create threat intel feed updater job", e); |
335 | | - countDownLatch.countDown(); |
336 | | - } |
337 | | - } |
| 299 | + } |
| 300 | + ) |
338 | 301 | ); |
339 | | - countDownLatch.await(); |
| 302 | + } |
| 303 | + |
| 304 | + private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) { |
| 305 | + SearchRequest searchRequest = new SearchRequest(tifdIndex); |
| 306 | + searchRequest.source().size(9999); //TODO: convert to scroll |
| 307 | + String finalTifdIndex = tifdIndex; |
| 308 | + client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { |
| 309 | + log.error(String.format( |
| 310 | + "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); |
| 311 | + listener.onFailure(e); |
| 312 | + })); |
340 | 313 | } |
341 | 314 |
|
342 | 315 | private String getIndexMapping() { |
|
0 commit comments