Skip to content

Commit 4868dfc

Browse files
author
Ayushi Arya
committed
Add current_application_duration_ms to cluster state download stats in node stats API; add UT and IT for test coverage
Signed-off-by: Ayushi Arya <ayuaryak@amazon.com>
1 parent 9c29462 commit 4868dfc

6 files changed

Lines changed: 154 additions & 3 deletions

File tree

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,51 @@ public void testRemotePublicationDownloadStats() {
277277
assertDataNodeDownloadStats(nodesStatsResponseDataNode.getNodes().get(0));
278278
}
279279

280+
public void testCurrentApplicationDurationInDownloadStats() throws Exception {
281+
int shardCount = 1;
282+
int replicaCount = 1;
283+
int dataNodeCount = shardCount * (replicaCount + 1);
284+
prepareCluster(1, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
285+
ensureStableCluster(1 + dataNodeCount);
286+
ensureGreen(INDEX_NAME);
287+
288+
String dataNode = internalCluster().getDataNodeNames().stream().collect(Collectors.toList()).get(0);
289+
290+
// Use assertBusy to handle the case where a background cluster state application
291+
// may still be in progress at the moment stats are fetched
292+
assertBusy(() -> {
293+
NodesStatsResponse nodesStatsResponse = client().admin()
294+
.cluster()
295+
.prepareNodesStats(dataNode)
296+
.addMetric(DISCOVERY.metricName())
297+
.get();
298+
299+
NodeStats nodeStats = nodesStatsResponse.getNodes().get(0);
300+
List<PersistedStateStats> persistenceStats = nodeStats.getDiscoveryStats()
301+
.getClusterStateStats()
302+
.getPersistenceStats();
303+
304+
boolean foundMatchingStats = false;
305+
for (PersistedStateStats stats : persistenceStats) {
306+
if (FULL_DOWNLOAD_STATS.equals(stats.getStatsName())
307+
|| DIFF_DOWNLOAD_STATS.equals(stats.getStatsName())) {
308+
foundMatchingStats = true;
309+
assertTrue(
310+
"Expected extended field current_application_duration_ms in " + stats.getStatsName(),
311+
stats.getExtendedFields().containsKey("current_application_duration_ms")
312+
);
313+
// Application is complete, so duration should be 0
314+
assertEquals(
315+
0,
316+
stats.getExtendedFields().get("current_application_duration_ms").get()
317+
);
318+
}
319+
}
320+
assertTrue("Expected at least one FULL_DOWNLOAD_STATS or DIFF_DOWNLOAD_STATS entry in persistence stats", foundMatchingStats);
321+
});
322+
}
323+
324+
280325
public void testRemotePublicationDisabledByRollingRestart() throws Exception {
281326
prepareCluster(3, 2, INDEX_NAME, 1, 2);
282327
ensureStableCluster(5);

server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
import java.util.Random;
106106
import java.util.Set;
107107
import java.util.concurrent.atomic.AtomicBoolean;
108+
import java.util.concurrent.atomic.AtomicLong;
108109
import java.util.function.BiConsumer;
109110
import java.util.function.Consumer;
110111
import java.util.function.Supplier;
@@ -953,9 +954,15 @@ public DiscoveryStats stats() {
953954
}
954955
});
955956
if (remoteClusterStateService != null) {
956-
stats.add(remoteClusterStateService.getFullDownloadStats());
957-
stats.add(remoteClusterStateService.getDiffDownloadStats());
957+
long durationMs = clusterApplier.getCurrentApplicationDurationMs();
958+
PersistedStateStats fullDownloadStats = remoteClusterStateService.getFullDownloadStats();
959+
PersistedStateStats diffDownloadStats = remoteClusterStateService.getDiffDownloadStats();
960+
fullDownloadStats.addToExtendedFields("current_application_duration_ms", new AtomicLong(durationMs));
961+
diffDownloadStats.addToExtendedFields("current_application_duration_ms", new AtomicLong(durationMs));
962+
stats.add(fullDownloadStats);
963+
stats.add(diffDownloadStats);
958964
}
965+
959966
clusterStateStats.setPersistenceStats(stats);
960967
return new DiscoveryStats(new PendingClusterStateStats(0, 0, 0), publicationHandler.stats(), clusterStateStats);
961968
}

