Skip to content

Commit 8ba5ca5

Browse files
committed
simonw feedback
1 parent b9f5f02 commit 8ba5ca5

2 files changed

Lines changed: 24 additions & 21 deletions

File tree

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,13 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
108108
searcherManager = new SearcherManager(reader, searcherFactory);
109109
if (seqNoStats == null) {
110110
seqNoStats = buildSeqNoStats(config, lastCommittedSegmentInfos);
111-
ensureNoUncommittedOperation(seqNoStats, lastCommittedSegmentInfos);
112111
}
113112
this.seqNoStats = seqNoStats;
114113
this.docsStats = docsStats(lastCommittedSegmentInfos);
115114
this.indexWriterLock = indexWriterLock;
115+
if (obtainLock) {
116+
ensureNoUncommittedOperation(this.seqNoStats, lastCommittedSegmentInfos);
117+
}
116118
success = true;
117119
} finally {
118120
if (success == false) {
@@ -129,8 +131,13 @@ public ReadOnlyEngine(EngineConfig config, SeqNoStats seqNoStats, TranslogStats
129131
* so peer recovery of closed indices can skip phase 2 (i.e., not replaying translog operations) without losing data.
130132
*/
131133
private void ensureNoUncommittedOperation(SeqNoStats seqNoStats, SegmentInfos segmentInfos) throws IOException {
132-
// we can't enforce this check on an old index - should we prevent this engine as a recovery source?
134+
assert indexWriterLock != null;
133135
if (config().getIndexSettings().getIndexVersionCreated().before(Version.V_6_7_0)) {
136+
// we can't enforce this check on an old index
137+
return;
138+
}
139+
if (seqNoStats.getGlobalCheckpoint() == seqNoStats.getMaxSeqNo()) {
140+
// we are good - no need to open translog
134141
return;
135142
}
136143
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
@@ -139,13 +146,13 @@ private void ensureNoUncommittedOperation(SeqNoStats seqNoStats, SegmentInfos se
139146
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(Long.MAX_VALUE, Long.MAX_VALUE);
140147
translogDeletionPolicy.setTranslogGenerationOfLastCommit(recoverTranslogGeneration.translogFileGeneration);
141148
translogDeletionPolicy.setMinTranslogGenerationForRecovery(recoverTranslogGeneration.translogFileGeneration);
142-
final LocalCheckpointTracker localCheckpointTracker;
143-
try (DirectoryReader reader = DirectoryReader.open(indexCommit)) {
144-
localCheckpointTracker = createLocalCheckpointTracker(engineConfig, segmentInfos, logger,
145-
() -> new Searcher("build_checkpoint_tracker", new IndexSearcher(reader), () -> {}), LocalCheckpointTracker::new);
146-
}
147149
try (Translog translog = new Translog(engineConfig.getTranslogConfig(), translogUUID, translogDeletionPolicy,
148150
engineConfig.getGlobalCheckpointSupplier(), engineConfig.getPrimaryTermSupplier())) {
151+
final LocalCheckpointTracker localCheckpointTracker;
152+
try (DirectoryReader reader = DirectoryReader.open(indexCommit)) {
153+
localCheckpointTracker = createLocalCheckpointTracker(engineConfig, segmentInfos, logger,
154+
() -> new Searcher("build_checkpoint_tracker", new IndexSearcher(reader), () -> {}), LocalCheckpointTracker::new);
155+
}
149156
try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(recoverTranslogGeneration, Long.MAX_VALUE)) {
150157
Translog.Operation op;
151158
while ((op = snapshot.next()) != null) {

server/src/test/java/org/elasticsearch/index/engine/ReadOnlyEngineTests.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.elasticsearch.index.mapper.ParsedDocument;
2626
import org.elasticsearch.index.seqno.SeqNoStats;
2727
import org.elasticsearch.index.seqno.SequenceNumbers;
28-
import org.elasticsearch.index.shard.DocsStats;
2928
import org.elasticsearch.index.store.Store;
3029

3130
import java.io.IOException;
@@ -140,29 +139,26 @@ public void testEnsureNoUncommittedOperations() throws Exception {
140139
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
141140
try (Store store = createStore()) {
142141
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), null, null, globalCheckpoint::get);
143-
boolean hasPendingOps = false;
144142
List<Engine.Operation> ops = generateHistoryOnReplica(between(10, 500), randomBoolean(), false, randomBoolean());
145143
Randomness.shuffle(ops);
146-
final DocsStats docStats;
144+
List<Engine.Operation> uncommittedOps = randomSubsetOf(ops);
145+
ops.removeAll(uncommittedOps);
147146
try (InternalEngine engine = createEngine(config)) {
148147
for (Engine.Operation op : ops) {
149-
hasPendingOps = true;
150148
applyOperation(engine, op);
151-
if (rarely()) {
152-
engine.flush(true, true);
153-
hasPendingOps = false;
154-
}
155149
}
156-
engine.refresh("test");
157-
docStats = engine.docStats();
150+
engine.flush(true, true);
151+
globalCheckpoint.set(engine.getLocalCheckpoint());
152+
for (Engine.Operation op : uncommittedOps) {
153+
applyOperation(engine, op);
154+
}
158155
}
159-
if (hasPendingOps) {
156+
if (uncommittedOps.isEmpty() == false) {
160157
AssertionError error = expectThrows(AssertionError.class,
161-
() -> new ReadOnlyEngine(config, null, null, randomBoolean(), Function.identity()));
158+
() -> new ReadOnlyEngine(config, null, null, true, Function.identity()));
162159
assertThat(error.getMessage(), containsString("does not contain operation"));
163160
} else {
164-
try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, randomBoolean(), Function.identity())) {
165-
assertThat(readOnlyEngine.docStats(), equalTo(docStats));
161+
try (ReadOnlyEngine ignored = new ReadOnlyEngine(config, null, null, randomBoolean(), Function.identity())) {
166162
}
167163
}
168164
}

0 commit comments

Comments
 (0)