Expose whether or not the global checkpoint updated#32659
Expose whether or not the global checkpoint updated#32659jasontedor merged 6 commits intoelastic:masterfrom
Conversation
It will be useful for future efforts to know if the global checkpoint was updated. To this end, we need to expose whether or not the global checkpoint was updated when the state of the replication tracker updates. For this, we add to the tracker a callback that is invoked whenever the global checkpoint is updated. For primaries this will be invoked when the computed global checkpoint is updated based on state changes to the tracker. For replicas this will be invoked when the local knowledge of the global checkpoint is advanced from the primary.
|
Pinging @elastic/es-distributed |
bleskes
left a comment
There was a problem hiding this comment.
I left some initial questions.
| if (cps != null && globalCheckpoint > cps.globalCheckpoint) { | ||
| ifUpdated.accept(cps.globalCheckpoint); | ||
| cps.globalCheckpoint = globalCheckpoint; | ||
| onGlobalCheckpointUpdated.accept(globalCheckpoint); |
There was a problem hiding this comment.
I'm doubting whether this should be called out of lock. I'm tending to say yes. Thoughts?
There was a problem hiding this comment.
Also, can you clarify why the consumer is called when updating the primary's knowledge of the gcp knowledge of a replica? (this method is used there too)
There was a problem hiding this comment.
Good catch. That was not intended. I pushed a477ff4.
There was a problem hiding this comment.
Regarding not invoking the notification under lock, I was tending to avoid complicating these methods (I like that we have synchronized as a method modifier) and avoiding dealing with the fact that some of these updates can occur under nested invocations of synchronized methods. This would mean returning booleans and dropping the synchronized from the method modifiers. In the POC that I have, the callback is "cheap" because it forks invocation of the listeners to another thread (practically to the listener thread pool):
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
if (listeners != null) {
final List<GlobalCheckpointListener> currentListeners;
synchronized (this) {
currentListeners = listeners;
listeners = null;
}
if (currentListeners != null) {
executor.execute(() -> {
for (final GlobalCheckpointListener listener : currentListeners) {
try {
listener.accept(globalCheckpoint, e);
} catch (final Exception caught) {
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
logger.warn(
new ParameterizedMessage(
"error notifying global checkpoint listener of updated global checkpoint [{}]",
globalCheckpoint),
caught);
} else {
logger.warn("error notifying global checkpoint listener of closed shard", caught);
}
}
}
});
}
}
}I think this is okay?
There was a problem hiding this comment.
I'm fine if you intend to spawn off to other threads in the outer layers. Indeed the simplicity of the current solution is what was making me doubt towards having it as you did.
|
Thanks @bleskes, that was a good catch. I responded to your feedback. |
bleskes
left a comment
There was a problem hiding this comment.
Production code LGTM. Left some nits on the testing.
| final long globalCheckpoint = tracker.getGlobalCheckpoint(); | ||
| updatedGlobalCheckpoint.set(globalCheckpoint); | ||
| tracker.updateLocalCheckpoint(allocationId, localCheckpoint); | ||
| if (globalCheckpoint == tracker.getGlobalCheckpoint()) { |
There was a problem hiding this comment.
logically this is the same as always checking assertThat(updatedGlobalCheckpoint.get(), equalTo(tracker.getGlobalCheckpoint())) no?
|
|
||
| private void updateLocalCheckpoint(final ReplicationTracker tracker, final String allocationId, final long localCheckpoint) { | ||
| final long globalCheckpoint = tracker.getGlobalCheckpoint(); | ||
| updatedGlobalCheckpoint.set(globalCheckpoint); |
There was a problem hiding this comment.
why do you manually set it here? I think this method is just after checking that the callback is called , which sets updatedGlobalCheckpoint?
|
@bleskes Will you take one last look? |
It will be useful for future efforts to know if the global checkpoint was updated. To this end, we need to expose whether or not the global checkpoint was updated when the state of the replication tracker updates. For this, we add to the tracker a callback that is invoked whenever the global checkpoint is updated. For primaries this will be invoked when the computed global checkpoint is updated based on state changes to the tracker. For replicas this will be invoked when the local knowledge of the global checkpoint is advanced from the primary.
It will be useful for future efforts to know if the global checkpoint was updated. To this end, we need to expose whether or not the global checkpoint was updated when the state of the replication tracker updates. For this, we add to the tracker a callback that is invoked whenever the global checkpoint is updated. For primaries this will be invoked when the computed global checkpoint is updated based on state changes to the tracker. For replicas this will be invoked when the local knowledge of the global checkpoint is advanced from the primary.
Relates #32651