server/src/main/java/org/opensearch/cluster/service/ClusterApplier.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ public interface ClusterApplier {
6464
*/
6565
void onNewClusterState(String source, Supplier<ClusterState> clusterStateSupplier, ClusterApplyListener listener);
6666

67+
/**
68+
* Returns the duration in milliseconds of the currently running cluster state application,
69+
* or 0 if no application is in progress
70+
*/
71+
default long getCurrentApplicationDurationMs(){
72+
return 0;
73+
}
74+
6775
/**
6876
* Listener for results of cluster state application
6977
*

server/src/main/java/org/opensearch/cluster/service/ClusterApplierService.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements
129129

130130
private final ClusterManagerMetrics clusterManagerMetrics;
131131

132+
//application duration tracking
133+
private volatile long applicationStartTimeNanos;
134+
135+
136+
132137
public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
133138
this(nodeName, settings, clusterSettings, threadPool, new ClusterManagerMetrics(NoopMetricsRegistry.INSTANCE));
134139
}
@@ -468,7 +473,7 @@ private void runTask(UpdateTask task) {
468473
logger.debug("processing [{}]: ignoring, cluster applier service not started", task.source);
469474
return;
470475
}
471-
476+
this.applicationStartTimeNanos = System.nanoTime();
472477
logger.debug("processing [{}]: execute", task.source);
473478
final ClusterState previousClusterState = state.get();
474479

@@ -492,6 +497,7 @@ private void runTask(UpdateTask task) {
492497
e
493498
);
494499
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
500+
this.applicationStartTimeNanos = 0;
495501
task.listener.onFailure(task.source, e);
496502
return;
497503
}
@@ -500,6 +506,7 @@ private void runTask(UpdateTask task) {
500506
TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, currentTimeInMillis() - startTimeMS));
501507
logger.debug("processing [{}]: took [{}] no change in cluster state", task.source, executionTime);
502508
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
509+
this.applicationStartTimeNanos = 0;
503510
task.listener.onSuccess(task.source);
504511
} else {
505512
if (logger.isTraceEnabled()) {
@@ -524,6 +531,7 @@ private void runTask(UpdateTask task) {
524531
newClusterState.stateUUID()
525532
);
526533
warnAboutSlowTaskIfNeeded(executionTime, task.source, stopWatch);
534+
this.applicationStartTimeNanos = 0;
527535
// Then we call the ClusterApplyListener of the task
528536
task.listener.onSuccess(task.source);
529537
} catch (Exception e) {
@@ -555,6 +563,7 @@ private void runTask(UpdateTask task) {
555563
// failing to apply a cluster state with an exception indicates a bug in validation or in one of the appliers; if we
556564
// continue we will retry with the same cluster state but that might not help.
557565
assert applicationMayFail();
566+
this.applicationStartTimeNanos = 0;
558567
task.listener.onFailure(task.source, e);
559568
}
560569
}
@@ -785,6 +794,20 @@ protected boolean applicationMayFail() {
785794
return false;
786795
}
787796

797+
/**
798+
* Returns the duration in milliseconds of the currently running cluster state application,
799+
* or 0 if no application is in progress.
800+
*/
801+
@Override
802+
public long getCurrentApplicationDurationMs() {
803+
long startNanos = this.applicationStartTimeNanos;
804+
if (startNanos == 0) {
805+
return 0;
806+
}
807+
return TimeValue.nsecToMSec(System.nanoTime() - startNanos);
808+
}
809+
810+
788811
/**
789812
* Pre-commit State of the cluster-applier
790813
* @return ClusterState

server/src/test/java/org/opensearch/cluster/coordination/PersistedStateStatsTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@
1313

1414
import java.util.concurrent.atomic.AtomicLong;
1515

16+
import org.opensearch.common.io.stream.BytesStreamOutput;
17+
import org.opensearch.core.common.io.stream.StreamInput;
18+
import java.io.IOException;
19+
20+
1621
public class PersistedStateStatsTests extends OpenSearchTestCase {
1722
private PersistedStateStats persistedStateStats;
1823

@@ -59,4 +64,26 @@ public void testAddMultipleFields() {
5964
assertEquals(42, persistedStateStats.getExtendedFields().get(fieldName1).get());
6065
assertEquals(84, persistedStateStats.getExtendedFields().get(fieldName2).get());
6166
}
67+
//serialization with extendedFields
68+
public void testSerializationRoundTripWithExtendedFields() throws IOException {
69+
PersistedStateStats original = new PersistedStateStats("test_download");
70+
original.stateSucceeded();
71+
original.stateTook(100);
72+
original.addToExtendedFields("current_application_duration_ms", new AtomicLong(5000));
73+
74+
// Serialize
75+
BytesStreamOutput out = new BytesStreamOutput();
76+
original.writeTo(out);
77+
78+
// Deserialize
79+
StreamInput in = out.bytes().streamInput();
80+
PersistedStateStats deserialized = new PersistedStateStats(in);
81+
82+
assertEquals("test_download", deserialized.getStatsName());
83+
assertEquals(1, deserialized.getSuccessCount());
84+
assertEquals(100, deserialized.getTotalTimeInMillis());
85+
assertTrue(deserialized.getExtendedFields().containsKey("current_application_duration_ms"));
86+
assertEquals(5000, deserialized.getExtendedFields().get("current_application_duration_ms").get());
87+
}
88+
6289
}

server/src/test/java/org/opensearch/cluster/service/ClusterApplierServiceTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -677,6 +677,47 @@ public void onFailure(String source, Exception e) {
677677
verifyNoInteractions(listenerslatencyHistogram);
678678
}
679679

680+
//testing for application duration tracking
681+
682+
public void testGetCurrentApplicationDurationMsReturnsZeroWhenIdle() {
683+
// No task running → should return 0
684+
assertEquals(0, clusterApplierService.getCurrentApplicationDurationMs());
685+
}
686+
687+
public void testGetCurrentApplicationDurationMsDuringApplication() throws Exception {
688+
689+
CountDownLatch taskStarted = new CountDownLatch(1);
690+
CountDownLatch taskCanFinish = new CountDownLatch(1);
691+
692+
clusterApplierService.onNewClusterState("test", () -> {
693+
taskStarted.countDown();
694+
try { taskCanFinish.await(); } catch (InterruptedException e) {}
695+
return ClusterState.builder(clusterApplierService.state()).build();
696+
}, new ClusterApplier.ClusterApplyListener() {
697+
@Override public void onSuccess(String source) {}
698+
@Override public void onFailure(String source, Exception e) {}
699+
});
700+
701+
taskStarted.await(); // Wait for task to start
702+
long duration = clusterApplierService.getCurrentApplicationDurationMs();
703+
assertTrue("Duration should be >= 0 during application, got: " + duration, duration >= 0);
704+
taskCanFinish.countDown(); // Let task finish
705+
}
706+
//after apllication completion returns 0
707+
public void testGetCurrentApplicationDurationMsResetsAfterCompletion() throws Exception {
708+
CountDownLatch taskDone = new CountDownLatch(1);
709+
710+
clusterApplierService.onNewClusterState("test",
711+
() -> ClusterState.builder(clusterApplierService.state()).build(),
712+
new ClusterApplier.ClusterApplyListener() {
713+
@Override public void onSuccess(String source) { taskDone.countDown(); }
714+
@Override public void onFailure(String source, Exception e) { taskDone.countDown(); }
715+
});
716+
717+
taskDone.await();
718+
assertEquals(0, clusterApplierService.getCurrentApplicationDurationMs());
719+
}
720+
680721
static class TimedClusterApplierService extends ClusterApplierService {
681722

682723
final ClusterSettings clusterSettings;

0 commit comments

Comments
 (0)