Conversation
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| plugins { |
|
|
||
| dependencies { | ||
| implementation project(':data-prepper-plugins:aws-plugin-api') | ||
| implementation project(path: ':data-prepper-api') |
There was a problem hiding this comment.
Remove the path: from lines 28-33
| } | ||
| } | ||
|
|
||
| test { |
There was a problem hiding this comment.
Remove this section. The parent provides it.
|
|
||
| @Override | ||
| public void doInitialize() { | ||
| sinkInitialized = Boolean.TRUE; |
There was a problem hiding this comment.
| sinkInitialized = Boolean.TRUE; | |
| sinkInitialized = true; |
| import java.util.function.BiConsumer; | ||
|
|
||
| public class SqsSinkBatch { | ||
| public static final int MAX_ENTRIES_PER_BATCH = 10; |
There was a problem hiding this comment.
| public static final int MAX_ENTRIES_PER_BATCH = 10; | |
| public static final int MAX_MESSAGES_PER_BATCH = 10; |
Let's use precise terms as this can be confusing.
| } | ||
| } | ||
|
|
||
| public abstract void pushFailedObjectsToDlq(Object failedStatus); |
There was a problem hiding this comment.
These abstract methods should all be protected instead of public.
| public boolean perform(final List<DlqObject> dlqObjects) { | ||
| try { | ||
| if (dlqWriter != null && dlqObjects != null && dlqObjects.size() > 0) { | ||
| for (DlqObject dlqObject: dlqObjects) { |
There was a problem hiding this comment.
Some leftover code. Removed it.
|
|
||
| public class SqsThresholdConfig { | ||
| public static final int DEFAULT_MESSAGES_PER_EVENT = 25; | ||
| public static final String DEFAULT_MAX_MESSAGE_SIZE = "256kb"; |
There was a problem hiding this comment.
| public static final String DEFAULT_MAX_MESSAGE_SIZE = "256kb"; | |
| public static final ByteCount DEFAULT_MAX_MESSAGE_SIZE = ByteCount.parse("256kb"); |
Use ByteCount.
| private int maxEventsPerMessage = DEFAULT_MESSAGES_PER_EVENT; | ||
|
|
||
| @JsonProperty("max_message_size") | ||
| private String maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; |
There was a problem hiding this comment.
| private String maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; | |
| private ByteCount maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE; |
Use ByteCount instead of String.
| private static final long MAXIMUM_DELAY_MS = Duration.ofMinutes(10).toMillis(); | ||
|
|
||
|
|
||
| public void execute(Collection<Record<Event>> records) { |
There was a problem hiding this comment.
This is an abstract template design. In general, it is better to favor composition over inheritance. Rather than introduce this highly specific approach, we would benefit from composable constructs that can help with common situations. For example, have strategies for flushing, for retries, etc.
| * and resources. | ||
| */ | ||
| public class AwsConfig { | ||
| public static final int DEFAULT_CONNECTION_ATTEMPTS = 5; |
There was a problem hiding this comment.
What is this required for? It is not being used in any of the below parameters.
| */ | ||
| @DataPrepperPlugin(name = "json", pluginType = OutputCodec.class, pluginConfigurationType = JsonOutputCodecConfig.class) | ||
| public class JsonOutputCodec implements OutputCodec { | ||
| private final int OVERHEAD_BYTES = 16; |
There was a problem hiding this comment.
could you please explain the reason for this.
There was a problem hiding this comment.
It is the bytes added by start and complete functions. The number of overhead bytes is always fixed and it is actually slightly less than 16.
There was a problem hiding this comment.
BTW, this implementation will change soon. David has a PR coming up
| violationRules { | ||
| rule { | ||
| limit { | ||
| minimum = 0.90 |
There was a problem hiding this comment.
We follow 100% coverage for API packages but not for source/sinks/processors.
There was a problem hiding this comment.
It should be our goal. If we can get 100%, let's do it.
| sinkInitialized = false; | ||
| final PluginModel codecConfiguration = sqsSinkConfig.getCodec(); | ||
| final PluginSetting codecPluginSettings; | ||
| if (codecConfiguration != null) { |
There was a problem hiding this comment.
I think we are missing a test for lines 64-73
There was a problem hiding this comment.
64-71 is being tested. Only the else part is not being tested. Will add a test case.
| } | ||
|
|
||
| @Test | ||
| void TestBasic() { |
There was a problem hiding this comment.
Should the test names start with lowercase?
There was a problem hiding this comment.
I don't think we follow any conventions strictly for naming
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
| violationRules { | ||
| rule { | ||
| limit { | ||
| minimum = 0.90 |
There was a problem hiding this comment.
It should be our goal. If we can get 100%, let's do it.
| codecPluginSettings = new PluginSetting(codecConfiguration.getPluginName(), | ||
| codecConfiguration.getPluginSettings()); | ||
| } else { | ||
| codecPluginSettings = new PluginSetting("ndjson", Map.of()); |
There was a problem hiding this comment.
We should also require that the threshold max_events_per_message is set in this case.
This could should then set the max_events_per_message to 1 because that is what this would be.
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Description
Add SQS sink
FIFO queue support will be added in a future PR
Issues Resolved
Resolves #5634
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.