Conversation
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
…ne config or not Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
|
|
||
| public UpdatePipelineBaseHandler(final PipelinesProvider pipelinesProvider) { | ||
| this.pipelinesProvider = pipelinesProvider; | ||
| this.s3Client = S3Client.builder().region(Region.US_EAST_1).build(); |
There was a problem hiding this comment.
can we avoid hardcoding the region.
There was a problem hiding this comment.
Removed that and now expecting that will also be provided by the user in the payload.
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
…t to check for dynamic update feasibility Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
dlvenable
left a comment
There was a problem hiding this comment.
This is a nice feature! One thing that is probably missing though - We cannot start with an S3 object. We only support refresh from S3.
There should be some support in the data-prepper-config.yaml to load from S3 and not use the pipelines/ path.
| import java.util.Set; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| public class DynamicPipelineUpdateUtil { |
There was a problem hiding this comment.
Let's not use Util classes. We support Spring dependency injection. So use that instead to avoid the long-term difficulties of relying on utility classes.
There was a problem hiding this comment.
Converted to Spring Service (named bean)
| private final String pluginName; | ||
| private final InternalJsonModel innerModel; | ||
|
|
||
| public PluginModel(final String pluginName, final Map<String, Object> pluginSettings) { |
There was a problem hiding this comment.
Were these moved within nested classes? Or just shifted in the code?
There was a problem hiding this comment.
Just shifted in the code. I now adjusted my IDE formatter not to touch unmodified code.
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { |
There was a problem hiding this comment.
Please add unit tests for these new methods.
| targetProcessors = targetProcessors == null ? List.of() : targetProcessors; | ||
|
|
||
| // Collect single-threaded processors in current and target | ||
| Set<String> currentSingleThreaded = currentProcessors.stream() |
There was a problem hiding this comment.
There is a bit of split logic here. We have other places to look for this annotation. Can we make use of that existing code?
| } | ||
| } | ||
|
|
||
| public static Set<String> scanForSingleThreadAnnotatedProcessorPlugins() { |
There was a problem hiding this comment.
This is duplicating work from the PluginFactory. We should avoid this logic split.
I don't think the PluginFactory should give all @SingleThreaded annotations. But, maybe have a loadPlugin method with a Predicate?
| for (String targetProcessor : targetSingleThreaded) { | ||
| if (!currentSingleThreaded.contains(targetProcessor)) { | ||
| throw new DynamicPipelineConfigUpdateException( | ||
| "Cannot add new single-threaded processor: " + targetProcessor); |
There was a problem hiding this comment.
Why do we care about this? Is it because of the state?
If so, this is insufficient. The aggregate processor retains state, but it is done in a thread-safe way.
Also, grok is currently labeled as @SingleThread because of a small code gap.
Maybe we should have an @Stateful annotation to indicate that state is kept?
|
|
||
| private static final Logger LOG = LoggerFactory.getLogger(UpdatePipelineBaseHandler.class); | ||
| private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
| private static final Pattern PIPELINE_NAME_PATTERN = Pattern.compile("/([a-zA-Z0-9-]{1,28})$"); |
There was a problem hiding this comment.
This regex appears overly restrictive. I don't think we have a current pipeline name requirement actually.
| } catch (final IllegalArgumentException e) { | ||
| LOG.warn("Invalid request parameters: {}", e.getMessage()); | ||
| sendErrorResponse(exchange, HttpURLConnection.HTTP_BAD_REQUEST, e.getMessage()); | ||
| } catch (final SdkClientException e) { |
There was a problem hiding this comment.
We should not add any AWS exceptions here. Let's consolidate these exceptions into the PipelineConfigurationFileReader implementations. That will allow this code to be more flexible for other data sources.
| final S3PathRequest s3PathRequest = parseS3PathRequest(requestBody); | ||
| PipelinesDataFlowModel targetPipelinesDataFlowModel = | ||
| new PipelinesDataflowModelParser( | ||
| new PipelineConfigurationS3FileReader(s3PathRequest.s3paths, s3PathRequest.s3region) |
There was a problem hiding this comment.
We should support reloading from the local file system as well. This will help with local testing and the additional effort should be light. It just requires a little code clean-up to determine the file path. e.g. s3paths versus filepaths.
Description
Introducing two new DataPrepper core api to dynamically update pipeline configuration. API itself is not updating the pipeline yet. For now, it is only adding additional path mapping and enabling a way to add further logic. Breaking the whole functionality into multiple small PRs so sharing this one without fully integrated.
isDynamicallyUpdatablePipelineConfigAPI - to validate if a given new pipeline config yaml is updatable compared to its current state.updatePipelineConfigAPI - to update the pipeline state by swapping the processor instances.Sample API looks like below. The pipeline to get updated is
testpipelinethe name passed in the path parameter. The expectation is that, it could be part of any one of the S3 paths passed and the goal is that this API only updates this specific pipeline (if feasible).Issues Resolved
#5716
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.