Skip to content

Adding streaming support for lambda pluggin#6273

Merged
dlvenable merged 6 commits intoopensearch-project:mainfrom
mananrajotia:lambda-streaming-support
Jan 29, 2026
Merged

Adding streaming support for lambda pluggin#6273
dlvenable merged 6 commits intoopensearch-project:mainfrom
mananrajotia:lambda-streaming-support

Conversation

@mananrajotia
Copy link
Copy Markdown
Contributor

Description

Adds streaming response support for lambda pluggin in data-prepper and enables longer response handling.

Documented in detail in issue - #5973
Documentation issue - opensearch-project/documentation-website#11556

Issues Resolved

Resolves #5973

Check List

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
@mananrajotia
Copy link
Copy Markdown
Contributor Author

./gradlew assemble & ./gradlew build have passed. E2E testing with lambda has also been done successfully.

E2E testing has been done using following configurations -

pipelines/pipeline.yaml -

streaming-lambda-pipeline:
  source:
    random:
      wait_delay: "PT5S"
  processor:
    - aws_lambda:
        function_name: "dataprepper-testing-streaming-lambda"
        invocation_type: "streaming-response"
        streaming:
          enabled: true
        aws:
          region: "us-east-1"
  sink:
    - stdout:

with lambda (Runtime : Node.js 22.x) deployed in us-east-1 region with code -

exports.handler = awslambda.streamifyResponse(async (event, responseStream, context) => {
    try {
        console.log('Received event for streaming:', JSON.stringify(event, null, 2));

        // First chunk
        const chunk1 = {
            chunkId: 1,
            message: "First chunk processed",
            eventCount: event.events ? event.events.length : 0,
            timestamp: new Date().toISOString()
        };

        responseStream.write(JSON.stringify([chunk1]) + "\n");

        // Optional delay to simulate streaming async work
        await new Promise(resolve => setTimeout(resolve, 500));

        // Second chunk
        const chunk2 = {
            chunkId: 2,
            message: "Second chunk processed",
            status: "completed",
            timestamp: new Date().toISOString()
        };

        responseStream.write(JSON.stringify([chunk2]) + "\n");

        // End the stream
        responseStream.end();

    } catch (error) {
        console.error('Error in streaming handler:', error);
        responseStream.write(JSON.stringify({ error: error.message }) + "\n");
        responseStream.end();
    }
});

furnishing these logs on bin/data-prepper run -

bin % ./data-prepper
Reading pipelines and data-prepper configuration files from Data Prepper home directory.
/usr/bin/java
Found openjdk version  of 21.0
2025-11-16T18:22:16,111 [main] INFO  org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer - No transformation needed
2025-11-16T18:22:16,404 [main] INFO  org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigExtension - Applying Kafka Cluster Config Extension.
2025-11-16T18:22:16,589 [main] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-11-16T18:22:17,081 [main] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-11-16T18:22:17,227 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - Creating data prepper server without authentication. This is not secure.
2025-11-16T18:22:17,227 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - In order to set up Http Basic authentication for the data prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/core_apis.md#authentication
2025-11-16T18:22:17,251 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - Creating Data Prepper server without TLS. This is not secure.
2025-11-16T18:22:17,251 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - In order to set up TLS for the Data Prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/configuration.md#server-configuration
2025-11-16T18:22:20,309 [streaming-lambda-pipeline-processor-worker-1-thread-1] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-11-16T18:22:20,315 [streaming-lambda-pipeline-processor-worker-1-thread-1] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-11-16T18:22:22,107 [aws-java-sdk-NettyEventLoop-0-6] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Processed streaming response: 2 records from 215 bytes
{"chunkId":1,"message":"First chunk processed","eventCount":1,"timestamp":"2025-11-16T12:52:21.749Z","streaming_processed":true,"response_size_bytes":215}
{"chunkId":2,"message":"Second chunk processed","status":"completed","timestamp":"2025-11-16T12:52:22.250Z","streaming_processed":true,"response_size_bytes":215}
2025-11-16T18:22:26,013 [aws-java-sdk-NettyEventLoop-0-3] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Processed streaming response: 2 records from 215 bytes
{"chunkId":1,"message":"First chunk processed","eventCount":1,"timestamp":"2025-11-16T12:52:25.668Z","streaming_processed":true,"response_size_bytes":215}
{"chunkId":2,"message":"Second chunk processed","status":"completed","timestamp":"2025-11-16T12:52:26.168Z","streaming_processed":true,"response_size_bytes":215}
2025-11-16T18:22:31,019 [aws-java-sdk-NettyEventLoop-0-6] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Processed streaming response: 2 records from 215 bytes
{"chunkId":1,"message":"First chunk processed","eventCount":1,"timestamp":"2025-11-16T12:52:30.674Z","streaming_processed":true,"response_size_bytes":215}
{"chunkId":2,"message":"Second chunk processed","status":"completed","timestamp":"2025-11-16T12:52:31.174Z","streaming_processed":true,"response_size_bytes":215}

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mananrajotia for this great contribution! This will be a nice addition to the Lambda processor. I left a few comments to help with consistency and to make the code easier to maintain.

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
@mananrajotia mananrajotia force-pushed the lambda-streaming-support branch from 439046d to bdc119b Compare December 22, 2025 09:07
@mananrajotia
Copy link
Copy Markdown
Contributor Author

