Create peer-recovery retention leases#43190
Conversation
This creates a peer-recovery retention lease for every shard during recovery, ensuring that the replication group retains history for future peer recoveries. It also ensures that leases for active shard copies do not expire, and leases for inactive shard copies expire immediately if the shard is fully-allocated. Relates elastic#41536
|
Pinging @elastic/es-distributed |
…peer-recovery-retention-leases
…peer-recovery-retention-leases
ywelsch
left a comment
There was a problem hiding this comment.
I've done a first pass and left some questions, mainly to get a better understanding of the scope of the change.
| final List<?> leases = (List<?>) retentionLeasesStats.get("leases"); | ||
| assertThat(leases, empty()); | ||
| for (final Object lease : leases) { | ||
| assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); |
There was a problem hiding this comment.
can we instead assert the absence of CCR leases?
There was a problem hiding this comment.
Not as robustly as I'd like, no. We could say there's no leases with source "ccr", but that's a lot weaker than saying the only remaining leases are PRRLs, similarly to how we previously asserted that there were no leases at all.
There was a problem hiding this comment.
Can we use toMapExcludingPeerRecoveryRetentionLeases here?
There was a problem hiding this comment.
Not very easily. Here we are the other side of the high-level REST API, and this doesn't include indices stats so we don't have access to a RetentionLeases object. It would be quite some work to build one.
There was a problem hiding this comment.
ah, I did not realize that it's a rest test.
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Show resolved
Hide resolved
| this.pendingInSync = new HashSet<>(); | ||
| this.routingTable = null; | ||
| this.replicationGroup = null; | ||
| assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; |
There was a problem hiding this comment.
is this to catch issues where tests have not been properly set up?
There was a problem hiding this comment.
Yes, if this is unset then the crucial assertions are skipped, which is Very Bad™.
| if (retentionLeases.get(leaseId) == null) { | ||
| /* | ||
| * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention | ||
| * leases for every shard copy, but in this case we do not expect any leases to exist. |
There was a problem hiding this comment.
this might also be a recovery from store?
What about when we become primary due to a primary relocation? Do we need to do this as well?
There was a problem hiding this comment.
This comment is explaining the following if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)). Covering the cases when the index was created in an earlier version is out of scope here.
In a primary relocation the new primary, being a tracked replica, already has a lease.
There was a problem hiding this comment.
Covering the cases when the index was created in an earlier version is out of scope here.
Did you mean there will be another change here? Why don't we do it now ;). The relocating target should not have a lease if the old primary was on an old version.
There was a problem hiding this comment.
This change is already a substantial +665/-185, and I think it's unwise to bring BWC into scope at this time. Note that this PR is against a feature branch, not master, so we're ok with missing features for now.
| && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { | ||
| runUnderPrimaryPermit(() -> { | ||
| try { | ||
| // blindly create the lease. TODO integrate this with the recovery process |
There was a problem hiding this comment.
I'm not sure what you mean by "blindly" here and what integration you're referring to.
There was a problem hiding this comment.
With this change, retention leases have no impact on the recovery process, nor do we make any attempt to add a lease for history we've any hope of retaining. E.g. with a file-based recovery we add a lease for all history.
In due course the recovery process will be made more dependent on leases.
| : routingTable.activeShards() + " vs " + shardAllocationId; | ||
| assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)); | ||
|
|
||
| // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication |
There was a problem hiding this comment.
we don't need a sync, but why not do one any way? This will persist the leases locally on disk
There was a problem hiding this comment.
Doing a sync on the cluster applier thread isn't possible as things stand because of the reroute phase; it also would mean waiting for the sync to return, which is something we try and avoid on the applier thread.
We could explicitly persist the leases when calling activatePrimaryMode but I don't think it's necessary to do so.
| * Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done | ||
| * properly. TODO remove this. | ||
| */ | ||
| public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() { |
There was a problem hiding this comment.
why are we not automatically advancing the leases when the global checkpoints advance? Is it because it breaks some tests right now?
There was a problem hiding this comment.
Mainly because I think this change is already large enough without this feature too, and we haven't settled for definite on whether these leases should be GCP-based. Advancing the leases is needed in the tests in very few places, but I haven't tried advancing them more eagerly.
DaveCTurner
left a comment
There was a problem hiding this comment.
Thanks @ywelsch, I responded.
| * Advance the peer-recovery retention lease for all tracked shard copies, for use in tests until advancing these leases is done | ||
| * properly. TODO remove this. | ||
| */ | ||
| public synchronized void advancePeerRecoveryRetentionLeasesToGlobalCheckpoints() { |
There was a problem hiding this comment.
Mainly because I think this change is already large enough without this feature too, and we haven't settled for definite on whether these leases should be GCP-based. Advancing the leases is needed in the tests in very few places, but I haven't tried advancing them more eagerly.
| this.pendingInSync = new HashSet<>(); | ||
| this.routingTable = null; | ||
| this.replicationGroup = null; | ||
| assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false; |
There was a problem hiding this comment.
Yes, if this is unset then the crucial assertions are skipped, which is Very Bad™.
| if (retentionLeases.get(leaseId) == null) { | ||
| /* | ||
| * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention | ||
| * leases for every shard copy, but in this case we do not expect any leases to exist. |
There was a problem hiding this comment.
This comment is explaining the following if (indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0)). Covering the cases when the index was created in an earlier version is out of scope here.
In a primary relocation the new primary, being a tracked replica, already has a lease.
| && shard.indexSettings().getIndexMetaData().getState() != IndexMetaData.State.CLOSE) { | ||
| runUnderPrimaryPermit(() -> { | ||
| try { | ||
| // blindly create the lease. TODO integrate this with the recovery process |
There was a problem hiding this comment.
With this change, retention leases have no impact on the recovery process, nor do we make any attempt to add a lease for history we've any hope of retaining. E.g. with a file-based recovery we add a lease for all history.
In due course the recovery process will be made more dependent on leases.
server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java
Show resolved
Hide resolved
| final List<?> leases = (List<?>) retentionLeasesStats.get("leases"); | ||
| assertThat(leases, empty()); | ||
| for (final Object lease : leases) { | ||
| assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); |
There was a problem hiding this comment.
Not as robustly as I'd like, no. We could say there's no leases with source "ccr", but that's a lot weaker than saying the only remaining leases are PRRLs, similarly to how we previously asserted that there were no leases at all.
| : routingTable.activeShards() + " vs " + shardAllocationId; | ||
| assert replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard)); | ||
|
|
||
| // Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication |
There was a problem hiding this comment.
Doing a sync on the cluster applier thread isn't possible as things stand because of the reroute phase; it also would mean waiting for the sync to return, which is something we try and avoid on the applier thread.
We could explicitly persist the leases when calling activatePrimaryMode but I don't think it's necessary to do so.
dnhatn
left a comment
There was a problem hiding this comment.
Thanks @DaveCTurner. I left some comments.
| /** | ||
| * Source for peer recovery retention leases; see {@link ReplicationTracker#addPeerRecoveryRetentionLease}. | ||
| */ | ||
| public static final String PEER_RECOVERY_RETENTION_LEASE_SOURCE = "peer recovery"; |
There was a problem hiding this comment.
How about moving this constant and two related static methods to RetentionLease class instead?
There was a problem hiding this comment.
I don't think RetentionLease should know about this special kind of retention lease.
| .stream() | ||
| .collect(Collectors.groupingBy(lease -> currentTimeMillis - lease.timestamp() > retentionLeaseMillis)); | ||
| .collect(Collectors.groupingBy(lease -> { | ||
| if (lease.source().equals(PEER_RECOVERY_RETENTION_LEASE_SOURCE)) { |
There was a problem hiding this comment.
Can we make this check a method of RetentionLease?
There was a problem hiding this comment.
As in #43190 (comment) I don't think RetentionLease should know about this special kind of retention lease.
| */ | ||
| static Map<String, RetentionLease> toMap(final RetentionLeases retentionLeases) { | ||
| return retentionLeases.leases; | ||
| public static Map<String, RetentionLease> toMapExcludingPeerRecoveryRetentionLeases(final RetentionLeases retentionLeases) { |
There was a problem hiding this comment.
Can we move this method to test? Maybe test framework?
| * containing the persistent node ID calculated by {@link ReplicationTracker#getPeerRecoveryRetentionLeaseId}, and retain operations | ||
| * with sequence numbers strictly greater than the given global checkpoint. | ||
| */ | ||
| public void addPeerRecoveryRetentionLease(String nodeId, long globalCheckpoint, ActionListener<ReplicationResponse> listener) { |
There was a problem hiding this comment.
Can we remove this method and prepare these parameters in IndexShard instead?
There was a problem hiding this comment.
We could but I think it's appropriate to do this here given that you need to do this when working with the ReplicationTracker in isolation, e.g. PeerRecoveryRetentionLeaseExpiryTests.
| if (retentionLeases.get(leaseId) == null) { | ||
| /* | ||
| * We might have got here here via a rolling upgrade from an older version that doesn't create peer recovery retention | ||
| * leases for every shard copy, but in this case we do not expect any leases to exist. |
There was a problem hiding this comment.
Covering the cases when the index was created in an earlier version is out of scope here.
Did you mean there will be another change here? Why don't we do it now ;). The relocating target should not have a lease if the old primary was on an old version.
| .flatMap(n -> StreamSupport.stream(getLeaderCluster().getInstance(IndicesService.class, n).spliterator(), false)) | ||
| .flatMap(n -> StreamSupport.stream(n.spliterator(), false)) | ||
| .filter(indexShard -> indexShard.shardId().getIndexName().equals("index1")) | ||
| .filter(indexShard -> indexShard.routingEntry().primary()) |
There was a problem hiding this comment.
This is used in tests only but we should make it more robust. See #40386 (comment).
There was a problem hiding this comment.
This is here to call the temporary advancePeerRecoveryRetentionLeasesToGlobalCheckpoints method, pending implementation of the proper way to advance the leases. Once that happens, it'll be gone. Are you saying that this test sometimes fails? I don't expect the primaries on the leader to move around during this test.
| runUnderPrimaryPermit(() -> { | ||
| try { | ||
| // blindly create the lease. TODO integrate this with the recovery process | ||
| shard.addPeerRecoveryRetentionLease(request.targetNode().getId(), startingSeqNo - 1, establishRetentionLeaseStep); |
There was a problem hiding this comment.
The second parameter of addPeerRecoveryRetentionLease is "global checkpoint" which does not match startingSeqNo - 1.
There was a problem hiding this comment.
Indeed, but it will be once we only copy operations that are above the GCP :)
| final List<?> leases = (List<?>) retentionLeasesStats.get("leases"); | ||
| assertThat(leases, empty()); | ||
| for (final Object lease : leases) { | ||
| assertThat(((Map<?, ?>) lease).get("source"), equalTo(ReplicationTracker.PEER_RECOVERY_RETENTION_LEASE_SOURCE)); |
There was a problem hiding this comment.
Can we use toMapExcludingPeerRecoveryRetentionLeases here?
…peer-recovery-retention-leases
dnhatn
left a comment
There was a problem hiding this comment.
LGTM given this PR will go into a feature branch. Thanks @DaveCTurner.
…peer-recovery-retention-leases
This creates a peer-recovery retention lease for every shard during recovery, ensuring that the replication group retains history for future peer recoveries. It also ensures that leases for active shard copies do not expire, and leases for inactive shard copies expire immediately if the shard is fully-allocated. Relates #41536
This creates a peer-recovery retention lease for every shard during recovery,
ensuring that the replication group retains history for future peer recoveries.
It also ensures that leases for active shard copies do not expire, and leases
for inactive shard copies expire immediately if the shard is fully-allocated.
Relates #41536