Address Scale Items for lambda plugin#5032
Address Scale Items for lambda plugin#5032kkondaka merged 10 commits intoopensearch-project:mainfrom
Conversation
6bd7c07 to
3840423
Compare
| } | ||
| codec.writeEvent(event, currentBuffer.getOutputStream()); | ||
| int count = currentBuffer.getEventCount() + 1; | ||
| LOG.info("CurrentBuffer event count: {}", count); |
There was a problem hiding this comment.
Will this get logged for every event? If so it should be debug
There was a problem hiding this comment.
Thanks for noticing that, i will make the noisy logs debug level.
|
|
||
| public void flushToLambdaIfNeeded(List<Record<Event>> resultRecords, boolean forceFlush) { | ||
|
|
||
| LOG.info("currentBufferEventCount:{}, maxEvents:{}, maxBytes:{}, maxCollectionDuration:{}, isBatch:{}, forceFlush:{} ", currentBuffer.getEventCount(),maxEvents,maxBytes,maxCollectionDuration,isBatchEnabled, forceFlush); |
There was a problem hiding this comment.
This seems like a helpful log, but if it may be noisy as INFO if this is called for every Event
| // Handle future | ||
| CompletableFuture<Void> processingFuture = future.thenAccept(response -> { | ||
| handleLambdaResponse(response); | ||
| LOG.info("Successfully flushed {} events", eventCount); |
There was a problem hiding this comment.
Same here on this being a noisy log
|
|
||
| public void resetBuffer() { | ||
| try { | ||
| LOG.info("Resetting buffer"); |
There was a problem hiding this comment.
Is this helpful to have as info or can it be debug?
| isBatchEnabled = true; | ||
| LOG.info("maxEvents:" + maxEvents + " maxbytes:" + maxBytes + " maxDuration:" + maxCollectionDuration); | ||
| } else if(payloadModel.equals(SINGLE_EVENT)) { | ||
| LOG.info("Single events"); |
| codec, | ||
| codecContext, | ||
| isBatchEnabled, | ||
| whenCondition, | ||
| maxEvents, | ||
| maxBytes, | ||
| maxCollectionDuration, | ||
| isSink, | ||
| dlqPushHandler, | ||
| pluginSetting); |
There was a problem hiding this comment.
Can we put this all into a single config class to consolidate?
| return records; | ||
| } | ||
|
|
||
| LOG.info("Received " + records.size() + "records to lambda Processor" ); |
| } | ||
| } | ||
|
|
||
| LOG.info("Force Flushing the remaining {} events in the buffer", lambdaCommonHandler.getCurrentBuffer().getEventCount()); |
| @JsonProperty("payload_model") | ||
| private String payloadModel = BATCH_EVENT; | ||
|
|
||
| @JsonProperty("sdk_timeout") |
There was a problem hiding this comment.
You should add the @JsonPropertyDescription annotation to all of these config parameters.
There was a problem hiding this comment.
Why do you call this sdk_timeout? This seems more related to the implementation than what is actually timing out. We should probably look at using socket_timeout or connect_timeout.
| lambdaCommonHandler.resetBuffer(); | ||
| } | ||
| } | ||
| LOG.info("Force Flushing the remaining {} events in the buffer", lambdaCommonHandler.getCurrentBuffer().getEventCount()); |
abc2c61 to
547dcfd
Compare
|
|
||
| }).exceptionally(throwable -> { | ||
| LOG.error(NOISY, "Exception occurred while invoking Lambda. Function: {} | Exception: ", functionName, throwable); | ||
| handleFailure(throwable); |
There was a problem hiding this comment.
Need a metric to keep track of failures.
| public void handleLambdaResponse(InvokeResponse response) { | ||
| int statusCode = response.statusCode(); | ||
| if (statusCode < 200 || statusCode >= 300) { | ||
| LOG.warn("Lambda invocation returned with non-success status code: {}", statusCode); |
| Thread.sleep(Duration.ofSeconds(10).toMillis()); | ||
| } | ||
| } No newline at end of file | ||
| ///* |
There was a problem hiding this comment.
Why is this file commented out?
| /* | ||
| * Release events per batch | ||
| */ | ||
| public void releaseEventHandles(final boolean result, List<EventHandle> bufferedEventHandles) { |
There was a problem hiding this comment.
I think this should not be in common code. Processor doesn't need to explicitly release event handles.
| private static final int DEFAULT_CONNECTION_RETRIES = 3; | ||
| public class LambdaProcessorConfig { | ||
| //Ensures 1:1 mapping of events input to lambda and response from lambda | ||
| public static final String STRICT = "strict"; |
There was a problem hiding this comment.
Please use enums for these. This is important for correct support for validations and schemas.
Here is an example:
| @JsonProperty("payload_model") | ||
| private String payloadModel = BATCH_EVENT; | ||
|
|
||
| @JsonProperty("sdk_timeout") |
There was a problem hiding this comment.
Why do you call this sdk_timeout? This seems more related to the implementation than what is actually timing out. We should probably look at using socket_timeout or connect_timeout.
...src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java
Outdated
Show resolved
Hide resolved
| } | ||
| //Reset Buffer | ||
| LOG.debug("currentBufferPerBatchEventCount:{}, maxEvents:{}, maxBytes:{}, maxCollectionDuration:{}, forceFlush:{} ", currentBufferPerBatch.getEventCount(),maxEvents,maxBytes,maxCollectionDuration, forceFlush); | ||
| if (forceFlush || ThresholdCheck.checkThresholdExceed(currentBufferPerBatch, maxEvents, maxBytes, maxCollectionDuration)) { |
There was a problem hiding this comment.
We should be able to handle this in common with the sink with some slight refactoring.
There was a problem hiding this comment.
flush method looks very similar for both sink and processor and i tried combining that in the earlier PR, but we need to handle release of events differently for sink. For that i introduced isSink flag. But Comment from Krishna was that common code should not have flag like that and i think agree with his point; and now reverted it back to both processor and sink having their own flush methods.
There was a problem hiding this comment.
I agree that a flag is not ideal. But, you can have an injectable strategy. We can resolve this later though.
| LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}",parsedEvents.size(),flushedBuffer.getEventCount(),flushedBuffer.getSize()); | ||
| // Check if the response is a JSON array and the codec is JSON | ||
| if (isJsonCodec) { | ||
| if (parsedEvents.size() == flushedBuffer.getEventCount()) { |
There was a problem hiding this comment.
We should use the strategy pattern to inject code for handling the parsed events based on aggregate or strict. Then you don't need this large conditional.
Something like this:
responseStrategy.handleEvents(parsedEvents, originalRecords);
Also, this will grow with some upcoming changes to support aggregations in end-to-end acknowledgements.
|
|
||
| LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}",parsedEvents.size(),flushedBuffer.getEventCount(),flushedBuffer.getSize()); | ||
| // Check if the response is a JSON array and the codec is JSON | ||
| if (isJsonCodec) { |
There was a problem hiding this comment.
This condition does not belong here. We should verify that the codec is one we support in the constructor. This allows the processor validation to fail fast and give the user quicker feedback.
...ambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java
Outdated
Show resolved
Hide resolved
...ambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessor.java
Outdated
Show resolved
Hide resolved
| private Duration sdkTimeout = DEFAULT_SDK_TIMEOUT; | ||
| @JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda") | ||
| @JsonProperty("response_cardinality") | ||
| private String responseCardinality; |
There was a problem hiding this comment.
| private String responseCardinality; | |
| private ResponseCardinality responseCardinality; |
You will need to make other changes.
| } | ||
| } | ||
|
|
||
| // Default value is STRICT |
There was a problem hiding this comment.
| // Default value is STRICT | |
| @JsonCreator |
This will work better with schema validation.
| this.value = value; | ||
| } | ||
|
|
||
| public String getValue() { |
There was a problem hiding this comment.
| public String getValue() { | |
| @JsonCreator | |
| public String getValue() { |
This will allow this to work well with the schema generation as well.
| this.awsLambdaValue = awsLambdaValue; | ||
| } | ||
|
|
||
| public String getUserInputValue() { |
There was a problem hiding this comment.
| public String getUserInputValue() { | |
| @JsonValue | |
| public String getUserInputValue() { |
This is needed to work with schema generation.
| } | ||
| } | ||
|
|
||
| public static InvocationType fromString(String value) { |
There was a problem hiding this comment.
| public static InvocationType fromString(String value) { | |
| @JsonCreator | |
| public static InvocationType fromString(String value) { |
This is important for pipeline validation.
| } | ||
| //Reset Buffer | ||
| LOG.debug("currentBufferPerBatchEventCount:{}, maxEvents:{}, maxBytes:{}, maxCollectionDuration:{}, forceFlush:{} ", currentBufferPerBatch.getEventCount(),maxEvents,maxBytes,maxCollectionDuration, forceFlush); | ||
| if (forceFlush || ThresholdCheck.checkThresholdExceed(currentBufferPerBatch, maxEvents, maxBytes, maxCollectionDuration)) { |
There was a problem hiding this comment.
I agree that a flag is not ideal. But, you can have an injectable strategy. We can resolve this later though.
| LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(), flushedBuffer.getSize()); | ||
|
|
||
| responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer); | ||
| // if (parsedEvents.size() == flushedBuffer.getEventCount()) { |
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com> Refactor aws lambda plugin to have a class for common methods between processor and sink Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com> Add support for lambda async client in lambda sink Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
…onse codec Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
…nality enums; Change reponse_processing_mode option to response_cardinality Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
7559f42 to
81a30c9
Compare
|
|
||
| @JsonCreator | ||
| public static InvocationType fromString(String value) { | ||
| return INVOCATION_TYPE_MAP.get(value.toLowerCase()); |
There was a problem hiding this comment.
Please remove the toLowerCase(). We want to be sure that the values provided by the user are the expected values.
|
|
||
| static { | ||
| for (InvocationType type : InvocationType.values()) { | ||
| INVOCATION_TYPE_MAP.put(type.getUserInputValue().toLowerCase(), type); |
There was a problem hiding this comment.
Please remove the toLowerCase() here.
|
|
||
| static { | ||
| for (ResponseCardinality type : ResponseCardinality.values()) { | ||
| RESPONSE_CARDINALITY_MAP.put(type.getValue().toLowerCase(), type); |
| if (value == null) { | ||
| return STRICT; | ||
| } | ||
| return RESPONSE_CARDINALITY_MAP.getOrDefault(value.toLowerCase(), STRICT); |
There was a problem hiding this comment.
Please remove .toLowerCase().
Also, do not get the default. This will result in this being valid:
response_cardinality: oops!
|
|
||
| @JsonCreator | ||
| public static ResponseCardinality fromString(String value) { | ||
| if (value == null) { |
There was a problem hiding this comment.
Do not return a default value for null here. This can result in odd behavior such as the following being allowed.
response_cardinality: null
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Description
Following are the changes made:
Address Scale issues:
1.1 We will have a asynchronous call to lambda at a batch level, ie, we could send multiple batches to lambda at the
same time. We will wait for the futures only after the entire set of records that was received by the processors
are done.
1.2 Handle metrics and buffer per batch based on futures processing.
payload_modeland will no more support SINGLE_EVENT, will only support BATCH based calls by defaultAcknowledgements:
5. Address Acknowledgements for processor and sink.
For Processors:
5.1. When a batch of N events that is configured by the user (N could be <=pipeline batch) ie, request to lambda contains N events in a batch , the lambda could return back N responses or M responses(N!=M). When N responses are sent as a json array, we resuse the original records and clear the old event data and populate it with the response from lambda, that way the acknowledgement set need not be changed.
5.2 When M responses(N!=M), we create new events and populate them to the original acknowledgement set. The older events are also retained in the ack set but will be released by core later.
Handling Failure:
6. Address failures at process the events, the events in the processor will be tagged and forwarded. This processor will NOT drop events on failure.
7. Lambda sink will send to DLQ on failure and will acknowledge as true. If a dlq is not setup, we will send a negative acknowledgement.
Refactor:
8. Refactor aws lambda plugin to have a class for common methods between processor and sink.
Issues Resolved
Resolves #5031
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.