Skip to content

Commit 2404098

Browse files
committed
Simplify locks and use IdentifyHashMap
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com> Co-authored-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent c0c040f commit 2404098

5 files changed

Lines changed: 210 additions & 40 deletions

File tree

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.common.queue;
10+
11+
import org.openjdk.jmh.annotations.Benchmark;
12+
import org.openjdk.jmh.annotations.BenchmarkMode;
13+
import org.openjdk.jmh.annotations.Fork;
14+
import org.openjdk.jmh.annotations.Group;
15+
import org.openjdk.jmh.annotations.GroupThreads;
16+
import org.openjdk.jmh.annotations.Level;
17+
import org.openjdk.jmh.annotations.Measurement;
18+
import org.openjdk.jmh.annotations.Mode;
19+
import org.openjdk.jmh.annotations.OutputTimeUnit;
20+
import org.openjdk.jmh.annotations.Param;
21+
import org.openjdk.jmh.annotations.Scope;
22+
import org.openjdk.jmh.annotations.Setup;
23+
import org.openjdk.jmh.annotations.State;
24+
import org.openjdk.jmh.annotations.Warmup;
25+
import org.openjdk.jmh.infra.Blackhole;
26+
27+
import java.util.LinkedList;
28+
import java.util.List;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.concurrent.locks.ReentrantLock;
32+
33+
/**
34+
* JMH benchmark for {@link LockablePool} measuring:
35+
* <ul>
36+
* <li>Isolated checkout/return throughput at varying thread counts</li>
37+
* <li>Mixed workload: concurrent writers + periodic checkoutAll (refresh)</li>
38+
* </ul>
39+
* The mixed group benchmark is the most realistic — it models the composite
40+
* engine's write path where indexing threads hold writers while a refresh
41+
* thread periodically drains the pool via checkoutAll.
42+
*/
43+
@Fork(2)
44+
@Warmup(iterations = 2, time = 3)
45+
@Measurement(iterations = 3, time = 5)
46+
@BenchmarkMode(Mode.Throughput)
47+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
48+
@State(Scope.Benchmark)
49+
@SuppressWarnings("unused")
50+
public class LockablePoolBenchmark {
51+
52+
@Param({ "4", "8" })
53+
int concurrency;
54+
55+
private LockablePool<PoolEntry> pool;
56+
57+
@Setup(Level.Iteration)
58+
public void setup() {
59+
AtomicInteger counter = new AtomicInteger(0);
60+
pool = new LockablePool<>(() -> new PoolEntry(counter.getAndIncrement()), LinkedList::new, concurrency);
61+
// Pre-warm the pool
62+
for (int i = 0; i < concurrency * 2; i++) {
63+
PoolEntry e = pool.getAndLock();
64+
pool.releaseAndUnlock(e);
65+
}
66+
}
67+
68+
// ---- Mixed workload: writers + periodic refresh (checkoutAll) ----
69+
// This is the realistic scenario: multiple indexing threads hold writers
70+
// while a single refresh thread periodically drains the pool.
71+
72+
@Benchmark
73+
@Group("mixed_7w_1r")
74+
@GroupThreads(7)
75+
public void writers_7w1r(Blackhole bh) {
76+
PoolEntry e = pool.getAndLock();
77+
bh.consume(simulateWork(e));
78+
pool.releaseAndUnlock(e);
79+
}
80+
81+
@Benchmark
82+
@Group("mixed_7w_1r")
83+
@GroupThreads(1)
84+
public List<PoolEntry> refresh_7w1r() throws InterruptedException {
85+
Thread.sleep(1000); // simulate 1s refresh interval
86+
return pool.checkoutAll();
87+
}
88+
89+
@Benchmark
90+
@Group("mixed_3w_1r")
91+
@GroupThreads(3)
92+
public void writers_3w1r(Blackhole bh) {
93+
PoolEntry e = pool.getAndLock();
94+
bh.consume(simulateWork(e));
95+
pool.releaseAndUnlock(e);
96+
}
97+
98+
@Benchmark
99+
@Group("mixed_3w_1r")
100+
@GroupThreads(1)
101+
public List<PoolEntry> refresh_3w1r() throws InterruptedException {
102+
Thread.sleep(1000); // simulate 1s refresh interval
103+
return pool.checkoutAll();
104+
}
105+
106+
// ---- Isolated: pure writer throughput (no refresh contention) ----
107+
108+
@Benchmark
109+
@Group("writers_only_4t")
110+
@GroupThreads(4)
111+
public void writersOnly_4t(Blackhole bh) {
112+
PoolEntry e = pool.getAndLock();
113+
bh.consume(simulateWork(e));
114+
pool.releaseAndUnlock(e);
115+
}
116+
117+
@Benchmark
118+
@Group("writers_only_8t")
119+
@GroupThreads(8)
120+
public void writersOnly_8t(Blackhole bh) {
121+
PoolEntry e = pool.getAndLock();
122+
bh.consume(simulateWork(e));
123+
pool.releaseAndUnlock(e);
124+
}
125+
126+
private static long simulateWork(PoolEntry entry) {
127+
long result = entry.hashCode();
128+
for (int i = 0; i < 20; i++) {
129+
result ^= (result << 13);
130+
result ^= (result >> 7);
131+
result ^= (result << 17);
132+
}
133+
return result;
134+
}
135+
136+
static final class PoolEntry implements Lockable {
137+
final int id;
138+
private final ReentrantLock lock = new ReentrantLock();
139+
140+
PoolEntry(int id) {
141+
this.id = id;
142+
}
143+
144+
@Override
145+
public void lock() {
146+
lock.lock();
147+
}
148+
149+
@Override
150+
public boolean tryLock() {
151+
return lock.tryLock();
152+
}
153+
154+
@Override
155+
public void unlock() {
156+
lock.unlock();
157+
}
158+
}
159+
}

