Only retain reasonable history for peer recoveries#45208
Conversation
Today if a shard is not fully allocated we maintain a retention lease for a lost peer for up to 12 hours, retaining all operations that occur in that time period so that we can recover this replica using an operations-based recovery if it returns. However it is not always reasonable to perform an operations-based recovery on such a replica: if the replica is a very long way behind the rest of the replication group then it can be much quicker to perform a file-based recovery instead. This commit introduces a notion of "reasonable" recoveries. If an operations-based recovery would involve copying only a small number of operations, but the index is large, then an operations-based recovery is reasonable; on the other hand if there are many operations to copy across and the index itself is relatively small then it makes more sense to perform a file-based recovery. We measure the size of the index by computing its number of documents (including deleted documents) in all segments belonging to the current safe commit, and compare this to the number of operations a lease is retaining below the local checkpoint of the safe commit. We consider an operations-based recovery to be reasonable iff it would involve replaying at most 10% of the documents in the index. The mechanism for this feature is to expire peer-recovery retention leases early if they are retaining so much history that an operations-based recovery using that lease would be unreasonable. Relates elastic#41536
…elling all leases anyway
|
Pinging @elastic/es-distributed |
dnhatn
left a comment
There was a problem hiding this comment.
Looks good. I left some comments.
| * Defaults to retaining history for up to 10% of the documents in the shard. This can only be changed in tests, since this setting is | ||
| * intentionally unregistered. | ||
| */ | ||
| public static final Setting<Double> REASONABLE_OPERATIONS_BASED_RECOVERY_PROPORTION_SETTING |
There was a problem hiding this comment.
Should we put this setting in IndexSettings.java instead? Also, if this setting is dynamic, we should invalidate the cached value when it is updated.
There was a problem hiding this comment.
The Property.Dynamic property was leftover from an earlier iteration, removed in f806892, thanks. Yes you're right if we were consuming updates then we should also be invalidating the cache.
I'm not sure where we should declare this setting. It's basically a constant (except for tests) and this is where I'd naturally declare a constant.
There was a problem hiding this comment.
Moved in to IndexSettings in c220530 since that's about the most sensible place for it in the light of that change.
| public synchronized long getAsLong() { | ||
| try (Engine.IndexCommitRef safeCommitRef = getEngine().acquireSafeIndexCommit()) { | ||
| final IndexCommit safeCommit = safeCommitRef.getIndexCommit(); | ||
| final long generation = safeCommit.getGeneration(); |
There was a problem hiding this comment.
Perhaps use SegmentInfos#getId instead of the generation of the commit as the cache key. We can have the same generation for different index commits if we perform a file-based recovery.
|
|
||
| @Override | ||
| public synchronized long getAsLong() { | ||
| try (Engine.IndexCommitRef safeCommitRef = getEngine().acquireSafeIndexCommit()) { |
There was a problem hiding this comment.
Releasing a commit is quite expensive. Maybe expose a new method that returns the safe commit without acquiring. I think it's fine to use the last commit (we already exposed it) instead of the safe commit here.
There was a problem hiding this comment.
Please ignore this as it is not correct. We won't revisit the deletion policy on releasing a commit unless the safe commit advances.
There was a problem hiding this comment.
I too am a little uneasy with the costs of this. With this implementation each call to getAsLong() is synchronized and obtains a new commit ref, which might occasionally be expensive to release. It only does this on replication groups that aren't fully-assigned, of course. An alternative would be to recalculate this while advancing the safe commit, then this could just become a volatile read. I couldn't determine a neat way to hook this calculation in the right place. Any ideas?
There was a problem hiding this comment.
I tried moving the calculation to CombinedDeletionPolicy#updateRetentionPolicy in c220530. I think it makes sense there. WDYT?
|
|
||
| final long localCheckpoint = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); | ||
|
|
||
| final long totalDocs = StreamSupport.stream( |
There was a problem hiding this comment.
Maybe use Lucene.readSegmentInfos(safeCommit).totalMaxDoc()?
| final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); | ||
| softDeletesPolicy.setLocalCheckpointOfSafeCommit(localCheckpointOfSafeCommit); | ||
|
|
||
| final long docCountOfSafeCommit = getDocCountOfSafeCommit(); |
There was a problem hiding this comment.
Drive by comment: I think this will do IO while holding the monitor. This would then prevent concurrent acquireIndexCommit and releaseIndexCommit calls. A quick scrape of usages did not find any where this would hurt, but I would anyway suggest to do the calculation of minimumReasonableRetainedSeqNo outside the monitor instead.
There was a problem hiding this comment.
Yes, it will. We're already in a method that throws IOException but you're right, it doesn't look like it really does do much IO as it currently stands. Moved outside the mutex in 5860c41.
|
@elasticmachine please test this |
ywelsch
left a comment
There was a problem hiding this comment.
I've left mainly comments on naming and code structure, looking good o.w.
server/src/main/java/org/elasticsearch/index/IndexSettings.java
Outdated
Show resolved
Hide resolved
| * intentionally unregistered. | ||
| */ | ||
| public static final Setting<Double> REASONABLE_OPERATIONS_BASED_RECOVERY_PROPORTION_SETTING | ||
| = Setting.doubleSetting("index.recovery.reasonable_operations_based_recovery_proportion", 0.1, 0.0, Setting.Property.IndexScope); |
There was a problem hiding this comment.
I wonder if this should be called index.recovery.full_recovery_threshold or index.recovery.file_based_threshold.
It's essentially a threshold beyond which full file-based recoveries are preferred.
Same goes for the variable names throughout the rest of the PR
There was a problem hiding this comment.
Ok, changed to index.recovery.file_based_threshold in 13a4c38.
server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java
Outdated
Show resolved
Hide resolved
| final IndexCommit safeCommit = updateCommitsAndRetentionPolicy(commits); | ||
| final long localCheckpointOfSafeCommit = Long.parseLong(safeCommit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); | ||
| final long docCountOfSafeCommit = getDocCountOfSafeCommit(); | ||
| minimumReasonableRetainedSeqNo = localCheckpointOfSafeCommit + 1 |
There was a problem hiding this comment.
what was the reason to compute this value here and not in ReplicationTracker? This class could e.g. return a pre-computed safeCommitInfo that contains local checkpoint + number of docs.
The actual computation could then be done in replication tracker, where I think that logic fits more naturally (and avoids the slightly odd minimumReasonableRetainedSeqNo naming that we have here).
There was a problem hiding this comment.
It didn't really seem worth the extra plumbing to move this single expression, particularly since CombinedDeletionPolicy also has some bearing on history retention. It didn't seem any more natural to have it in ReplicationTracker. But I've done this in 13a4c38.
|
@elasticmachine please run elasticsearch-ci/packaging-sample |
| private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. | ||
| private volatile IndexCommit lastCommit; // the most recent commit point | ||
| private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; | ||
| private final Object onCommitMutex = new Object(); |
There was a problem hiding this comment.
Do we really need this separate mutex?
There was a problem hiding this comment.
I am not sure, because it depends on whether onCommit is called concurrently or not. Is it? If it is then we could end up with stale data in safeCommitInfo.
There was a problem hiding this comment.
Before this change, onCommit was a synchronized method. Is it okay if we leave it as is?
There was a problem hiding this comment.
Henning asked to avoid running the IO needed to compute the safeCommitInfo under this mutex here: #45208 (comment).
There was a problem hiding this comment.
Ah, ok. Although It's not an issue, I think we should avoid having two lock orderings:
this -> onCommitMutex in onInit and onCommitMutex -> this in onCommit. How about implementing onCommit in two steps using a single mutex.
final IndexCommit safeCommit;
synchronized (this) {
// update the policy
safeCommit = this.safeCommit;
}
final SafeCommitInfo safeCommitInfo = ..
synchronized (this) {
if (safeCommit == this.safeCommit) {
this.safeCommitInfo = safeCommitInfo;
}
}
dnhatn
left a comment
There was a problem hiding this comment.
LGTM. I left a comment around the mutex in the deletion policy for discussion.
| private volatile IndexCommit safeCommit; // the most recent safe commit point - its max_seqno at most the persisted global checkpoint. | ||
| private volatile IndexCommit lastCommit; // the most recent commit point | ||
| private volatile SafeCommitInfo safeCommitInfo = SafeCommitInfo.EMPTY; | ||
| private final Object onCommitMutex = new Object(); |
There was a problem hiding this comment.
Ah, ok. Although It's not an issue, I think we should avoid having two lock orderings:
this -> onCommitMutex in onInit and onCommitMutex -> this in onCommit. How about implementing onCommit in two steps using a single mutex.
final IndexCommit safeCommit;
synchronized (this) {
// update the policy
safeCommit = this.safeCommit;
}
final SafeCommitInfo safeCommitInfo = ..
synchronized (this) {
if (safeCommit == this.safeCommit) {
this.safeCommitInfo = safeCommitInfo;
}
}|
Ah yes you're right I missed that |
|
@elasticmachine please run elasticsearch-ci/default-distro |
ywelsch
left a comment
There was a problem hiding this comment.
Left one more comment, o.w. looking good
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
Show resolved
Hide resolved
henningandersen
left a comment
There was a problem hiding this comment.
Left a couple of comments, looking good otherwise.
server/src/main/java/org/elasticsearch/index/engine/CombinedDeletionPolicy.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
Show resolved
Hide resolved
|
@elasticmachine please run elasticsearch-ci/1 |
|
Merging + back-porting this for @DaveCTurner as discussed with him. |
Today if a shard is not fully allocated we maintain a retention lease for a lost peer for up to 12 hours, retaining all operations that occur in that time period so that we can recover this replica using an operations-based recovery if it returns. However it is not always reasonable to perform an operations-based recovery on such a replica: if the replica is a very long way behind the rest of the replication group then it can be much quicker to perform a file-based recovery instead. This commit introduces a notion of "reasonable" recoveries. If an operations-based recovery would involve copying only a small number of operations, but the index is large, then an operations-based recovery is reasonable; on the other hand if there are many operations to copy across and the index itself is relatively small then it makes more sense to perform a file-based recovery. We measure the size of the index by computing its number of documents (including deleted documents) in all segments belonging to the current safe commit, and compare this to the number of operations a lease is retaining below the local checkpoint of the safe commit. We consider an operations-based recovery to be reasonable iff it would involve replaying at most 10% of the documents in the index. The mechanism for this feature is to expire peer-recovery retention leases early if they are retaining so much history that an operations-based recovery using that lease would be unreasonable. Relates #41536
Today if a shard is not fully allocated we maintain a retention lease for a
lost peer for up to 12 hours, retaining all operations that occur in that time
period so that we can recover this replica using an operations-based recovery
if it returns. However it is not always reasonable to perform an
operations-based recovery on such a replica: if the replica is a very long way
behind the rest of the replication group then it can be much quicker to perform
a file-based recovery instead.
This commit introduces a notion of "reasonable" recoveries. If an
operations-based recovery would involve copying only a small number of
operations, but the index is large, then an operations-based recovery is
reasonable; on the other hand if there are many operations to copy across and
the index itself is relatively small then it makes more sense to perform a
file-based recovery. We measure the size of the index by computing its number
of documents (including deleted documents) in all segments belonging to the
current safe commit, and compare this to the number of operations a lease is
retaining below the local checkpoint of the safe commit. We consider an
operations-based recovery to be reasonable iff it would involve replaying at
most 10% of the documents in the index.
The mechanism for this feature is to expire peer-recovery retention leases
early if they are retaining so much history that an operations-based recovery
using that lease would be unreasonable.
Relates #41536