Skip to content

Add functionality to fast forward local processed checkpoints [segment replication] (#2576)#2883

Merged
Poojita-Raj merged 2 commits intoopensearch-project:mainfrom
Poojita-Raj:main
Apr 13, 2022
Merged

Add functionality to fast forward local processed checkpoints [segment replication] (#2576)#2883
Poojita-Raj merged 2 commits intoopensearch-project:mainfrom
Poojita-Raj:main

Conversation

@Poojita-Raj
Copy link
Copy Markdown
Contributor

Signed-off-by: Poojita Raj poojiraj@amazon.com

Description

In the event of segment replication, indexing of documents doesn't take place on the replica (only on the primary) and the replica just receives the segment files from the primary.

A side-effect of this is that the local processed checkpoint (which denotes that all sequence numbers below this are guaranteed to be processed) does not get updated.

In this change, we add in a fastForward function that updates the local processed checkpoint on receiving the checkpoint from the primary while safely omitting the check that all seq numbers till that point are consecutively processed (since indexing doesn't take place). It includes unit tests for these changes.

Check List

  • New functionality includes testing.
    • All tests pass
  • New functionality has been documented.
    • New functionality has javadoc added
  • Commits are signed per the DCO using --signoff

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

* fix local processed checkpoint update

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* separated tests + wrapper function

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
@Poojita-Raj Poojita-Raj requested a review from a team as a code owner April 12, 2022 22:10
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success db57264
Log 4423

Reports 4423

*/
public synchronized void fastForwardProcessedSeqNo(final long seqNo) {
advanceMaxSeqNo(seqNo);
if (shouldUpdateSeqNo(seqNo, processedCheckpoint, persistedCheckpoint) == false) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little weird to mix a synchronized method with AtomicLongs, but I think the way you'd write this to be safe is:

final long currentProcessedCheckpoint = processedCheckpoint.get();
if (shouldUpdateSeqNo(seqNo, currentProcessedCheckpoint, persistedCheckpoint) == false) {
    return;
}
processedCheckpoint.compareAndSet(currentProcessedCheckpoint, seqNo);

Maybe the synchronized prevents it in practice (which raises the question of why an AtomicLong is being used...), but in theory the value of processedCheckpoint could change in between the shouldUpdateSeqNo check and the compareAndSet, which would defeat the purpose of the compare-and-set check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean because the method is synchronized, it's redundant to use AtomicLong since it's already thread safe? I was trying to keep to the format markSeqNo followed (similarly called within a synchronized method with atomic parameters).

I'll make the change as mentioned - thanks for the suggestion - it makes sense esp in context of compareAndSet

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean because the method is synchronized, it's redundant to use AtomicLong since it's already thread safe?

Yeah, they are both mechanisms for handling concurrency, so it is a bit weird to mix them. You generally use the Atomic* variants when you need limited functionality (like compare-and-set or increment-and-get) because it can give you much better performance versus the more course-grained locks of synchronizing the method.

A cursory look suggests to me that all mutations of processedCheckpoint happen in synchronized methods so it could likely be replaced with a volatile long. I'm definitely not suggesting you refactor the existing code though.


import static org.hamcrest.Matchers.equalTo;

public class SegmentReplicationLocalCheckpointTrackerTests extends OpenSearchTestCase {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add these tests to LocalCheckpointTrackerTests instead of creating a new class?

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
@Poojita-Raj Poojita-Raj requested a review from andrross April 13, 2022 20:20
@opensearch-ci-bot
Copy link
Copy Markdown
Collaborator

✅   Gradle Check success bae5f5e
Log 4449

Reports 4449

@Poojita-Raj Poojita-Raj merged commit c4b684d into opensearch-project:main Apr 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants