Skip to content

Ingest failure metadata fields refer to compound processor #17823

@javanna

Description

@javanna

The compound processor is used internally in ingest when handling failures. As I understand though it should be an internal concept that users don't know about. If you define a processor that fails, which has an on_failure block defined, which fails again, and the pipeline has its own on_failure, the metadata about the failure itself will be referring to the compound processor rather than the one that really failed.

GET _ingest/pipeline/_simulate
{
  "pipeline" : {
    "processors" : [
      {
        "convert" : {
          "field": "doesnotexist",
          "type" : "integer",
          "on_failure" : [
            {
              "fail" : {
                "message" : "custom error message"
              }    
            }
          ]
        }
      }
    ],
    "on_failure" : [
      {
        "set" : {
          "field" : "failure",
          "value" : {
            "message" : "{{_ingest.on_failure_message}}",
            "processor" : "{{_ingest.on_failure_processor_type}}",
            "tag" : "{{_ingest.on_failure_processor_tag}}"
          }
        }
      }
    ]
  },
  "docs" : [
    {
      "_source" : {
        "message" : "test"
      }
    }
  ]
}
{
  "docs": [
    {
      "doc": {
        "_type": "_type",
        "_index": "_index",
        "_id": "_id",
        "_source": {
          "message": "test",
          "failure": {
            "tag": "CompoundProcessor-null-null",
            "message": "custom error message",
            "processor": "compound"
          }
        },
        "_ingest": {
          "timestamp": "2016-04-18T13:20:28.981+0000"
        }
      }
    }
  ]
}

The tag and the processor that goes in the document as part of the failure object are a bit off, they should be coming from the fail processor, which caused the failure. That information gets lost with the double wrapping.

Here is also a failing unit test to add to CompoundProcessorTests that reproduces the problem:

public void testFail() throws Exception {
        TestProcessor firstProcessor = new TestProcessor("id1", "first", ingestDocument -> {throw new RuntimeException("error");});
        FailProcessor failProcessor = new FailProcessor("tag_fail", new TestTemplateService.MockTemplate("custom error message"));
        TestProcessor secondProcessor = new TestProcessor("id2", "second", ingestDocument -> {
            Map<String, String> ingestMetadata = ingestDocument.getIngestMetadata();
            assertThat(ingestMetadata.size(), equalTo(3));
            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_MESSAGE_FIELD), equalTo("custom error message"));
            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TYPE_FIELD), equalTo("fail"));
            assertThat(ingestMetadata.get(CompoundProcessor.ON_FAILURE_PROCESSOR_TAG_FIELD), equalTo("tag_fail"));
        });

        CompoundProcessor failCompoundProcessor = new CompoundProcessor(Collections.singletonList(firstProcessor),
                Collections.singletonList(failProcessor));

        CompoundProcessor compoundProcessor = new CompoundProcessor(Collections.singletonList(failCompoundProcessor),
                Collections.singletonList(secondProcessor));
        compoundProcessor.execute(ingestDocument);

        assertThat(firstProcessor.getInvokedCounter(), equalTo(1));
        assertThat(secondProcessor.getInvokedCounter(), equalTo(1));
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions