Skip to content

Commit 80dd650

Browse files
committed
Address comments
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent aae3e85 commit 80dd650

22 files changed

Lines changed: 654 additions & 546 deletions

File tree

benchmarks/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ dependencies {
5151
// us to invoke the JMH uberjar as usual.
5252
exclude group: 'net.sf.jopt-simple', module: 'jopt-simple'
5353
}
54+
api project(':libs:opensearch-concurrent-queue')
5455
api "org.openjdk.jmh:jmh-core:$versions.jmh"
5556
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh"
5657
// Dependencies of JMH
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.benchmark.queue;
10+
11+
import org.opensearch.common.queue.Lockable;
12+
import org.opensearch.common.queue.LockableConcurrentQueue;
13+
import org.openjdk.jmh.annotations.Benchmark;
14+
import org.openjdk.jmh.annotations.BenchmarkMode;
15+
import org.openjdk.jmh.annotations.Fork;
16+
import org.openjdk.jmh.annotations.Level;
17+
import org.openjdk.jmh.annotations.Measurement;
18+
import org.openjdk.jmh.annotations.Mode;
19+
import org.openjdk.jmh.annotations.OutputTimeUnit;
20+
import org.openjdk.jmh.annotations.Param;
21+
import org.openjdk.jmh.annotations.Scope;
22+
import org.openjdk.jmh.annotations.Setup;
23+
import org.openjdk.jmh.annotations.State;
24+
import org.openjdk.jmh.annotations.Threads;
25+
import org.openjdk.jmh.annotations.Warmup;
26+
import org.openjdk.jmh.infra.Blackhole;
27+
28+
import java.util.LinkedList;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.locks.ReentrantLock;
31+
32+
/**
33+
* JMH benchmark for {@link LockableConcurrentQueue} measuring throughput of
34+
* lock-and-poll / add-and-unlock cycles under varying concurrency levels.
35+
* <p>
36+
* Includes two benchmark groups:
37+
* <ul>
38+
* <li>{@code pollAndReturn} — minimal overhead: poll an entry and immediately return it.</li>
39+
* <li>{@code writerWorkload} — simulates a writer pool: poll an entry, perform simulated
40+
* document writes (CPU work), then return the entry. Models the composite writer
41+
* checkout-write-return cycle.</li>
42+
* </ul>
43+
*/
44+
@Fork(3)
45+
@Warmup(iterations = 5, time = 1)
46+
@Measurement(iterations = 10, time = 1)
47+
@BenchmarkMode(Mode.Throughput)
48+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
49+
@State(Scope.Benchmark)
50+
@SuppressWarnings("unused")
51+
public class LockableConcurrentQueueBenchmark {
52+
53+
@Param({ "1", "4", "8" })
54+
int concurrency;
55+
56+
@Param({ "16", "64" })
57+
int poolSize;
58+
59+
private LockableConcurrentQueue<LockableEntry> queue;
60+
61+
@Setup(Level.Iteration)
62+
public void setup() {
63+
queue = new LockableConcurrentQueue<>(LinkedList::new, concurrency);
64+
for (int i = 0; i < poolSize; i++) {
65+
LockableEntry entry = new LockableEntry();
66+
entry.lock();
67+
queue.addAndUnlock(entry);
68+
}
69+
}
70+
71+
// ---- pollAndReturn: minimal overhead benchmarks ----
72+
73+
@Benchmark
74+
@Threads(1)
75+
public LockableEntry pollAndReturn_1thread() {
76+
return pollAndReturn();
77+
}
78+
79+
@Benchmark
80+
@Threads(4)
81+
public LockableEntry pollAndReturn_4threads() {
82+
return pollAndReturn();
83+
}
84+
85+
@Benchmark
86+
@Threads(8)
87+
public LockableEntry pollAndReturn_8threads() {
88+
return pollAndReturn();
89+
}
90+
91+
private LockableEntry pollAndReturn() {
92+
LockableEntry entry = queue.lockAndPoll();
93+
if (entry != null) {
94+
queue.addAndUnlock(entry);
95+
}
96+
return entry;
97+
}
98+
99+
// ---- writerWorkload: simulated writer pool benchmarks ----
100+
101+
@Benchmark
102+
@Threads(4)
103+
public void writerWorkload_4threads(Blackhole bh) {
104+
writerWorkload(bh);
105+
}
106+
107+
@Benchmark
108+
@Threads(8)
109+
public void writerWorkload_8threads(Blackhole bh) {
110+
writerWorkload(bh);
111+
}
112+
113+
@Benchmark
114+
@Threads(16)
115+
public void writerWorkload_16threads(Blackhole bh) {
116+
writerWorkload(bh);
117+
}
118+
119+
/**
120+
* Simulates a writer pool cycle: checkout an entry, perform CPU work
121+
* representing document indexing across multiple formats, then return it.
122+
*/
123+
private void writerWorkload(Blackhole bh) {
124+
LockableEntry entry = queue.lockAndPoll();
125+
if (entry != null) {
126+
// Simulate document write work (field additions across formats)
127+
bh.consume(simulateDocumentWrite(entry));
128+
queue.addAndUnlock(entry);
129+
}
130+
}
131+
132+
/**
133+
* Simulates the CPU cost of writing a document to multiple data formats.
134+
* Performs arithmetic work to prevent JIT elimination while keeping
135+
* the hold time realistic relative to a real addDoc call.
136+
*/
137+
private static long simulateDocumentWrite(LockableEntry entry) {
138+
long result = entry.hashCode();
139+
// ~10 field additions across 2 formats
140+
for (int i = 0; i < 20; i++) {
141+
result ^= (result << 13);
142+
result ^= (result >> 7);
143+
result ^= (result << 17);
144+
}
145+
return result;
146+
}
147+
148+
static final class LockableEntry implements Lockable {
149+
private final ReentrantLock lock = new ReentrantLock();
150+
151+
@Override
152+
public void lock() {
153+
lock.lock();
154+
}
155+
156+
@Override
157+
public boolean tryLock() {
158+
return lock.tryLock();
159+
}
160+
161+
@Override
162+
public void unlock() {
163+
lock.unlock();
164+
}
165+
}
166+
}

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/ConcurrentQueue.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.util.Iterator;
1212
import java.util.Queue;
13+
import java.util.concurrent.ConcurrentHashMap;
1314
import java.util.concurrent.locks.Lock;
1415
import java.util.concurrent.locks.ReentrantLock;
1516
import java.util.function.Predicate;
@@ -32,6 +33,8 @@ public final class ConcurrentQueue<T> {
3233
private final Lock[] locks;
3334
private final Queue<T>[] queues;
3435
private final Supplier<Queue<T>> queueSupplier;
36+
/** Maps each entry to its queue index so that {@link #remove} can go directly to the right queue. */
37+
private final ConcurrentHashMap<T, Integer> queueIndex;
3538

3639
ConcurrentQueue(Supplier<Queue<T>> queueSupplier, int concurrency) {
3740
if (concurrency < MIN_CONCURRENCY || concurrency > MAX_CONCURRENCY) {
@@ -41,6 +44,7 @@ public final class ConcurrentQueue<T> {
4144
}
4245
this.concurrency = concurrency;
4346
this.queueSupplier = queueSupplier;
47+
this.queueIndex = new ConcurrentHashMap<>();
4448
locks = new Lock[concurrency];
4549
@SuppressWarnings({ "rawtypes", "unchecked" })
4650
Queue<T>[] queues = new Queue[concurrency];
@@ -63,6 +67,7 @@ void add(T entry) {
6367
if (lock.tryLock()) {
6468
try {
6569
queue.add(entry);
70+
queueIndex.put(entry, index);
6671
return;
6772
} finally {
6873
lock.unlock();
@@ -75,6 +80,7 @@ void add(T entry) {
7580
lock.lock();
7681
try {
7782
queue.add(entry);
83+
queueIndex.put(entry, index);
7884
} finally {
7985
lock.unlock();
8086
}
@@ -93,6 +99,7 @@ T poll(Predicate<T> predicate) {
9399
T entry = it.next();
94100
if (predicate.test(entry)) {
95101
it.remove();
102+
queueIndex.remove(entry);
96103
return entry;
97104
}
98105
}
@@ -112,6 +119,7 @@ T poll(Predicate<T> predicate) {
112119
T entry = it.next();
113120
if (predicate.test(entry)) {
114121
it.remove();
122+
queueIndex.remove(entry);
115123
return entry;
116124
}
117125
}
@@ -123,12 +131,29 @@ T poll(Predicate<T> predicate) {
123131
}
124132

125133
boolean remove(T entry) {
134+
Integer queueIdx = queueIndex.get(entry);
135+
if (queueIdx != null) {
136+
final Lock lock = locks[queueIdx];
137+
final Queue<T> queue = queues[queueIdx];
138+
lock.lock();
139+
try {
140+
if (queue.remove(entry)) {
141+
queueIndex.remove(entry);
142+
return true;
143+
}
144+
} finally {
145+
lock.unlock();
146+
}
147+
}
148+
// Fallback: entry may have been re-added to a different queue between the index lookup
149+
// and the lock acquisition, so scan all stripes.
126150
for (int i = 0; i < concurrency; ++i) {
127151
final Lock lock = locks[i];
128152
final Queue<T> queue = queues[i];
129153
lock.lock();
130154
try {
131155
if (queue.remove(entry)) {
156+
queueIndex.remove(entry);
132157
return true;
133158
}
134159
} finally {
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.queue;
10+
11+
import java.io.Closeable;
12+
import java.io.IOException;
13+
import java.util.ArrayList;
14+
import java.util.Collections;
15+
import java.util.IdentityHashMap;
16+
import java.util.Iterator;
17+
import java.util.List;
18+
import java.util.Objects;
19+
import java.util.Queue;
20+
import java.util.Set;
21+
import java.util.function.Supplier;
22+
23+
/**
24+
* A thread-safe pool of {@link Lockable} items backed by a {@link LockableConcurrentQueue}.
25+
* Items are locked on checkout and unlocked on release, ensuring safe reuse across threads.
26+
* <p>
27+
* The pool is created with a supplier that produces new items on demand when the pool
28+
* is empty. Items are tracked in a set for registration checks and iteration.
29+
*
30+
* @param <T> the pooled item type, must implement {@link Lockable}
31+
*/
32+
public final class LockablePool<T extends Lockable> implements Iterable<T>, Closeable {
33+
34+
private final Set<T> items;
35+
private final LockableConcurrentQueue<T> availableItems;
36+
private final Supplier<T> itemSupplier;
37+
private volatile boolean closed;
38+
39+
/**
40+
* Creates a new pool.
41+
*
42+
* @param itemSupplier factory for creating new items when the pool is empty
43+
* @param queueSupplier supplier for the underlying queue instances
44+
* @param concurrency the concurrency level (number of stripes)
45+
*/
46+
public LockablePool(Supplier<T> itemSupplier, Supplier<Queue<T>> queueSupplier, int concurrency) {
47+
this.items = Collections.newSetFromMap(new IdentityHashMap<>());
48+
this.itemSupplier = Objects.requireNonNull(itemSupplier, "itemSupplier must not be null");
49+
this.availableItems = new LockableConcurrentQueue<>(queueSupplier, concurrency);
50+
}
51+
52+
/**
53+
* Locks and polls an item from the pool, or creates a new one if none are available.
54+
*
55+
* @return a locked item
56+
* @throws IllegalStateException if the pool is closed
57+
*/
58+
public T getAndLock() {
59+
ensureOpen();
60+
T item = availableItems.lockAndPoll();
61+
return Objects.requireNonNullElseGet(item, this::fetchItem);
62+
}
63+
64+
private synchronized T fetchItem() {
65+
ensureOpen();
66+
T item = itemSupplier.get();
67+
item.lock();
68+
items.add(item);
69+
return item;
70+
}
71+
72+
/**
73+
* Releases the given item back to this pool for reuse.
74+
*
75+
* @param item the item to release
76+
*/
77+
public void releaseAndUnlock(T item) {
78+
assert isRegistered(item) : "Pool doesn't know about this item";
79+
availableItems.addAndUnlock(item);
80+
}
81+
82+
/**
83+
* Lock and checkout all items from the pool.
84+
*
85+
* @return unmodifiable list of all items locked by current thread
86+
* @throws IllegalStateException if the pool is closed
87+
*/
88+
public List<T> checkoutAll() {
89+
ensureOpen();
90+
List<T> lockedItems = new ArrayList<>();
91+
List<T> checkedOutItems = new ArrayList<>();
92+
for (T item : this) {
93+
item.lock();
94+
lockedItems.add(item);
95+
}
96+
synchronized (this) {
97+
for (T item : lockedItems) {
98+
try {
99+
if (isRegistered(item) && items.remove(item)) {
100+
availableItems.remove(item);
101+
checkedOutItems.add(item);
102+
}
103+
} finally {
104+
item.unlock();
105+
}
106+
}
107+
}
108+
return Collections.unmodifiableList(checkedOutItems);
109+
}
110+
111+
/**
112+
* Check if an item is part of this pool.
113+
*
114+
* @param item item to validate
115+
* @return true if the item is part of this pool
116+
*/
117+
public synchronized boolean isRegistered(T item) {
118+
return items.contains(item);
119+
}
120+
121+
private void ensureOpen() {
122+
if (closed) {
123+
throw new IllegalStateException("LockablePool is already closed");
124+
}
125+
}
126+
127+
@Override
128+
public synchronized Iterator<T> iterator() {
129+
return List.copyOf(items).iterator();
130+
}
131+
132+
@Override
133+
public void close() throws IOException {
134+
this.closed = true;
135+
}
136+
}

0 commit comments

Comments
 (0)