|
29 | 29 | import java.util.Map; |
30 | 30 | import java.util.concurrent.ConcurrentHashMap; |
31 | 31 | import java.util.concurrent.atomic.AtomicBoolean; |
| 32 | +import java.util.function.LongSupplier; |
32 | 33 |
|
33 | 34 | public class TranslogDeletionPolicy { |
34 | 35 |
|
@@ -65,10 +66,14 @@ public void assertNoOpenTranslogRefs() { |
65 | 66 |
|
66 | 67 | private int retentionTotalFiles; |
67 | 68 |
|
68 | | - public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles) { |
| 69 | + private final LongSupplier retainingSeqNoSupplier; |
| 70 | + |
| 71 | + public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis, int retentionTotalFiles, |
| 72 | + LongSupplier retainingSeqNoSupplier) { |
69 | 73 | this.retentionSizeInBytes = retentionSizeInBytes; |
70 | 74 | this.retentionAgeInMillis = retentionAgeInMillis; |
71 | 75 | this.retentionTotalFiles = retentionTotalFiles; |
| 76 | + this.retainingSeqNoSupplier = retainingSeqNoSupplier; |
72 | 77 | if (Assertions.ENABLED) { |
73 | 78 | openTranslogRef = new ConcurrentHashMap<>(); |
74 | 79 | } else { |
@@ -172,7 +177,9 @@ synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogW |
172 | 177 | minByAgeAndSize = Math.max(minByAge, minBySize); |
173 | 178 | } |
174 | 179 | long minByNumFiles = getMinTranslogGenByTotalFiles(readers, writer, retentionTotalFiles); |
175 | | - return Math.min(Math.max(minByAgeAndSize, minByNumFiles), Math.min(minByLocks, minTranslogGenerationForRecovery)); |
| 180 | + long minByPolicy = Math.max(minByAgeAndSize, minByNumFiles); |
| 181 | + long minByRetainingSeqNo = getMinTranslogGenByRetainingSeqNo(readers, writer, retainingSeqNoSupplier.getAsLong()); |
| 182 | + return Math.min(Math.max(minByPolicy, minByRetainingSeqNo), Math.min(minByLocks, minTranslogGenerationForRecovery)); |
176 | 183 | } |
177 | 184 |
|
178 | 185 | static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) { |
@@ -214,6 +221,15 @@ static long getMinTranslogGenByTotalFiles(List<TranslogReader> readers, Translog |
214 | 221 | return minGen; |
215 | 222 | } |
216 | 223 |
|
| 224 | + static long getMinTranslogGenByRetainingSeqNo(List<TranslogReader> readers, TranslogWriter writer, long seqNo) { |
| 225 | + for (TranslogReader reader : readers) { |
| 226 | + if (reader.getCheckpoint().maxEffectiveSeqNo() >= seqNo) { |
| 227 | + return reader.generation; |
| 228 | + } |
| 229 | + } |
| 230 | + return writer.generation; |
| 231 | + } |
| 232 | + |
217 | 233 | protected long currentTime() { |
218 | 234 | return System.currentTimeMillis(); |
219 | 235 | } |
|
0 commit comments