Skip to content

Adding Processor Registry to provision Atomic swapping of Processor instances#5794

Merged
san81 merged 5 commits intoopensearch-project:mainfrom
san81:pipeline-manager
Jun 20, 2025
Merged

Adding Processor Registry to provision Atomic swapping of Processor instances#5794
san81 merged 5 commits intoopensearch-project:mainfrom
san81:pipeline-manager

Conversation

@san81
Copy link
Copy Markdown
Collaborator

@san81 san81 commented Jun 19, 2025

To support Dynamic swapping of Processor instances, these code changes helps keep the Processor instances at one place so that we can hot swap the processor instances as an atomic operation.

Issues Resolved

#5327

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…sor list

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this looks good. Thanks for adding the registry. I added one comment.

Collection records = recordsReadFromBuffer.getKey();
final CheckpointState checkpointState = recordsReadFromBuffer.getValue();
records = runProcessorsAndProcessAcknowledgements(processors, records);
List<Processor> currentProcessors = pipeline.getProcessorRegistry().getProcessors();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be ideal to modify the classes some so that swapProcessors is not available to be called here.

Maybe something like this:

public interface ProcessorProvider {
  List<Processor> getProcessors();
public class ProcessorRegister implements ProcessorProvider

And in Pipeline:

    public ProcessorProvider getProcessorProvider() {
        return processorRegistry;
    }

Then,

List<Processor> currentProcessors = pipeline.getProcessorProvider().getProcessors();

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Made this change 👍

san81 added 2 commits June 19, 2025 13:28
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 marked this pull request as ready for review June 20, 2025 05:57
san81 added 2 commits June 19, 2025 23:01
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
@san81 san81 requested a review from dlvenable June 20, 2025 16:11

@SuppressWarnings({"rawtypes"})
public class ProcessorRegistry implements ProcessorProvider {
private volatile List<Processor> processors;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't synchronized better than volatile?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronized block is better if in future we plan to modify the processors list

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to synchronize. That will slow down every call. The only loss is that we may have one iteration on the old one. I think the timing issue is not significant.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. But we should never try to modify the list. We should only do re-assignment

// Dynamically swap the pipeline processors
LOG.info("Swapping the pipeline processors");
List<Processor> targetPipelineProcessors = getTargetPipelineProcessors();
dataPrepperTestRunner.swapProcessors(PIPELINE_NAME_IN_YAML, targetPipelineProcessors);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be need to use the HTTP endpoint itself. You can another integration test that exercises it here.

final AggregatedHttpResponse response = WebClient.of().execute(RequestHeaders.builder()
.scheme(SessionProtocol.HTTP)
.authority("127.0.0.1:4900")
.method(HttpMethod.GET)
.path("/list")
.build())
.aggregate()
.join();

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make sure to fix this in a follow up PR once we have fully functional API flow that I can use here.

}


private List<Processor> getTargetPipelineProcessors() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be loading the plugins here. At this point you are only testing a small sliver of the functionality. Again, if we use the HTTP server, this should be unnecessary.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make sure to fix this in a follow up PR once we have fully functional API flow that I can use here.


@SuppressWarnings({"rawtypes"})
public class ProcessorRegistry implements ProcessorProvider {
private volatile List<Processor> processors;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to synchronize. That will slow down every call. The only loss is that we may have one iteration on the old one. I think the timing issue is not significant.

.thenReturn(futureHelperResult);

final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline);
final ProcessWorker processWorker = new ProcessWorker(buffer, pipeline);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is something wrong with these tests if they are so tightly coupled to our internal processing. This is probably too big to fix in this PR, but something to point out.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we will revisit in a later PR.

@san81 san81 requested a review from dlvenable June 20, 2025 19:30
Copy link
Copy Markdown
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed offline with @san81 . We'll improve the integration tests in a follow-on PR. It depends on having #5767 in place and possibly others. Though, we should also clean up any classes that we exposed which won't be needed after updating the IT.

@san81 san81 merged commit 844eac1 into opensearch-project:main Jun 20, 2025
46 of 47 checks passed
JonahCalvo pushed a commit to JonahCalvo/os-data-prepper that referenced this pull request Jul 17, 2025
…nstances (opensearch-project#5794)

* Processor Registry class added to provision Atomic swapping of processor list

Signed-off-by: Santhosh Gandhe <1909520+san81@users.noreply.github.com>
Signed-off-by: Jonah Calvo <caljonah@amazon.com>
dlvenable added a commit to dlvenable/data-prepper that referenced this pull request Jul 24, 2025
…cessor instances (opensearch-project#5794)"

This reverts commit 844eac1

Signed-off-by: David Venable <dlv@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants