Skip to content

Commit 32ef22c

Browse files
committed
Add Builder in ThreadPoolStats and refactor usage
Signed-off-by: Jean Kim <bgshhd95@gmail.com>
1 parent 3ffa49a commit 32ef22c

6 files changed

Lines changed: 230 additions & 43 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2222
- Onboarding new maven snapshots publishing to s3 ([#19619](https://github.com/opensearch-project/OpenSearch/pull/19619))
2323
- Remove MultiCollectorWrapper and use MultiCollector in Lucene instead ([#19595](https://github.com/opensearch-project/OpenSearch/pull/19595))
2424
- Change implementation for `percentiles` aggregation for latency improvement ([#19648](https://github.com/opensearch-project/OpenSearch/pull/19648))
25+
- Refactor the ThreadPoolStats.Stats class to use the Builder pattern instead of constructors ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306))
2526

2627
### Fixed
2728
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
@@ -42,6 +43,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4243
- Bump `stefanzweifel/git-auto-commit-action` from 6 to 7 ([#19689](https://github.com/opensearch-project/OpenSearch/pull/19689))
4344

4445
### Deprecated
46+
- Deprecated existing constructors in ThreadPoolStats.Stats in favor of the new Builder ([#19306](https://github.com/opensearch-project/OpenSearch/pull/19306))
4547

4648
### Removed
4749

server/src/main/java/org/opensearch/threadpool/ThreadPool.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,18 @@ public ThreadPoolStats stats() {
567567
rejected = ((XRejectedExecutionHandler) rejectedExecutionHandler).rejected();
568568
}
569569
}
570-
stats.add(new ThreadPoolStats.Stats(name, threads, queue, active, rejected, largest, completed, waitTimeNanos, parallelism));
570+
stats.add(
571+
new ThreadPoolStats.Stats.Builder().name(name)
572+
.threads(threads)
573+
.queue(queue)
574+
.active(active)
575+
.rejected(rejected)
576+
.largest(largest)
577+
.completed(completed)
578+
.waitTimeNanos(waitTimeNanos)
579+
.parallelism(parallelism)
580+
.build()
581+
);
571582
}
572583
return new ThreadPoolStats(stats);
573584
}
@@ -606,14 +617,14 @@ public ExecutorService executor(String name) {
606617
/**
607618
* Schedules a one-shot command to run after a given delay. The command is run in the context of the calling thread.
608619
*
609-
* @param command the command to run
610-
* @param delay delay before the task executes
620+
* @param command the command to run
621+
* @param delay delay before the task executes
611622
* @param executor the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes
612-
* the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the
613-
* command completes.
623+
* the meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the
624+
* command completes.
614625
* @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if
615-
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
616-
* the ScheduledFuture will cannot interact with it.
626+
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
627+
* the ScheduledFuture will cannot interact with it.
617628
* @throws OpenSearchRejectedExecutionException if the task cannot be scheduled for execution
618629
*/
619630
@Override

server/src/main/java/org/opensearch/threadpool/ThreadPoolStats.java

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,28 @@ public static class Stats implements Writeable, ToXContentFragment, Comparable<S
7373
private final long waitTimeNanos;
7474
private final int parallelism;
7575

76+
/**
77+
* Private constructor that takes a builder.
78+
* This is the sole entry point for creating a new Stats object.
79+
* @param builder The builder instance containing all the values.
80+
*/
81+
private Stats(Builder builder) {
82+
this.name = builder.name;
83+
this.threads = builder.threads;
84+
this.queue = builder.queue;
85+
this.active = builder.active;
86+
this.rejected = builder.rejected;
87+
this.largest = builder.largest;
88+
this.completed = builder.completed;
89+
this.waitTimeNanos = builder.waitTimeNanos;
90+
this.parallelism = builder.parallelism;
91+
}
92+
93+
/**
94+
* This constructor will be deprecated starting in version 3.3.0.
95+
* Use {@link Builder} instead.
96+
*/
97+
@Deprecated
7698
public Stats(String name, int threads, int queue, int active, long rejected, int largest, long completed, long waitTimeNanos) {
7799
this.name = name;
78100
this.threads = threads;
@@ -226,6 +248,77 @@ public int compareTo(Stats other) {
226248
return compare;
227249
}
228250
}
251+
252+
/**
253+
* Builder for the {@link Stats} class.
254+
* Provides a fluent API for constructing a Stats object.
255+
*/
256+
public static class Builder {
257+
private String name = "";
258+
private int threads = 0;
259+
private int queue = 0;
260+
private int active = 0;
261+
private long rejected = 0;
262+
private int largest = 0;
263+
private long completed = 0;
264+
private long waitTimeNanos = 0;
265+
private int parallelism = 0;
266+
267+
public Builder() {}
268+
269+
public Builder name(String name) {
270+
this.name = name;
271+
return this;
272+
}
273+
274+
public Builder threads(int threads) {
275+
this.threads = threads;
276+
return this;
277+
}
278+
279+
public Builder queue(int queue) {
280+
this.queue = queue;
281+
return this;
282+
}
283+
284+
public Builder active(int active) {
285+
this.active = active;
286+
return this;
287+
}
288+
289+
public Builder rejected(long rejected) {
290+
this.rejected = rejected;
291+
return this;
292+
}
293+
294+
public Builder largest(int largest) {
295+
this.largest = largest;
296+
return this;
297+
}
298+
299+
public Builder completed(long completed) {
300+
this.completed = completed;
301+
return this;
302+
}
303+
304+
public Builder waitTimeNanos(long waitTimeNanos) {
305+
this.waitTimeNanos = waitTimeNanos;
306+
return this;
307+
}
308+
309+
public Builder parallelism(int parallelism) {
310+
this.parallelism = parallelism;
311+
return this;
312+
}
313+
314+
/**
315+
* Creates a {@link Stats} object from the builder's current state.
316+
* @return A new Stats instance.
317+
*/
318+
public Stats build() {
319+
return new Stats(this);
320+
}
321+
}
229322
}
230323

231324
private List<Stats> stats;

server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -734,17 +734,16 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOExcep
734734
List<ThreadPoolStats.Stats> threadPoolStatsList = new ArrayList<>();
735735
for (int i = 0; i < numThreadPoolStats; i++) {
736736
threadPoolStatsList.add(
737-
new ThreadPoolStats.Stats(
738-
randomAlphaOfLengthBetween(3, 10),
739-
randomIntBetween(1, 1000),
740-
randomIntBetween(1, 1000),
741-
randomIntBetween(1, 1000),
742-
randomNonNegativeLong(),
743-
randomIntBetween(1, 1000),
744-
randomIntBetween(1, 1000),
745-
randomIntBetween(-1, 10),
746-
-1 // Non-ForkJoinPool: use -1
747-
)
737+
new ThreadPoolStats.Stats.Builder().name(randomAlphaOfLengthBetween(3, 10))
738+
.threads(randomIntBetween(1, 1000))
739+
.queue(randomIntBetween(1, 1000))
740+
.active(randomIntBetween(1, 1000))
741+
.rejected(randomNonNegativeLong())
742+
.largest(randomIntBetween(1, 1000))
743+
.completed(randomIntBetween(1, 1000))
744+
.waitTimeNanos(randomIntBetween(-1, 10))
745+
.parallelism(-1) // Non-ForkJoinPool: use -1
746+
.build()
748747
);
749748
}
750749
threadPoolStats = new ThreadPoolStats(threadPoolStatsList);

server/src/test/java/org/opensearch/index/autoforcemerge/AutoForceMergeManagerTests.java

Lines changed: 71 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,18 @@ public void testNodeValidatorWithHealthyResources() {
240240
when(cpu.getPercent()).thenReturn((short) 50);
241241
when(jvm.getHeapUsedPercent()).thenReturn((short) 60);
242242
ThreadPoolStats stats = new ThreadPoolStats(
243-
Arrays.asList(new ThreadPoolStats.Stats(
244-
ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0, -1
245-
))
243+
Arrays.asList(
244+
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
245+
.threads(1)
246+
.queue(0)
247+
.active(0)
248+
.rejected(0)
249+
.largest(1)
250+
.completed(0)
251+
.waitTimeNanos(0)
252+
.parallelism(-1)
253+
.build()
254+
)
246255
);
247256
when(threadPool.stats()).thenReturn(stats);
248257

@@ -256,9 +265,17 @@ public void testNodeValidatorWithFeatureSwitch() {
256265
when(cpu.getPercent()).thenReturn((short) 50);
257266
when(jvm.getHeapUsedPercent()).thenReturn((short) 60);
258267
ThreadPoolStats stats = new ThreadPoolStats(
259-
Arrays.asList(new ThreadPoolStats.Stats(
260-
ThreadPool.Names.FORCE_MERGE, 1, 0, 0, 0, 1, 0, 0, -1
261-
))
268+
Arrays.asList(new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
269+
.threads(1)
270+
.queue(0)
271+
.active(0)
272+
.rejected(0)
273+
.largest(1)
274+
.completed(0)
275+
.waitTimeNanos(0)
276+
.parallelism(-1)
277+
.build()
278+
)
262279
);
263280
when(threadPool.stats()).thenReturn(stats);
264281
Settings settings = getConfiguredClusterSettings(false, false, Collections.emptyMap());
@@ -335,9 +352,18 @@ public void testNodeValidatorWithInsufficientForceMergeThreads() {
335352
when(cpu.getPercent()).thenReturn((short) 50);
336353
when(jvm.getHeapUsedPercent()).thenReturn((short) 50);
337354
ThreadPoolStats stats = new ThreadPoolStats(
338-
Arrays.asList(new ThreadPoolStats.Stats(
339-
ThreadPool.Names.FORCE_MERGE, 1, 1, 1, 0, 1, 0, -1, -1
340-
))
355+
Arrays.asList(
356+
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
357+
.threads(1)
358+
.queue(1)
359+
.active(1)
360+
.rejected(0)
361+
.largest(1)
362+
.completed(0)
363+
.waitTimeNanos(-1)
364+
.parallelism(-1)
365+
.build()
366+
)
341367
);
342368
when(threadPool.stats()).thenReturn(stats);
343369
AutoForceMergeManager autoForceMergeManager = clusterSetupWithNode(getConfiguredClusterSettings(true, true, Collections.emptyMap()), getNodeWithRoles(DATA_NODE_1, Set.of(DiscoveryNodeRole.DATA_ROLE)));
@@ -474,7 +500,18 @@ public void testForceMergeOperationOnDataNodeWithFailingMerges() throws IOExcept
474500
ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads);
475501
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);
476502
ThreadPoolStats stats = new ThreadPoolStats(
477-
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1))
503+
Arrays.asList(
504+
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
505+
.threads(forceMergeThreads)
506+
.queue(0)
507+
.active(0)
508+
.rejected(0)
509+
.largest(forceMergeThreads)
510+
.completed(0)
511+
.waitTimeNanos(-1)
512+
.parallelism(-1)
513+
.build()
514+
)
478515
);
479516
when(threadPool.stats()).thenReturn(stats);
480517

@@ -524,7 +561,18 @@ public void testForceMergeOperationOnDataNodeOfWarmEnabledCluster() throws IOExc
524561
ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads);
525562
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);
526563
ThreadPoolStats stats = new ThreadPoolStats(
527-
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1))
564+
Arrays.asList(
565+
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
566+
.threads(forceMergeThreads)
567+
.queue(0)
568+
.active(0)
569+
.rejected(0)
570+
.largest(forceMergeThreads)
571+
.completed(0)
572+
.waitTimeNanos(-1)
573+
.parallelism(-1)
574+
.build()
575+
)
528576
);
529577
when(threadPool.stats()).thenReturn(stats);
530578
IndexService indexService1 = mock(IndexService.class);
@@ -580,7 +628,18 @@ public void testForceMergeOperationOnDataNodeWithThreadInterruption() throws Int
580628
ExecutorService executorService = Executors.newFixedThreadPool(forceMergeThreads);
581629
when(threadPool.executor(ThreadPool.Names.FORCE_MERGE)).thenReturn(executorService);
582630
ThreadPoolStats stats = new ThreadPoolStats(
583-
Arrays.asList(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, forceMergeThreads, 0, 0, 0, forceMergeThreads, 0, -1, -1))
631+
Arrays.asList(
632+
new ThreadPoolStats.Stats.Builder().name(ThreadPool.Names.FORCE_MERGE)
633+
.threads(forceMergeThreads)
634+
.queue(0)
635+
.active(0)
636+
.rejected(0)
637+
.largest(forceMergeThreads)
638+
.completed(0)
639+
.waitTimeNanos(-1)
640+
.parallelism(-1)
641+
.build()
642+
)
584643
);
585644
when(threadPool.stats()).thenReturn(stats);
586645

server/src/test/java/org/opensearch/threadpool/ThreadPoolStatsTests.java

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,20 @@
5454
public class ThreadPoolStatsTests extends OpenSearchTestCase {
5555
public void testThreadPoolStatsSort() throws IOException {
5656
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
57-
stats.add(new ThreadPoolStats.Stats("z", -1, 0, 0, 0, 0, 0L, 0L, -1));
58-
stats.add(new ThreadPoolStats.Stats("m", 3, 0, 0, 0, 0, 0L, 0L, -1));
59-
stats.add(new ThreadPoolStats.Stats("m", 1, 0, 0, 0, 0, 0L, 0L, -1));
60-
stats.add(new ThreadPoolStats.Stats("d", -1, 0, 0, 0, 0, 0L, 0L, -1));
61-
stats.add(new ThreadPoolStats.Stats("m", 2, 0, 0, 0, 0, 0L, 0L, -1));
62-
stats.add(new ThreadPoolStats.Stats("t", -1, 0, 0, 0, 0, 0L, 0L, -1));
63-
stats.add(new ThreadPoolStats.Stats("a", -1, 0, 0, 0, 0, 0L, 0L, -1));
57+
ThreadPoolStats.Stats.Builder defaultStats = new ThreadPoolStats.Stats.Builder().queue(0)
58+
.active(0)
59+
.rejected(0)
60+
.largest(0)
61+
.completed(0L)
62+
.waitTimeNanos(0L)
63+
.parallelism(-1);
64+
stats.add(defaultStats.name("z").threads(-1).build());
65+
stats.add(defaultStats.name("m").threads(3).build());
66+
stats.add(defaultStats.name("m").threads(1).build());
67+
stats.add(defaultStats.name("d").threads(-1).build());
68+
stats.add(defaultStats.name("m").threads(2).build());
69+
stats.add(defaultStats.name("t").threads(-1).build());
70+
stats.add(defaultStats.name("a").threads(-1).build());
6471

6572
List<ThreadPoolStats.Stats> copy = new ArrayList<>(stats);
6673
Collections.sort(copy);
@@ -131,7 +138,16 @@ public void testStatsCompareToWithParallelism() {
131138
}
132139

133140
public void testStatsGetters() {
134-
ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats("test", 1, 2, 3, 4L, 5, 6L, 7L, 8);
141+
ThreadPoolStats.Stats stats = new ThreadPoolStats.Stats.Builder().name("test")
142+
.threads(1)
143+
.queue(2)
144+
.active(3)
145+
.rejected(4L)
146+
.largest(5)
147+
.completed(6L)
148+
.waitTimeNanos(7L)
149+
.parallelism(8)
150+
.build();
135151
assertEquals("test", stats.getName());
136152
assertEquals(1, stats.getThreads());
137153
assertEquals(2, stats.getQueue());
@@ -147,11 +163,18 @@ public void testThreadPoolStatsToXContent() throws IOException {
147163
try (BytesStreamOutput os = new BytesStreamOutput()) {
148164

149165
List<ThreadPoolStats.Stats> stats = new ArrayList<>();
150-
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SEARCH, -1, 0, 0, 0, 0, 0L, 0L, -1));
151-
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.WARMER, -1, 0, 0, 0, 0, 0L, -1L, -1));
152-
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.GENERIC, -1, 0, 0, 0, 0, 0L, -1L, -1));
153-
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.FORCE_MERGE, -1, 0, 0, 0, 0, 0L, -1L, -1));
154-
stats.add(new ThreadPoolStats.Stats(ThreadPool.Names.SAME, -1, 0, 0, 0, 0, 0L, -1L, -1));
166+
ThreadPoolStats.Stats.Builder defaultStats = new ThreadPoolStats.Stats.Builder().threads(-1)
167+
.queue(0)
168+
.active(0)
169+
.rejected(0)
170+
.largest(0)
171+
.completed(0L)
172+
.parallelism(-1);
173+
stats.add(defaultStats.name(ThreadPool.Names.SEARCH).waitTimeNanos(0L).build());
174+
stats.add(defaultStats.name(ThreadPool.Names.WARMER).waitTimeNanos(-1L).build());
175+
stats.add(defaultStats.name(ThreadPool.Names.GENERIC).waitTimeNanos(-1L).build());
176+
stats.add(defaultStats.name(ThreadPool.Names.FORCE_MERGE).waitTimeNanos(-1L).build());
177+
stats.add(defaultStats.name(ThreadPool.Names.SAME).waitTimeNanos(-1L).build());
155178

156179
ThreadPoolStats threadPoolStats = new ThreadPoolStats(stats);
157180
try (XContentBuilder builder = new XContentBuilder(MediaTypeRegistry.JSON.xContent(), os)) {

0 commit comments

Comments
 (0)