Remove TranslogRecoveryPerformer#24858
Conversation
945a7d7 to
b0ad489
Compare
b0ad489 to
ca5b9f9
Compare
| /** | ||
| * Returns statistics object for the translog. Used during translog recovery, see also {@link Engine#recoverFromTranslog()} | ||
| */ | ||
| public RecoveryState.Translog getTranslogStats() { |
There was a problem hiding this comment.
nit: call this (and derivatives) getRecoveryTranslogStats?
| for (Translog.Operation op : operations) { | ||
| Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); | ||
| if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { | ||
| translog.decrementRecoveredOperations(completedOps); // clean-up stats |
There was a problem hiding this comment.
I think this is a tricky place to put this - it doesn't really know what the retry semantics are (that we always retry a full batch). This is why we had the BatchOperationException. If we want to remove it from the "official exception list" (+1 on that), we can still make BatchOperationException dedicated non ElasticsearchException by always rethrowing it's cause.
There was a problem hiding this comment.
can we also have mapping updates for deletes? I wonder if that is the case now that we allow type introduction for deletes too?!
There was a problem hiding this comment.
I think the mapping is tricky too, I am not sure if we can hit it because of a broken mapping or anything in which case we should fail the recovery? Maybe we can use DelayRecoveryException for this purpose instead? it's really nothing else but a delay?
There was a problem hiding this comment.
I've changed the flow of this method to first do the conversion for all the operations in the batch and then only proceed with the actual indexing once we've confirmed that there were no mapping updates. This makes the BatchOperationException obsolete.
@s1monw yes, the method TransportShardBulkAction.executeDeleteRequestOnReplica currently has other mapping conditions than our recovery code here. The main motivation for this PR was to change some of the internal APIs to address the divergence between recovery and replication code. We can fix the actual divergences in a follow-up.
…he recovery logic from a snapshot
|
the latest commit pushed me over to +1... |
s1monw
left a comment
There was a problem hiding this comment.
first I got exited, I thought @ywelsch found a way to encapsulate the translog recovery entirely inside the engine... Well I am still excited since it's moving things into the right places IMO. I also like to get rid of another exception. I left some suggestions, thanks @ywelsch for cleaning things up
| for (Translog.Operation op : operations) { | ||
| Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); | ||
| if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { | ||
| translog.decrementRecoveredOperations(completedOps); // clean-up stats |
There was a problem hiding this comment.
can we also have mapping updates for deletes? I wonder if that is the case now that we allow type introduction for deletes too?!
| for (Translog.Operation op : operations) { | ||
| Engine.Operation engineOp = indexShard().convertToEngineOp(op, Engine.Operation.Origin.PEER_RECOVERY); | ||
| if (engineOp instanceof Engine.Index && ((Engine.Index) engineOp).parsedDoc().dynamicMappingsUpdate() != null) { | ||
| translog.decrementRecoveredOperations(completedOps); // clean-up stats |
There was a problem hiding this comment.
I think the mapping is tricky too, I am not sure if we can hit it because of a broken mapping or anything in which case we should fail the recovery? Maybe we can use DelayRecoveryException for this purpose instead? it's really nothing else but a delay?
| @Override | ||
| public long indexTranslogOperations( | ||
| List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer.BatchOperationException { | ||
| public long indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws MapperException, IOException { |
There was a problem hiding this comment.
any chance we can get a unittest for this?
There was a problem hiding this comment.
There is already extensive test coverage for this (e.g. test subclasses of ESIndexLevelReplicationTestCase).
| exception); | ||
| final RecoveryState.Translog translog = recoveryTarget.state().getTranslog(); | ||
| translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops | ||
| logger.trace("delaying recovery due to missing mapping changes", exception); |
There was a problem hiding this comment.
I am leaning towards making this a debug or remove it entirely. I really think it's worth a debug statement
There was a problem hiding this comment.
ok, I've changed it to "debug" level in 6535937
| return mapperService.documentMapperWithAutoCreate(type); // protected for testing | ||
| } | ||
|
|
||
| public Engine.Operation convertToEngineOp(Translog.Operation operation, Engine.Operation.Origin origin) { |
There was a problem hiding this comment.
this class seems simple enough to be unittested? Maybe we can add a test based on IndexShardTestCase
| return translogOpToEngineOpConverter.convertToEngineOp(operation, origin); | ||
| } | ||
|
|
||
| private int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot) throws IOException { |
There was a problem hiding this comment.
any chance we can add a test for this to IndexShardTests
| return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); | ||
| } | ||
|
|
||
| public Engine.Result applyOperation(Engine.Operation operation) throws IOException { |
There was a problem hiding this comment.
please add a javadoc to this.
…eryperformer-out-of-engine-config
| exception); | ||
| final RecoveryState.Translog translog = recoveryTarget.state().getTranslog(); | ||
| translog.decrementRecoveredOperations(exception.completedOperations()); // do the maintainance and rollback competed ops | ||
| logger.debug("delaying recovery due to missing mapping changes", exception); |
There was a problem hiding this comment.
unrelated, but can this still happen today? do we want to assert here?
There was a problem hiding this comment.
I think this can still happen (although very rarely). In theory, we could avoid this by doing something similar as calling recoveryTarget.ensureClusterStateVersion(currentClusterStateVersion) before phase2. The tricky bit is that the current primary might have indexed something based on a mapping change that has not been fully applied yet (i.e. is being applied, but not yet available under ClusterService.state(). In this case we would rather want to know about the pre-applied state).
| throw new IndexShardNotRecoveringException(shardId, indexShard().state()); | ||
| } | ||
| // first convert all translog operations to engine operations to check for mapping updates | ||
| List<Engine.Operation> engineOps = operations.stream().map( |
Splits TranslogRecoveryPerformer into three parts:
This makes it possible for peer recovery to use the same IndexShard interface as bulk shard requests (i.e. Engine operations instead of Translog operations). It also pushes the "fail on bad mapping" logic outside of IndexShard. Future pull requests could unify the BulkShard and peer recovery path even more.