Skip to content

Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class#5818

Merged
graytaylor0 merged 14 commits intoopensearch-project:mainfrom
JonahCalvo:data-loss-bug
Jul 30, 2025
Merged

Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class#5818
graytaylor0 merged 14 commits intoopensearch-project:mainfrom
JonahCalvo:data-loss-bug

Conversation

@JonahCalvo
Copy link
Copy Markdown
Contributor

Description

Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class

Issues Resolved

Resolves #4764

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@graytaylor0
Copy link
Copy Markdown
Member

DCO is failing as well.

@JonahCalvo JonahCalvo force-pushed the data-loss-bug branch 2 times, most recently from efd7991 to 7f0ae3b Compare July 17, 2025 23:51
}

public void giveUpPartition(final StreamPartition streamPartition) {
sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the best approach here to basically just treat this signal as a failure, and handle it the same way you would handle a failure (i.e. do some checkpointing, stop tracking the partition for the next loop, and then give it up.

return globalPartition.isPresent();
}

public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a unit test where we make this call to ShardAcknowledgmentManager, and then run the loop and verify ownership update is called for that partition

Jonah Calvo added 5 commits July 30, 2025 11:30
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Jonah Calvo added 8 commits July 30, 2025 11:30
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
shardProgress.increment();
AcknowledgementSet acknowledgementSet = null;
if (shardAcknowledgementManager != null) {
acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is harder to verify but let's just keep an eye that shardIterator is actually null only once we reach the end of a s shard.

@graytaylor0 graytaylor0 merged commit dfc3c70 into opensearch-project:main Jul 30, 2025
54 of 59 checks passed
graytaylor0 added a commit to graytaylor0/data-prepper that referenced this pull request Aug 5, 2025
…nowledgementManager class (opensearch-project#5818)"

This reverts commit dfc3c70.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
graytaylor0 added a commit that referenced this pull request Aug 5, 2025
* Revert "Ensure shards are completed when last getRecords call has no records and no shardIterator (#5958)"

This reverts commit 13049d3.

Signed-off-by: Taylor Gray <tylgry@amazon.com>

* Revert "Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class (#5818)"

This reverts commit dfc3c70.

Signed-off-by: Taylor Gray <tylgry@amazon.com>

---------

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Checkpoint acknowledgments for DynamoDB pipelines

3 participants