Skip to content

Commit 3ad6d6e

Browse files
authored
Simplify InternalEngine#innerIndex (#22721)
Today `InternalEngine#innerIndex` is a pretty big method (> 150 SLoC). This commit merged `#index` and `#innerIndex` and splits it up into smaller contained methods.
1 parent 8028578 commit 3ad6d6e

File tree

1 file changed

+145
-152
lines changed

1 file changed

+145
-152
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 145 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -496,30 +496,6 @@ private long checkDeletedAndGCed(VersionValue versionValue) {
496496
return currentVersion;
497497
}
498498

499-
@Override
500-
public IndexResult index(Index index) throws IOException {
501-
IndexResult result;
502-
try (ReleasableLock ignored = readLock.acquire()) {
503-
ensureOpen();
504-
if (index.origin().isRecovery()) {
505-
// Don't throttle recovery operations
506-
result = innerIndex(index);
507-
} else {
508-
try (Releasable r = throttle.acquireThrottle()) {
509-
result = innerIndex(index);
510-
}
511-
}
512-
} catch (RuntimeException | IOException e) {
513-
try {
514-
maybeFailEngine("index", e);
515-
} catch (Exception inner) {
516-
e.addSuppressed(inner);
517-
}
518-
throw e;
519-
}
520-
return result;
521-
}
522-
523499
private boolean canOptimizeAddDocument(Index index) {
524500
if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
525501
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
@@ -559,148 +535,165 @@ private boolean assertSequenceNumber(final Engine.Operation.Origin origin, final
559535
return true;
560536
}
561537

562-
private IndexResult innerIndex(Index index) throws IOException {
563-
// TODO we gotta split this method up it's too big!
564-
assert assertSequenceNumber(index.origin(), index.seqNo());
565-
final Translog.Location location;
566-
long seqNo = index.seqNo();
567-
try (Releasable ignored = acquireLock(index.uid())) {
568-
lastWriteNanos = index.startTime();
569-
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
570-
* and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
571-
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
572-
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
573-
* that:
574-
* - doesn't change per document
575-
* - is preserved in the transaction log
576-
* - and is assigned before we start to index / replicate
577-
* NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
578-
* case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
579-
* for instance:
580-
* - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
581-
* - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
582-
*
583-
* while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
584-
* - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
585-
*
586-
* if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
587-
* documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
588-
* While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
589-
*
590-
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
591-
* updateDocument.
592-
*/
593-
long currentVersion;
594-
final boolean deleted;
595-
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
596-
// lucene index without checking the version map but we still do the version check
597-
final boolean forceUpdateDocument;
598-
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
599-
if (canOptimizeAddDocument) {
600-
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
601-
if (index.isRetry()) {
602-
forceUpdateDocument = true;
603-
do {
604-
deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
605-
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
606-
break;
607-
}
608-
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
609-
index.getAutoGeneratedIdTimestamp()) == false);
610-
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
538+
@Override
539+
public IndexResult index(Index index) throws IOException {
540+
final boolean doThrottle = index.origin().isRecovery() == false;
541+
try (ReleasableLock releasableLock = readLock.acquire()) {
542+
ensureOpen();
543+
assert assertSequenceNumber(index.origin(), index.seqNo());
544+
final Translog.Location location;
545+
long seqNo = index.seqNo();
546+
try (Releasable ignored = acquireLock(index.uid());
547+
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
548+
lastWriteNanos = index.startTime();
549+
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
550+
* and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
551+
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
552+
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
553+
* that:
554+
* - doesn't change per document
555+
* - is preserved in the transaction log
556+
* - and is assigned before we start to index / replicate
557+
* NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
558+
* case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
559+
* for instance:
560+
* - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
561+
* - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
562+
*
563+
* while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
564+
* - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
565+
*
566+
* if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
567+
* documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
568+
* While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
569+
*
570+
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
571+
* updateDocument.
572+
*/
573+
long currentVersion;
574+
final boolean deleted;
575+
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
576+
// lucene index without checking the version map but we still do the version check
577+
final boolean forceUpdateDocument;
578+
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
579+
if (canOptimizeAddDocument) {
580+
forceUpdateDocument = isForceUpdateDocument(index);
581+
currentVersion = Versions.NOT_FOUND;
582+
deleted = true;
611583
} else {
612-
// in this case we force
613-
forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
584+
// update the document
585+
forceUpdateDocument = false; // we don't force it - it depends on the version
586+
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
587+
assert incrementVersionLookup();
588+
if (versionValue == null) {
589+
currentVersion = loadCurrentVersionFromIndex(index.uid());
590+
deleted = currentVersion == Versions.NOT_FOUND;
591+
} else {
592+
currentVersion = checkDeletedAndGCed(versionValue);
593+
deleted = versionValue.delete();
594+
}
614595
}
615-
currentVersion = Versions.NOT_FOUND;
616-
deleted = true;
617-
} else {
618-
// update the document
619-
forceUpdateDocument = false; // we don't force it - it depends on the version
620-
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
621-
assert incrementVersionLookup();
622-
if (versionValue == null) {
623-
currentVersion = loadCurrentVersionFromIndex(index.uid());
624-
deleted = currentVersion == Versions.NOT_FOUND;
596+
final long expectedVersion = index.version();
597+
Optional<IndexResult> resultOnVersionConflict;
598+
try {
599+
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
600+
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
601+
: Optional.empty();
602+
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
603+
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
604+
}
605+
606+
final IndexResult indexResult;
607+
if (resultOnVersionConflict.isPresent()) {
608+
indexResult = resultOnVersionConflict.get();
625609
} else {
626-
currentVersion = checkDeletedAndGCed(versionValue);
627-
deleted = versionValue.delete();
610+
// no version conflict
611+
if (index.origin() == Operation.Origin.PRIMARY) {
612+
seqNo = seqNoService().generateSeqNo();
613+
}
614+
indexResult = indexIntoLucene(index, seqNo, currentVersion, deleted, forceUpdateDocument, canOptimizeAddDocument, expectedVersion);
615+
}
616+
if (indexResult.hasFailure() == false) {
617+
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
618+
? translog.add(new Translog.Index(index, indexResult))
619+
: null;
620+
indexResult.setTranslogLocation(location);
621+
}
622+
indexResult.setTook(System.nanoTime() - index.startTime());
623+
indexResult.freeze();
624+
return indexResult;
625+
} finally {
626+
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
627+
seqNoService().markSeqNoAsCompleted(seqNo);
628628
}
629629
}
630-
final long expectedVersion = index.version();
631-
Optional<IndexResult> resultOnVersionConflict;
630+
} catch (RuntimeException | IOException e) {
632631
try {
633-
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
634-
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
635-
: Optional.empty();
636-
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
637-
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
632+
maybeFailEngine("index", e);
633+
} catch (Exception inner) {
634+
e.addSuppressed(inner);
638635
}
636+
throw e;
637+
}
638+
}
639639

640-
final IndexResult indexResult;
641-
if (resultOnVersionConflict.isPresent()) {
642-
indexResult = resultOnVersionConflict.get();
640+
private IndexResult indexIntoLucene(Index index, long seqNo, long currentVersion, boolean deleted, boolean forceUpdateDocument, boolean canOptimizeAddDocument, long expectedVersion) throws IOException {
641+
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
642+
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
643+
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
644+
*/
645+
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
646+
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
647+
index.parsedDoc().version().setLongValue(updatedVersion);
648+
try {
649+
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
650+
// document does not exists, we can optimize for create, but double check if assertions are running
651+
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
652+
index(index.docs(), indexWriter);
643653
} else {
644-
// no version conflict
645-
if (index.origin() == Operation.Origin.PRIMARY) {
646-
seqNo = seqNoService().generateSeqNo();
647-
}
648-
649-
/*
650-
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
651-
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
652-
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
653-
*/
654-
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
655-
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
656-
index.parsedDoc().version().setLongValue(updatedVersion);
657-
IndexResult innerIndexResult;
658-
try {
659-
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
660-
// document does not exists, we can optimize for create, but double check if assertions are running
661-
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
662-
index(index.docs(), indexWriter);
663-
} else {
664-
update(index.uid(), index.docs(), indexWriter);
665-
}
666-
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
667-
innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted);
668-
} catch (Exception ex) {
669-
if (indexWriter.getTragicException() == null) {
670-
/* There is no tragic event recorded so this must be a document failure.
671-
*
672-
* The handling inside IW doesn't guarantee that an tragic / aborting exception
673-
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
674-
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
675-
* we can potentially handle the exception before the engine is failed.
676-
* Bottom line is that we can only rely on the fact that if it's a document failure then
677-
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
678-
* non-document failure
679-
*/
680-
innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo());
681-
} else {
682-
throw ex;
683-
}
684-
}
685-
assert innerIndexResult != null;
686-
indexResult = innerIndexResult;
654+
update(index.uid(), index.docs(), indexWriter);
687655
}
688-
if (!indexResult.hasFailure()) {
689-
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
690-
? translog.add(new Translog.Index(index, indexResult))
691-
: null;
692-
indexResult.setTranslogLocation(location);
693-
}
694-
indexResult.setTook(System.nanoTime() - index.startTime());
695-
indexResult.freeze();
696-
return indexResult;
697-
} finally {
698-
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
699-
seqNoService().markSeqNoAsCompleted(seqNo);
656+
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
657+
return new IndexResult(updatedVersion, seqNo, deleted);
658+
} catch (Exception ex) {
659+
if (indexWriter.getTragicException() == null) {
660+
/* There is no tragic event recorded so this must be a document failure.
661+
*
662+
* The handling inside IW doesn't guarantee that an tragic / aborting exception
663+
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
664+
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
665+
* we can potentially handle the exception before the engine is failed.
666+
* Bottom line is that we can only rely on the fact that if it's a document failure then
667+
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
668+
* non-document failure
669+
*/
670+
return new IndexResult(ex, currentVersion, index.seqNo());
671+
} else {
672+
throw ex;
700673
}
701674
}
702675
}
703676

677+
private boolean isForceUpdateDocument(Index index) {
678+
boolean forceUpdateDocument;
679+
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
680+
if (index.isRetry()) {
681+
forceUpdateDocument = true;
682+
do {
683+
deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
684+
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
685+
break;
686+
}
687+
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
688+
index.getAutoGeneratedIdTimestamp()) == false);
689+
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
690+
} else {
691+
// in this case we force
692+
forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
693+
}
694+
return forceUpdateDocument;
695+
}
696+
704697
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
705698
if (docs.size() > 1) {
706699
indexWriter.addDocuments(docs);

0 commit comments

Comments
 (0)