Translog file recovery should not rely on lucene commits#25005
Translog file recovery should not rely on lucene commits#25005bleskes merged 12 commits intoelastic:masterfrom
Conversation
|
retest this please |
When we open a translog, we rely on the `translog.ckp` file to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next we open the translog we will ignore those files and never delete them (I have added tests for this). This PR changes the approach to have the translog store both of those numbers in the `translog.ckp`. This means it's more self contained and easier to control. This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduce elastic#24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around, and be free to recover from any of them.
4f7f33a to
2e4a617
Compare
jasontedor
left a comment
There was a problem hiding this comment.
Let's figure out something better than that disgusting hack. Otherwise it looks good.
| public void testSyncedFlush() throws IOException { | ||
| try (Store store = createStore(); | ||
| Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { | ||
| Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) { |
There was a problem hiding this comment.
well, it's part of the try() clause and not the body, I think this is good ? (it's also the auto formatter)
| try (Store store = createStore(); | ||
| Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), | ||
| new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP | ||
| Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), |
There was a problem hiding this comment.
well, it's part of the try() clause and not the body, I think this is good ? (it's also the auto formatter)
| FileChannel::open, | ||
| TranslogConfig.DEFAULT_BUFFER_SIZE, | ||
| () -> globalCheckpoint, () -> generation)) { | ||
| } |
There was a problem hiding this comment.
It looks like it was unnecessary to touch this file?
There was a problem hiding this comment.
I need to add the new parameters?
| } | ||
| } | ||
|
|
||
| // commit hook for testing |
There was a problem hiding this comment.
It's a matter of taste, but I no longer find these comments to be useful.
There was a problem hiding this comment.
I don't mind. just following conventions.
There was a problem hiding this comment.
And I'm doing what I can to influence abandonment of it.
There was a problem hiding this comment.
well, you're the reviewer for this one and I don't mind - so gone it is.
| } | ||
|
|
||
| // commit hook for testing | ||
| void callCommitOnWriter(IndexWriter writer) throws IOException { |
There was a problem hiding this comment.
I think it should just be called commit or commitWriter.
There was a problem hiding this comment.
I struggled with this name. We already have a commitIndexWriter, so I figured to be explicit and call this exactly what this does.
There was a problem hiding this comment.
I think we can achieve the same without introducing this method (that has potential to be used in the wrong place) with the following:
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 18fafb6e90..8bbf320e26 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -1778,7 +1778,7 @@ public class InternalEngine extends Engine {
* @param syncId the sync flush ID ({@code null} if not committing a synced flush)
* @throws IOException if an I/O exception occurs committing the specfied writer
*/
- private void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
+ void commitIndexWriter(final IndexWriter writer, final Translog translog, @Nullable final String syncId) throws IOException {
ensureCanFlush();
try {
final long localCheckpoint = seqNoService().getLocalCheckpoint();
@@ -1810,7 +1810,7 @@ public class InternalEngine extends Engine {
return commitData.entrySet().iterator();
});
- callCommitOnWriter(writer);
+ writer.commit();
} catch (final Exception ex) {
try {
failEngine("lucene commit failed", ex);
@@ -1837,11 +1837,6 @@ public class InternalEngine extends Engine {
}
}
- // commit hook for testing
- void callCommitOnWriter(IndexWriter writer) throws IOException {
- writer.commit();
- }
-
private void ensureCanFlush() {
// translog recover happens after the engine is fully constructed
// if we are in this stage we have to prevent flushes from this
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index a767e50d4e..a10381be4a 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -1163,8 +1163,9 @@ public class InternalEngineTests extends ESTestCase {
}
public void testSyncedFlush() throws IOException {
- try (Store store = createStore();
- Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
+ try (
+ Store store = createStore();
+ Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), new LogByteSizeMergePolicy(), null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
engine.index(indexForDoc(doc));
@@ -2496,8 +2497,8 @@ public class InternalEngineTests extends ESTestCase {
final Path translogPath = createTempDir();
try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) {
@Override
- void callCommitOnWriter(IndexWriter writer) throws IOException {
- super.callCommitOnWriter(writer);
+ void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException {
+ super.commitIndexWriter(writer, translog, syncId);
if (throwErrorOnCommit.get()) {
throw new RuntimeException("power's out");
}What do you think?
| final long minGenerationToRecoverFrom; | ||
| if (checkpoint.minTranslogGeneration < 0) { | ||
| final Version indexVersionCreated = indexSettings().getIndexVersionCreated(); | ||
| assert indexVersionCreated.before(Version.V_6_0_0_alpha2) : |
There was a problem hiding this comment.
I think this needs to be Version.V_6_0_0_alpha3 now.
| Collections.reverse(foundTranslogs); | ||
|
|
||
| // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them | ||
| // if we crush just at the wrong moment, it may be that we leave one unreferenced file behind. Delete it if there |
There was a problem hiding this comment.
. Delete it if there -> so we delete it if they exist
| } | ||
| Collections.reverse(foundTranslogs); | ||
|
|
||
| // when we clean up files, we first update the checkpoint with a new minReferencedTranslog and then delete them |
| * Returns the minimum file generation referenced by the translog | ||
| */ | ||
| long getMinFileGeneration() { | ||
| try (ReleasableLock ignored = readLock.acquire()) { |
There was a problem hiding this comment.
Let's change the conditional so we can avoid the negative:
if (readers.isEmpty()) {
return current.getGeneration()
} else {
return readers.get(0).getGeneration();
}| globalCheckpointSupplier); | ||
| globalCheckpointSupplier, | ||
| minTranslogGenerationSupplier | ||
| ); |
There was a problem hiding this comment.
Can we place this on the end of the previous line?
|
Thx @jasontedor . I addressed all your comments. |
s1monw
left a comment
There was a problem hiding this comment.
left some nits LGTM otherwise
| IOUtils.closeWhileHandlingException(unreferencedReader); | ||
| IOUtils.deleteFilesIgnoringExceptions(translogPath, | ||
| translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); | ||
| // update the checkpoint not to reference the removed file |
There was a problem hiding this comment.
nit: can this comment be more clear ie. tell us what we update to make sure we don't ref this file.
| "deletion policy requires a minReferenceGen of [" + minReferencedGen + "] which is higher than the current generation [" | ||
| + currentFileGeneration() + "]"; | ||
|
|
||
| while (readers.isEmpty() == false && readers.get(0).getGeneration() < minReferencedGen) { |
There was a problem hiding this comment.
can we use an iterator here instead? it would be more clear to me if we'd do that..
| IOUtils.deleteFilesIgnoringExceptions(translogPath, | ||
| translogPath.resolveSibling(getCommitCheckpointFileName(unreferencedReader.getGeneration()))); | ||
| // update the checkpoint not to reference the removed file | ||
| current.sync(); |
There was a problem hiding this comment.
should we try to delete in a finally block here? best effort?
There was a problem hiding this comment.
If we fail to sync, I think we want to keep the file around because it's being referenced by the ckp?
| assertFalse("translog [" + id + "] still exists", Files.exists(translog.location().resolve(Translog.getFilename(id)))); | ||
| } | ||
|
|
||
| private void assertFilesPresence(Translog translog) { |
There was a problem hiding this comment.
I think to be ultra-pedantic it should be assertFilePresences.
| try (Store store = createStore(); | ||
| Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), | ||
| new LogByteSizeMergePolicy(), null))) { // use log MP here we test some behavior in ESMP | ||
| Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), |
…in_checkpoint # Conflicts: # core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
jasontedor
left a comment
There was a problem hiding this comment.
I left a few more comments.
589c735 to
440ecc9
Compare
| final Path translogPath = createTempDir(); | ||
| try (InternalEngine engine = new InternalEngine(config(defaultSettings, store, translogPath, newMergePolicy(), null, null)) { | ||
| @Override | ||
| protected void commitIndexWriter(IndexWriter writer, Translog translog, String syncId) throws IOException { |
There was a problem hiding this comment.
instead of making this method protected I think we should use MockDirectoryWrapper#failOn(Failure) and pass some failure to it that fails if we commit the indexwriter like this:
Failure fail = new Failure() {
@Override
public void eval(MockDirectoryWrapper dir) throws IOException {
for (StackTraceElement e : Thread.currentThread().getStackTrace()) {
if (doFail && "commit".equals(e.getMethodName())) {
throw new FakeIOException();
}
}
}
};There was a problem hiding this comment.
@s1monw I tried this is in many variants and non of them was good. The Failure as stated fails too early (before the segmetns_N file is written). I tried many other variants but none of them allow all of the commit logic to complete without triggering any failure handling in the IndexWriter which also means later on that the new commits files are cleaned by a rollback we do when we fail the engine. Even if we do find a way to do this, I think it will be way too brittle and tend to break with changes in Lucene. Bottom line, I prefer to keep the current solution.
…in_checkpoint # Conflicts: # core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java # core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
|
Thanks @jasontedor @s1monw for the thorough review. |
When we open a translog, we rely on the
translog.ckpfile to tell us what the maximum generation file should be and on the information stored in the last lucene commit to know the first file we need to recover. This requires coordination and is currently subject to a race condition: if a node dies after a lucene commit is made but before we remove the translog generations that were unneeded by it, the next time we open the translog we will ignore those files and never delete them (I have added tests for this).This PR changes the approach to have the translog store both of those numbers in the
translog.ckp. This means it's more self contained and easier to control.This change also decouples the translog recovery logic from the specific commit we're opening. This prepares the ground to fully utilize the deletion policy introduced in #24950 and store more translog data that's needed for Lucene, keep multiple lucene commits around and be free to recover from any of them.