Skip to content

Add SQS sink#5628

Merged
sb2k16 merged 3 commits intoopensearch-project:mainfrom
kkondaka:sqs-sink
Apr 29, 2025
Merged

Add SQS sink#5628
sb2k16 merged 3 commits intoopensearch-project:mainfrom
kkondaka:sqs-sink

Conversation

@kkondaka
Copy link
Copy Markdown
Collaborator

@kkondaka kkondaka commented Apr 18, 2025

Description

Add SQS sink

  • supports batching of events before sending to SQS
  • Supports only json output codec
  • Supports sending failed to DLQ
  • Added unit tests and integration tests
    FIFO queue support will be added in a future PR

Issues Resolved

Resolves #5634

Check List

  • [X ] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [X ] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

* SPDX-License-Identifier: Apache-2.0
*/

plugins {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need these lines.


dependencies {
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(path: ':data-prepper-api')
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove the path: from lines 28-33

}
}

test {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this section. The parent provides it.


@Override
public void doInitialize() {
sinkInitialized = Boolean.TRUE;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
sinkInitialized = Boolean.TRUE;
sinkInitialized = true;

import java.util.function.BiConsumer;

public class SqsSinkBatch {
public static final int MAX_ENTRIES_PER_BATCH = 10;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final int MAX_ENTRIES_PER_BATCH = 10;
public static final int MAX_MESSAGES_PER_BATCH = 10;

Let's use precise terms as this can be confusing.

}
}

public abstract void pushFailedObjectsToDlq(Object failedStatus);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These abstract methods should all be protected instead of public.

public boolean perform(final List<DlqObject> dlqObjects) {
try {
if (dlqWriter != null && dlqObjects != null && dlqObjects.size() > 0) {
for (DlqObject dlqObject: dlqObjects) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this about?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some leftover code. Removed it.


public class SqsThresholdConfig {
public static final int DEFAULT_MESSAGES_PER_EVENT = 25;
public static final String DEFAULT_MAX_MESSAGE_SIZE = "256kb";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static final String DEFAULT_MAX_MESSAGE_SIZE = "256kb";
public static final ByteCount DEFAULT_MAX_MESSAGE_SIZE = ByteCount.parse("256kb");

Use ByteCount.

private int maxEventsPerMessage = DEFAULT_MESSAGES_PER_EVENT;

@JsonProperty("max_message_size")
private String maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
private ByteCount maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;

Use ByteCount instead of String.

private static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis();


public void execute(Collection<Record<Event>> records) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an abstract template design. In general, it is better to favor composition over inheritance. Rather than introduce this highly specific approach, we would benefit from composable constructs that can help with common situations. For example, have strategies for flushing, for retries, etc.

* and resources.
*/
public class AwsConfig {
public static final int DEFAULT_CONNECTION_ATTEMPTS = 5;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this required for? It is not being used in any of the below parameters.

*/
@DataPrepperPlugin(name = "json", pluginType = OutputCodec.class, pluginConfigurationType = JsonOutputCodecConfig.class)
public class JsonOutputCodec implements OutputCodec {
private final int OVERHEAD_BYTES = 16;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please explain the reason for this.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the bytes added by start and complete functions. The number of overhead bytes is always fixed and it is actually slightly less than 16.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, this implementation will change soon. David has a PR coming up

violationRules {
rule {
limit {
minimum = 0.90
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 1.0

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We follow 100% coverage for API packages but not for source/sinks/processors.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be our goal. If we can get 100%, let's do it.

sinkInitialized = false;
final PluginModel codecConfiguration = sqsSinkConfig.getCodec();
final PluginSetting codecPluginSettings;
if (codecConfiguration != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing a test for lines 64-73

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

64-71 is being tested. Only the else part is not being tested. Will add a test case.

}

@Test
void TestBasic() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the test names start with lowercase?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we follow any conventions strictly for naming

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
dlvenable
dlvenable previously approved these changes Apr 29, 2025
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kkondaka , I have a few comments. You can address now or in a follow on.

violationRules {
rule {
limit {
minimum = 0.90
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be our goal. If we can get 100%, let's do it.

codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(),
codecConfiguration.getPluginSettings());
} else {
codecPluginSettings = new PluginSetting("ndjson", Map.of());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also require that the threshold max_events_per_message is set in this case.

This could should then set the max_events_per_message to 1 because that is what this would be.

sb2k16
sb2k16 previously approved these changes Apr 29, 2025
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@kkondaka kkondaka dismissed stale reviews from sb2k16 and dlvenable via fc6a7fe April 29, 2025 19:35
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
@sb2k16 sb2k16 merged commit d40a066 into opensearch-project:main Apr 29, 2025
69 of 74 checks passed
@kkondaka kkondaka added this to the v2.12 milestone Jun 24, 2025
@kkondaka kkondaka deleted the sqs-sink branch July 1, 2025 17:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add SQS sink to Data Prepper

3 participants