Introduce primary/replica mode for GlobalCheckPointTracker#25468
Introduce primary/replica mode for GlobalCheckPointTracker#25468jasontedor merged 15 commits intoelastic:masterfrom
Conversation
…at block GCP advancement
…balcheckpoint-tracker
bleskes
left a comment
There was a problem hiding this comment.
I left a bunch of nits around assertion messages. There is one important test coverage ask for GlobalCheckpointTracker#initializeWithPrimaryContext, which caused to mark it as request changes. All the rest LGTM. Thanks @ywelsch
| /** | ||
| * during relocation handoff there are no entries blocking global checkpoint advancement | ||
| */ | ||
| assert !handOffInProgress || pendingInSync.isEmpty(); |
There was a problem hiding this comment.
can we add a message that says what the pending in sync aIds are?
| /** | ||
| * the computed global checkpoint is always up-to-date | ||
| */ | ||
| assert !primaryMode || globalCheckpoint == computeGlobalCheckPoint(pendingInSync, localCheckpoints.values(), globalCheckpoint); |
There was a problem hiding this comment.
can we add a message with the globalCheckpoint and the result of the computation?
| /** | ||
| * blocking global checkpoint advancement only happens for shards that are not in-sync | ||
| */ | ||
| assert !pendingInSync.contains(entry.getKey()) || !entry.getValue().inSync; |
There was a problem hiding this comment.
can we add message that indicates which aID it is?
| * Notifies the service of the current allocation ids in the cluster state. This method trims any shards that have been removed. | ||
| * Initializes the global checkpoint tracker in primary mode (see {@link #primaryMode}. Called on primary activation or promotion. | ||
| */ | ||
| public synchronized void initializeAsPrimary(final String allocationId, final long localCheckpoint) { |
There was a problem hiding this comment.
how would you feel about naming this method (and it's counterpart) activatePrimaryMode ? I was confused a couple of times as initialize and primary terms already used in the IndexShard context (a primary relocation target is a primary shard and is already initializing long before the method is called) .
| public synchronized void initializeAsPrimary(final String allocationId, final long localCheckpoint) { | ||
| assert invariant(); | ||
| assert primaryMode == false; | ||
| assert localCheckpoints.get(allocationId) != null && localCheckpoints.get(allocationId).inSync && |
There was a problem hiding this comment.
can we add a message with localCheckpoints.get(allocationId) and allocationId ?
| persistMetadata(path, indexSettings, newRouting, currentRouting, logger); | ||
|
|
||
| if (shardRouting.primary()) { | ||
| assert Thread.holdsLock(mutex); |
There was a problem hiding this comment.
this can go away, since you inlined the method
| } | ||
| } | ||
|
|
||
| cancellableThreads.execute(() -> runUnderOperationPermit(() -> shard.initiateTracking(request.targetAllocationId()))); |
There was a problem hiding this comment.
question, why did you choose to do it after phase1 ?
There was a problem hiding this comment.
It only becomes relevant that we're properly tracking the target shard when starting the engine on the target shard. With this we ensure that we don't miss any local checkpoint updates from the target shard.
| final Matcher<Long> globalCheckpointMatcher; | ||
| if (shardRouting.primary()) { | ||
| globalCheckpointMatcher = numDocs == 0 ? equalTo(unassignedSeqNo) : equalTo(numDocs - 1L); | ||
| globalCheckpointMatcher = numDocs == 0 ? equalTo(SequenceNumbersService.NO_OPS_PERFORMED) : equalTo(numDocs - 1L); |
There was a problem hiding this comment.
++. It is a good that this is fixed and we start with no ops performed.
| * than we have been using above ensures that we can not collide with a previous allocation ID | ||
| */ | ||
| newActiveAllocationIds.add(randomAlphaOfLength(32)); | ||
| // TODO: fix this: newActiveAllocationIds.add(initializingIds.iterator().next()); |
There was a problem hiding this comment.
I've removed this line, as it was an illegal operation (adding a fresh in-sync allocation id while the tracker was in primary mode).
| assertThat(tracker.getGlobalCheckpoint(), equalTo((long) nextActiveLocalCheckpoint)); | ||
| } | ||
|
|
||
| public void testPrimaryContextOlderThanAppliedClusterState() { |
There was a problem hiding this comment.
I think we need equivalent tests of initializeWithPrimaryContext and it's relation to appliedClusterStateVersion ?
|
@bleskes I've addressed all comments. Have another look. |
bleskes
left a comment
There was a problem hiding this comment.
LGTM. Thanks for the thorough test.
|
|
||
| activatePrimary(clusterState, oldPrimary); | ||
|
|
||
| for (int i = 0; i < randomInt(10); i++) { |
There was a problem hiding this comment.
nit: this samples the randomInt again and again.. much less likely to get to 10.
This PR refactors the GlobalCheckPointTracker to make it more resilient. The main idea is to make it more explicit what state is actually captured and how that state is updated through replication / cluster state updates etc. It also fixes the issue where the local checkpoint information is not being updated when a shard becomes primary. The primary relocation handoff becomes very simple too, we can just verbatim copy over the internal state.
The PR still misses some tests, which I will address soon. The main reason for opening it as is to get initial feedback.