Skip to content

Add support to increase acknowledgment expiry and use in progress check for Dynamo source#5428

Merged
graytaylor0 merged 2 commits intoopensearch-project:mainfrom
graytaylor0:DdbProgressCheck
Feb 13, 2025
Merged

Add support to increase acknowledgment expiry and use in progress check for Dynamo source#5428
graytaylor0 merged 2 commits intoopensearch-project:mainfrom
graytaylor0:DdbProgressCheck

Conversation

@graytaylor0
Copy link
Copy Markdown
Member

@graytaylor0 graytaylor0 commented Feb 10, 2025

Description

Fixes acknowledgment sets expiring from Dynamo streams by increasing expiry timeout in progress check

  • Increase default s3_data_file_acknowledgment_timeout from 5 minutes to 15 minutes. This default is too low and causes issues for users when they have high buffer usage.
  • Fixes another issue where shard partitions would time out on ownership while waiting for export by increasing the interval between checkpoints and increasing the ownership timeout increase for checkpoints on shards.

Tested by running a Dynamo pipeline and verified that no acknowledgments were expiring.

Issues Resolved

Resolves #5412

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.


private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class);

private static final Duration ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME = Duration.ofMinutes(10);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to have a relationship to shard_acknowledgment_timeout? Rather than a constant, should it be a function of that value?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these necessarily need to be coupled. The only dependence between these two is that the initial acknowledgment set timeout is the shard acknowledgment timeout, which means that we the interval between progress check calls just needs to be lower than the shard acknowledgment timeout. It would still work if this expiry_increase_time value was the same as the shard acknowledgment timeout, but I still don't see any really good reason to couple those


@Override
public void increaseExpiry(final Duration expiryIncreaseTime) {
this.expiryTime = this.expiryTime.plus(expiryIncreaseTime);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be now().plus(expiryIncreaseTime)?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make sense yeah

*/
public class DataFileLoaderFactory {

static final Duration ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME = Duration.ofMinutes(10);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a similar comment as with shards. Should this be a function of s3_data_file_acknowledgment_timeout?

…ck for Dynamo source

Signed-off-by: Taylor Gray <tylgry@amazon.com>

Instant getExpirationTime();

void shutdown();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe cancel would be clearer.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
}
} catch (final Exception exc) {
if (acknowledgementSet != null) {
acknowledgementSet.cancel();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see any possibilities where the consumer fails, but no exception reaches this point? For example, if there is an infinite/long-running loop. Maybe an exception caught elsewhere?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only possibility I see for this case is if any of these lines before the try block throw an exception. Otherwise I don't see this as a possibility. I think it's unlikely that these lines throw an exception, but we can move the try to the start if we want to be sure

@Override
    public void run() {
        LOG.debug("Shard Consumer start to run...");
        // Check should skip processing or not.
        if (shouldSkip()) {
            shardProgress.increment();
            if (acknowledgementSet != null) {
                checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout);
                acknowledgementSet.complete();
            }
            return;
        }
        if (acknowledgementSet != null) {
            addProgressCheck(acknowledgementSet);
        }
        long lastCheckpointTime = System.currentTimeMillis();
        String sequenceNumber = "";
        int interval;
        List<software.amazon.awssdk.services.dynamodb.model.Record> records;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than anything, I think it would be ideal to stop updating the expiry time if the progress update is not actually cancelled. Maybe this can follow on later.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Like something where the acknowledgment set allows a maximum expiry time overall increase and cancels it itself if it's been increased for something like 8 hours, for example?

Copy link
Copy Markdown
Collaborator

@kkondaka kkondaka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to write a test that verifies that there are no expirations with this change and has expirations without this change?

@graytaylor0 graytaylor0 merged commit 850a676 into opensearch-project:main Feb 13, 2025
44 of 47 checks passed
chenqi0805 pushed a commit to chenqi0805/data-prepper that referenced this pull request Apr 2, 2025
…ck for Dynamo source (opensearch-project#5428)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: George Chen <qchea@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] DynamoDB source with acknowledgements expires frequently

3 participants