Added document reconstruction logic in the second commit. Instead of treating each streamed chunk as a separate event and forwarding it to the sink (1 incoming event from source -> 1 lambda invocation -> It could result in multiple events being sent to the sink). That is unwanted for document reconstruction as per design, hence, collated data from multiple streamed chunks of a single lambda invocation to a single event to be forwarded to the sink.

Testing concluded with successful gradle assemble and gradle build.

Modified lambda function's code to -

exports.handler = awslambda.streamifyResponse(async (event, responseStream, context) => {
    try {
        console.log('Received event for streaming:', JSON.stringify(event, null, 2));

        // First chunk
        const chunk1 = {
            chunkId: 1,
            message: "First chunk processed",
            eventCount: event.events ? event.events.length : 0,
            timestamp: new Date().toISOString()
        };

        // Optional delay to simulate streaming async work
        await new Promise(resolve => setTimeout(resolve, 500));

        // Second chunk
        const chunk2 = {
            chunkId: 2,
            message: "Second chunk processed",
            status: "completed",
            timestamp: new Date().toISOString()
        };
        // Start array
        responseStream.write('[');

        // Write chunks
        responseStream.write(JSON.stringify(chunk1));
        responseStream.write(',');
        responseStream.write(JSON.stringify(chunk2));

        // End array
        responseStream.write(']');
        responseStream.end();

    } catch (error) {
        console.error('Error in streaming handler:', error);
        responseStream.write(JSON.stringify({ error: error.message }) + "\n");
        responseStream.end();
    }
});

Ran ./data-prepper with the same pipelines.yaml configuration as present in the overview and got desired results -

./data-prepper
Reading pipelines and data-prepper configuration files from Data Prepper home directory.
/usr/bin/java
Found openjdk version  of 21.0
2025-12-22T12:55:55,400 [main] INFO  org.opensearch.dataprepper.pipeline.parser.transformer.DynamicConfigTransformer - No transformation needed
2025-12-22T12:55:55,642 [main] INFO  org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigExtension - Applying Kafka Cluster Config Extension.
2025-12-22T12:55:55,827 [main] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-12-22T12:55:56,570 [main] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-12-22T12:55:56,742 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - Creating data prepper server without authentication. This is not secure.
2025-12-22T12:55:56,742 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.config.DataPrepperServerConfiguration - In order to set up Http Basic authentication for the data prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/core_apis.md#authentication
2025-12-22T12:55:56,771 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - Creating Data Prepper server without TLS. This is not secure.
2025-12-22T12:55:56,771 [main] WARN  org.opensearch.dataprepper.core.pipeline.server.HttpServerProvider - In order to set up TLS for the Data Prepper server, go here: https://github.com/opensearch-project/data-prepper/blob/main/docs/configuration.md#server-configuration
2025-12-22T12:55:59,828 [streaming-lambda-pipeline-processor-worker-1-thread-1] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-12-22T12:55:59,835 [streaming-lambda-pipeline-processor-worker-1-thread-1] WARN       software.amazon.awssdk.utils.Logger - Ignoring profile 'DEFAULT' on line 1 because it did not start with 'profile ' and it was not 'default'.
2025-12-22T12:56:01,677 [aws-java-sdk-NettyEventLoop-0-6] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Processed streaming response: 2 records from 212 bytes
2025-12-22T12:56:01,678 [aws-java-sdk-NettyEventLoop-0-6] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Reconstructed 2 chunks into 1 document(s)
{"message":"Second chunk processed","chunkId":2,"eventCount":1,"timestamp":"2025-12-22T07:26:01.843Z","status":"completed"}
2025-12-22T12:56:05,551 [aws-java-sdk-NettyEventLoop-0-3] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Processed streaming response: 2 records from 212 bytes
2025-12-22T12:56:05,552 [aws-java-sdk-NettyEventLoop-0-3] INFO  org.opensearch.dataprepper.plugins.lambda.common.StreamingLambdaHandler - Reconstructed 2 chunks into 1 document(s)
{"message":"Second chunk processed","chunkId":2,"eventCount":1,"timestamp":"2025-12-22T07:26:05.752Z","status":"completed"}

Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @mananrajotia ! I have a few more comments and questions.

…cing exception handling - PR comment addressal

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
…dressal

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
srikanthjg
srikanthjg previously approved these changes Jan 23, 2026
@dlvenable
Copy link
Copy Markdown
Member

@mananrajotia , We recently added a license header check and it is failing:

  body: '{"body":"## ⚠️ License Header Violations Found\\n\\nThe following newly added files are missing required license headers:\\n\\n- `data-prepper-plugins/aws-lambda/src/test/java/org/opensearch/dataprepper/plugins/lambda/common/config/ResponseHandlingTest.java`\\n- `data-prepper-plugins/aws-lambda/src/test/resources/lambda-processor-streaming-config.yaml`\\n\\nPlease add the appropriate license header to each file and push your changes.\\n\\n**See the license header requirements:** https://github.com/opensearch-project/data-prepper/blob/main/CONTRIBUTING.md#license-headers"}',

It looks like this new file has the old license header: ResponseHandlingTest.java.

I'm running the build to make sure it is ok. If those pass and license headers are fixed, I'm good to approve and merge.

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
Signed-off-by: Manan Rajotia <rajotia@amazon.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mananrajotia !

@dlvenable dlvenable merged commit ce3cff3 into opensearch-project:main Jan 29, 2026
70 of 72 checks passed
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Streaming response support for lambda plugin

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Streaming response support for lambda plugin

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Streaming response support for lambda plugin

Signed-off-by: Manan Rajotia <rajotia@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming Lambda response support in the OSI (Data Prepper) Lambda processor

3 participants