2727package org.opensearch.alerting.alerts
2828
2929import org.apache.logging.log4j.LogManager
30- import org.apache.lucene.index.IndexNotFoundException
3130import org.opensearch.ResourceAlreadyExistsException
31+ import org.opensearch.action.ActionListener
3232import org.opensearch.action.admin.cluster.state.ClusterStateRequest
33+ import org.opensearch.action.admin.cluster.state.ClusterStateResponse
3334import org.opensearch.action.admin.indices.alias.Alias
3435import org.opensearch.action.admin.indices.create.CreateIndexRequest
3536import org.opensearch.action.admin.indices.create.CreateIndexResponse
@@ -38,6 +39,7 @@ import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
3839import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
3940import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
4041import org.opensearch.action.admin.indices.rollover.RolloverRequest
42+ import org.opensearch.action.admin.indices.rollover.RolloverResponse
4143import org.opensearch.action.support.IndicesOptions
4244import org.opensearch.action.support.master.AcknowledgedResponse
4345import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX
@@ -145,7 +147,6 @@ class AlertIndices(
145147
146148 fun onMaster () {
147149 try {
148- // TODO: Change current actionGet requests within rolloverHistoryIndex() rolloverAndDeleteHistoryIndices() to use suspendUntil
149150 // try to rollover immediately as we might be restarting the cluster
150151 rolloverHistoryIndex()
151152 // schedule the next rollover for approx MAX_AGE later
@@ -279,9 +280,9 @@ class AlertIndices(
279280 deleteOldHistoryIndices()
280281 }
281282
282- private fun rolloverHistoryIndex (): Boolean {
283+ private fun rolloverHistoryIndex () {
283284 if (! historyIndexInitialized) {
284- return false
285+ return
285286 }
286287
287288 // We have to pass null for newIndexName in order to get Elastic to increment the index count.
@@ -291,17 +292,24 @@ class AlertIndices(
291292 .settings(Settings .builder().put(" index.hidden" , true ).build())
292293 request.addMaxIndexDocsCondition(historyMaxDocs)
293294 request.addMaxIndexAgeCondition(historyMaxAge)
294- val response = client.admin().indices().rolloverIndex(request).actionGet(requestTimeout)
295- if (! response.isRolledOver) {
296- logger.info(" $HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus} " )
297- } else {
298- lastRolloverTime = TimeValue .timeValueMillis(threadPool.absoluteTimeInMillis())
299- }
300- return response.isRolledOver
295+ client.admin().indices().rolloverIndex(
296+ request,
297+ object : ActionListener <RolloverResponse > {
298+ override fun onResponse (response : RolloverResponse ) {
299+ if (! response.isRolledOver) {
300+ logger.info(" $HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus} " )
301+ } else {
302+ lastRolloverTime = TimeValue .timeValueMillis(threadPool.absoluteTimeInMillis())
303+ }
304+ }
305+ override fun onFailure (e : Exception ) {
306+ logger.error(" $HISTORY_WRITE_INDEX not roll over failed." )
307+ }
308+ }
309+ )
301310 }
302311
303312 private fun deleteOldHistoryIndices () {
304- val indicesToDelete = mutableListOf<String >()
305313
306314 val clusterStateRequest = ClusterStateRequest ()
307315 .clear()
@@ -310,8 +318,27 @@ class AlertIndices(
310318 .local(true )
311319 .indicesOptions(IndicesOptions .strictExpand())
312320
313- val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()
321+ client.admin().cluster().state(
322+ clusterStateRequest,
323+ object : ActionListener <ClusterStateResponse > {
324+ override fun onResponse (clusterStateResponse : ClusterStateResponse ) {
325+ if (! clusterStateResponse.state.metadata.indices.isEmpty) {
326+ val indicesToDelete = getIndicesToDelete(clusterStateResponse)
327+ logger.info(" Deleting old history indices viz $indicesToDelete " )
328+ deleteAllOldHistoryIndices(indicesToDelete)
329+ } else {
330+ logger.info(" No Old History Indices to delete" )
331+ }
332+ }
333+ override fun onFailure (e : Exception ) {
334+ logger.error(" Error fetching cluster state" )
335+ }
336+ }
337+ )
338+ }
314339
340+ private fun getIndicesToDelete (clusterStateResponse : ClusterStateResponse ): List <String > {
341+ val indicesToDelete = mutableListOf<String >()
315342 for (entry in clusterStateResponse.state.metadata.indices) {
316343 val indexMetaData = entry.value
317344 val creationTime = indexMetaData.creationDate
@@ -331,26 +358,48 @@ class AlertIndices(
331358 indicesToDelete.add(indexMetaData.index.name)
332359 }
333360 }
361+ return indicesToDelete
362+ }
334363
364+ private fun deleteAllOldHistoryIndices (indicesToDelete : List <String >) {
335365 if (indicesToDelete.isNotEmpty()) {
336366 val deleteIndexRequest = DeleteIndexRequest (* indicesToDelete.toTypedArray())
337- val deleteIndexResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()
338-
339- if (! deleteIndexResponse.isAcknowledged) {
340- logger.error(" Could not delete one or more Alerting history indices: $indicesToDelete . Retrying one by one." )
341- for (index in indicesToDelete) {
342- try {
343- val singleDeleteRequest = DeleteIndexRequest (* indicesToDelete.toTypedArray())
344- val singleDeleteResponse = client.admin().indices().delete(singleDeleteRequest).actionGet()
367+ client.admin().indices().delete(
368+ deleteIndexRequest,
369+ object : ActionListener <AcknowledgedResponse > {
370+ override fun onResponse (deleteIndicesResponse : AcknowledgedResponse ) {
371+ if (! deleteIndicesResponse.isAcknowledged) {
372+ logger.error(" Could not delete one or more Alerting history indices: $indicesToDelete . Retrying one by one." )
373+ deleteOldHistoryIndex(indicesToDelete)
374+ }
375+ }
376+ override fun onFailure (e : Exception ) {
377+ logger.error(" Delete for Alerting History Indices $indicesToDelete Failed. Retrying one By one." )
378+ deleteOldHistoryIndex(indicesToDelete)
379+ }
380+ }
381+ )
382+ }
383+ }
345384
346- if (! singleDeleteResponse.isAcknowledged) {
347- logger.error(" Could not delete one or more Alerting history indices: $index " )
385+ private fun deleteOldHistoryIndex (indicesToDelete : List <String >) {
386+ for (index in indicesToDelete) {
387+ val singleDeleteRequest = DeleteIndexRequest (* indicesToDelete.toTypedArray())
388+ client.admin().indices().delete(
389+ singleDeleteRequest,
390+ object : ActionListener <AcknowledgedResponse > {
391+ override fun onResponse (acknowledgedResponse : AcknowledgedResponse ? ) {
392+ if (acknowledgedResponse != null ) {
393+ if (! acknowledgedResponse.isAcknowledged) {
394+ logger.error(" Could not delete one or more Alerting history indices: $index " )
395+ }
348396 }
349- } catch (e: IndexNotFoundException ) {
350- logger.debug(" $index was already deleted. ${e.message} " )
397+ }
398+ override fun onFailure (e : Exception ) {
399+ logger.debug(" Exception ${e.message} while deleting the index $index " )
351400 }
352401 }
353- }
402+ )
354403 }
355404 }
356405}
0 commit comments