-
Notifications
You must be signed in to change notification settings - Fork 25.8k
Ingest failure metadata fields refer to compound processor #17823
Copy link
Copy link
Closed
Labels
:Distributed/Ingest NodeExecution or management of Ingest PipelinesExecution or management of Ingest Pipelines>bugv5.0.0-alpha3
Description
The compound processor is used internally in ingest when handling failures. As I understand though it should be an internal concept that users don't know about. If you define a processor that fails, which has an on_failure block defined, which fails again, and the pipeline has its own on_failure, the metadata about the failure itself will be referring to the compound processor rather than the one that really failed.
GET _ingest/pipeline/_simulate
{
"pipeline" : {
"processors" : [
{
"convert" : {
"field": "doesnotexist",
"type" : "integer",
"on_failure" : [
{
"fail" : {
"message" : "custom error message"
}
}
]
}
}
],
"on_failure" : [
{
"set" : {
"field" : "failure",
"value" : {
"message" : "{{_ingest.on_failure_message}}",
"processor" : "{{_ingest.on_failure_processor_type}}",
"tag" : "{{_ingest.on_failure_processor_tag}}"
}
}
}
]
},
"docs" : [
{
"_source" : {
"message" : "test"
}
}
]
}
{
"docs": [
{
"doc": {
"_type": "_type",
"_index": "_index",
"_id": "_id",
"_source": {
"message": "test",
"failure": {
"tag": "CompoundProcessor-null-null",
"message": "custom error message",
"processor": "compound"
}
},
"_ingest": {
"timestamp": "2016-04-18T13:20:28.981+0000"
}
}
}
]
}
The tag and the processor that goes in the document as part of the failure object are a bit off, they should be coming from the fail processor, which caused the failure. That information gets lost with the double wrapping.
Here is also a failing unit test to add to CompoundProcessorTests that reproduces the problem:
public void testFail() throws Exception {
TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
FailProcessor failProcessor = new FailProcessor("tag_fail", new TestTemplateService.MockTemplate("custom error message"));
TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {
Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
assertThat(ingestMetadata.size(), equalTo(3));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
});
CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor),
Collections.singletonList(failProcessor));
CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
Collections.singletonList(secondProcessor));
compoundProcessor.execute(ingestDocument);
assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
}
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
:Distributed/Ingest NodeExecution or management of Ingest PipelinesExecution or management of Ingest Pipelines>bugv5.0.0-alpha3
Type
Fields
Give feedbackNo fields configured for issues without a type.