Add support to increase acknowledgment expiry and use in progress check for Dynamo source#5428
Conversation
3505101 to
6c4ed8e
Compare
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(ShardConsumer.class); | ||
|
|
||
| private static final Duration ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME = Duration.ofMinutes(10); |
There was a problem hiding this comment.
Is this intended to have a relationship to shard_acknowledgment_timeout? Rather than a constant, should it be a function of that value?
There was a problem hiding this comment.
I don't think these necessarily need to be coupled. The only dependence between these two is that the initial acknowledgment set timeout is the shard acknowledgment timeout, which means that we the interval between progress check calls just needs to be lower than the shard acknowledgment timeout. It would still work if this expiry_increase_time value was the same as the shard acknowledgment timeout, but I still don't see any really good reason to couple those
|
|
||
| @Override | ||
| public void increaseExpiry(final Duration expiryIncreaseTime) { | ||
| this.expiryTime = this.expiryTime.plus(expiryIncreaseTime); |
There was a problem hiding this comment.
Should this be now().plus(expiryIncreaseTime)?
There was a problem hiding this comment.
That would make sense yeah
| */ | ||
| public class DataFileLoaderFactory { | ||
|
|
||
| static final Duration ACKNOWLEDGMENT_EXPIRY_INCREASE_TIME = Duration.ofMinutes(10); |
There was a problem hiding this comment.
I have a similar comment as with shards. Should this be a function of s3_data_file_acknowledgment_timeout?
6c4ed8e to
b4e0991
Compare
…ck for Dynamo source Signed-off-by: Taylor Gray <tylgry@amazon.com>
b4e0991 to
c2064a4
Compare
|
|
||
| Instant getExpirationTime(); | ||
|
|
||
| void shutdown(); |
There was a problem hiding this comment.
Maybe cancel would be clearer.
Signed-off-by: Taylor Gray <tylgry@amazon.com>
| } | ||
| } catch (final Exception exc) { | ||
| if (acknowledgementSet != null) { | ||
| acknowledgementSet.cancel(); |
There was a problem hiding this comment.
Do you see any possibilities where the consumer fails, but no exception reaches this point? For example, if there is an infinite/long-running loop. Maybe an exception caught elsewhere?
There was a problem hiding this comment.
the only possibility I see for this case is if any of these lines before the try block throw an exception. Otherwise I don't see this as a possibility. I think it's unlikely that these lines throw an exception, but we can move the try to the start if we want to be sure
@Override
public void run() {
LOG.debug("Shard Consumer start to run...");
// Check should skip processing or not.
if (shouldSkip()) {
shardProgress.increment();
if (acknowledgementSet != null) {
checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout);
acknowledgementSet.complete();
}
return;
}
if (acknowledgementSet != null) {
addProgressCheck(acknowledgementSet);
}
long lastCheckpointTime = System.currentTimeMillis();
String sequenceNumber = "";
int interval;
List<software.amazon.awssdk.services.dynamodb.model.Record> records;
There was a problem hiding this comment.
More than anything, I think it would be ideal to stop updating the expiry time if the progress update is not actually cancelled. Maybe this can follow on later.
There was a problem hiding this comment.
I see. Like something where the acknowledgment set allows a maximum expiry time overall increase and cancels it itself if it's been increased for something like 8 hours, for example?
kkondaka
left a comment
There was a problem hiding this comment.
Is it possible to write a test that verifies that there are no expirations with this change and has expirations without this change?
…ck for Dynamo source (opensearch-project#5428) Signed-off-by: Taylor Gray <tylgry@amazon.com> Signed-off-by: George Chen <qchea@amazon.com>
Description
Fixes acknowledgment sets expiring from Dynamo streams by increasing expiry timeout in progress check
Tested by running a Dynamo pipeline and verified that no acknowledgments were expiring.
Issues Resolved
Resolves #5412
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.