Add support for lambda sink#4292
Conversation
| private final ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory().enable(YAMLGenerator.Feature.USE_PLATFORM_LINE_BREAKS)); | ||
| private String stsRoleArn; | ||
|
|
||
| final String LAMBDA_SINK_CONFIG_YAML = |
There was a problem hiding this comment.
Can we make this runtime input instead of hardcoded value?
| import static org.mockito.Mockito.verify; | ||
|
|
||
| @ExtendWith(MockitoExtension.class) | ||
| class LambdaSinkServiceIT { |
There was a problem hiding this comment.
How to run this test should be included in README but can come in a separate PR.
| private int maxConnectionRetries = DEFAULT_CONNECTION_RETRIES; | ||
|
|
||
| @JsonProperty("sync") | ||
| private Boolean sync = DEFAULT_INVOCATION; |
There was a problem hiding this comment.
QUES: is there reason we want async invocation as default?
There was a problem hiding this comment.
async provides a better way to communicate to external services
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| /** | ||
| * A buffer can hold in memory data and flushing it to S3. |
| * Upload accumulated data to s3 bucket. | ||
| */ | ||
| @Override | ||
| public void flushToLambda() { |
There was a problem hiding this comment.
e2e acknowledgment seems missing in this PR.
There was a problem hiding this comment.
e2e ack is handled. refer releaseEventHandles in the pr.
| } | ||
|
|
||
| /** | ||
| * Upload accumulated data to s3 bucket. |
| //reset buffer after flush | ||
| currentBuffer = bufferFactory.getBuffer(lambdaClient,functionName,invocationType); | ||
| } catch (final IOException e) { | ||
| LOG.error("Exception while completing codec", e); |
There was a problem hiding this comment.
You should do releaseEventHandles(false) in case of exceptionl
| private int maxRetries = 0; | ||
| private final Counter numberOfRecordsSuccessCounter; | ||
| private final Counter numberOfRecordsFailedCounter; | ||
| private final String SYNC_INVOCATION_TYPE = "RequestResponse"; |
There was a problem hiding this comment.
I think we discussed that sync doesn't make sense in case of lambda as sink. So, invocation type is not needed.
| dlqPushHandler.perform(pluginSetting,new LambdaSinkFailedDlqData(payload,errorMsgObj.get(),0)); | ||
| } | ||
| //release even if failed | ||
| releaseEventHandles(true); |
There was a problem hiding this comment.
Should be released with false if failed.
There was a problem hiding this comment.
I think it is considered successful if it is uploaded to dlq?
There was a problem hiding this comment.
But what if DLQ is not configured?
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
There was a problem hiding this comment.
Maybe we should move this to common directory so that other sinks can use this future?
There was a problem hiding this comment.
can refactor be part of another PR?
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
||
| public class JsonCodec implements OutputCodec { |
There was a problem hiding this comment.
As discussed, let's rename it to be more specific to lambda
c63dd00 to
dddb205
Compare
| aws: | ||
| region: "us-east-1" | ||
| sts_role_arn: "<arn>" | ||
| function_name: "uploadToS3Lambda" |
There was a problem hiding this comment.
Did we not discuss about passing arguments to lambda function? Do you not see any need for that option?
| reentrantLock.lock(); | ||
| try { | ||
| for (Record<Event> record : records) { | ||
| final Event event = record.getData(); |
There was a problem hiding this comment.
We probably need to support sending event metadata as well?
There was a problem hiding this comment.
yes we could, we can add this incrementally?
| dlqPushHandler.perform(pluginSetting,new LambdaSinkFailedDlqData(payload,errorMsgObj.get(),0)); | ||
| } | ||
| //release even if failed | ||
| releaseEventHandles(true); |
There was a problem hiding this comment.
But what if DLQ is not configured?
| * A buffer can hold in memory data and flushing it. | ||
| */ | ||
| public class InMemoryBuffer implements Buffer { | ||
|
|
There was a problem hiding this comment.
Is it not possible to use existing BufferAccumulator ?
There was a problem hiding this comment.
Interface definitions are different. Will not be able to use the existing one
| import java.util.Objects; | ||
|
|
||
| public class LambdaJsonCodec implements OutputCodec { | ||
| private final ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
How is this different from Json Output Codec we already have? Can we re-use it somehow?
There was a problem hiding this comment.
i tried to use existing one but it is not possible without making a whole bunch of changes specific to lambda, so i chose to have a new one instead.
Signed-off-by: srigovs <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srikanthjg123@gmail.com>
4e4aeb8 to
48a17fb
Compare
|
Hi @srikanthjg, I know that the chance of catching you on this merged PR are slim, but I wanted to ask if you had a pointer to any documentation of the lambda event type and return type that's used for a lambda sink. https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/aws-lambda/#aws-lambda-processor-configuration doesn't really convey the shape of the lambda event or response unless I'm missing something. If there's something that can be pointed at, I'd be happy to ask for it to be included in https://github.com/DefinitelyTyped/DefinitelyTyped/tree/master/types/aws-lambda/trigger to help those of us using TS for these. Thanks much for any help, Dave |
Description
Add support for lambda sink
Issues Resolved
Resolves #4170
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.