Allow to trim all ops above a certain seq# with a term lower than X#30176
Allow to trim all ops above a certain seq# with a term lower than X#30176vladimirdolzhenko merged 54 commits intoelastic:masterfrom
Conversation
|
Pinging @elastic/es-distributed |
38446d1 to
594dff0
Compare
594dff0 to
d3d210a
Compare
bleskes
left a comment
There was a problem hiding this comment.
Thanks @vladimirdolzhenko . The basics look good. I did a sweep of the production code and left some comments. I will do the tests once these have been addressed.
| public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> { | ||
|
|
||
| private long belowTermId; | ||
| private long trimmedAboveSeqNo; |
| */ | ||
| public final class ResyncReplicationRequest extends ReplicatedWriteRequest<ResyncReplicationRequest> { | ||
|
|
||
| private long belowTermId; |
There was a problem hiding this comment.
I think this can be removed. When we execute a replication operation, we are guaranteed that the replica is on the same term as the primary that generated the request. The trim semantics can be trimOperationOfPreviousPrimaryTerms(long minSeqNoToTrim)
| public void readFrom(final StreamInput in) throws IOException { | ||
| assert Version.CURRENT.major <= 7; | ||
| if (in.getVersion().equals(Version.V_6_0_0)) { | ||
| if (in.getVersion().onOrBefore(Version.V_6_0_0)) { |
| throw new IllegalStateException("resync replication request serialization is broken in 6.0.0"); | ||
| } | ||
| super.readFrom(in); | ||
| if (in.getVersion().onOrAfter(Version.V_6_4_0)) { |
There was a problem hiding this comment.
you need to start with 7.0.0. You can only push this down to 6.4 once this has been backported.
There was a problem hiding this comment.
It's a bit unclear how it has to be done. Shall it be merged into master first with V_7_0_0, after backported to 6.x branch and after adjusted in master to V_6_X ?
There was a problem hiding this comment.
There is a way to do it. First you build it with 7.0.0. Before backporting to 6.x you disable bwc tests on master, push your 6.x code and then renable bwc tests on master. I'll show you how to do it once we're ready.
| super.readFrom(in); | ||
| if (in.getVersion().onOrAfter(Version.V_6_4_0)) { | ||
| belowTermId = in.readVLong(); | ||
| trimmedAboveSeqNo = in.readVLong(); |
There was a problem hiding this comment.
UNASSIGNED_SEQ_NO is -2 and NO_OPS_PERFORMED is -1. Since readVLong can't take negative numbers we use readZLong
|
|
||
| /** | ||
| * Trims translog (effectively moves max visible seq# {@link Checkpoint#trimmedAboveSeqNo}) for terms below <code>belowTerm</code> | ||
| * and seq# above <code>aboveSeqNo</code> |
There was a problem hiding this comment.
can you expend a bit that these operations will not be exposed in snapshots?
| return; | ||
| } | ||
|
|
||
| final ChannelFactory channelFactory = FileChannel::open; |
There was a problem hiding this comment.
nit: how much does this buy us? I think we can just inline it and have better readability?
| IOUtils.fsync(checkpointFile, false); | ||
| IOUtils.fsync(checkpointFile.getParent(), true); | ||
|
|
||
| newReader = reader.withNewCheckpoint(newCheckpoint); |
There was a problem hiding this comment.
this is super dangerous. If we have an exception here (or in pervious iterations) we will leak a file channel, as no one closes newReader. I think you need testing for this, see TranslogTests#testWithRandomException to see how this can be tested.
There was a problem hiding this comment.
agreed, going to fix it
| * Create a new reader with new checkoint that shares resources with current one | ||
| */ | ||
| TranslogReader withNewCheckpoint(final Checkpoint newCheckpoint){ | ||
| return new TranslogReader(newCheckpoint, channel, path, header, closed); |
There was a problem hiding this comment.
the are a couple of problems with this:
- A closed translog should never be trimmed.
- There should be a clear transfer of owner ship of the underlying channel between the old reader and the new - i.e. the old reader should be closed and the new reader should be open.
You can see how to do this kind of ownership transfer by looking at TranslogWriter.closeIntoReader .
Instead of splitting the trimming logic of a reader in two places (Translog.trim and here), I think we should move it all here. Call the method closeIntoTrimmedReader and do all the checkpoint + state juggling here.
There was a problem hiding this comment.
agreed, going to fix it
| return null; | ||
| while (readOperations < totalOperations) { | ||
| final Translog.Operation operation = readOperation(); | ||
| if (operation.seqNo() <= checkpoint.trimmedAboveSeqNo || checkpoint.trimmedAboveSeqNo < 0) { |
There was a problem hiding this comment.
can hard check timmerdAboveSeqNo to UNASSINGED_SEQ_NO? I don't like the implicit lower than 0. It's hard to grep for usages like that.
| } | ||
|
|
||
| /** | ||
| * Trims translog (effectively moves max visible seq# {@link Checkpoint#trimmedAboveSeqNo}) for terms below <code>belowTerm</code> |
There was a problem hiding this comment.
also we should be clear we're talking about primary terms of the files not the operations.
… fixes on PR#2 Relates to elastic#10708
… fixes on PR#2 Relates to elastic#10708
… fixes on PR elastic#3 Relates to elastic#10708
… fixes on PR elastic#4 Relates to elastic#10708
| /** | ||
| * The number of operations have been skipped (overridden or trimmed) in the snapshot so far. | ||
| */ | ||
| default int skippedOperations() { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
… fixes on PR elastic#5 Relates to elastic#10708
… fixes for the case of no entities Relates to elastic#10708
bleskes
left a comment
There was a problem hiding this comment.
Thanks @vladimirdolzhenko . Good iteration. I left some more comments.
| } | ||
| } | ||
| if (request.getTrimAboveSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { | ||
| replica.trimOperationOfPreviousPrimaryTerms(replica.getPrimaryTerm(), request.getTrimAboveSeqNo()); |
There was a problem hiding this comment.
there is no need to pass the primary term to the replica - it already knows it.
| public abstract boolean isThrottled(); | ||
|
|
||
| /** | ||
| * Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code> |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| * Trims translog for terms below <code>belowTerm</code> and seq# above <code>aboveSeqNo</code> | ||
| */ | ||
| public void trimTranslog(long belowTerm, long aboveSeqNo) throws IOException { | ||
| getTranslog().trim(belowTerm, aboveSeqNo); |
There was a problem hiding this comment.
wrap with exceptions handling to fail the engine if this fails. Also, I think you want to hold the read lock, just for sanity. See InternalEngine#rollTranslogGeneration for an example
| assert currentEngineReference.get() == null; | ||
| } | ||
|
|
||
| public void trimOperationOfPreviousPrimaryTerms(long belowTerm, long aboveSeqNo) throws IOException { |
There was a problem hiding this comment.
See comment about remove the belowTerm parameter and using the shard's term
| } | ||
|
|
||
| if (!operations.isEmpty()) { | ||
| final long trimmedAboveSeqNo = firstMessage.get() && maxSeqNo > 0 ? maxSeqNo : SequenceNumbers.UNASSIGNED_SEQ_NO; |
There was a problem hiding this comment.
I think you want maxSeqNo >= 0 (0 is a valid sequence number). Semantically, I think it's nice to say maxSeqNo != SequenceNumbers.NO_OPS_PERFORMED. Also, double check your tests that they cover this case (only one operation on the primary and two ops on replica. It will be good to understand why this wasn't catched.
| } | ||
|
|
||
| failableTLog.rollGeneration(); | ||
| fail.failAlways(); |
There was a problem hiding this comment.
I don't think you need to fail always but rather fail randomly. This is important to test failures that can come in any step, not just the first.
| failableTLog.add(operation); | ||
| } | ||
|
|
||
| failableTLog.rollGeneration(); |
There was a problem hiding this comment.
as in the previous test - you need way more evilness - more roll generations, more terms, move variance.
| operations.add(operation); | ||
| } | ||
| // shuffle a bit - move several first items to the end | ||
| for(int i = 0, len = randomIntBetween(5, 10); i < len; i++){ |
There was a problem hiding this comment.
see previous comments about shuffle.
| } | ||
|
|
||
| try { | ||
| failableTLog.newSnapshot(); |
| } catch (MockDirectoryWrapper.FakeIOException ex) { | ||
| // all is fine | ||
| } | ||
|
|
There was a problem hiding this comment.
please make sure all files closed and no file is leaked.
There was a problem hiding this comment.
do we really need to manually check that all files are closed while that check is provided by ESTestCase ?
There was a problem hiding this comment.
If that's the case, we don't need - that's making sure :D - where do you see it's done in ESTestCase? (I didn't check myself)
There was a problem hiding this comment.
TestRuleTemporaryFilesCleanup takes care of it (in case I don't do close of file handlers):
java.lang.RuntimeException: file handle leaks: [FileChannel(/private/var/folders/dz/750jvg8j31j21s52xyhzcj4c0000gn/T/org.elasticsearch.index.translog.TranslogTests_FBDFE9B843A061D2-001/tempDir-003/translog-1.tlog), FileChannel(/private/var/folders/dz/750jvg8j31j21s52xyhzcj4c0000gn/T/org.elasticsearch.index.translog.TranslogTests_FBDFE9B843A061D2-001/tempDir-003/translog-2.tlog), FileChannel(/private/var/folders/dz/750jvg8j31j21s52xyhzcj4c0000gn/T/org.elasticsearch.index.translog.TranslogTests_FBDFE9B843A061D2-001/tempDir-003/translog-3.tlog)]
at __randomizedtesting.SeedInfo.seed([FBDFE9B843A061D2]:0)
at org.apache.lucene.mockfile.LeakFS.onClose(LeakFS.java:63)
at org.apache.lucene.mockfile.FilterFileSystem.close(FilterFileSystem.java:77)
at org.apache.lucene.mockfile.FilterFileSystem.close(FilterFileSystem.java:78)
at org.apache.lucene.util.TestRuleTemporaryFilesCleanup.afterAlways(TestRuleTemporaryFilesCleanup.java:228)
at com.carrotsearch.randomizedtesting.rules.TestRuleAdapter$1.afterAlways(TestRuleAdapter.java:31)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:43)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.apache.lucene.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
at org.apache.lucene.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:47)
at org.apache.lucene.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:64)
at org.apache.lucene.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:54)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
at java.base/java.lang.Thread.run(Thread.java:844)
Caused by: java.lang.Exception
at org.apache.lucene.mockfile.LeakFS.onOpen(LeakFS.java:46)
at org.apache.lucene.mockfile.HandleTrackingFS.callOpenHook(HandleTrackingFS.java:81)
at org.apache.lucene.mockfile.HandleTrackingFS.newFileChannel(HandleTrackingFS.java:197)
at org.apache.lucene.mockfile.HandleTrackingFS.newFileChannel(HandleTrackingFS.java:166)
at java.base/java.nio.channels.FileChannel.open(FileChannel.java:292)
at java.base/java.nio.channels.FileChannel.open(FileChannel.java:340)
at org.elasticsearch.index.translog.Translog.openReader(Translog.java:279)
at org.elasticsearch.index.translog.Translog.recoverFromFiles(Translog.java:225)
at org.elasticsearch.index.translog.Translog.<init>(Translog.java:177)
at org.elasticsearch.index.translog.TranslogTests$4.<init>(TranslogTests.java:2128)
at org.elasticsearch.index.translog.TranslogTests.getFailableTranslog(TranslogTests.java:2128)
at org.elasticsearch.index.translog.TranslogTests.testExceptionOnTrimAboveSeqNo(TranslogTests.java:1552)
There was a problem hiding this comment.
sorry - that rule is defined in LuceneTestCase rather than in ESTestCase
… fixes for PR#6 Relates to elastic#10708
… fixes for PR#6 Relates to elastic#10708
bleskes
left a comment
There was a problem hiding this comment.
Thanks @vladimirdolzhenko . I did another round. Getting close.
| IOUtils.closeWhileHandlingException(newReaders); | ||
| IOUtils.closeWhileHandlingException(current); | ||
| IOUtils.closeWhileHandlingException(readers); | ||
| close(); |
There was a problem hiding this comment.
please wrap with try catch and add any exception as an suppressed exception to e
| final long globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
| final long minTranslogGeneration = -1L; | ||
| return new Checkpoint(in.readLong(), in.readInt(), in.readLong(), minSeqNo, maxSeqNo, globalCheckpoint, minTranslogGeneration); | ||
| final long minTranslogGeneration = SequenceNumbers.UNASSIGNED_SEQ_NO; |
There was a problem hiding this comment.
?? this is not a sequence number??
| */ | ||
| TranslogReader closeIntoTrimmedReader(long belowTerm, long aboveSeqNo, ChannelFactory channelFactory) throws IOException { | ||
| ensureOpen(); | ||
| if (getPrimaryTerm() < belowTerm && aboveSeqNo < checkpoint.maxSeqNo) { |
There was a problem hiding this comment.
See #30176 (comment) , which I think you missed. This method is called close and I think it should always close. term based checking can be done on the translog layer
There was a problem hiding this comment.
nope, the check is in Translog - agree that ensureOpen check in TranslogReader can go.
|
|
||
| @TestLogging("org.elasticsearch.index.shard:TRACE,org.elasticsearch.action.resync:TRACE") | ||
| public void testResyncAfterPrimaryPromotion() throws Exception { | ||
| // TODO: check translog trimming functionality once it's implemented |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| assertThat(task.getResyncedOperations(), greaterThanOrEqualTo(extraDocs)); | ||
| } | ||
| shards.assertAllEqual(initialDocs + extraDocs); | ||
| List<IndexShard> replicas = shards.getReplicas(); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| Translog.Operation next; | ||
| while ((next = snapshot.next()) != null) { | ||
| translogOperations++; | ||
| assertTrue("unexpected op: " + next, next.seqNo() < initialDocs + extraDocs); |
There was a problem hiding this comment.
can we strength this and look at doc ids also confirm they have the right seq# + term as on the primary ? (now you can have a seq# that happens to be fine but with the wrong doc)
| } | ||
|
|
||
| primaryTerm.incrementAndGet(); | ||
| translog.rollGeneration(); |
There was a problem hiding this comment.
sorry, i thought it relates to testRandomExceptionsOnTrimOperations
| final Translog failableTLog = | ||
| getFailableTranslog(fail, config, randomBoolean(), false, null, createTranslogDeletionPolicy(), fileChannels); | ||
|
|
||
| expectThrows(IOException.class, |
There was a problem hiding this comment.
This enforce that the exception is thrown, but the current code doesn't always throw one.
| expectThrows(IOException.class, | ||
| () -> { | ||
| int translogOperations = 0; | ||
| int maxAttempts = randomIntBetween(5, 10); |
There was a problem hiding this comment.
just fix a number, I don't think randomizing this adds much.
| fileChannels.stream().filter(f -> f.isOpen()).findFirst().isPresent(), is(false)); | ||
|
|
||
| final AlreadyClosedException alreadyClosedException = expectThrows(AlreadyClosedException.class, () -> failableTLog.newSnapshot()); | ||
| assertThat(alreadyClosedException.getMessage(), |
There was a problem hiding this comment.
can you check that you can reopen the translog and that you can at least read all operations that weren't trimmed? this should not result in a translog corruption.
… Relates to elastic#10708 added TODO note for test regarding Lucene rollback
… Relates to elastic#10708 dropped extra debug info of replicas seq# stats
… Relates to elastic#10708 check source of a doc after trimming
… Relates to elastic#10708 minTranslogGeneration has NOT to refer to SeqNumbers
|
@elasticmachine test this please |
bleskes
left a comment
There was a problem hiding this comment.
LGTM. I left one suggestion that I don't feel needs another cycle. Thanks @vladimirdolzhenko for all the iterations.
| operationsList | ||
| .stream() | ||
| // handle all expect last one - it has `current` ops | ||
| .limit(operationsList.size() - 1) |
There was a problem hiding this comment.
"Current" ops should never be trimmed due to our assumptions, that said I think it's good not to model that here but rather make sure that the way we generate ops conforms to those assumptions?
|
thanks @bleskes for comments - indeed it's better to keep current operations and other operations apart, could you please have a look ? |
|
@elasticmachine test this please |
|
@vladimirdolzhenko I think there's some misunderstanding (I'm happy to discuss). What I meant in my comment is that we shouldn't make any distinction between current and the rest in the competing mock translog. That one should be just a dumb array of operations where trim simply. |
|
@bleskes thank you 👍 indeed - it should be no any assumption what to trim - all or (all expect current) - current has not to contain any op with |
|
@elasticmachine test this please |
| return IntStream.range(0, size + 1) | ||
| // current + reverse traverse of operations | ||
| .mapToObj(i -> i == 0 ? currentOperations : operationsList.get(size - i)) | ||
| return IntStream.range(0, size) |
There was a problem hiding this comment.
why just use operationList.stream() ?
There was a problem hiding this comment.
I'm sorry. got confused. nevermind
There was a problem hiding this comment.
That said, instead of being fancy here, we can do this on add and existing ops if they have the same seq# but a lower term. That will improve the test.
|
@bleskes thanks, it makes it way simpler |
| void rollGeneration() { | ||
| operationsList.add(new LinkedList<>()); | ||
| final Translog.Operation old = operations.put(operation.seqNo(), operation); | ||
| assert old == null || old.primaryTerm() <= operation.primaryTerm(); |
|
thanks a lot @bleskes for review and comments |
* master: Move default location of dependencies report (#31228) Remove dependencies report task dependencies (#31227) Add recognition of MPL 2.0 (#31226) Fix unknown licenses (#31223) Remove version from license file name for GCS SDK (#31221) Fully encapsulate LocalCheckpointTracker inside of the engine (#31213) [DOCS] Added 'fail_on_unsupported_field' param to MLT. Closes #28008 (#31160) Add licenses for transport-nio (#31218) Remove DocumentFieldMappers#simpleMatchToFullName. (#31041) Allow to trim all ops above a certain seq# with a term lower than X, post backport fix (#31211) Compliant SAML Response destination check (#31175) Remove DocumentFieldMappers#smartNameFieldMapper, as it is no longer needed. (#31018) Remove extraneous references to 'tokenized' in the mapper code. (#31010) Allow to trim all ops above a certain seq# with a term lower than X (#30176) SQL: Make a single JDBC driver jar (#31012) Enhance license detection for various licenses (#31198) [DOCS] Add note about long-lived idle connections (#30990) Move number of language analyzers to analysis-common module (#31143) Default max concurrent search req. numNodes * 5 (#31171) flush job to ensure all results have been written (#31187)
…ecker * elastic/master: (309 commits) [test] add fix for rare virtualbox error (elastic#31212) Move default location of dependencies report (elastic#31228) Remove dependencies report task dependencies (elastic#31227) Add recognition of MPL 2.0 (elastic#31226) Fix unknown licenses (elastic#31223) Remove version from license file name for GCS SDK (elastic#31221) Fully encapsulate LocalCheckpointTracker inside of the engine (elastic#31213) [DOCS] Added 'fail_on_unsupported_field' param to MLT. Closes elastic#28008 (elastic#31160) Add licenses for transport-nio (elastic#31218) Remove DocumentFieldMappers#simpleMatchToFullName. (elastic#31041) Allow to trim all ops above a certain seq# with a term lower than X, post backport fix (elastic#31211) Compliant SAML Response destination check (elastic#31175) Remove DocumentFieldMappers#smartNameFieldMapper, as it is no longer needed. (elastic#31018) Remove extraneous references to 'tokenized' in the mapper code. (elastic#31010) Allow to trim all ops above a certain seq# with a term lower than X (elastic#30176) SQL: Make a single JDBC driver jar (elastic#31012) Enhance license detection for various licenses (elastic#31198) [DOCS] Add note about long-lived idle connections (elastic#30990) Move number of language analyzers to analysis-common module (elastic#31143) Default max concurrent search req. numNodes * 5 (elastic#31171) ...
Allow to trim all ops above a certain seq# with a term lower than X
Relates to #10708