Skip to content

Fixes a regression in core where @SingleThread annotated processors are only running the last instance.#5902

Merged
dlvenable merged 3 commits intoopensearch-project:mainfrom
dlvenable:5901-process-worker-fix-for-singlethread
Jul 25, 2025
Merged

Fixes a regression in core where @SingleThread annotated processors are only running the last instance.#5902
dlvenable merged 3 commits intoopensearch-project:mainfrom
dlvenable:5901-process-worker-fix-for-singlethread

Conversation

@dlvenable
Copy link
Copy Markdown
Member

@dlvenable dlvenable commented Jul 25, 2025

Description

This reworks the recent code for the ProcessorProvider to create a provider per processor thread.

With this fix, the processor swaps will not work correctly, but this is not complete anyway.

New integration test

This includes an integration test which fails on main.

The test exactly shows what I expected to see, which is that the last processor instance is used by all threads.

Expected: <1>
     but: was <0>
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
		at org.opensearch.dataprepper.integration.ProcessorValidationIT.lambda$verifySingleThreadUsage$4(ProcessorValidationIT.java:217)
		at org.junit.jupiter.api.AssertAll.lambda$assertAll$0(AssertAll.java:68)
		at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
		at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
		at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
		at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
		at org.junit.jupiter.api.AssertAll.assertAll(AssertAll.java:77)
		... 133 more
	Suppressed: java.lang.AssertionError: 
Expected: <1>
     but: was <0>
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
		at org.opensearch.dataprepper.integration.ProcessorValidationIT.lambda$verifySingleThreadUsage$5(ProcessorValidationIT.java:218)
		at org.junit.jupiter.api.AssertAll.lambda$assertAll$0(AssertAll.java:68)
		at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
		at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
		at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
		at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
		at org.junit.jupiter.api.AssertAll.assertAll(AssertAll.java:77)
		... 133 more
	Suppressed: java.lang.AssertionError: 
Expected: <1>
     but: was <0>
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
		at org.opensearch.dataprepper.integration.ProcessorValidationIT.lambda$verifySingleThreadUsage$6(ProcessorValidationIT.java:219)
		at org.junit.jupiter.api.AssertAll.lambda$assertAll$0(AssertAll.java:68)
		at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
		at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
		at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
		at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
		at org.junit.jupiter.api.AssertAll.assertAll(AssertAll.java:77)
		... 133 more
	Suppressed: java.lang.AssertionError: 
Expected: <1>
     but: was <4>
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
		at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
		at org.opensearch.dataprepper.integration.ProcessorValidationIT.lambda$verifySingleThreadUsage$7(ProcessorValidationIT.java:220)
		at org.junit.jupiter.api.AssertAll.lambda$assertAll$0(AssertAll.java:68)
		at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
		at java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
		at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
		at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
		at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
		at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
		at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
		at org.junit.jupiter.api.AssertAll.assertAll(AssertAll.java:77)
		... 133 more

Issues Resolved

Resolves #5901

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…re only running the last instance.

Fixes opensearch-project#5901

Signed-off-by: David Venable <dlv@amazon.com>

public BasicEventsTrackingTestProcessor() {
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP);
super(PLUGIN_NAME, PROCESSED_EVENTS_MAP, -1);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why -1?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't track it for this processor. I made this a static final with a name to add clarity.

}
}
).collect(Collectors.toList());
final ProcessorRegistry processorRegistry = new ProcessorRegistry(processors);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why same name as the class member?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I renamed this as well as some other fields to try to add clarity.

Signed-off-by: David Venable <dlv@amazon.com>
graytaylor0
graytaylor0 previously approved these changes Jul 25, 2025
…t completed.

Signed-off-by: David Venable <dlv@amazon.com>
@dlvenable dlvenable merged commit 53f16d7 into opensearch-project:main Jul 25, 2025
61 of 67 checks passed
opensearch-trigger-bot bot pushed a commit that referenced this pull request Jul 25, 2025
…re only running the last instance. (#5902)

Fixes a regression in core where @SingleThread annotated processors are only running the last instance. Also, disable the ProcessorSwapPipelineIT test since this feature is not yet completed.

Fixes #5901

Signed-off-by: David Venable <dlv@amazon.com>
(cherry picked from commit 53f16d7)
dlvenable added a commit that referenced this pull request Jul 28, 2025
…re only running the last instance. (#5902) (#5904)

Fixes a regression in core where @SingleThread annotated processors are only running the last instance. Also, disable the ProcessorSwapPipelineIT test since this feature is not yet completed.

Fixes #5901


(cherry picked from commit 53f16d7)

Signed-off-by: David Venable <dlv@amazon.com>
Co-authored-by: David Venable <dlv@amazon.com>
@dlvenable dlvenable deleted the 5901-process-worker-fix-for-singlethread branch July 31, 2025 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Service Map does not rotate with multiple workers

3 participants