Skip to content

CompoundProcessor should also catch exceptions when executing a processor#85035

Merged
martijnvg merged 2 commits intoelastic:masterfrom
martijnvg:backport_84838
Mar 16, 2022
Merged

CompoundProcessor should also catch exceptions when executing a processor#85035
martijnvg merged 2 commits intoelastic:masterfrom
martijnvg:backport_84838

Conversation

@martijnvg
Copy link
Copy Markdown
Member

Forward porting #84838 to master branch.

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.

…ssor (elastic#84838)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes elastic#84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
@martijnvg martijnvg added >non-issue :Distributed/Ingest Node Execution or management of Ingest Pipelines auto-backport Automatically create backport pull requests when merged v8.2.0 v8.1.2 labels Mar 16, 2022
@elasticmachine elasticmachine added the Team:Data Management (obsolete) DO NOT USE. This team no longer exists. label Mar 16, 2022
@elasticmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-data-management (Team:Data Management)

@martijnvg martijnvg added >bug :Distributed/Ingest Node Execution or management of Ingest Pipelines v8.2.0 and removed auto-backport Automatically create backport pull requests when merged v8.1.2 >non-issue :Distributed/Ingest Node Execution or management of Ingest Pipelines Team:Data Management (obsolete) DO NOT USE. This team no longer exists. v8.2.0 labels Mar 16, 2022
@martijnvg martijnvg assigned martijnvg and unassigned martijnvg Mar 16, 2022
@martijnvg martijnvg added >non-issue and removed >bug labels Mar 16, 2022
@elasticmachine elasticmachine added the Team:Data Management (obsolete) DO NOT USE. This team no longer exists. label Mar 16, 2022
@martijnvg martijnvg merged commit a0e5772 into elastic:master Mar 16, 2022
martijnvg added a commit that referenced this pull request Mar 16, 2022
Forward port #85000 (Revert enrich cache lookup optimisation ) and #84838 (CompoundProcessor should also catch exceptions when executing a processor) to 8.1 branch.

Revert enrich cache lookup optimisation (#85028)

Forwardporting #85000 to master branch.
This PR reverts the optimisation that was added via #77259.
This optimisation cleverly ensures no duplicate searches happen if multiple threads concurrently execute the same search. However there are issues with the implementation that cause issues like #84781. The optimisation make use of CompletableFuture and in this case we don't check whether the result has completed exceptionally. Which causes the callback not being invoked and this leads to bulk request not being completed and hanging around.

The ingest framework due to its asynchronous nature is already complex and adding CompletableFuture into the mix makes debugging these issues very time consuming. This is the main reason why we like to revert this commit.

* CompoundProcessor should also catch exceptions when executing a processor (#84838) (#85035)

Currently, CompoundProcessor does not catch Exception and
if a processor throws an error and a method higher in the
call stack doesn't catch the exception then pipeline execution
stalls and bulk requests may not complete.

Usually these exceptions are caught by IngestService#executePipelines(...) method,
but when a processor executes async (for example: enrich processor) and the thread
that executes enrich is no longer the original write thread then there is no logic
that deals with failing pipeline execution and cleaning resources up. This then leads
to memory leaks.

Closes #84781

Also change how 'pipeline doesn't exist' error is thrown in TrackingResultProcessor.

With the change to CompoundProcessor thrown exceptions are caught and delegated to handler.
SimulateExecutionService in verbose mode ignores exceptions delegated to its handler, since
it assumes that processorResultList contains the result (successful or not successful) of every
processor in the pipeline.

In case TrackingResultProcessor for PipelineProcessor couldn't find the mentioned pipeline then
it just throws an error without updating the processorResultList. This commit addresses that.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Ingest Node Execution or management of Ingest Pipelines >non-issue Team:Data Management (obsolete) DO NOT USE. This team no longer exists. v8.2.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ingest enrich processor then foreach processor == possible memory leak

2 participants