Retain soft-deleted documents for rollback#31846
Retain soft-deleted documents for rollback#31846dnhatn wants to merge 23 commits intoelastic:ccrfrom
Conversation
An operation whose seqno is greater than the global checkpoint is subject to undoing when the primary fails over. If that operation updates or deletes existing documents in Lucene, those documents are also subject to undoing. Thus, we need to retain them during merges until they are no longer subject to rollback.
|
Pinging @elastic/es-distributed |
|
I looked at this and I was a bit unhappy that we had to make the resolving methods more complicated by introducing another wrapper class. I think that There is some bwc complications that we agreed to address in a backport pr only. |
|
@bleskes I've updated the resolveDocSeqNo method. Can you have a look? Thank you. |
| } else { | ||
| plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, | ||
| index.seqNo(), index.version()); | ||
| plan = IndexingStrategy.processAsStaleOp(softDeleteEnabled, index.seqNo(), index.version(), prevSeqNo.getAsLong()); |
There was a problem hiding this comment.
can you add an assertion that loads the primary term and checks equality?
There was a problem hiding this comment.
I added this assertion but it was violated. I added a TODO in resolveDocSeqNo method
| this.indexIntoLucene = indexIntoLucene; | ||
| this.addStaleOpToLucene = addStaleOpToLucene; | ||
| this.seqNoOfNewerVersion = seqNoOfNewerVersion; | ||
| assert addStaleOpToLucene == false || seqNoOfNewerVersion >= 0 : "stale op [" + seqNoForIndexing + "] with invalid newer seqno"; |
There was a problem hiding this comment.
also assert it's higher than the seqNoForIndexing?
There was a problem hiding this comment.
also assert that if addStateOpToLucene is true, seqNoNewerVersion is unassigned? (move the assertion next to the other ones please)
| final long versionForIndexing; | ||
| final boolean indexIntoLucene; | ||
| final boolean addStaleOpToLucene; | ||
| final long seqNoOfNewerVersion; // the seqno of the newer copy of this _uid if exists; otherwise -1 |
There was a problem hiding this comment.
maybe call this seqNoOfNewerDocIfStale? (trying to avoid the word version as it's confusing)
| } else { | ||
| plan = DeletionStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, | ||
| delete.seqNo(), delete.version()); | ||
| plan = DeletionStrategy.processAsStaleOp(softDeleteEnabled, false, delete.seqNo(), delete.version()); |
There was a problem hiding this comment.
same comment about an assertion loading the term if seq no is equal
There was a problem hiding this comment.
We have to wait for the actual rollback to add the assertion here. I added a TODO in resolveDocSeqNo.
jpountz
left a comment
There was a problem hiding this comment.
The parts that I understand look good, @bleskes kindly provided context so that I better understand this change. I just left a note that the new retention query might be slow. There are potential way that we could reduce the slow down by adding a small cache of the max value of the UPDATED_BY_SEQNO_NAME field as a follow-up.
| .add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getMinRetainedSeqNo(), Long.MAX_VALUE), BooleanClause.Occur.SHOULD) | ||
| .add(NumericDocValuesField.newSlowRangeQuery(SeqNoFieldMapper.UPDATED_BY_SEQNO_NAME, | ||
| globalCheckpointSupplier.getAsLong() + 1, Long.MAX_VALUE), BooleanClause.Occur.SHOULD) | ||
| .build(); |
There was a problem hiding this comment.
adding a doc-value query as a SHOULD clause might mean we need to do a linear scan to find matches of this query. We don't have much of a choice so I would still let the change in and benchmark, but we might want to still add a comment here.
@bleskes The retention is tested in Engine#testForceMergeWithSoftDeletesRetention. |
| Map<Long, Long> seqNos = readUpdatedBySeqNos.apply(engine); | ||
| assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 | ||
| assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 | ||
| assertThat(seqNos, hasEntry(2L, 3L)); // 2 -> 3 (stale) |
There was a problem hiding this comment.
+1 . Can we also have stale delete operations ?
There was a problem hiding this comment.
why don't we have an entry for term seq3? also check that the map size is 3?
| engine.index(replicaIndexForDoc(createParsedDoc("1", null), 5, 4, false)); | ||
| Map<Long, Long> seqNos = readUpdatedBySeqNos.apply(engine); | ||
| assertThat(seqNos, hasEntry(0L, 1L)); // 0 -> 1 | ||
| assertThat(seqNos, hasEntry(1L, 4L)); // 1 -> 3 -> 4 |
| @@ -1314,7 +1297,8 @@ private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) | |||
| if (plan.addStaleOpToLucene || plan.currentlyDeleted) { | |||
There was a problem hiding this comment.
do we need UPDATED_BY_SEQNO_NAME to be updated here too?
There was a problem hiding this comment.
Good question. A regular delete tombstone is soft-deleted before indexing into Lucene, thus "softUpdateDocument" method won't affect on that document. I think we should not add the updated_by_seqno field to the stale deletes to ensure the consistency between deletes tombstones(i.e., all deletes don't have updated_by_seqno value).
However, I found the case which might be an issue if we remove the safe commit. This happens as follows:
- Index a doc (version=1) (seq#0)
- Delete that doc (seq#1)
- Index that doc (version=2) (seq#2)
Suppose the global checkpoint is 1, and the seq#2 in the danger zone, we trigger force merge. The problem is seq#2 only updates the updated_by_seqno of seq#0 because the delete is invisible while seq#0 is still visible (until refresh). A force merge will reclaim the delete and retain seq#0 and seq#2. A rollback seq#2 would restore seq#0 which is incorrect.
We can avoid this issue if a document is invisible immediately after it's soft-deleted.
There was a problem hiding this comment.
we discussed this yesterday and this has to do with the fact that lucene treats a document that is index as soft deleted (i.e., the tombstone) differently than a doc that is index and then later soft deleted while that doc is in the indexing buffer. As a work around we decide to detect this and force a refresh to maintain semantics. This will allow this work to continue while we work on a solution in lucene.
| engine.refresh("test"); | ||
| Map<Long, Long> seqNos = readUpdatedBySeqNos.apply(engine); | ||
| assertThat(seqNos, hasEntry(0L, 2L)); // 0 -> 1 -> 2 | ||
| assertThat(seqNos, hasEntry(1L, 2L)); // 1 -> 2 |
There was a problem hiding this comment.
assert that the total size is what we expect?
sorry. I have no idea why I missed the tests. I left some comments there. |
|
I have another approach that requires some changes:
Suppose we have three operations of a single document id index-1 (i1), delete-2 (d2), and index-3 (i3) with the following processing orders: a. i1, d2, i3 -> updated_by_seqno { 2, _, _ } c. d2, i1, i3 -> updated_by_seqno { 2, _, _ } e. i3, i1, d2 -> updated_by_seqno { 3, 3, _}
I think we are good now with these changes. WDYT? |
|
Discussed with Boaz, we agreed not to proceed with this approach because:
We will instead go with Simon's approach which re-adds a delete tombstone with |
|
@bleskes It's ready for another go. Can you please have a look? Thank you! |
|
@dnhatn and I discussed the current approach and sadly it still has some bugs - consider the following indexing order for the same doc: index v1 (seq 2) , delete (seq 4) and index (seq 10). With the current approach that loads the current doc version from lucene with a normal searcher, when index (seq 10) comes in, it will not see the delete (seq 4) and will not add an extra tombstone doc with update_by_seq_no set to 10. We can fix this by using a reader that exposes tombstone docs but then we have another problem - if the global checkpoint is 6, the merge policy is free to reclame the delete tombstone (it's not marked with update by seq no yet) and it is free to leave the index v1 (seq 2) in the index. This will trick the rollback to think index v1 is the rollback doc. We've had some ideas on other approaches but it becomes clear that our testing is lacking as we keep finding issues during reviews rather than by failing tests. Nhat and I had an idea on how to write a test that should cover all these edge cases and that's going to be a first goal now (i.e., having a test that fails for this issue and also the problems we found with previous iterations). We're going to give this a day or two and if we fail to be able to write a test we feel confident about we're going to invest in a TLA+ model before proceeding. We chose to wait with the TLA+ model as it will take some time to write and will be based on our current model of lucene (which may be flawed - we are still working on mapping it out) rather than using lucene it self. |
|
@bleskes I've added a test which simulates Lucene merge in f6487e1. This test is able to detect the current issue. Below is an example that it found. Can you please have a look? (/cc @ywelsch) |
| List<Engine.Operation> operations = new ArrayList<>(); | ||
| int numOps = scaledRandomIntBetween(10, 200); | ||
| for (int seqNo = 0; seqNo < numOps; seqNo++) { | ||
| String id = Integer.toString(between(1, 5)); |
There was a problem hiding this comment.
why not use the random history generator? maybe we should extend it if we need to?
There was a problem hiding this comment.
I adjusted the history generator method and use it here.
| .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true); | ||
| final IndexMetaData indexMetaData = IndexMetaData.builder(defaultSettings.getIndexMetaData()).settings(settings).build(); | ||
| final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetaData); | ||
| realisticShuffleOperations(operations); |
There was a problem hiding this comment.
why be realistic here? Pure random is a better test, no?
There was a problem hiding this comment.
I ran several hundred iterations with the pure random but no failure. The reason is the operations were shuffled so much so that the local checkpoint was not advanced enough to allow us to reclaim documents or tombstone. Then I introduced this shuffle method.
| } | ||
| processedOps.put(op.seqNo(), op); | ||
| if (between(1, 20) == 1) { | ||
| assertDocumentsForRollback(engine, globalCheckpoint, processedOps); |
There was a problem hiding this comment.
An interesting case is when the updated_by_seqno is updated twice. This method needs to refresh, and if we refresh every operation, that case will disappear.
| if (between(1, 5) == 1) { | ||
| engine.maybePruneDeletes(); | ||
| } | ||
| if (between(1, 20) == 1) { |
|
@dnhatn I think that test is good. I would prefer to change it a bit to not rely on the engine at all in it's expectations - it knows the series of indexing operations it has performed and thus can figure out what the "rollback" version should be. I also left some questions about things that weren't clear to me. |
|
sorry. I got confused, the test already keeps track of indexed ops on it's own and uses that to compute expected rollback targets. |
|
@dnhatn I think that we can close this PR for now? |
|
I am closing this as we are going to implement a commit-based rollback. |
An operation whose seqno is greater than the global checkpoint is
subject to undoing when the primary fails over. If that operation
updates or deletes existing documents in Lucene, those documents are
also subject to undoing. Thus, we need to retain them during merges
until they are no longer subject to rollback.