Skip to content

[Ingest] ingest on failure#15565

Merged
talevy merged 1 commit intoelastic:feature/ingestfrom
talevy:ingest_on_failure
Dec 22, 2015
Merged

[Ingest] ingest on failure#15565
talevy merged 1 commit intoelastic:feature/ingestfrom
talevy:ingest_on_failure

Conversation

@talevy
Copy link
Copy Markdown
Contributor

@talevy talevy commented Dec 21, 2015

After some discussion, a sort of consensus around an on_failure block for all processors to have was decided on, instead of a continue_on_failure flag described by this issue.

An example of this is as follows:

{
  description: "my pipeline with on_failure",
  processors: [
    {
      "grok" : {
          ...
          on_failure : [
            { "set" : { "grok.failed" : true } },
            { "meta": { "_index" : "failed-index" } }
          ]
      }
    },
    {
       "date" : {
         ...
       }
    }
  ]
}

an on_failure will also be added to a pipeline as a whole:

{
   "description" : "...",
   "processors" : [ ... ],
   "on_failure" : [ ... ]
}

processors without an on_failure parameter defined will throw an exception and exit the pipeline immediately. processors with on_failure defined will catch the exception and allow for further processors to run. Exceptions within the on_failure block will be treated the same as the top-level.

If a user wishes to handle a failure, and still exit the pipeline immediately, a fail processor will be introduced. This processor will do nothing but throw an exception so that the pipeline exits.

Closes: #14548

@talevy talevy added the :Distributed/Ingest Node Execution or management of Ingest Pipelines label Dec 21, 2015
@talevy
Copy link
Copy Markdown
Contributor Author

talevy commented Dec 21, 2015

@martijnvg @javanna . I am convinced I was thinking of the on_failure all wrong previously. We spoke of it as something that is a feature of a Processor. I have opened a PR for on_failure where I assume it is a feature of a pipeline definition. This way Processors do not need to have knowledge of their failure processors. That knowledge was a rather confusing self-referencing dependency. so I wrap processors that are created within pipelines within HandledProcessor objects. I am not in love with that name, but I can't think of a better one at the moment. Please, let me know what you think.

currently, I am missing an on_failure for a whole pipeline.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that embedding the on failure handling here is good. I've been thinking about how to make this class also usable on the pipeline level:

What I think we can change here is to let HandledProcessor also handle multiple processors, so that then this class can also be used at the pipeline level.

Something like this:

public class HandledProcessor implements Processor {

    // No need to use HandledProcessor as generic type, since that is an implementation detail
    private final List<Processor> processors; 
    private final List<Processor> onFailureProcessors;

    public HandledProcessor(Processor... processor) {
        this(Arrays.asList(processor), Collections.emptyList());
    }
    public HandledProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
        this.processors = Collections.unmodifiableList(processors);
        this.onFailureProcessors = Collections.unmodifiableList(onFailureProcessors);
    }

    public boolean isHandled() {
        return !onFailureProcessors.isEmpty();
    }

    public List<Processor> getProcessors() {
        return processors;
    }

    @Override
    public void execute(IngestDocument ingestDocument) throws Exception {
        try {
            for (Processor processor : processors) {
                processor.execute(ingestDocument);
            }
        } catch (Exception e) {
            if (isHandled()) {
                executeOnFailure(ingestDocument);
            } else {
                throw e;
            }
        }

    }

    public void executeOnFailure(IngestDocument ingestDocument) throws Exception {
        for (Processor processor : onFailureProcessors) {
            processor.execute(ingestDocument);
        }
    }
}

Then in Pipeline the processors field type can be changed to HandledProcessor.
The the pipeline factory can then benefit from having the same parsing logic for the on_failure on both processor and pipeline level.

However the Processor#getType() method is problematic with this approach, because it doesn't assume a processor is embedding multiple processors. This method is only really used in the simulate api and I think for that reason this method needs to be removed from the Processor interface and the functionality of figuring out what is the processor type should go somewhere else maybe the PipelineStore? We can do this in a followup pr and for now just throw an UnsupportedOperationException in HandledProcessor#getType(). The Pipeline#getProcessors() method then just needs to delegate to HandledProcessor#getProcessors so that the SimulateExecutionService doesn't fail.

If this makes sense, then we should rename HandledProcessor to CompoundProcessor.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, what do you say about having CompoundProcessor's getType return: "compound[processor1,processor2]". so it wraps all the underlying processor types within its type name.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is okay for now. I do like to see Processor#getType() be replaced that just works for the simulate api.

@martijnvg
Copy link
Copy Markdown
Member

@talevy I left an idea on how we should get HandledProcessor to work on both processor and pipeline level.

@talevy talevy force-pushed the ingest_on_failure branch 2 times, most recently from fa2c398 to 76b93a6 Compare December 21, 2015 19:48
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a private helper method for parsing on failure processors that both readProcessor(...) and create(...) use?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. good idea. I added a readProcessors helper. definitely improves code-reuse.

@talevy talevy force-pushed the ingest_on_failure branch 4 times, most recently from 4084a6f to 03037e5 Compare December 22, 2015 16:42
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

million dollar question: shall we catch Exception or Throwable here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose Throwable would catch more things, as we would allow processors to run arbitrary code in their execute. I will change it to that

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per @martijnvg's note. I will leave this as Exception

@javanna
Copy link
Copy Markdown
Contributor

javanna commented Dec 22, 2015

I left a few minor comments, I like this very much though, the CompoundProcessor looks good ;)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe catch Exception instead. If there is a jvm error (out of memory and such) then we should just bail and not try to execute the onfailure processors.

@talevy talevy force-pushed the ingest_on_failure branch 2 times, most recently from 8b118d9 to 96f99c3 Compare December 22, 2015 18:24
both processors and pipelines now have the ability to define
a separate list of processors to be executed if the original line
of execution throws an Exception.

processors without an on_failure parameter defined will throw an
exception and exit the pipeline immediately. processors with on_failure
defined will catch the exception and allow for further processors to
run. Exceptions within the on_failure block will be treated the same as
the top-level.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Ingest Node Execution or management of Ingest Pipelines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants