Skip to content

Commit bda80cd

Browse files
committed
policy with retaining seqno
1 parent 84d9c01 commit bda80cd

2 files changed

Lines changed: 36 additions & 7 deletions

File tree

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.elasticsearch.index.merge.MergeStats;
8888
import org.elasticsearch.index.merge.OnGoingMerge;
8989
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
90+
import org.elasticsearch.index.seqno.RetentionLease;
9091
import org.elasticsearch.index.seqno.SeqNoStats;
9192
import org.elasticsearch.index.seqno.SequenceNumbers;
9293
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
@@ -189,11 +190,7 @@ public InternalEngine(EngineConfig engineConfig) {
189190
final EngineConfig engineConfig,
190191
final BiFunction<Long, Long, LocalCheckpointTracker> localCheckpointTrackerSupplier) {
191192
super(engineConfig);
192-
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
193-
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
194-
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis(),
195-
engineConfig.getIndexSettings().getTranslogRetentionTotalFiles()
196-
);
193+
final TranslogDeletionPolicy translogDeletionPolicy = newTranslogDeletionPolicy(engineConfig);
197194
store.incRef();
198195
IndexWriter writer = null;
199196
Translog translog = null;
@@ -488,6 +485,22 @@ private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecovery
488485
translog.trimUnreferencedReaders();
489486
}
490487

488+
private static TranslogDeletionPolicy newTranslogDeletionPolicy(EngineConfig config) {
489+
final LongSupplier retainingSeqNo;
490+
if (config.getIndexSettings().isSoftDeleteEnabled()) {
491+
retainingSeqNo = () -> Long.MAX_VALUE;
492+
} else {
493+
retainingSeqNo = () -> config.retentionLeasesSupplier().get().leases().stream()
494+
.mapToLong(RetentionLease::retainingSequenceNumber)
495+
.max().orElse(Long.MAX_VALUE);
496+
};
497+
return new TranslogDeletionPolicy(
498+
config.getIndexSettings().getTranslogRetentionSize().getBytes(),
499+
config.getIndexSettings().getTranslogRetentionAge().getMillis(),
500+
config.getIndexSettings().getTranslogRetentionTotalFiles(),
501+
retainingSeqNo);
502+
}
503+
491504
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy,
492505
LongSupplier globalCheckpointSupplier, LongConsumer persistedSequenceNumberConsumer) throws IOException {
493506

server/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicy.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.concurrent.ConcurrentHashMap;
3131
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.function.LongSupplier;
3233

3334
public class TranslogDeletionPolicy {
3435

@@ -65,10 +66,14 @@ public void assertNoOpenTranslogRefs() {
6566

6667
private int retentionTotalFiles;
6768

68-
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) {
69+
private final LongSupplier retainingSeqNoSupplier;
70+
71+
public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles,
72+
LongSupplier retainingSeqNoSupplier) {
6973
this.retentionSizeInBytes = retentionSizeInBytes;
7074
this.retentionAgeInMillis = retentionAgeInMillis;
7175
this.retentionTotalFiles = retentionTotalFiles;
76+
this.retainingSeqNoSupplier = retainingSeqNoSupplier;
7277
if (Assertions.ENABLED) {
7378
openTranslogRef = new ConcurrentHashMap<>();
7479
} else {
@@ -172,7 +177,9 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW
172177
minByAgeAndSize = Math.max(minByAge, minBySize);
173178
}
174179
long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles);
175-
return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery));
180+
long minByPolicy = Math.max(minByAgeAndSize, minByNumFiles);
181+
long minByRetainingSeqNo = getMinTranslogGenByRetainingSeqNo(readers, writer, retainingSeqNoSupplier.getAsLong());
182+
return Math.min(Math.max(minByPolicy, minByRetainingSeqNo), Math.min(minByLocks, minTranslogGenerationForRecovery));
176183
}
177184

178185
static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
@@ -214,6 +221,15 @@ static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, Translog
214221
return minGen;
215222
}
216223

224+
static long getMinTranslogGenByRetainingSeqNo(List<TranslogReader> readers, TranslogWriter writer, long seqNo) {
225+
for (TranslogReader reader : readers) {
226+
if (reader.getCheckpoint().maxEffectiveSeqNo() >= seqNo) {
227+
return reader.generation;
228+
}
229+
}
230+
return writer.generation;
231+
}
232+
217233
protected long currentTime() {
218234
return System.currentTimeMillis();
219235
}

0 commit comments

Comments
 (0)