Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class#5818
Merged
graytaylor0 merged 14 commits intoopensearch-project:mainfrom Jul 30, 2025
Merged
Conversation
graytaylor0
reviewed
Jun 30, 2025
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Show resolved
Hide resolved
...src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Outdated
Show resolved
Hide resolved
graytaylor0
reviewed
Jun 30, 2025
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Show resolved
Hide resolved
Member
|
DCO is failing as well. |
dlvenable
reviewed
Jun 30, 2025
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Outdated
Show resolved
Hide resolved
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Show resolved
Hide resolved
efd7991 to
7f0ae3b
Compare
graytaylor0
reviewed
Jul 18, 2025
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Show resolved
Hide resolved
graytaylor0
reviewed
Jul 21, 2025
| } | ||
|
|
||
| public void giveUpPartition(final StreamPartition streamPartition) { | ||
| sourceCoordinator.saveProgressStateForPartition(streamPartition, dynamoDBSourceConfig.getShardAcknowledgmentTimeout()); |
Member
There was a problem hiding this comment.
I think the best approach here to basically just treat this signal as a failure, and handle it the same way you would handle a failure (i.e. do some checkpointing, stop tracking the partition for the next loop, and then give it up.
...src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/StreamScheduler.java
Outdated
Show resolved
Hide resolved
4130fbf to
07e9cb6
Compare
graytaylor0
previously approved these changes
Jul 22, 2025
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Show resolved
Hide resolved
dlvenable
requested changes
Jul 23, 2025
...a/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManager.java
Show resolved
Hide resolved
...g/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java
Show resolved
Hide resolved
...ain/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java
Show resolved
Hide resolved
...ain/java/org/opensearch/dataprepper/plugins/source/dynamodb/model/ShardCheckpointStatus.java
Outdated
Show resolved
Hide resolved
graytaylor0
reviewed
Jul 25, 2025
| return globalPartition.isPresent(); | ||
| } | ||
|
|
||
| public void startUpdatingOwnershipForShard(final StreamPartition streamPartition) { |
Member
There was a problem hiding this comment.
I would add a unit test where we make this call to ShardAcknowledgmentManager, and then run the loop and verify ownership update is called for that partition
...e/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java
Outdated
Show resolved
Hide resolved
dlvenable
reviewed
Jul 29, 2025
...g/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardAcknowledgementManagerTest.java
Outdated
Show resolved
Hide resolved
...c/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java
Outdated
Show resolved
Hide resolved
added 5 commits
July 30, 2025 11:30
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
added 8 commits
July 30, 2025 11:30
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
73b29bc to
d8ec169
Compare
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
d8ec169 to
860e78b
Compare
dlvenable
approved these changes
Jul 30, 2025
graytaylor0
approved these changes
Jul 30, 2025
| shardProgress.increment(); | ||
| AcknowledgementSet acknowledgementSet = null; | ||
| if (shardAcknowledgementManager != null) { | ||
| acknowledgementSet = shardAcknowledgementManager.createAcknowledgmentSet(streamPartition, sequenceNumber, shardIterator == null); |
Member
There was a problem hiding this comment.
I know this is harder to verify but let's just keep an eye that shardIterator is actually null only once we reach the end of a s shard.
Merged
4 tasks
graytaylor0
added a commit
to graytaylor0/data-prepper
that referenced
this pull request
Aug 5, 2025
…nowledgementManager class (opensearch-project#5818)" This reverts commit dfc3c70. Signed-off-by: Taylor Gray <tylgry@amazon.com>
graytaylor0
added a commit
that referenced
this pull request
Aug 5, 2025
* Revert "Ensure shards are completed when last getRecords call has no records and no shardIterator (#5958)" This reverts commit 13049d3. Signed-off-by: Taylor Gray <tylgry@amazon.com> * Revert "Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class (#5818)" This reverts commit dfc3c70. Signed-off-by: Taylor Gray <tylgry@amazon.com> --------- Signed-off-by: Taylor Gray <tylgry@amazon.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Introduce checkpointing per shard to DDB source, via ShardAcknowledgementManager class
Issues Resolved
Resolves #4764
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.