CompoundProcessor should also catch exceptions when executing a processor#84838
Merged
martijnvg merged 7 commits intoelastic:7.17from Mar 16, 2022
Merged
CompoundProcessor should also catch exceptions when executing a processor#84838martijnvg merged 7 commits intoelastic:7.17from
martijnvg merged 7 commits intoelastic:7.17from
Conversation
…ssor. Currently, CompoundProcessor does not catch Exception and if a processor throws an error and a method higher in the call stack doesn't catch the exception then pipeline execution stalls and bulk requests may not complete. Usually these exceptions are caught by IngestService#executePipelines(...) method, but when a processor executes async (for example: enrich processor) and the thread that executes enrich is no longer the original write thread then there is no logic that deals with failing pipeline execution and cleaning resources up. This then leads to memory leaks. Closes elastic#84781
…Processor. With the change to CompoundProcessor thrown exceptions are caught and delegated to handler. SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since it assumes that processorResultList contains the result (successful or not successful) of every processor in the pipeline. In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then it just throws an error without updating the processorResultList. This commit addresses that.
Collaborator
|
Pinging @elastic/es-data-management (Team:Data Management) |
Collaborator
|
Hi @martijnvg, I've created a changelog YAML for you. |
jbaiera
approved these changes
Mar 16, 2022
Member
jbaiera
left a comment
There was a problem hiding this comment.
LGTM pending question about metrics accounting
| try { | ||
| processor.execute(ingestDocument, (result, e) -> { | ||
| long ingestTimeInNanos = relativeTimeProvider.getAsLong() - startTimeInNanos; | ||
| metric.postIngest(ingestTimeInNanos); |
Member
There was a problem hiding this comment.
Should we be calling this in case of failure as well?
Member
There was a problem hiding this comment.
specifically metric.postIngest(ingestTimeInNanos);
\me shakes fist at IDEA review tools picking four lines for the comment
Member
Author
There was a problem hiding this comment.
Good point. I think metric.postIngest(ingestTimeInNanos); should be called as well when catch exception at line 151.
(in case an error comes in via this handler (meaning e is not null) then this is already invoked)
martijnvg
added a commit
to martijnvg/elasticsearch
that referenced
this pull request
Mar 16, 2022
…ssor (elastic#84838) Currently, CompoundProcessor does not catch Exception and if a processor throws an error and a method higher in the call stack doesn't catch the exception then pipeline execution stalls and bulk requests may not complete. Usually these exceptions are caught by IngestService#executePipelines(...) method, but when a processor executes async (for example: enrich processor) and the thread that executes enrich is no longer the original write thread then there is no logic that deals with failing pipeline execution and cleaning resources up. This then leads to memory leaks. Closes elastic#84781 Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor. With the change to CompoundProcessor thrown exceptions are caught and delegated to handler. SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since it assumes that processorResultList contains the result (successful or not successful) of every processor in the pipeline. In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then it just throws an error without updating the processorResultList. This commit addresses that.
martijnvg
added a commit
that referenced
this pull request
Mar 16, 2022
…ssor (#84838) (#85035) Currently, CompoundProcessor does not catch Exception and if a processor throws an error and a method higher in the call stack doesn't catch the exception then pipeline execution stalls and bulk requests may not complete. Usually these exceptions are caught by IngestService#executePipelines(...) method, but when a processor executes async (for example: enrich processor) and the thread that executes enrich is no longer the original write thread then there is no logic that deals with failing pipeline execution and cleaning resources up. This then leads to memory leaks. Closes #84781 Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor. With the change to CompoundProcessor thrown exceptions are caught and delegated to handler. SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since it assumes that processorResultList contains the result (successful or not successful) of every processor in the pipeline. In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then it just throws an error without updating the processorResultList. This commit addresses that.
martijnvg
added a commit
that referenced
this pull request
Mar 16, 2022
Forward port #85000 (Revert enrich cache lookup optimisation ) and #84838 (CompoundProcessor should also catch exceptions when executing a processor) to 8.1 branch. Revert enrich cache lookup optimisation (#85028) Forwardporting #85000 to master branch. This PR reverts the optimisation that was added via #77259. This optimisation cleverly ensures no duplicate searches happen if multiple threads concurrently execute the same search. However there are issues with the implementation that cause issues like #84781. The optimisation make use of CompletableFuture and in this case we don't check whether the result has completed exceptionally. Which causes the callback not being invoked and this leads to bulk request not being completed and hanging around. The ingest framework due to its asynchronous nature is already complex and adding CompletableFuture into the mix makes debugging these issues very time consuming. This is the main reason why we like to revert this commit. * CompoundProcessor should also catch exceptions when executing a processor (#84838) (#85035) Currently, CompoundProcessor does not catch Exception and if a processor throws an error and a method higher in the call stack doesn't catch the exception then pipeline execution stalls and bulk requests may not complete. Usually these exceptions are caught by IngestService#executePipelines(...) method, but when a processor executes async (for example: enrich processor) and the thread that executes enrich is no longer the original write thread then there is no logic that deals with failing pipeline execution and cleaning resources up. This then leads to memory leaks. Closes #84781 Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor. With the change to CompoundProcessor thrown exceptions are caught and delegated to handler. SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since it assumes that processorResultList contains the result (successful or not successful) of every processor in the pipeline. In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then it just throws an error without updating the processorResultList. This commit addresses that.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.
Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.
Closes #84781
Note that this PR will be forward ported to 8.1 and master branches.