Skip to content

Commit b5cc002

Browse files
authored
Optimize read write lock constructs during translog upload to remote store (opensearch-project#9636)
Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent d7aa6dd commit b5cc002

6 files changed

Lines changed: 63 additions & 31 deletions

File tree

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class RemoteFsTranslog extends Translog {
6060
private final BooleanSupplier primaryModeSupplier;
6161
private final RemoteTranslogTransferTracker remoteTranslogTransferTracker;
6262
private volatile long maxRemoteTranslogGenerationUploaded;
63+
private final Object uploadMutex = new Object();
6364

6465
private volatile long minSeqNoToKeep;
6566

@@ -237,11 +238,20 @@ public static TranslogTransferManager buildTranslogTransferManager(
237238

238239
@Override
239240
public boolean ensureSynced(Location location) throws IOException {
240-
try (ReleasableLock ignored = writeLock.acquire()) {
241-
assert location.generation <= current.getGeneration();
242-
if (location.generation == current.getGeneration()) {
243-
ensureOpen();
244-
return prepareAndUpload(primaryTermSupplier.getAsLong(), location.generation);
241+
try {
242+
boolean shouldUpload = false;
243+
try (ReleasableLock ignored = writeLock.acquire()) {
244+
assert location.generation <= current.getGeneration();
245+
if (location.generation == current.getGeneration()) {
246+
ensureOpen();
247+
if (prepareForUpload(location.generation) == false) {
248+
return false;
249+
}
250+
shouldUpload = true;
251+
}
252+
}
253+
if (shouldUpload) {
254+
return performUpload(primaryTermSupplier.getAsLong(), location.generation);
245255
}
246256
} catch (final Exception ex) {
247257
closeOnTragicEvent(ex);
@@ -256,10 +266,12 @@ public void rollGeneration() throws IOException {
256266
if (current.totalOperations() == 0 && primaryTermSupplier.getAsLong() == current.getPrimaryTerm()) {
257267
return;
258268
}
259-
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
269+
if (prepareForUpload(null)) {
270+
performUpload(primaryTermSupplier.getAsLong(), null);
271+
}
260272
}
261273

262-
private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOException {
274+
private boolean prepareForUpload(Long generation) throws IOException {
263275
try (Releasable ignored = writeLock.acquire()) {
264276
if (generation == null || generation == current.getGeneration()) {
265277
try {
@@ -275,23 +287,41 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
275287
closeOnTragicEvent(e);
276288
throw e;
277289
}
278-
} else if (generation < current.getGeneration()) {
279-
return false;
280-
}
290+
return true;
291+
} else return generation >= current.getGeneration();
292+
}
293+
}
281294

282-
// Do we need remote writes in sync fashion ?
283-
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
284-
// and also verify for same during primary-primary relocation
285-
// Writing remote in sync fashion doesn't hurt as global ckp update
286-
// is not updated in remote translog except in primary to primary recovery.
287-
if (generation == null) {
288-
if (closed.get() == false) {
289-
return upload(primaryTerm, current.getGeneration() - 1);
295+
/**
296+
* This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized
297+
* is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the
298+
* underlying translog readers are not deleted and the current writer is not converted to a reader at the time of
299+
* upload.
300+
*
301+
* @param primaryTerm current primary term
302+
* @param generation current generation
303+
* @return true if upload is successful
304+
* @throws IOException if the upload fails due to any underlying exceptions.
305+
*/
306+
private boolean performUpload(Long primaryTerm, Long generation) throws IOException {
307+
synchronized (uploadMutex) {
308+
try (Releasable ignored = readLock.acquire()) {
309+
// Do we need remote writes in sync fashion ?
310+
// If we don't , we should swallow FileAlreadyExistsException while writing to remote store
311+
// and also verify for same during primary-primary relocation
312+
// Writing remote in sync fashion doesn't hurt as global ckp update
313+
// is not updated in remote translog except in primary to primary recovery.
314+
long generationToUpload;
315+
if (generation == null) {
316+
if (closed.get() == false) {
317+
generationToUpload = current.getGeneration() - 1;
318+
} else {
319+
generationToUpload = current.getGeneration();
320+
}
290321
} else {
291-
return upload(primaryTerm, current.getGeneration());
322+
generationToUpload = generation;
292323
}
293-
} else {
294-
return upload(primaryTerm, generation);
324+
return upload(primaryTerm, generationToUpload);
295325
}
296326
}
297327
}
@@ -343,8 +373,8 @@ private boolean syncToDisk() throws IOException {
343373
@Override
344374
public void sync() throws IOException {
345375
try {
346-
if (syncToDisk() || syncNeeded()) {
347-
prepareAndUpload(primaryTermSupplier.getAsLong(), null);
376+
if ((syncToDisk() || syncNeeded()) && prepareForUpload(null)) {
377+
performUpload(primaryTermSupplier.getAsLong(), null);
348378
}
349379
} catch (final Exception e) {
350380
tragedy.setTragicException(e);
@@ -528,22 +558,23 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
528558

529559
@Override
530560
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
531-
transferReleasable.close();
532-
closeFilesIfNoPendingRetentionLocks();
533561
maxRemoteTranslogGenerationUploaded = generation;
534562
minRemoteGenReferenced = getMinFileGeneration();
535563
logger.trace("uploaded translog for {} {} ", primaryTerm, generation);
536564
}
537565

538566
@Override
539567
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) throws IOException {
540-
transferReleasable.close();
541-
closeFilesIfNoPendingRetentionLocks();
542568
if (ex instanceof IOException) {
543569
throw (IOException) ex;
544570
} else {
545571
throw (RuntimeException) ex;
546572
}
547573
}
574+
575+
@Override
576+
public void close() {
577+
transferReleasable.close();
578+
}
548579
}
549580
}

server/src/main/java/org/opensearch/index/translog/transfer/TranslogCheckpointTransferSnapshot.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ public TranslogCheckpointTransferSnapshot build() throws IOException {
164164
translogTransferSnapshot.setMinTranslogGeneration(highestGenMinTranslogGeneration);
165165

166166
assert this.primaryTerm == highestGenPrimaryTerm : "inconsistent primary term";
167-
assert this.generation == highestGeneration : " inconsistent generation ";
168167
final long finalHighestGeneration = highestGeneration;
169168
assert LongStream.iterate(lowestGeneration, i -> i + 1)
170169
.limit(highestGeneration)

server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans
109109
long prevUploadBytesSucceeded = remoteTranslogTransferTracker.getUploadBytesSucceeded();
110110
long prevUploadTimeInMillis = remoteTranslogTransferTracker.getTotalUploadTimeInMillis();
111111

112-
try {
112+
try (translogTransferListener) {
113113
toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots()));
114114
toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots())));
115115
if (toUpload.isEmpty()) {

server/src/main/java/org/opensearch/index/translog/transfer/listener/TranslogTransferListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*
1818
* @opensearch.internal
1919
*/
20-
public interface TranslogTransferListener {
20+
public interface TranslogTransferListener extends AutoCloseable {
2121
/**
2222
* Invoked when the transfer of {@link TransferSnapshot} succeeds
2323
* @param transferSnapshot the transfer snapshot

server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@
108108
import static org.mockito.Mockito.when;
109109

110110
@LuceneTestCase.SuppressFileSystems("ExtrasFS")
111-
112111
public class RemoteFsTranslogTests extends OpenSearchTestCase {
113112

114113
protected final ShardId shardId = new ShardId("index", "_na_", 1);

server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,9 @@ public void onUploadComplete(TransferSnapshot transferSnapshot) {
168168
public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) {
169169
translogTransferFailed.incrementAndGet();
170170
}
171+
172+
@Override
173+
public void close() {}
171174
}));
172175
assertEquals(4, fileTransferSucceeded.get());
173176
assertEquals(0, fileTransferFailed.get());

0 commit comments

Comments
 (0)