Kds cross account stream#5687
Conversation
Signed-off-by: Souvik Bose <souvbose@amazon.com>
| this.maxRetryCount = maxRetryCount; | ||
| } | ||
|
|
||
| public StreamIdentifier getStreamIdentifierFromStreamArn(final String streamArnString) { |
There was a problem hiding this comment.
I think you can make this package protected. Remove the public modifier.
There was a problem hiding this comment.
Based on my other comments, I think this code can be refactored to have a single place for handling the stream identification.
| return StreamIdentifier.multiStreamInstance(streamIdentifierString); | ||
| } | ||
|
|
||
| public String getConsumerArnForStream(final String streamArn, final String consumerName) { |
There was a problem hiding this comment.
It looks like this is only used internally, so you can make it private.
| try { | ||
| Thread.sleep(delayMillis); | ||
| } catch (final InterruptedException e){ | ||
| Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
You reset the interrupt here. But, does anything actually monitor the interrupt status? Maybe you should end the backoff/retry loop.
| return streamName.equals(streamIdentifier.streamName()); | ||
| } | ||
| return streamConfig.getName().equals(streamIdentifier.streamName()); | ||
| }).findAny(); |
There was a problem hiding this comment.
This findAny can return one if there are multiple matching. Should we have some check to ensure there is only one?
|
|
||
| @JsonProperty("stream_arn") | ||
| @Valid | ||
| private String arn; |
There was a problem hiding this comment.
Rename to getStreamArn to make it clearer what this is.
|
|
||
| private KinesisStreamConfig getStreamConfig(final KinesisSourceConfig kinesisSourceConfig) { | ||
| final Optional<KinesisStreamConfig> kinesisStreamConfig = kinesisSourceConfig.getStreams().stream().filter(streamConfig -> streamConfig.getName().equals(streamIdentifier.streamName())).findAny(); | ||
| final Optional<KinesisStreamConfig> kinesisStreamConfig = kinesisSourceConfig.getStreams().stream().filter(streamConfig -> { |
There was a problem hiding this comment.
We have a bit of split logic with how we ready the stream config and get the stream identifiers. I see other conditions based on the different options above in KinesisClientApiHandler and KinesisMultiStreamTracker.
There should be a good way to have a single class that can handle all the logic for determining the KinesisStreamConfig/StreamConfig for any stream or identifier. Maybe a class that maps all these on initialization?
Then you won't have to have all this split logic.
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
| streamArn, ex.getMessage(), attempt + 1); | ||
| } else if (cause instanceof com.amazonaws.SdkClientException) { | ||
| log.error("AWS SDK client error while describing stream consumer for stream {}: {}. Attempt {}.", | ||
| streamArn, ex.getMessage(), attempt + 1); |
There was a problem hiding this comment.
Do we need two different if/else cases for this? The message will show which exception it is, right?
There was a problem hiding this comment.
The `ex.message() will show the exception. However, the line 145 is to have a different message in the logs.
There was a problem hiding this comment.
The only differene here is the AWS SDK versus Kinesis API. I think @kkondaka is right that we don't need this.
This is the default Throwable::toString. I think this give us what we want.
public String toString() {
String s = getClass().getName();
String message = getLocalizedMessage();
return (message != null) ? (s + ": " + message) : s;
}
Something like this will work:
log.error("AWS error while describing stream consumer for stream {}: {}. Attempt {}.",
streamArn, ex.toString(), attempt + 1);
| try { | ||
| Thread.sleep(delayMillis); | ||
| } catch (final InterruptedException e) { | ||
| Thread.currentThread().interrupt(); |
There was a problem hiding this comment.
I don't think this is needed.
There was a problem hiding this comment.
This marks the current thread as interrupted until the next sleep. But, I really don't think we use that for anything.
| } catch (final InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| log.error("Thread interrupted while waiting for retry", e); | ||
| throw new KinesisRetriesExhaustedException("Thread interrupted while waiting for retry"); |
There was a problem hiding this comment.
Is it a good idea to throw an error for this? It is a very rare event. In the worst case, it should not count towards an "attempt"
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Implementation for cross account stream support in KDS Signed-off-by: Souvik Bose <souvbose@amazon.com>
Description
This PR is to add support for accessing cross account Kinesis Data streams. This will allow data prepper to securely consume data from streams belonging to different AWS accounts while maintaining proper access controls and performance optimization.
Add Stream ARN Support
stream_arnto allow explicit specification of cross-account stream ARNsarn:aws:kinesis:region:account-id:stream/<stream-name>Add Consumer ARN support
consumer_arnto allow explicit specification of cross-account stream ARNsarn:aws:kinesis:region:account-id:stream/stream-name/consumer/<consumer_arn>:<consumer epoch time>Issues Resolved
Resolves #1082
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.