Skip to content

Kds cross account stream#5687

Merged
dlvenable merged 7 commits intoopensearch-project:mainfrom
sb2k16:kds-cross-account-stream
May 13, 2025
Merged

Kds cross account stream#5687
dlvenable merged 7 commits intoopensearch-project:mainfrom
sb2k16:kds-cross-account-stream

Conversation

@sb2k16
Copy link
Copy Markdown
Member

@sb2k16 sb2k16 commented May 8, 2025

Description

This PR is to add support for accessing cross account Kinesis Data streams. This will allow data prepper to securely consume data from streams belonging to different AWS accounts while maintaining proper access controls and performance optimization.

Add Stream ARN Support

  • Add new optional field: stream_arn to allow explicit specification of cross-account stream ARNs
  • Format: arn:aws:kinesis:region:account-id:stream/<stream-name>

Add Consumer ARN support

  • Add new optional field: consumer_arn to allow explicit specification of cross-account stream ARNs
  • Format: arn:aws:kinesis:region:account-id:stream/stream-name/consumer/<consumer_arn>:<consumer epoch time>

Issues Resolved

Resolves #1082

Check List

  • New functionality includes testing.
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sb2k16 for this new feature! Some refactoring can help isolate the logic.

this.maxRetryCount = maxRetryCount;
}

public StreamIdentifier getStreamIdentifierFromStreamArn(final String streamArnString) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can make this package protected. Remove the public modifier.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on my other comments, I think this code can be refactored to have a single place for handling the stream identification.

return StreamIdentifier.multiStreamInstance(streamIdentifierString);
}

public String getConsumerArnForStream(final String streamArn, final String consumerName) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this is only used internally, so you can make it private.

try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e){
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You reset the interrupt here. But, does anything actually monitor the interrupt status? Maybe you should end the backoff/retry loop.

return streamName.equals(streamIdentifier.streamName());
}
return streamConfig.getName().equals(streamIdentifier.streamName());
}).findAny();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This findAny can return one if there are multiple matching. Should we have some check to ensure there is only one?


@JsonProperty("stream_arn")
@Valid
private String arn;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to getStreamArn to make it clearer what this is.


private KinesisStreamConfig getStreamConfig(final KinesisSourceConfig kinesisSourceConfig) {
final Optional<KinesisStreamConfig> kinesisStreamConfig = kinesisSourceConfig.getStreams().stream().filter(streamConfig -> streamConfig.getName().equals(streamIdentifier.streamName())).findAny();
final Optional<KinesisStreamConfig> kinesisStreamConfig = kinesisSourceConfig.getStreams().stream().filter(streamConfig -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a bit of split logic with how we ready the stream config and get the stream identifiers. I see other conditions based on the different options above in KinesisClientApiHandler and KinesisMultiStreamTracker.

There should be a good way to have a single class that can handle all the logic for determining the KinesisStreamConfig/StreamConfig for any stream or identifier. Maybe a class that maps all these on initialization?

Then you won't have to have all this split logic.

sbose2k21 added 3 commits May 12, 2025 08:44
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
dlvenable
dlvenable previously approved these changes May 12, 2025
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sb2k16 !

streamArn, ex.getMessage(), attempt + 1);
} else if (cause instanceof com.amazonaws.SdkClientException) {
log.error("AWS SDK client error while describing stream consumer for stream {}: {}. Attempt {}.",
streamArn, ex.getMessage(), attempt + 1);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need two different if/else cases for this? The message will show which exception it is, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The `ex.message() will show the exception. However, the line 145 is to have a different message in the logs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only differene here is the AWS SDK versus Kinesis API. I think @kkondaka is right that we don't need this.

This is the default Throwable::toString. I think this give us what we want.

public String toString() {
        String s = getClass().getName();
        String message = getLocalizedMessage();
        return (message != null) ? (s + ": " + message) : s;
    }

Something like this will work:

            log.error("AWS error while describing stream consumer for stream {}: {}. Attempt {}.",
                    streamArn, ex.toString(), attempt + 1);

try {
Thread.sleep(delayMillis);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This marks the current thread as interrupted until the next sleep. But, I really don't think we use that for anything.

} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Thread interrupted while waiting for retry", e);
throw new KinesisRetriesExhaustedException("Thread interrupted while waiting for retry");
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a good idea to throw an error for this? It is a very rare event. In the worst case, it should not count towards an "attempt"

Signed-off-by: Souvik Bose <souvbose@amazon.com>
Signed-off-by: Souvik Bose <souvbose@amazon.com>
@dlvenable dlvenable merged commit 49ef09d into opensearch-project:main May 13, 2025
37 of 47 checks passed
alparish pushed a commit to alparish/data-prepper that referenced this pull request May 22, 2025
Implementation for cross account stream support in KDS

Signed-off-by: Souvik Bose <souvbose@amazon.com>
@dlvenable dlvenable added this to the v2.12 milestone Jun 24, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support AWS Kinesis Data Streams as a Source

4 participants