Refactor ForEachProcessor to use iteration instead of recursion#51104
Refactor ForEachProcessor to use iteration instead of recursion#51104probakowski merged 5 commits intoelastic:masterfrom
Conversation
This change makes ForEachProcessor iterative and still non-blocking. In case of non-async processors we use single for loop and no recursion at all. In case of async processors we continue work on either current thread or thread started by downstream processor, whichever is slower (usually processor thread). Everything is synchronised by single atomic variable. Relates elastic#50514
|
Pinging @elastic/es-core-features (:Core/Features/Ingest) |
martijnvg
left a comment
There was a problem hiding this comment.
This is a much cleaner and simpler approach 👍
It did took a while before I understood the change, but I think it is neat.
I left a few comments, but that is just for my own understanding.
| void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document, | ||
| BiConsumer<IngestDocument, Exception> handler) { | ||
| for (; index < values.size(); index++) { | ||
| AtomicBoolean shouldContinueHere = new AtomicBoolean(); |
There was a problem hiding this comment.
I like this approach. In the case of inner processor going async; whichever thread sets this atomic boolean last should continue with the next value.
And in the case inner processor doesn't go async then on the second getAndSet(...) the current thread continues with the next value.
I do wonder if this variable can be renamed to describe this beter.
There was a problem hiding this comment.
I wonder about this scenario:
- The current thread sets
shouldContinueHerefirst. - Next value is handled and also manages to set
shouldContinueHerefirst. - Both async call manage to return and update
newValuessimultaneously.
Is this scenario possible? And if so should have protection against this?
There was a problem hiding this comment.
If current thread hits shouldContinueHere first, then it sees false and ends the loop (returns).
If it hits it second that means it's after async call already finished and other thread updated newValues already.
As you mentioned in first comment only thread that checks shouldContinueHere last actually proceeds
Does it answer your question or did I misunderstand the scenario?
There was a problem hiding this comment.
Thanks for clarifying. I misinterpreted the if statement on line 98 when looking at it this morning.
It is not possible that multiple values from the list are handled concurrently.
| if (e != null || result == null) { | ||
| handler.accept(result, e); | ||
| } else if (shouldContinueHere.getAndSet(true)) { | ||
| innerExecute(nextIndex, values, newValues, document, handler); |
There was a problem hiding this comment.
In this case recursion does happen, but we're not risking a SO here, because it always a different thread would get here.
…#51104) (#51322) * Refactor ForEachProcessor to use iteration instead of recursion (#51104) * Refactor ForEachProcessor to use iteration instead of recursion This change makes ForEachProcessor iterative and still non-blocking. In case of non-async processors we use single for loop and no recursion at all. In case of async processors we continue work on either current thread or thread started by downstream processor, whichever is slower (usually processor thread). Everything is synchronised by single atomic variable. Relates #50514 * Update IngestCommonPlugin.java
This change makes ForEachProcessor iterative and still non-blocking.
In case of non-async processors we use single for loop and no recursion at all.
In case of async processors we continue work on either current thread or thread
started by downstream processor, whichever finishes second (usually processor thread).
Everything is synchronized by single atomic variable.
Relates #50514