Add primary term to translog header#29227
Conversation
This commit adds the current primary term to the header of the current translog file. Having a term in a translog file allows us to trim translog operations in that file given the max valid seq# for that term. This commit also updates tests to conform the primary term invariant which guarantees that all translog operations in a translog file have its terms at most the term stored in the translog header.
|
Pinging @elastic/es-distributed |
|
@bleskes Please let me know if I need to split this into two separate PRs. |
s1monw
left a comment
There was a problem hiding this comment.
left some initial comments. looks good
| // Write primary term | ||
| out.writeLong(primaryTerm); | ||
| // Checksum header | ||
| out.writeInt((int) out.getChecksum()); |
There was a problem hiding this comment.
I think you should call out.flush() here to ensure everything is written to the channel.
|
|
||
| /** | ||
| * Writes this header with the latest format into the file channel | ||
| */ |
There was a problem hiding this comment.
check if you need to put a SuppressWarning here since we don't close the channel.
There was a problem hiding this comment.
Yes. I added a SuppressWarning for both Eclipse and Intellij.
| try { | ||
| InputStreamStreamInput headerStream = new InputStreamStreamInput(java.nio.channels.Channels.newInputStream(channel), | ||
| channel.size()); // don't close | ||
| // Lucene's CodecUtil writes a magic number of 0x3FD76C17 with the |
There was a problem hiding this comment.
this PR removes this LUCENE_CODEC_HEADER_BYTE check etc. since we don't support it, correct. Maybe we should keep it and fail with a descriptive error message instead?
# Conflicts: # server/src/test/java/org/elasticsearch/index/engine/EngineDiskUtilsTests.java # server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java
|
@s1monw I've addressed your comments. Would you please take another look? Thank you. |
bleskes
left a comment
There was a problem hiding this comment.
Looks good. I left some nits.
| public Translog( | ||
| final TranslogConfig config, final String translogUUID, TranslogDeletionPolicy deletionPolicy, | ||
| final LongSupplier globalCheckpointSupplier) throws IOException { | ||
| final LongSupplier globalCheckpointSupplier, final LongSupplier primaryTermSupplier) throws IOException { |
There was a problem hiding this comment.
can you add java docs for the term supplier ? also describe what the term means here - i.e., it is sampled when a generation is rolled and the translog will reject operations with a higher term until rolled again.
| TranslogWriter writer = TranslogWriter.create(shardId, translogUUID, 1, location.resolve(getFilename(1)), channelFactory, | ||
| new ByteSizeValue(10), 1, initialGlobalCheckpoint, | ||
| () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); } | ||
| () -> { throw new UnsupportedOperationException(); }, () -> { throw new UnsupportedOperationException(); }, 0L |
There was a problem hiding this comment.
shall we make people pass the term to this method? If it's difficult, it's not a big deal but I feel it will be good to have the right term on the file.
| private final long primaryTerm; | ||
| private final int headerSizeInBytes; | ||
|
|
||
| TranslogHeader(String translogUUID, long primaryTerm) { |
There was a problem hiding this comment.
can we add java docs about what the uuid and term mean? i.e., what do they enforce?
| return headerSizeInBytes; | ||
| } | ||
|
|
||
| static int defaultSizeInBytes(String translogUUID) { |
There was a problem hiding this comment.
nit: this is used in the constructor so it's not really default, maybe just headerSizeInBytes?
| // 0x00 => version 0 of the translog | ||
| final byte b1 = Channels.readFromFileChannel(channel, 0, 1)[0]; | ||
| if (b1 == 0x3f) { // LUCENE_CODEC_HEADER_BYTE | ||
| throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header. path:" + path); |
There was a problem hiding this comment.
can you clarify what you mean with version 1 or later?
There was a problem hiding this comment.
It means that the translog was created in ES 1.4 or later but corrupted. Moved this from
| assert primaryTerm >= 0 : "Primary term must be non-negative [" + primaryTerm + "]; translog path [" + path + "]"; | ||
| } else { | ||
| assert version == VERSION_CHECKPOINTS : "Unknown header version [" + version + "]"; | ||
| primaryTerm = UNKNOWN_PRIMARY_TERM; |
There was a problem hiding this comment.
UNKOWN_PRIMARY_TERM is -1. I wonder if we should 0 as a default here. Primary terms are always non negative.
There was a problem hiding this comment.
Yes, we can use 0L as the default. I pushed d50d7ed
| } | ||
| // Verify the checksum | ||
| if (version >= VERSION_PRIMARY_TERM) { | ||
| Translog.verifyChecksum(in); |
|
@dnhatn this has many many unrelated changes I think some merge was messed up? |
|
@dnhatn nervermind I think github messed something up when I check what changed |
# Conflicts: # server/src/main/java/org/elasticsearch/index/translog/Translog.java # server/src/test/java/org/elasticsearch/action/resync/ResyncReplicationRequestTests.java # server/src/test/java/org/elasticsearch/index/IndexModuleTests.java # server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java # server/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java # server/src/test/java/org/elasticsearch/index/translog/TranslogTests.java # server/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java
| // 0x00 => version 0 of the translog | ||
| final byte b1 = Channels.readFromFileChannel(channel, 0, 1)[0]; | ||
| if (b1 == 0x3f) { // LUCENE_CODEC_HEADER_BYTE | ||
| throw new TranslogCorruptedException("translog looks like version 1 or later, but has corrupted header. path:" + path); |
| TranslogWriter.writeHeader(out, translogRef); | ||
| fc.force(true); | ||
| try (FileChannel fc = FileChannel.open(filename, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW)) { | ||
| TranslogHeader header = new TranslogHeader(translogUUID, 0L); |
There was a problem hiding this comment.
Shall we use the DEFAULT_PRIMARY_TERM?
This change adds the current primary term to the header of the current translog file. Having a term in a translog header is a prerequisite step that allows us to trim translog operations given the max valid seq# for that term. This commit also updates tests to conform the primary term invariant which guarantees that all translog operations in a translog file have its terms at most the term stored in the translog header.
* master: Enable skipping fetching latest for BWC builds (elastic#29497) Add remote cluster client (elastic#29495) Ensure flush happens on shard idle Adds SpanGapQueryBuilder in the query DSL (elastic#28636) Control max size and count of warning headers (elastic#28427) Make index APIs work without types. (elastic#29479) Deprecate filtering on `_type`. (elastic#29468) Fix auto-generated ID example format (elastic#29461) Fix typo in max number of threads check docs (elastic#29469) Add primary term to translog header (elastic#29227) Add a helper method to get a random java.util.TimeZone (elastic#29487) Move TimeValue into elasticsearch-core project (elastic#29486)
* es/master: Add remote cluster client (#29495) Ensure flush happens on shard idle Adds SpanGapQueryBuilder in the query DSL (#28636) Control max size and count of warning headers (#28427) Make index APIs work without types. (#29479) Deprecate filtering on `_type`. (#29468) Fix auto-generated ID example format (#29461) Fix typo in max number of threads check docs (#29469) Add primary term to translog header (#29227) Add a helper method to get a random java.util.TimeZone (#29487) Move TimeValue into elasticsearch-core project (#29486) Fix NPE in InternalGeoCentroidTests#testReduceRandom (#29481) Build: introduce keystoreFile for cluster config (#29491) test: Index more docs, so that it is less likely the search request does not time out.
* es/6.x: Enable skipping fetching latest for BWC builds (#29497) Add remote cluster client (#29495) Ensure flush happens on shard idle Adds SpanGapQueryBuilder in the query DSL (#28636) Fix auto-generated ID example format (#29461) Fix typo in max number of threads check docs (#29469) Add primary term to translog header (#29227) Add a helper method to get a random java.util.TimeZone (#29487) Move TimeValue into elasticsearch-core project (#29486) Fix NPE in InternalGeoCentroidTests#testReduceRandom (#29481) Build: introduce keystoreFile for cluster config (#29491) test: Index more docs, so that it is less likely the search request does not time out.
This commit adds the current primary term to the header of the current translog file. Having a term in a translog header is a prerequisite step that allows us to trim translog operations given the max valid seq# for that term.
This commit also updates tests to conform the primary term invariant which guarantees that all translog operations in a translog file have its terms at most the term stored in the translog header.