[Ingest] ingest on failure#15565
Conversation
|
@martijnvg @javanna . I am convinced I was thinking of the on_failure all wrong previously. We spoke of it as something that is a feature of a Processor. I have opened a PR for on_failure where I assume it is a feature of a pipeline definition. This way Processors do not need to have knowledge of their failure processors. That knowledge was a rather confusing self-referencing dependency. so I wrap processors that are created within pipelines within HandledProcessor objects. I am not in love with that name, but I can't think of a better one at the moment. Please, let me know what you think. currently, I am missing an |
There was a problem hiding this comment.
I think that embedding the on failure handling here is good. I've been thinking about how to make this class also usable on the pipeline level:
What I think we can change here is to let HandledProcessor also handle multiple processors, so that then this class can also be used at the pipeline level.
Something like this:
public class HandledProcessor implements Processor {
// No need to use HandledProcessor as generic type, since that is an implementation detail
private final List<Processor> processors;
private final List<Processor> onFailureProcessors;
public HandledProcessor(Processor... processor) {
this(Arrays.asList(processor), Collections.emptyList());
}
public HandledProcessor(List<Processor> processors, List<Processor> onFailureProcessors) {
this.processors = Collections.unmodifiableList(processors);
this.onFailureProcessors = Collections.unmodifiableList(onFailureProcessors);
}
public boolean isHandled() {
return !onFailureProcessors.isEmpty();
}
public List<Processor> getProcessors() {
return processors;
}
@Override
public void execute(IngestDocument ingestDocument) throws Exception {
try {
for (Processor processor : processors) {
processor.execute(ingestDocument);
}
} catch (Exception e) {
if (isHandled()) {
executeOnFailure(ingestDocument);
} else {
throw e;
}
}
}
public void executeOnFailure(IngestDocument ingestDocument) throws Exception {
for (Processor processor : onFailureProcessors) {
processor.execute(ingestDocument);
}
}
}Then in Pipeline the processors field type can be changed to HandledProcessor.
The the pipeline factory can then benefit from having the same parsing logic for the on_failure on both processor and pipeline level.
However the Processor#getType() method is problematic with this approach, because it doesn't assume a processor is embedding multiple processors. This method is only really used in the simulate api and I think for that reason this method needs to be removed from the Processor interface and the functionality of figuring out what is the processor type should go somewhere else maybe the PipelineStore? We can do this in a followup pr and for now just throw an UnsupportedOperationException in HandledProcessor#getType(). The Pipeline#getProcessors() method then just needs to delegate to HandledProcessor#getProcessors so that the SimulateExecutionService doesn't fail.
If this makes sense, then we should rename HandledProcessor to CompoundProcessor.
There was a problem hiding this comment.
For now, what do you say about having CompoundProcessor's getType return: "compound[processor1,processor2]". so it wraps all the underlying processor types within its type name.
There was a problem hiding this comment.
I think that is okay for now. I do like to see Processor#getType() be replaced that just works for the simulate api.
|
@talevy I left an idea on how we should get |
fa2c398 to
76b93a6
Compare
There was a problem hiding this comment.
maybe add a private helper method for parsing on failure processors that both readProcessor(...) and create(...) use?
There was a problem hiding this comment.
yeah. good idea. I added a readProcessors helper. definitely improves code-reuse.
4084a6f to
03037e5
Compare
There was a problem hiding this comment.
million dollar question: shall we catch Exception or Throwable here?
There was a problem hiding this comment.
I suppose Throwable would catch more things, as we would allow processors to run arbitrary code in their execute. I will change it to that
There was a problem hiding this comment.
per @martijnvg's note. I will leave this as Exception
|
I left a few minor comments, I like this very much though, the CompoundProcessor looks good ;) |
03037e5 to
ecd3a1e
Compare
There was a problem hiding this comment.
Maybe catch Exception instead. If there is a jvm error (out of memory and such) then we should just bail and not try to execute the onfailure processors.
8b118d9 to
96f99c3
Compare
both processors and pipelines now have the ability to define a separate list of processors to be executed if the original line of execution throws an Exception. processors without an on_failure parameter defined will throw an exception and exit the pipeline immediately. processors with on_failure defined will catch the exception and allow for further processors to run. Exceptions within the on_failure block will be treated the same as the top-level.
After some discussion, a sort of consensus around an
on_failureblock for all processors to have was decided on, instead of acontinue_on_failureflag described by this issue.An example of this is as follows:
an
on_failurewill also be added to a pipeline as a whole:processors without an
on_failureparameter defined will throw an exception and exit the pipeline immediately. processors withon_failuredefined will catch the exception and allow for further processors to run. Exceptions within theon_failureblock will be treated the same as the top-level.If a user wishes to handle a failure, and still exit the pipeline immediately, a
failprocessor will be introduced. This processor will do nothing but throw an exception so that the pipeline exits.Closes: #14548