Skip to content

Update Pipeline Config API#5767

Open
san81 wants to merge 15 commits intoopensearch-project:mainfrom
san81:update-config-api
Open

Update Pipeline Config API#5767
san81 wants to merge 15 commits intoopensearch-project:mainfrom
san81:update-config-api

Conversation

@san81
Copy link
Copy Markdown
Collaborator

@san81 san81 commented Jun 9, 2025

Description

Introducing two new DataPrepper core api to dynamically update pipeline configuration. API itself is not updating the pipeline yet. For now, it is only adding additional path mapping and enabling a way to add further logic. Breaking the whole functionality into multiple small PRs so sharing this one without fully integrated.

  1. isDynamicallyUpdatablePipelineConfig API - to validate if a given new pipeline config yaml is updatable compared to its current state.
  2. updatePipelineConfig API - to update the pipeline state by swapping the processor instances.

Sample API looks like below. The pipeline to get updated is testpipeline the name passed in the path parameter. The expectation is that, it could be part of any one of the S3 paths passed and the goal is that this API only updates this specific pipeline (if feasible).

curl --location --request PUT 'localhost:4900/updatePipelineConfig/testpipeline' \
--header 'Content-Type: application/json' \
--data '{
    "s3paths": ["s3://pipline-configuration/pipeline-1.yaml",
                "s3://pipline-configuration/pipeline-2.yaml"
        ],
     "s3region": "us-east-1"
}'

Issues Resolved

#5716

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

san81 added 5 commits June 6, 2025 17:27
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 marked this pull request as ready for review June 9, 2025 21:54
san81 added 3 commits June 9, 2025 15:17
…ne config or not

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>

public UpdatePipelineBaseHandler(final PipelinesProvider pipelinesProvider) {
this.pipelinesProvider = pipelinesProvider;
this.s3Client = S3Client.builder().region(Region.US_EAST_1).build();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid hardcoding the region.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed that and now expecting that will also be provided by the user in the payload.

san81 added 5 commits June 11, 2025 12:23
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
…t to check for dynamic update feasibility

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice feature! One thing that is probably missing though - We cannot start with an S3 object. We only support refresh from S3.

There should be some support in the data-prepper-config.yaml to load from S3 and not use the pipelines/ path.

import java.util.Set;
import java.util.stream.Collectors;

public class DynamicPipelineUpdateUtil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not use Util classes. We support Spring dependency injection. So use that instead to avoid the long-term difficulties of relying on utility classes.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted to Spring Service (named bean)

private final String pluginName;
private final InternalJsonModel innerModel;

public PluginModel(final String pluginName, final Map<String, Object> pluginSettings) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Were these moved within nested classes? Or just shifted in the code?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just shifted in the code. I now adjusted my IDE formatter not to touch unmodified code.

}

@Override
public boolean equals(Object o) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add unit tests for these new methods.

targetProcessors = targetProcessors == null ? List.of() : targetProcessors;

// Collect single-threaded processors in current and target
Set<String> currentSingleThreaded = currentProcessors.stream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a bit of split logic here. We have other places to look for this annotation. Can we make use of that existing code?

}
}

public static Set<String> scanForSingleThreadAnnotatedProcessorPlugins() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicating work from the PluginFactory. We should avoid this logic split.

I don't think the PluginFactory should give all @SingleThreaded annotations. But, maybe have a loadPlugin method with a Predicate?

for (String targetProcessor : targetSingleThreaded) {
if (!currentSingleThreaded.contains(targetProcessor)) {
throw new DynamicPipelineConfigUpdateException(
"Cannot add new single-threaded processor: " + targetProcessor);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we care about this? Is it because of the state?

If so, this is insufficient. The aggregate processor retains state, but it is done in a thread-safe way.

Also, grok is currently labeled as @SingleThread because of a small code gap.

Maybe we should have an @Stateful annotation to indicate that state is kept?


private static final Logger LOG = LoggerFactory.getLogger(UpdatePipelineBaseHandler.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Pattern PIPELINE_NAME_PATTERN = Pattern.compile("/([a-zA-Z0-9-]{1,28})$");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This regex appears overly restrictive. I don't think we have a current pipeline name requirement actually.

} catch (final IllegalArgumentException e) {
LOG.warn("Invalid request parameters: {}", e.getMessage());
sendErrorResponse(exchange, HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage());
} catch (final SdkClientException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not add any AWS exceptions here. Let's consolidate these exceptions into the PipelineConfigurationFileReader implementations. That will allow this code to be more flexible for other data sources.

final S3PathRequest s3PathRequest = parseS3PathRequest(requestBody);
PipelinesDataFlowModel targetPipelinesDataFlowModel =
new PipelinesDataflowModelParser(
new PipelineConfigurationS3FileReader(s3PathRequest.s3paths, s3PathRequest.s3region)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should support reloading from the local file system as well. This will help with local testing and the additional effort should be light. It just requires a little code clean-up to determine the file path. e.g. s3paths versus filepaths.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants