@@ -60,6 +60,7 @@ public class RemoteFsTranslog extends Translog {
6060 private final BooleanSupplier primaryModeSupplier ;
6161 private final RemoteTranslogTransferTracker remoteTranslogTransferTracker ;
6262 private volatile long maxRemoteTranslogGenerationUploaded ;
63+ private final Object uploadMutex = new Object ();
6364
6465 private volatile long minSeqNoToKeep ;
6566
@@ -237,11 +238,20 @@ public static TranslogTransferManager buildTranslogTransferManager(
237238
238239 @ Override
239240 public boolean ensureSynced (Location location ) throws IOException {
240- try (ReleasableLock ignored = writeLock .acquire ()) {
241- assert location .generation <= current .getGeneration ();
242- if (location .generation == current .getGeneration ()) {
243- ensureOpen ();
244- return prepareAndUpload (primaryTermSupplier .getAsLong (), location .generation );
241+ try {
242+ boolean shouldUpload = false ;
243+ try (ReleasableLock ignored = writeLock .acquire ()) {
244+ assert location .generation <= current .getGeneration ();
245+ if (location .generation == current .getGeneration ()) {
246+ ensureOpen ();
247+ if (prepareForUpload (location .generation ) == false ) {
248+ return false ;
249+ }
250+ shouldUpload = true ;
251+ }
252+ }
253+ if (shouldUpload ) {
254+ return performUpload (primaryTermSupplier .getAsLong (), location .generation );
245255 }
246256 } catch (final Exception ex ) {
247257 closeOnTragicEvent (ex );
@@ -256,10 +266,12 @@ public void rollGeneration() throws IOException {
256266 if (current .totalOperations () == 0 && primaryTermSupplier .getAsLong () == current .getPrimaryTerm ()) {
257267 return ;
258268 }
259- prepareAndUpload (primaryTermSupplier .getAsLong (), null );
269+ if (prepareForUpload (null )) {
270+ performUpload (primaryTermSupplier .getAsLong (), null );
271+ }
260272 }
261273
262- private boolean prepareAndUpload ( Long primaryTerm , Long generation ) throws IOException {
274+ private boolean prepareForUpload ( Long generation ) throws IOException {
263275 try (Releasable ignored = writeLock .acquire ()) {
264276 if (generation == null || generation == current .getGeneration ()) {
265277 try {
@@ -275,23 +287,41 @@ private boolean prepareAndUpload(Long primaryTerm, Long generation) throws IOExc
275287 closeOnTragicEvent (e );
276288 throw e ;
277289 }
278- } else if (generation < current .getGeneration ()) {
279- return false ;
280- }
290+ return true ;
291+ } else return generation >= current .getGeneration ();
292+ }
293+ }
281294
282- // Do we need remote writes in sync fashion ?
283- // If we don't , we should swallow FileAlreadyExistsException while writing to remote store
284- // and also verify for same during primary-primary relocation
285- // Writing remote in sync fashion doesn't hurt as global ckp update
286- // is not updated in remote translog except in primary to primary recovery.
287- if (generation == null ) {
288- if (closed .get () == false ) {
289- return upload (primaryTerm , current .getGeneration () - 1 );
295+ /**
296+ * This method does the remote store upload by first acquiring the lock on the uploadMutex monitor. The synchronized
297+ * is required to restrict multiple uploads happening concurrently. The read lock is required to ensure that the
298+ * underlying translog readers are not deleted and the current writer is not converted to a reader at the time of
299+ * upload.
300+ *
301+ * @param primaryTerm current primary term
302+ * @param generation current generation
303+ * @return true if upload is successful
304+ * @throws IOException if the upload fails due to any underlying exceptions.
305+ */
306+ private boolean performUpload (Long primaryTerm , Long generation ) throws IOException {
307+ synchronized (uploadMutex ) {
308+ try (Releasable ignored = readLock .acquire ()) {
309+ // Do we need remote writes in sync fashion ?
310+ // If we don't , we should swallow FileAlreadyExistsException while writing to remote store
311+ // and also verify for same during primary-primary relocation
312+ // Writing remote in sync fashion doesn't hurt as global ckp update
313+ // is not updated in remote translog except in primary to primary recovery.
314+ long generationToUpload ;
315+ if (generation == null ) {
316+ if (closed .get () == false ) {
317+ generationToUpload = current .getGeneration () - 1 ;
318+ } else {
319+ generationToUpload = current .getGeneration ();
320+ }
290321 } else {
291- return upload ( primaryTerm , current . getGeneration ()) ;
322+ generationToUpload = generation ;
292323 }
293- } else {
294- return upload (primaryTerm , generation );
324+ return upload (primaryTerm , generationToUpload );
295325 }
296326 }
297327 }
@@ -343,8 +373,8 @@ private boolean syncToDisk() throws IOException {
343373 @ Override
344374 public void sync () throws IOException {
345375 try {
346- if (syncToDisk () || syncNeeded ()) {
347- prepareAndUpload (primaryTermSupplier .getAsLong (), null );
376+ if (( syncToDisk () || syncNeeded ()) && prepareForUpload ( null )) {
377+ performUpload (primaryTermSupplier .getAsLong (), null );
348378 }
349379 } catch (final Exception e ) {
350380 tragedy .setTragicException (e );
@@ -528,22 +558,23 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
528558
529559 @ Override
530560 public void onUploadComplete (TransferSnapshot transferSnapshot ) throws IOException {
531- transferReleasable .close ();
532- closeFilesIfNoPendingRetentionLocks ();
533561 maxRemoteTranslogGenerationUploaded = generation ;
534562 minRemoteGenReferenced = getMinFileGeneration ();
535563 logger .trace ("uploaded translog for {} {} " , primaryTerm , generation );
536564 }
537565
538566 @ Override
539567 public void onUploadFailed (TransferSnapshot transferSnapshot , Exception ex ) throws IOException {
540- transferReleasable .close ();
541- closeFilesIfNoPendingRetentionLocks ();
542568 if (ex instanceof IOException ) {
543569 throw (IOException ) ex ;
544570 } else {
545571 throw (RuntimeException ) ex ;
546572 }
547573 }
574+
575+ @ Override
576+ public void close () {
577+ transferReleasable .close ();
578+ }
548579 }
549580}
0 commit comments