@@ -496,30 +496,6 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
496496 return currentVersion ;
497497 }
498498
499- @ Override
500- public IndexResult index (Index index ) throws IOException {
501- IndexResult result ;
502- try (ReleasableLock ignored = readLock .acquire ()) {
503- ensureOpen ();
504- if (index .origin ().isRecovery ()) {
505- // Don't throttle recovery operations
506- result = innerIndex (index );
507- } else {
508- try (Releasable r = throttle .acquireThrottle ()) {
509- result = innerIndex (index );
510- }
511- }
512- } catch (RuntimeException | IOException e ) {
513- try {
514- maybeFailEngine ("index" , e );
515- } catch (Exception inner ) {
516- e .addSuppressed (inner );
517- }
518- throw e ;
519- }
520- return result ;
521- }
522-
523499 private boolean canOptimizeAddDocument (Index index ) {
524500 if (index .getAutoGeneratedIdTimestamp () != IndexRequest .UNSET_AUTO_GENERATED_TIMESTAMP ) {
525501 assert index .getAutoGeneratedIdTimestamp () >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
@@ -559,148 +535,165 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
559535 return true ;
560536 }
561537
562- private IndexResult innerIndex (Index index ) throws IOException {
563- // TODO we gotta split this method up it's too big!
564- assert assertSequenceNumber (index .origin (), index .seqNo ());
565- final Translog .Location location ;
566- long seqNo = index .seqNo ();
567- try (Releasable ignored = acquireLock (index .uid ())) {
568- lastWriteNanos = index .startTime ();
569- /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
570- * and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
571- * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
572- * to detect if it has potentially been added before. We use the documents timestamp for this since it's something
573- * that:
574- * - doesn't change per document
575- * - is preserved in the transaction log
576- * - and is assigned before we start to index / replicate
577- * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
578- * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
579- * for instance:
580- * - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
581- * - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
582- *
583- * while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
584- * - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
585- *
586- * if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
587- * documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
588- * While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
589- *
590- * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
591- * updateDocument.
592- */
593- long currentVersion ;
594- final boolean deleted ;
595- // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
596- // lucene index without checking the version map but we still do the version check
597- final boolean forceUpdateDocument ;
598- final boolean canOptimizeAddDocument = canOptimizeAddDocument (index );
599- if (canOptimizeAddDocument ) {
600- long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp .get ();
601- if (index .isRetry ()) {
602- forceUpdateDocument = true ;
603- do {
604- deOptimizeTimestamp = maxUnsafeAutoIdTimestamp .get ();
605- if (deOptimizeTimestamp >= index .getAutoGeneratedIdTimestamp ()) {
606- break ;
607- }
608- } while (maxUnsafeAutoIdTimestamp .compareAndSet (deOptimizeTimestamp ,
609- index .getAutoGeneratedIdTimestamp ()) == false );
610- assert maxUnsafeAutoIdTimestamp .get () >= index .getAutoGeneratedIdTimestamp ();
538+ @ Override
539+ public IndexResult index (Index index ) throws IOException {
540+ final boolean doThrottle = index .origin ().isRecovery () == false ;
541+ try (ReleasableLock releasableLock = readLock .acquire ()) {
542+ ensureOpen ();
543+ assert assertSequenceNumber (index .origin (), index .seqNo ());
544+ final Translog .Location location ;
545+ long seqNo = index .seqNo ();
546+ try (Releasable ignored = acquireLock (index .uid ());
547+ Releasable indexThrottle = doThrottle ? () -> {} : throttle .acquireThrottle ()) {
548+ lastWriteNanos = index .startTime ();
549+ /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
550+ * and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
551+ * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
552+ * to detect if it has potentially been added before. We use the documents timestamp for this since it's something
553+ * that:
554+ * - doesn't change per document
555+ * - is preserved in the transaction log
556+ * - and is assigned before we start to index / replicate
557+ * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
558+ * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
559+ * for instance:
560+ * - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
561+ * - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
562+ *
563+ * while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
564+ * - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
565+ *
566+ * if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
567+ * documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
568+ * While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
569+ *
570+ * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
571+ * updateDocument.
572+ */
573+ long currentVersion ;
574+ final boolean deleted ;
575+ // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
576+ // lucene index without checking the version map but we still do the version check
577+ final boolean forceUpdateDocument ;
578+ final boolean canOptimizeAddDocument = canOptimizeAddDocument (index );
579+ if (canOptimizeAddDocument ) {
580+ forceUpdateDocument = isForceUpdateDocument (index );
581+ currentVersion = Versions .NOT_FOUND ;
582+ deleted = true ;
611583 } else {
612- // in this case we force
613- forceUpdateDocument = deOptimizeTimestamp >= index .getAutoGeneratedIdTimestamp ();
584+ // update the document
585+ forceUpdateDocument = false ; // we don't force it - it depends on the version
586+ final VersionValue versionValue = versionMap .getUnderLock (index .uid ());
587+ assert incrementVersionLookup ();
588+ if (versionValue == null ) {
589+ currentVersion = loadCurrentVersionFromIndex (index .uid ());
590+ deleted = currentVersion == Versions .NOT_FOUND ;
591+ } else {
592+ currentVersion = checkDeletedAndGCed (versionValue );
593+ deleted = versionValue .delete ();
594+ }
614595 }
615- currentVersion = Versions .NOT_FOUND ;
616- deleted = true ;
617- } else {
618- // update the document
619- forceUpdateDocument = false ; // we don't force it - it depends on the version
620- final VersionValue versionValue = versionMap .getUnderLock (index .uid ());
621- assert incrementVersionLookup ();
622- if (versionValue == null ) {
623- currentVersion = loadCurrentVersionFromIndex (index .uid ());
624- deleted = currentVersion == Versions .NOT_FOUND ;
596+ final long expectedVersion = index .version ();
597+ Optional <IndexResult > resultOnVersionConflict ;
598+ try {
599+ final boolean isVersionConflict = checkVersionConflict (index , currentVersion , expectedVersion , deleted );
600+ resultOnVersionConflict = isVersionConflict ? Optional .of (new IndexResult (currentVersion , index .seqNo (), false ))
601+ : Optional .empty ();
602+ } catch (IllegalArgumentException | VersionConflictEngineException ex ) {
603+ resultOnVersionConflict = Optional .of (new IndexResult (ex , currentVersion , index .seqNo ()));
604+ }
605+
606+ final IndexResult indexResult ;
607+ if (resultOnVersionConflict .isPresent ()) {
608+ indexResult = resultOnVersionConflict .get ();
625609 } else {
626- currentVersion = checkDeletedAndGCed (versionValue );
627- deleted = versionValue .delete ();
610+ // no version conflict
611+ if (index .origin () == Operation .Origin .PRIMARY ) {
612+ seqNo = seqNoService ().generateSeqNo ();
613+ }
614+ indexResult = indexIntoLucene (index , seqNo , currentVersion , deleted , forceUpdateDocument , canOptimizeAddDocument , expectedVersion );
615+ }
616+ if (indexResult .hasFailure () == false ) {
617+ location = index .origin () != Operation .Origin .LOCAL_TRANSLOG_RECOVERY
618+ ? translog .add (new Translog .Index (index , indexResult ))
619+ : null ;
620+ indexResult .setTranslogLocation (location );
621+ }
622+ indexResult .setTook (System .nanoTime () - index .startTime ());
623+ indexResult .freeze ();
624+ return indexResult ;
625+ } finally {
626+ if (seqNo != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
627+ seqNoService ().markSeqNoAsCompleted (seqNo );
628628 }
629629 }
630- final long expectedVersion = index .version ();
631- Optional <IndexResult > resultOnVersionConflict ;
630+ } catch (RuntimeException | IOException e ) {
632631 try {
633- final boolean isVersionConflict = checkVersionConflict (index , currentVersion , expectedVersion , deleted );
634- resultOnVersionConflict = isVersionConflict ? Optional .of (new IndexResult (currentVersion , index .seqNo (), false ))
635- : Optional .empty ();
636- } catch (IllegalArgumentException | VersionConflictEngineException ex ) {
637- resultOnVersionConflict = Optional .of (new IndexResult (ex , currentVersion , index .seqNo ()));
632+ maybeFailEngine ("index" , e );
633+ } catch (Exception inner ) {
634+ e .addSuppressed (inner );
638635 }
636+ throw e ;
637+ }
638+ }
639639
640- final IndexResult indexResult ;
641- if (resultOnVersionConflict .isPresent ()) {
642- indexResult = resultOnVersionConflict .get ();
640+ private IndexResult indexIntoLucene (Index index , long seqNo , long currentVersion , boolean deleted , boolean forceUpdateDocument , boolean canOptimizeAddDocument , long expectedVersion ) throws IOException {
641+ /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
642+ * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
643+ * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
644+ */
645+ index .parsedDoc ().updateSeqID (seqNo , index .primaryTerm ());
646+ final long updatedVersion = index .versionType ().updateVersion (currentVersion , expectedVersion );
647+ index .parsedDoc ().version ().setLongValue (updatedVersion );
648+ try {
649+ if (currentVersion == Versions .NOT_FOUND && forceUpdateDocument == false ) {
650+ // document does not exists, we can optimize for create, but double check if assertions are running
651+ assert assertDocDoesNotExist (index , canOptimizeAddDocument == false );
652+ index (index .docs (), indexWriter );
643653 } else {
644- // no version conflict
645- if (index .origin () == Operation .Origin .PRIMARY ) {
646- seqNo = seqNoService ().generateSeqNo ();
647- }
648-
649- /*
650- * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
651- * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
652- * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
653- */
654- index .parsedDoc ().updateSeqID (seqNo , index .primaryTerm ());
655- final long updatedVersion = index .versionType ().updateVersion (currentVersion , expectedVersion );
656- index .parsedDoc ().version ().setLongValue (updatedVersion );
657- IndexResult innerIndexResult ;
658- try {
659- if (currentVersion == Versions .NOT_FOUND && forceUpdateDocument == false ) {
660- // document does not exists, we can optimize for create, but double check if assertions are running
661- assert assertDocDoesNotExist (index , canOptimizeAddDocument == false );
662- index (index .docs (), indexWriter );
663- } else {
664- update (index .uid (), index .docs (), indexWriter );
665- }
666- versionMap .putUnderLock (index .uid ().bytes (), new VersionValue (updatedVersion ));
667- innerIndexResult = new IndexResult (updatedVersion , seqNo , deleted );
668- } catch (Exception ex ) {
669- if (indexWriter .getTragicException () == null ) {
670- /* There is no tragic event recorded so this must be a document failure.
671- *
672- * The handling inside IW doesn't guarantee that an tragic / aborting exception
673- * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
674- * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
675- * we can potentially handle the exception before the engine is failed.
676- * Bottom line is that we can only rely on the fact that if it's a document failure then
677- * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
678- * non-document failure
679- */
680- innerIndexResult = new IndexResult (ex , currentVersion , index .seqNo ());
681- } else {
682- throw ex ;
683- }
684- }
685- assert innerIndexResult != null ;
686- indexResult = innerIndexResult ;
654+ update (index .uid (), index .docs (), indexWriter );
687655 }
688- if (!indexResult .hasFailure ()) {
689- location = index .origin () != Operation .Origin .LOCAL_TRANSLOG_RECOVERY
690- ? translog .add (new Translog .Index (index , indexResult ))
691- : null ;
692- indexResult .setTranslogLocation (location );
693- }
694- indexResult .setTook (System .nanoTime () - index .startTime ());
695- indexResult .freeze ();
696- return indexResult ;
697- } finally {
698- if (seqNo != SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
699- seqNoService ().markSeqNoAsCompleted (seqNo );
656+ versionMap .putUnderLock (index .uid ().bytes (), new VersionValue (updatedVersion ));
657+ return new IndexResult (updatedVersion , seqNo , deleted );
658+ } catch (Exception ex ) {
659+ if (indexWriter .getTragicException () == null ) {
660+ /* There is no tragic event recorded so this must be a document failure.
661+ *
662+ * The handling inside IW doesn't guarantee that an tragic / aborting exception
663+ * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
664+ * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
665+ * we can potentially handle the exception before the engine is failed.
666+ * Bottom line is that we can only rely on the fact that if it's a document failure then
667+ * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
668+ * non-document failure
669+ */
670+ return new IndexResult (ex , currentVersion , index .seqNo ());
671+ } else {
672+ throw ex ;
700673 }
701674 }
702675 }
703676
677+ private boolean isForceUpdateDocument (Index index ) {
678+ boolean forceUpdateDocument ;
679+ long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp .get ();
680+ if (index .isRetry ()) {
681+ forceUpdateDocument = true ;
682+ do {
683+ deOptimizeTimestamp = maxUnsafeAutoIdTimestamp .get ();
684+ if (deOptimizeTimestamp >= index .getAutoGeneratedIdTimestamp ()) {
685+ break ;
686+ }
687+ } while (maxUnsafeAutoIdTimestamp .compareAndSet (deOptimizeTimestamp ,
688+ index .getAutoGeneratedIdTimestamp ()) == false );
689+ assert maxUnsafeAutoIdTimestamp .get () >= index .getAutoGeneratedIdTimestamp ();
690+ } else {
691+ // in this case we force
692+ forceUpdateDocument = deOptimizeTimestamp >= index .getAutoGeneratedIdTimestamp ();
693+ }
694+ return forceUpdateDocument ;
695+ }
696+
704697 private static void index (final List <ParseContext .Document > docs , final IndexWriter indexWriter ) throws IOException {
705698 if (docs .size () > 1 ) {
706699 indexWriter .addDocuments (docs );
0 commit comments