Adding Processor Registry to provision Atomic swapping of Processor instances#5794
Adding Processor Registry to provision Atomic swapping of Processor instances#5794san81 merged 5 commits intoopensearch-project:mainfrom
Conversation
…sor list Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
dlvenable
left a comment
There was a problem hiding this comment.
Overall, this looks good. Thanks for adding the registry. I added one comment.
| Collection records = recordsReadFromBuffer.getKey(); | ||
| final CheckpointState checkpointState = recordsReadFromBuffer.getValue(); | ||
| records = runProcessorsAndProcessAcknowledgements(processors, records); | ||
| List<Processor> currentProcessors = pipeline.getProcessorRegistry().getProcessors(); |
There was a problem hiding this comment.
It would be ideal to modify the classes some so that swapProcessors is not available to be called here.
Maybe something like this:
public interface ProcessorProvider {
List<Processor> getProcessors();
public class ProcessorRegister implements ProcessorProvider
And in Pipeline:
public ProcessorProvider getProcessorProvider() {
return processorRegistry;
}
Then,
List<Processor> currentProcessors = pipeline.getProcessorProvider().getProcessors();
There was a problem hiding this comment.
Agree. Made this change 👍
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
|
|
||
| @SuppressWarnings({"rawtypes"}) | ||
| public class ProcessorRegistry implements ProcessorProvider { | ||
| private volatile List<Processor> processors; |
There was a problem hiding this comment.
Isn't synchronized better than volatile?
There was a problem hiding this comment.
Synchronized block is better if in future we plan to modify the processors list
There was a problem hiding this comment.
I don't think we need to synchronize. That will slow down every call. The only loss is that we may have one iteration on the old one. I think the timing issue is not significant.
There was a problem hiding this comment.
Agreed. But we should never try to modify the list. We should only do re-assignment
| // Dynamically swap the pipeline processors | ||
| LOG.info("Swapping the pipeline processors"); | ||
| List<Processor> targetPipelineProcessors = getTargetPipelineProcessors(); | ||
| dataPrepperTestRunner.swapProcessors(PIPELINE_NAME_IN_YAML, targetPipelineProcessors); |
There was a problem hiding this comment.
I think it would be need to use the HTTP endpoint itself. You can another integration test that exercises it here.
There was a problem hiding this comment.
I will make sure to fix this in a follow up PR once we have fully functional API flow that I can use here.
| } | ||
|
|
||
|
|
||
| private List<Processor> getTargetPipelineProcessors() { |
There was a problem hiding this comment.
We shouldn't be loading the plugins here. At this point you are only testing a small sliver of the functionality. Again, if we use the HTTP server, this should be unnecessary.
There was a problem hiding this comment.
I will make sure to fix this in a follow up PR once we have fully functional API flow that I can use here.
|
|
||
| @SuppressWarnings({"rawtypes"}) | ||
| public class ProcessorRegistry implements ProcessorProvider { | ||
| private volatile List<Processor> processors; |
There was a problem hiding this comment.
I don't think we need to synchronize. That will slow down every call. The only loss is that we may have one iteration on the old one. I think the timing issue is not significant.
| .thenReturn(futureHelperResult); | ||
|
|
||
| final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); | ||
| final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline); |
There was a problem hiding this comment.
There is something wrong with these tests if they are so tightly coupled to our internal processing. This is probably too big to fix in this PR, but something to point out.
There was a problem hiding this comment.
Sure, we will revisit in a later PR.
…nstances (opensearch-project#5794) * Processor Registry class added to provision Atomic swapping of processor list Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com> Signed-off-by: Jonah Calvo <caljonah@amazon.com>
…cessor instances (opensearch-project#5794)" This reverts commit 844eac1 Signed-off-by: David Venable <dlv@amazon.com>
To support Dynamic swapping of Processor instances, these code changes helps keep the Processor instances at one place so that we can hot swap the processor instances as an atomic operation.
Issues Resolved
#5327
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.