Add Lambda Synchronous processor support#4700
Add Lambda Synchronous processor support#4700dlvenable merged 7 commits intoopensearch-project:mainfrom
Conversation
| testImplementation project(':data-prepper-plugins:parse-json-processor') | ||
| testImplementation 'org.powermock:powermock-module-junit4:2.0.9' | ||
| testImplementation 'org.powermock:powermock-api-mockito2:2.0.9' | ||
| testImplementation 'junit:junit:4.13.2' |
There was a problem hiding this comment.
You shouldn't need any of these four lines. They are provided by the root project.
879671f to
96615fc
Compare
67f4a1d to
98b27af
Compare
| import software.amazon.awssdk.core.retry.RetryPolicy; | ||
| import software.amazon.awssdk.services.lambda.LambdaClient; | ||
|
|
||
| public final class LambdaClientFactory { |
There was a problem hiding this comment.
Did you explore the possibility of using one LambdaClientFactory class? I see one class with that name in lambda sink directory
There was a problem hiding this comment.
sure i can merge the two and move it to common.
| if (mode != null && mode.equalsIgnoreCase(LambdaProcessorConfig.SYNCHRONOUS_MODE)) { | ||
| invocationType = SYNC_INVOCATION_TYPE; | ||
| } else { | ||
| throw new RuntimeException("mode has to be synchronous or asynchronous"); |
There was a problem hiding this comment.
Something like "unsupported mode {}", mode is better message here.
98b27af to
2fe00e8
Compare
|
@srikanthjg white source check is failing. I am ready to approve this. |
| @JsonProperty("max_retries") | ||
| private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
|
||
| @JsonProperty("mode") |
There was a problem hiding this comment.
The term "mode" is quite ambiguous. I think we can borrow the term "invocation_type" from AWS Lambda itself.
https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html#API_Invoke_RequestSyntax
| throw new RuntimeException("Unsupported mode " + mode); | ||
| } | ||
|
|
||
| codec = new LambdaJsonCodec(batchKey); |
There was a problem hiding this comment.
This approach is very restrictive. It assumes that the body fits the format we ask. We should follow the same pattern used elsewhere in Data Prepper by allowing for a pluggable output codec.
There was a problem hiding this comment.
LambdaJsonCodec is an implementation of OutputCodec - link . I needed this specifically for batch processing when more than one event needs to be mapped to json. JsonOutputCodec is only event at a time. Maybe i can change the name to something more generic like BulkJsonOutputCodec or BatchJsonOutputCodec? It will be used for all dial out processors. In s3 sink we use BufferedCodec, but the only implementation is Parquet currently and the way we want to implement for lambda is different.
There was a problem hiding this comment.
All OutputCodecs are made for batches.
Customers can use either JsonOutputCodec or NdjsonOutputCodec.
The json codec already supports a configurable key name. This would replace the batch_key which you don't need.
codec:
json:
key_name: myKey
There was a problem hiding this comment.
So the customer can have two options:
- JSON
codec:
json:
key_name: myKey
Yields:
{
"myKey" : [
{ ...event1... },
{ ...event2... },
{ ...event3... }
]
}
- The customer can use
ndjson
codec:
ndjson:
Yields:
{ ...event1... }
{ ...event2... }
{ ...event3... }
There was a problem hiding this comment.
Bulk will always need a key as it will be considered one payload, so i guess ndJson cannot be used.
There is also a difference when it comes to handling single event without batch. In this case, i still want to convert dataprepper event to json but i dont want to have a key, i want to pass on the user's data as it is to lambda as payload; but current output codec forces me to have a key. To address that, i either need to add new behaviour to json writeEvent method, to convert event directly to json OR write a new codec(which is what i did).
The behaviour i want seem to be a combination of the 2 codecs - ndjson and json. i want json behaviour for bulk and ndjson behaviour when for single event.
There was a problem hiding this comment.
Bulk will always need a key as it will be considered one payload, so i guess ndJson cannot be used.
Yes, this makes sense. The payload needs to be JSON and ND-JSON with multiple events becomes non-JSON.
There is also a difference when it comes to handling single event without batch.
Actually, an ndjson which writes a single event gives you exactly what you want in this case. It is exactly the same output.
I also see that you are trying to support the concept of calling a Lambda for each event. Improving the configuration can help make this clearer. Right now there are multiple configurations which the user needs to carefully set to get the desired output.
This is a simpler way to configure it.
- To have a single invocation per event, add a boolean flag. The user need not make any more decisions.
aws_lambda:
function_name: MyFunction
invocation_per_event: true
- The default should be to batch, and this can have the existing defaults. You can probably keep this configuration the same. Though, rename
batch_keytokey_namefor consistency with the other APIs. Also, disallow setting an event size of1as this is not the goal of this approach.
aws_lambda:
function:name: MyFunction
Second, you can still use the existing codecs.
When invocation_per_event is set to true, you can use the NdJsonInputCodec internally. Otherwise, use the JsonCodec and provide the batch.key_name as the keyName in the codec configuration.
There was a problem hiding this comment.
regarding the configuration, i already have "invocation_type" as a configuration, this allows to set per event invocation or batch invocation(RequestResponse or Event).
If we are implementing this internally, i cannot use parse-json-processor as a plugin but will have to take a dependency on it. Is it ok for one processor to take a dependency on the other?
I wanted to avoid this, hence went with implementing a custom codec. But i think this codec can also be used by other dial-out processors eventually, i can make it generic.
|
|
||
| public class LambdaProcessorConfig { | ||
|
|
||
| public static final String SYNCHRONOUS_MODE = "RequestResponse"; |
There was a problem hiding this comment.
We should stick with Data Prepper naming conventions: request_response.
| public class LambdaProcessorConfig { | ||
|
|
||
| public static final String SYNCHRONOUS_MODE = "RequestResponse"; | ||
| public static final String ASYNCHRONOUS_MODE = "Event"; |
2fe00e8 to
426a023
Compare
426a023 to
e7ff721
Compare
|
|
||
| public class LambdaProcessorConfig { | ||
|
|
||
| public static final String REQUEST_RESPONSE = "RequestResponse"; |
There was a problem hiding this comment.
Let's use request-response to match our existing naming conventions.
| public class LambdaProcessorConfig { | ||
|
|
||
| public static final String REQUEST_RESPONSE = "RequestResponse"; | ||
| public static final String EVENT = "Event"; |
There was a problem hiding this comment.
Let's use event to match our existing naming conventions.
|
|
||
| public class LambdaSinkConfig { | ||
|
|
||
| public static final String REQUEST_RESPONSE = "RequestResponse"; |
There was a problem hiding this comment.
Let's consolidate these constant values with the LambdaProcessorConfig so that they don't diverge.
| public static final String EVENT = "Event"; | ||
| public static final String BATCH_EVENT = "batch_event"; | ||
| public static final String SINGLE_EVENT = "single_event"; | ||
| public static final String REQUEST_RESPONSE = "request-response"; |
There was a problem hiding this comment.
Perhaps make a CommonLambdaConfig class that has these constants. We should avoid duplicating these or we may have future mismatches.
88a6549 to
8d5633d
Compare
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @srikanthjg for this contribution!
| maximum_size: 3mb | ||
| ``` | ||
|
|
||
| `invocation_type` as RequestResponse will be used when the response from aws lambda comes back to dataprepper. |
There was a problem hiding this comment.
nit:
invocation_type as RequestResponse is used when DataPrepper needs to process the response from AWS Lambda.
invocation_type as Event is used when the response from AWS Lambda is sent to an S3 bucket.
|
|
||
| In batch options, an implicit batch threshold option is that if events size is 3mb, we flush it. | ||
| `payload_model` this is used to define how the payload should be constructed from a dataprepper event. | ||
| `payload_model` as batch_event is used when the output needs to be formed as a batch of multiple events, |
There was a problem hiding this comment.
are there other values for paylod_model ?
|
|
||
| @ParameterizedTest | ||
| @ValueSource(ints = {1,3}) | ||
| void verify_records_to_lambda_success(final int recordCount) throws Exception { |
There was a problem hiding this comment.
consider adding test for InvocationType Event
There was a problem hiding this comment.
invocation type event will be disabled for now, will be releasing event type with asynchronous support that requires additional infra changes. I have disabled it in the verification for now, ll fix the readme.
|
|
||
| return LambdaClient.builder() | ||
| .region(lambdaSinkConfig.getAwsAuthenticationOptions().getAwsRegion()) | ||
| .region(awsAuthenticationOptions.getAwsRegion()) |
There was a problem hiding this comment.
Consider enabling SDK metrics to track number of request, timeout, throttle etc.
...mbda/src/main/java/org/opensearch/dataprepper/plugins/lambda/common/config/BatchOptions.java
Show resolved
Hide resolved
| codec = new NdjsonOutputCodec(ndjsonOutputCodecConfig); | ||
| isBatchEnabled = false; | ||
| } else{ | ||
| throw new RuntimeException("invalid payload_model option"); |
There was a problem hiding this comment.
Can this validation be part of lambdaProcessorConfig ?
|
|
||
| if(!lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.EVENT) && | ||
| !lambdaProcessorConfig.getInvocationType().equals(LambdaCommonConfig.REQUEST_RESPONSE)){ | ||
| throw new RuntimeException("Unsupported invocation type " + lambdaProcessorConfig.getInvocationType()); |
| if (currentBuffer.getEventCount() == 0) { | ||
| codec.start(currentBuffer.getOutputStream(), event, codecContext); | ||
| } | ||
| codec.writeEvent(event, currentBuffer.getOutputStream()); |
There was a problem hiding this comment.
yes this is running in the context of a processor.
|
|
||
| } | ||
|
|
||
| void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException { |
|
|
||
| void flushToLambdaIfNeeded(List<Record<Event>> resultRecords) throws InterruptedException, IOException { | ||
|
|
||
| LOG.info("Flush to Lambda check: currentBuffer.size={}, currentBuffer.events={}, currentBuffer.duration={}", currentBuffer.getSize(), currentBuffer.getEventCount(), currentBuffer.getDuration()); |
There was a problem hiding this comment.
Is there excessive logging in this method ?
There was a problem hiding this comment.
sure will reduce them.
| } | ||
| } | ||
|
|
||
| LambdaResult retryFlushToLambda(Buffer currentBuffer, final AtomicReference<String> errorMsgObj) throws InterruptedException { |
| return lambdaResult; | ||
| } | ||
|
|
||
| Event convertLambdaResponseToEvent(InvokeResponse lambdaResponse) { |
| Map<String, String> invocationTypeMap = Map.of( | ||
| LambdaCommonConfig.EVENT, EVENT_LAMBDA | ||
| ); |
| this.bufferFactory = new InMemoryBufferFactory(); | ||
| try { | ||
| currentBuffer = this.bufferFactory.getBuffer(lambdaClient, functionName, invocationType); |
There was a problem hiding this comment.
The buffer is overloaded in DataPrepper. Looks like this is not just buffer but tightly coupled with lambda. We should consider renaming this class and interface to be clear.
There was a problem hiding this comment.
i am handling the same way we would do it in the sink. i can address refactor in another pr.
| } catch (AwsServiceException | SdkClientException e) { | ||
| errorMsgObj.set(e.getMessage()); | ||
| LOG.error("Exception occurred while uploading records to lambda. Retry countdown : {} | exception:", retryCount, e); | ||
| --retryCount; |
There was a problem hiding this comment.
is this retry on top of lambda client retry ? Any reason we need this ?
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
57603ff to
21ee56e
Compare
| private String invocationType = REQUEST_RESPONSE; | ||
|
|
||
| @JsonProperty("payload_model") | ||
| private String payloadModel = BATCH_EVENT; |
There was a problem hiding this comment.
We should make a Java enum for this.
Here is an example:
I'm ok with doing this in a follow-on PR.
| private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
|
||
| @JsonProperty("invocation_type") | ||
| private String invocationType = REQUEST_RESPONSE; |
There was a problem hiding this comment.
We should make this an enum as well.
| private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
|
||
| @JsonProperty("invocation_type") | ||
| private String invocationType = EVENT; |
| private String invocationType = EVENT; | ||
|
|
||
| @JsonProperty("payload_model") | ||
| private String payloadModel = BATCH_EVENT; |
|
Did you look into moving some of the null check validations from the plugin into Config ? Other than this the changes look good to me |
dlvenable
left a comment
There was a problem hiding this comment.
@srikanthjg , Thank you for this contribution. I have a few other changes we should try to get in to improve it. But, let's follow-on in another PR.
|
|
||
| public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_SUCCESS = "lambdaProcessorObjectsEventsSucceeded"; | ||
| public static final String NUMBER_OF_RECORDS_FLUSHED_TO_LAMBDA_FAILED = "lambdaProcessorObjectsEventsFailed"; | ||
| public static final String LAMBDA_LATENCY_METRIC = "lambdaProcessorLatency"; |
There was a problem hiding this comment.
We can rename this to simply requestLatency. Or you could call it lambdaRequestLatency, but that seems unnecessary.
As I read the code, this is the time to make the request to Lambda regardless of it being request/response or event.
Add Lambda Processor Synchronous Mode support Make LambdaClientFactory common to sink and processor Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Description
Adds AWS lambda as a remote processor for dataprepper.
Further details mentioned in #4699
Issues Resolved
Resolves #4699
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.