Skip to content

Address Scale Items for lambda plugin#5032

Merged
kkondaka merged 10 commits intoopensearch-project:mainfrom
srikanthjg:lambda-async-client
Oct 29, 2024
Merged

Address Scale Items for lambda plugin#5032
kkondaka merged 10 commits intoopensearch-project:mainfrom
srikanthjg:lambda-async-client

Conversation

@srikanthjg
Copy link
Copy Markdown
Collaborator

@srikanthjg srikanthjg commented Oct 8, 2024

Description

Following are the changes made:

Address Scale issues:

  1. Added support for lambda async client in lambda processor and sink
    1.1 We will have a asynchronous call to lambda at a batch level, ie, we could send multiple batches to lambda at the
    same time. We will wait for the futures only after the entire set of records that was received by the processors
    are done.
    1.2 Handle metrics and buffer per batch based on futures processing.
  2. Make sdk timeout a user configurable parameter.
  3. Add Codec for request and response from lambda. NOTE: Json codec as input and output is the current default. And lambda response codec always assumes json array as the response.
  4. Removed payload_model and will no more support SINGLE_EVENT, will only support BATCH based calls by default

Acknowledgements:
5. Address Acknowledgements for processor and sink.
For Processors:
5.1. When a batch of N events that is configured by the user (N could be <=pipeline batch) ie, request to lambda contains N events in a batch , the lambda could return back N responses or M responses(N!=M). When N responses are sent as a json array, we resuse the original records and clear the old event data and populate it with the response from lambda, that way the acknowledgement set need not be changed.
5.2 When M responses(N!=M), we create new events and populate them to the original acknowledgement set. The older events are also retained in the ack set but will be released by core later.

Handling Failure:
6. Address failures at process the events, the events in the processor will be tagged and forwarded. This processor will NOT drop events on failure.
7. Lambda sink will send to DLQ on failure and will acknowledge as true. If a dlq is not setup, we will send a negative acknowledgement.

Refactor:
8. Refactor aws lambda plugin to have a class for common methods between processor and sink.

Issues Resolved

Resolves #5031

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@srikanthjg srikanthjg changed the title Add support for lambda async client in lambda processor Address Scale Iterms for lambda plugin Oct 8, 2024
@srikanthjg srikanthjg force-pushed the lambda-async-client branch from 6bd7c07 to 3840423 Compare October 8, 2024 16:29
@srikanthjg srikanthjg changed the title Address Scale Iterms for lambda plugin Address Scale Items for lambda plugin Oct 8, 2024
}
codec.writeEvent(event, currentBuffer.getOutputStream());
int count = currentBuffer.getEventCount() + 1;
LOG.info("CurrentBuffer event count: {}", count);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this get logged for every event? If so it should be debug

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for noticing that, i will make the noisy logs debug level.


public void flushToLambdaIfNeeded(List<Record<Event>> resultRecords, boolean forceFlush) {

LOG.info("currentBufferEventCount:{}, maxEvents:{}, maxBytes:{}, maxCollectionDuration:{}, isBatch:{}, forceFlush:{} ", currentBuffer.getEventCount(),maxEvents,maxBytes,maxCollectionDuration,isBatchEnabled, forceFlush);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a helpful log, but if it may be noisy as INFO if this is called for every Event

// Handle future
CompletableFuture<Void> processingFuture = future.thenAccept(response -> {
handleLambdaResponse(response);
LOG.info("Successfully flushed {} events", eventCount);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here on this being a noisy log


public void resetBuffer() {
try {
LOG.info("Resetting buffer");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this helpful to have as info or can it be debug?

isBatchEnabled = true;
LOG.info("maxEvents:" + maxEvents + " maxbytes:" + maxBytes + " maxDuration:" + maxCollectionDuration);
} else if(payloadModel.equals(SINGLE_EVENT)) {
LOG.info("Single events");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be debug

Comment on lines +160 to +169
codec,
codecContext,
isBatchEnabled,
whenCondition,
maxEvents,
maxBytes,
maxCollectionDuration,
isSink,
dlqPushHandler,
pluginSetting);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put this all into a single config class to consolidate?

return records;
}

LOG.info("Received " + records.size() + "records to lambda Processor" );
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be debug

}
}

LOG.info("Force Flushing the remaining {} events in the buffer", lambdaCommonHandler.getCurrentBuffer().getEventCount());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, should be debug

@JsonProperty("payload_model")
private String payloadModel = BATCH_EVENT;

@JsonProperty("sdk_timeout")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add the @JsonPropertyDescription annotation to all of these config parameters.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you call this sdk_timeout? This seems more related to the implementation than what is actually timing out. We should probably look at using socket_timeout or connect_timeout.

lambdaCommonHandler.resetBuffer();
}
}
LOG.info("Force Flushing the remaining {} events in the buffer", lambdaCommonHandler.getCurrentBuffer().getEventCount());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug log


}).exceptionally(throwable -> {
LOG.error(NOISY, "Exception occurred while invoking Lambda. Function: {} | Exception: ", functionName, throwable);
handleFailure(throwable);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a metric to keep track of failures.

public void handleLambdaResponse(InvokeResponse response) {
int statusCode = response.statusCode();
if (statusCode < 200 || statusCode >= 300) {
LOG.warn("Lambda invocation returned with non-success status code: {}", statusCode);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a metric here.

Thread.sleep(Duration.ofSeconds(10).toMillis());
}
} No newline at end of file
///*
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this file commented out?

/*
* Release events per batch
*/
public void releaseEventHandles(final boolean result, List<EventHandle> bufferedEventHandles) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be in common code. Processor doesn't need to explicitly release event handles.

@srikanthjg srikanthjg requested a review from sb2k16 as a code owner October 23, 2024 06:42
private static final int DEFAULT_CONNECTION_RETRIES = 3;
public class LambdaProcessorConfig {
//Ensures 1:1 mapping of events input to lambda and response from lambda
public static final String STRICT = "strict";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JsonProperty("payload_model")
private String payloadModel = BATCH_EVENT;

@JsonProperty("sdk_timeout")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you call this sdk_timeout? This seems more related to the implementation than what is actually timing out. We should probably look at using socket_timeout or connect_timeout.

}
//Reset Buffer
LOG.debug("currentBufferPerBatchEventCount:{}, maxEvents:{}, maxBytes:{}, maxCollectionDuration:{}, forceFlush:{} ", currentBufferPerBatch.getEventCount(),maxEvents,maxBytes,maxCollectionDuration, forceFlush);
if (forceFlush || ThresholdCheck.checkThresholdExceed(currentBufferPerBatch, maxEvents, maxBytes, maxCollectionDuration)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to handle this in common with the sink with some slight refactoring.

Copy link
Copy Markdown
Collaborator Author

@srikanthjg srikanthjg Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush method looks very similar for both sink and processor and i tried combining that in the earlier PR, but we need to handle release of events differently for sink. For that i introduced isSink flag. But Comment from Krishna was that common code should not have flag like that and i think agree with his point; and now reverted it back to both processor and sink having their own flush methods.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a flag is not ideal. But, you can have an injectable strategy. We can resolve this later though.

LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}",parsedEvents.size(),flushedBuffer.getEventCount(),flushedBuffer.getSize());
// Check if the response is a JSON array and the codec is JSON
if (isJsonCodec) {
if (parsedEvents.size() == flushedBuffer.getEventCount()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the strategy pattern to inject code for handling the parsed events based on aggregate or strict. Then you don't need this large conditional.

Something like this:

responseStrategy.handleEvents(parsedEvents, originalRecords);

Also, this will grow with some upcoming changes to support aggregations in end-to-end acknowledgements.


LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}",parsedEvents.size(),flushedBuffer.getEventCount(),flushedBuffer.getSize());
// Check if the response is a JSON array and the codec is JSON
if (isJsonCodec) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition does not belong here. We should verify that the codec is one we support in the constructor. This allows the processor validation to fail fast and give the user quicker feedback.

private Duration sdkTimeout = DEFAULT_SDK_TIMEOUT;
@JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda")
@JsonProperty("response_cardinality")
private String responseCardinality;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private String responseCardinality;
private ResponseCardinality responseCardinality;

You will need to make other changes.

}
}

// Default value is STRICT
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Default value is STRICT
@JsonCreator

This will work better with schema validation.

this.value = value;
}

