Skip to content

Commit ff57c54

Browse files
authored
Removing All Usages of Action Get Method Calls and adding the listeners (#130)
Signed-off-by: Aditya Jindal <aditjind@amazon.com>
1 parent 0c4c2fc commit ff57c54

1 file changed

Lines changed: 75 additions & 26 deletions

File tree

alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertIndices.kt

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@
2727
package org.opensearch.alerting.alerts
2828

2929
import org.apache.logging.log4j.LogManager
30-
import org.apache.lucene.index.IndexNotFoundException
3130
import org.opensearch.ResourceAlreadyExistsException
31+
import org.opensearch.action.ActionListener
3232
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
33+
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
3334
import org.opensearch.action.admin.indices.alias.Alias
3435
import org.opensearch.action.admin.indices.create.CreateIndexRequest
3536
import org.opensearch.action.admin.indices.create.CreateIndexResponse
@@ -38,6 +39,7 @@ import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest
3839
import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse
3940
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
4041
import org.opensearch.action.admin.indices.rollover.RolloverRequest
42+
import org.opensearch.action.admin.indices.rollover.RolloverResponse
4143
import org.opensearch.action.support.IndicesOptions
4244
import org.opensearch.action.support.master.AcknowledgedResponse
4345
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALERT_INDEX
@@ -145,7 +147,6 @@ class AlertIndices(
145147

146148
fun onMaster() {
147149
try {
148-
// TODO: Change current actionGet requests within rolloverHistoryIndex() rolloverAndDeleteHistoryIndices() to use suspendUntil
149150
// try to rollover immediately as we might be restarting the cluster
150151
rolloverHistoryIndex()
151152
// schedule the next rollover for approx MAX_AGE later
@@ -279,9 +280,9 @@ class AlertIndices(
279280
deleteOldHistoryIndices()
280281
}
281282

282-
private fun rolloverHistoryIndex(): Boolean {
283+
private fun rolloverHistoryIndex() {
283284
if (!historyIndexInitialized) {
284-
return false
285+
return
285286
}
286287

287288
// We have to pass null for newIndexName in order to get Elastic to increment the index count.
@@ -291,17 +292,24 @@ class AlertIndices(
291292
.settings(Settings.builder().put("index.hidden", true).build())
292293
request.addMaxIndexDocsCondition(historyMaxDocs)
293294
request.addMaxIndexAgeCondition(historyMaxAge)
294-
val response = client.admin().indices().rolloverIndex(request).actionGet(requestTimeout)
295-
if (!response.isRolledOver) {
296-
logger.info("$HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus}")
297-
} else {
298-
lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis())
299-
}
300-
return response.isRolledOver
295+
client.admin().indices().rolloverIndex(
296+
request,
297+
object : ActionListener<RolloverResponse> {
298+
override fun onResponse(response: RolloverResponse) {
299+
if (!response.isRolledOver) {
300+
logger.info("$HISTORY_WRITE_INDEX not rolled over. Conditions were: ${response.conditionStatus}")
301+
} else {
302+
lastRolloverTime = TimeValue.timeValueMillis(threadPool.absoluteTimeInMillis())
303+
}
304+
}
305+
override fun onFailure(e: Exception) {
306+
logger.error("$HISTORY_WRITE_INDEX not roll over failed.")
307+
}
308+
}
309+
)
301310
}
302311

303312
private fun deleteOldHistoryIndices() {
304-
val indicesToDelete = mutableListOf<String>()
305313

306314
val clusterStateRequest = ClusterStateRequest()
307315
.clear()
@@ -310,8 +318,27 @@ class AlertIndices(
310318
.local(true)
311319
.indicesOptions(IndicesOptions.strictExpand())
312320

313-
val clusterStateResponse = client.admin().cluster().state(clusterStateRequest).actionGet()
321+
client.admin().cluster().state(
322+
clusterStateRequest,
323+
object : ActionListener<ClusterStateResponse> {
324+
override fun onResponse(clusterStateResponse: ClusterStateResponse) {
325+
if (!clusterStateResponse.state.metadata.indices.isEmpty) {
326+
val indicesToDelete = getIndicesToDelete(clusterStateResponse)
327+
logger.info("Deleting old history indices viz $indicesToDelete")
328+
deleteAllOldHistoryIndices(indicesToDelete)
329+
} else {
330+
logger.info("No Old History Indices to delete")
331+
}
332+
}
333+
override fun onFailure(e: Exception) {
334+
logger.error("Error fetching cluster state")
335+
}
336+
}
337+
)
338+
}
314339

340+
private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
341+
val indicesToDelete = mutableListOf<String>()
315342
for (entry in clusterStateResponse.state.metadata.indices) {
316343
val indexMetaData = entry.value
317344
val creationTime = indexMetaData.creationDate
@@ -331,26 +358,48 @@ class AlertIndices(
331358
indicesToDelete.add(indexMetaData.index.name)
332359
}
333360
}
361+
return indicesToDelete
362+
}
334363

364+
private fun deleteAllOldHistoryIndices(indicesToDelete: List<String>) {
335365
if (indicesToDelete.isNotEmpty()) {
336366
val deleteIndexRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
337-
val deleteIndexResponse = client.admin().indices().delete(deleteIndexRequest).actionGet()
338-
339-
if (!deleteIndexResponse.isAcknowledged) {
340-
logger.error("Could not delete one or more Alerting history indices: $indicesToDelete. Retrying one by one.")
341-
for (index in indicesToDelete) {
342-
try {
343-
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
344-
val singleDeleteResponse = client.admin().indices().delete(singleDeleteRequest).actionGet()
367+
client.admin().indices().delete(
368+
deleteIndexRequest,
369+
object : ActionListener<AcknowledgedResponse> {
370+
override fun onResponse(deleteIndicesResponse: AcknowledgedResponse) {
371+
if (!deleteIndicesResponse.isAcknowledged) {
372+
logger.error("Could not delete one or more Alerting history indices: $indicesToDelete. Retrying one by one.")
373+
deleteOldHistoryIndex(indicesToDelete)
374+
}
375+
}
376+
override fun onFailure(e: Exception) {
377+
logger.error("Delete for Alerting History Indices $indicesToDelete Failed. Retrying one By one.")
378+
deleteOldHistoryIndex(indicesToDelete)
379+
}
380+
}
381+
)
382+
}
383+
}
345384

346-
if (!singleDeleteResponse.isAcknowledged) {
347-
logger.error("Could not delete one or more Alerting history indices: $index")
385+
private fun deleteOldHistoryIndex(indicesToDelete: List<String>) {
386+
for (index in indicesToDelete) {
387+
val singleDeleteRequest = DeleteIndexRequest(*indicesToDelete.toTypedArray())
388+
client.admin().indices().delete(
389+
singleDeleteRequest,
390+
object : ActionListener<AcknowledgedResponse> {
391+
override fun onResponse(acknowledgedResponse: AcknowledgedResponse?) {
392+
if (acknowledgedResponse != null) {
393+
if (!acknowledgedResponse.isAcknowledged) {
394+
logger.error("Could not delete one or more Alerting history indices: $index")
395+
}
348396
}
349-
} catch (e: IndexNotFoundException) {
350-
logger.debug("$index was already deleted. ${e.message}")
397+
}
398+
override fun onFailure(e: Exception) {
399+
logger.debug("Exception ${e.message} while deleting the index $index")
351400
}
352401
}
353-
}
402+
)
354403
}
355404
}
356405
}

0 commit comments

Comments
 (0)