libs/concurrent-queue/src/main/java/org/opensearch/common/queue/LockablePool.java

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,11 @@
3131
*/
3232
public final class LockablePool<T extends Lockable> implements Iterable<T>, Closeable {
3333

34-
private final Set<T> items;
35-
private final LockableConcurrentQueue<T> availableItems;
34+
private volatile Set<T> items;
35+
private volatile LockableConcurrentQueue<T> availableItems;
3636
private final Supplier<T> itemSupplier;
37+
private final Supplier<Queue<T>> queueSupplier;
38+
private final int concurrency;
3739
private volatile boolean closed;
3840

3941
/**
@@ -46,6 +48,8 @@ public final class LockablePool<T extends Lockable> implements Iterable<T>, Clos
4648
public LockablePool(Supplier<T> itemSupplier, Supplier<Queue<T>> queueSupplier, int concurrency) {
4749
this.items = Collections.newSetFromMap(new IdentityHashMap<>());
4850
this.itemSupplier = Objects.requireNonNull(itemSupplier, "itemSupplier must not be null");
51+
this.queueSupplier = Objects.requireNonNull(queueSupplier, "queueSupplier must not be null");
52+
this.concurrency = concurrency;
4953
this.availableItems = new LockableConcurrentQueue<>(queueSupplier, concurrency);
5054
}
5155

@@ -70,39 +74,46 @@ private synchronized T fetchItem() {
7074
}
7175

7276
/**
73-
* Releases the given item back to this pool for reuse.
77+
* Releases the given item back to this pool for reuse. If the item belongs to a previous
78+
* generation (swapped out during {@link #checkoutAll()}), it is silently unlocked and
79+
* discarded since the checkout caller owns it.
7480
*
7581
* @param item the item to release
7682
*/
7783
public void releaseAndUnlock(T item) {
78-
assert isRegistered(item) : "Pool doesn't know about this item";
84+
if (isRegistered(item) == false) {
85+
item.unlock();
86+
return;
87+
}
7988
availableItems.addAndUnlock(item);
8089
}
8190

8291
/**
83-
* Lock and checkout all items from the pool.
92+
* Atomically swaps the pool's item set and queue with fresh instances, then waits for
93+
* any in-flight operations on the old items to complete. This minimizes the time the pool
94+
* lock is held — callers of {@link #getAndLock()} see the new empty pool immediately and
95+
* can create fresh items without waiting for the checkout to finish.
8496
*
85-
* @return unmodifiable list of all items locked by current thread
97+
* @return unmodifiable list of all checked-out items
8698
* @throws IllegalStateException if the pool is closed
8799
*/
88100
public List<T> checkoutAll() {
89101
ensureOpen();
90-
List<T> lockedItems = new ArrayList<>();
91-
List<T> checkedOutItems = new ArrayList<>();
92-
for (T item : this) {
93-
item.lock();
94-
lockedItems.add(item);
95-
}
102+
103+
Set<T> oldItems;
96104
synchronized (this) {
97-
for (T item : lockedItems) {
98-
try {
99-
if (isRegistered(item) && items.remove(item)) {
100-
availableItems.remove(item);
101-
checkedOutItems.add(item);
102-
}
103-
} finally {
104-
item.unlock();
105-
}
105+
oldItems = this.items;
106+
this.items = Collections.newSetFromMap(new IdentityHashMap<>());
107+
this.availableItems = new LockableConcurrentQueue<>(queueSupplier, concurrency);
108+
}
109+
110+
List<T> checkedOutItems = new ArrayList<>(oldItems.size());
111+
for (T item : oldItems) {
112+
item.lock();
113+
try {
114+
checkedOutItems.add(item);
115+
} finally {
116+
item.unlock();
106117
}
107118
}
108119
return Collections.unmodifiableList(checkedOutItems);

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeDocumentInput.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.index.engine.dataformat.DocumentInput;
1414
import org.opensearch.index.mapper.MappedFieldType;
1515

16+
import java.util.Collections;
1617
import java.util.List;
1718
import java.util.Map;
1819
import java.util.Objects;
@@ -47,7 +48,7 @@ public CompositeDocumentInput(
4748
) {
4849
this.primaryFormat = Objects.requireNonNull(primaryFormat, "primaryFormat must not be null");
4950
this.primaryDocumentInput = Objects.requireNonNull(primaryDocumentInput, "primaryDocumentInput must not be null");
50-
this.secondaryDocumentInputs = Map.copyOf(
51+
this.secondaryDocumentInputs = Collections.unmodifiableMap(
5152
Objects.requireNonNull(secondaryDocumentInputs, "secondaryDocumentInputs must not be null")
5253
);
5354
}

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.io.IOException;
3232
import java.util.ArrayList;
3333
import java.util.Collection;
34-
import java.util.LinkedHashMap;
34+
import java.util.IdentityHashMap;
3535
import java.util.LinkedList;
3636
import java.util.List;
3737
import java.util.Map;
@@ -275,7 +275,7 @@ public void deleteFiles(Map<String, Collection<String>> filesToDelete) throws IO
275275
@Override
276276
public CompositeDocumentInput newDocumentInput() {
277277
DocumentInput<?> primaryInput = primaryEngine.newDocumentInput();
278-
Map<DataFormat, DocumentInput<?>> secondaryInputMap = new LinkedHashMap<>();
278+
Map<DataFormat, DocumentInput<?>> secondaryInputMap = new IdentityHashMap<>();
279279
for (IndexingExecutionEngine<?, ?> engine : secondaryEngines) {
280280
secondaryInputMap.put(engine.getDataFormat(), engine.newDocumentInput());
281281
}

sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeWriter.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
import org.opensearch.index.engine.exec.WriterFileSet;
2222

2323
import java.io.IOException;
24-
import java.util.AbstractMap;
25-
import java.util.LinkedHashMap;
24+
import java.util.Collections;
25+
import java.util.IdentityHashMap;
2626
import java.util.Map;
2727
import java.util.Optional;
2828
import java.util.concurrent.atomic.AtomicReference;
@@ -44,7 +44,8 @@ class CompositeWriter implements Writer<CompositeDocumentInput>, Lockable {
4444

4545
private static final Logger logger = LogManager.getLogger(CompositeWriter.class);
4646

47-
private final Map.Entry<DataFormat, Writer<DocumentInput<?>>> primaryWriter;
47+
private final DataFormat primaryFormat;
48+
private final Writer<DocumentInput<?>> primaryWriter;
4849
private final Map<DataFormat, Writer<DocumentInput<?>>> secondaryWritersByFormat;
4950
private final ReentrantLock lock;
5051
private final long writerGeneration;
@@ -87,16 +88,14 @@ enum WriterState {
8788
this.writerGeneration = writerGeneration;
8889

8990
IndexingExecutionEngine<?, ?> primaryDelegate = engine.getPrimaryDelegate();
90-
this.primaryWriter = new AbstractMap.SimpleImmutableEntry<>(
91-
primaryDelegate.getDataFormat(),
92-
(Writer<DocumentInput<?>>) primaryDelegate.createWriter(writerGeneration)
93-
);
91+
this.primaryFormat = primaryDelegate.getDataFormat();
92+
this.primaryWriter = (Writer<DocumentInput<?>>) primaryDelegate.createWriter(writerGeneration);
9493

95-
Map<DataFormat, Writer<DocumentInput<?>>> secondaries = new LinkedHashMap<>();
94+
Map<DataFormat, Writer<DocumentInput<?>>> secondaries = new IdentityHashMap<>();
9695
for (IndexingExecutionEngine<?, ?> delegate : engine.getSecondaryDelegates()) {
9796
secondaries.put(delegate.getDataFormat(), (Writer<DocumentInput<?>>) delegate.createWriter(writerGeneration));
9897
}
99-
this.secondaryWritersByFormat = Map.copyOf(secondaries);
98+
this.secondaryWritersByFormat = Collections.unmodifiableMap(secondaries);
10099
this.rowIdGenerator = new RowIdGenerator(CompositeWriter.class.getName());
101100
}
102101

@@ -106,11 +105,11 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
106105
throw new IllegalStateException("Cannot add document to writer in state " + state.get());
107106
}
108107
// Write to primary first
109-
WriteResult primaryResult = primaryWriter.getValue().addDoc(doc.getPrimaryInput());
108+
WriteResult primaryResult = primaryWriter.addDoc(doc.getPrimaryInput());
110109
switch (primaryResult) {
111-
case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryWriter.getKey().name());
110+
case WriteResult.Success s -> logger.trace("Successfully added document in primary format [{}]", primaryFormat.name());
112111
case WriteResult.Failure f -> {
113-
logger.debug("Failed to add document in primary format [{}]", primaryWriter.getKey().name());
112+
logger.debug("Failed to add document in primary format [{}]", primaryFormat.name());
114113
return primaryResult;
115114
}
116115
}
@@ -141,8 +140,8 @@ public WriteResult addDoc(CompositeDocumentInput doc) throws IOException {
141140
public FileInfos flush() throws IOException {
142141
FileInfos.Builder builder = FileInfos.builder();
143142
// Flush primary
144-
Optional<WriterFileSet> primaryWfs = primaryWriter.getValue().flush().getWriterFileSet(primaryWriter.getKey());
145-
primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryWriter.getKey(), writerFileSet));
143+
Optional<WriterFileSet> primaryWfs = primaryWriter.flush().getWriterFileSet(primaryFormat);
144+
primaryWfs.ifPresent(writerFileSet -> builder.putWriterFileSet(primaryFormat, writerFileSet));
146145
// Flush secondaries
147146
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
148147
FileInfos fileInfos = writer.flush();
@@ -156,15 +155,15 @@ public FileInfos flush() throws IOException {
156155

157156
@Override
158157
public void sync() throws IOException {
159-
primaryWriter.getValue().sync();
158+
primaryWriter.sync();
160159
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
161160
writer.sync();
162161
}
163162
}
164163

165164
@Override
166165
public void close() throws IOException {
167-
primaryWriter.getValue().close();
166+
primaryWriter.close();
168167
for (Writer<DocumentInput<?>> writer : secondaryWritersByFormat.values()) {
169168
writer.close();
170169
}

0 commit comments

Comments
 (0)