public String getValue() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public String getValue() {
@JsonCreator
public String getValue() {

This will allow this to work well with the schema generation as well.

this.awsLambdaValue = awsLambdaValue;
}

public String getUserInputValue() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public String getUserInputValue() {
@JsonValue
public String getUserInputValue() {

This is needed to work with schema generation.

}
}

public static InvocationType fromString(String value) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public static InvocationType fromString(String value) {
@JsonCreator
public static InvocationType fromString(String value) {

This is important for pipeline validation.

}
//Reset Buffer
LOG.debug("currentBufferPerBatchEventCount:{}, maxEvents:{}, maxBytes:{}, maxCollectionDuration:{}, forceFlush:{} ", currentBufferPerBatch.getEventCount(),maxEvents,maxBytes,maxCollectionDuration, forceFlush);
if (forceFlush || ThresholdCheck.checkThresholdExceed(currentBufferPerBatch, maxEvents, maxBytes, maxCollectionDuration)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that a flag is not ideal. But, you can have an injectable strategy. We can resolve this later though.

LOG.debug("Parsed Event Size:{}, FlushedBuffer eventCount:{}, FlushedBuffer size:{}", parsedEvents.size(), flushedBuffer.getEventCount(), flushedBuffer.getSize());

responseStrategy.handleEvents(parsedEvents, originalRecords, resultRecords, flushedBuffer);
// if (parsedEvents.size() == flushedBuffer.getEventCount()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these comments.

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>

Refactor aws lambda plugin to have a class for common methods between processor and sink

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>

Add support for lambda async client in lambda sink

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
…onse codec

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
…nality enums; Change reponse_processing_mode option to response_cardinality

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>

@JsonCreator
public static InvocationType fromString(String value) {
return INVOCATION_TYPE_MAP.get(value.toLowerCase());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the toLowerCase(). We want to be sure that the values provided by the user are the expected values.


static {
for (InvocationType type : InvocationType.values()) {
INVOCATION_TYPE_MAP.put(type.getUserInputValue().toLowerCase(), type);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the toLowerCase() here.


static {
for (ResponseCardinality type : ResponseCardinality.values()) {
RESPONSE_CARDINALITY_MAP.put(type.getValue().toLowerCase(), type);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove toLowerCase().

if (value == null) {
return STRICT;
}
return RESPONSE_CARDINALITY_MAP.getOrDefault(value.toLowerCase(), STRICT);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove .toLowerCase().

Also, do not get the default. This will result in this being valid:

response_cardinality: oops!


@JsonCreator
public static ResponseCardinality fromString(String value) {
if (value == null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not return a default value for null here. This can result in odd behavior such as the following being allowed.

response_cardinality: null

Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
dlvenable
dlvenable previously approved these changes Oct 28, 2024
kkondaka
kkondaka previously approved these changes Oct 28, 2024
Signed-off-by: Srikanth Govindarajan <srigovs@amazon.com>
@srikanthjg srikanthjg dismissed stale reviews from kkondaka and dlvenable via f86f679 October 28, 2024 22:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Address Scale Items for Lambda Processor and Sink

4 participants