Skip to content

input kafka_consumer uses old partition leader during Kafka cluster node failure #7263

@dpajin

Description

@dpajin

Relevant telegraf.conf:

[agent]
  interval = "30s"
  round_interval = true
  flush_buffer_when_full = true
  metric_buffer_limit = 10000000
  metric_batch_size = 100000
  flush_interval = "2s"
  flush_jitter = "0s"
  debug = true
  omit_hostname = true

[[outputs.influxdb]]
  urls = ["http://db:8086"]
  database = "kpi"
  username = 
  password = 
  precision = "s"
  retention_policy = "default"
  timeout = "5s"
  skip_database_creation = true

[[inputs.kafka_consumer]]
  brokers = [ "node1:9094", "node2:9094", "node3:9094" ]
  topics = ["kpi"] 
  version = "2.0.0"
  max_message_len = 0
  max_undelivered_messages = 1000
  data_format = "influx"

System info:

Telegraf 1.13.4

Linux 5.3.0-42-generic #34~18.04.

Kafka version 2.4.0 (Commit:77a89fcf8d7fa018)

Docker

Docker Swarm 19.03.7

Steps to reproduce:

I have a Kafka cluster of 3 instances, running in the Docker Swarm environment with 3 nodes.
Each Kafka instance is on one node.

I have 3 Telegraf instances which uses kafka_consumer plugin to read the data from the Kafka cluster. It reads 1 topic which has 1 partition and 3 replicas - for each Kafka node

When Kafka instance which is the leader for the partition goes down, it happens that the Telegraf kafka_consumer does not switch to use another Kafka node (new partition leader) for consuming metric, but it tries to access the Kafka node (old partition leader) which is down.

Normal situation, everything up:

Kafka metadata - 3 brokers: node1, node2 and node3, partition leader is node2.

Metadata for all topics (from broker 1049: node1:9094/1049):
 3 brokers:
  broker 1049 at node1:9094
  broker 1003 at node3:9094
  broker 1001 at node2:9094
 2 topics:
  topic "kpi" with 1 partitions:
    partition 0, leader 1001, replicas: 1001,1003,1049, isrs: 1049,1003,1001
  topic "__consumer_offsets" with 1 partitions:
    partition 0, leader 1001, replicas: 1001,1003,1049, isrs: 1049,1003,1001

Telegraf log:


2020-04-01T09:29:14Z I! Starting Telegraf 1.13.4
2020-04-01T09:29:14Z I! Using config file: /etc/telegraf/telegraf.conf
2020-04-01T09:29:14Z I! Loaded inputs: kafka_consumer
2020-04-01T09:29:14Z I! Loaded aggregators: 
2020-04-01T09:29:14Z I! Loaded processors: 
2020-04-01T09:29:14Z I! Loaded outputs: influxdb
2020-04-01T09:29:14Z I! Tags enabled: 
2020-04-01T09:29:14Z I! [agent] Config: Interval:30s, Quiet:false, Hostname:"", Flush Interval:2s
2020-04-01T09:29:14Z D! [agent] Initializing plugins
2020-04-01T09:29:14Z D! [agent] Connecting outputs
2020-04-01T09:29:14Z D! [agent] Attempting connection to [outputs.influxdb]
2020-04-01T09:29:14Z D! [agent] Successfully connected to outputs.influxdb
2020-04-01T09:29:14Z D! [agent] Starting service inputs
2020-04-01T09:29:14Z D! [sarama] 
2020-04-01T09:29:14Z D! [sarama] client/metadata fetching metadata for all topics from broker node1:9094
2020-04-01T09:29:14Z D! [sarama] Connected to broker at node1:9094 (unregistered)
2020-04-01T09:29:14Z D! [sarama] client/brokers registered new broker #1049 at node1:9094
2020-04-01T09:29:14Z D! [sarama] client/brokers registered new broker #1003 at node3:9094
2020-04-01T09:29:14Z D! [sarama] client/brokers registered new broker #1001 at node2:9094
2020-04-01T09:29:14Z D! [sarama] 
2020-04-01T09:29:14Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T09:29:14Z D! [sarama] client/coordinator requesting coordinator for consumergroup RMIMH03S from node1:9094
2020-04-01T09:29:14Z D! [sarama] client/coordinator coordinator for consumergroup RMIMH03S is #1003 (node3:9094)
2020-04-01T09:29:14Z D! [sarama] Connected to broker at node3:9094 (registered as #1003)
2020-04-01T09:29:16Z D! [sarama] client/coordinator requesting coordinator for consumergroup RMIMH03S from node1:9094
2020-04-01T09:29:16Z D! [sarama] client/coordinator coordinator for consumergroup RMIMH03S is #1003 (node3:9094)
2020-04-01T09:29:16Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T09:29:16Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T09:29:16Z D! [sarama] consumer/broker/1003 added subscription to kpi/0
2020-04-01T09:29:18Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T09:29:20Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T09:29:22Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T09:29:22Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
...

Kafka partition leader failure (node2):

Kafka metadata - 2 brokers available: node1 and node3, partition leader changed to node3.

Metadata for all topics (from broker -1: 192.168.122.7:9094/bootstrap):
 2 brokers:
  broker 1049 at node1:9094
  broker 1003 at node3:9094
 2 topics:
  topic "kpi" with 1 partitions:
    partition 0, leader 1003, replicas: 1001,1003,1049, isrs: 1049,1003
  topic "__consumer_offsets" with 1 partitions:
    partition 0, leader 1003, replicas: 1001,1003,1049, isrs: 1049,1003

Telegraf still trying to access to old paritition leader node2, although the logs says that it is fetching metadata for the kafka topic ( [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094) where metadata clearly says there is a new leader:

2020-04-01T10:37:14Z D! [sarama] consumer/broker/1001 disconnecting due to error processing FetchRequest: read tcp 192.168.16.5:46504->192.168.122.5:9094: i/o timeout
2020-04-01T10:37:14Z D! [sarama] Closed connection to broker node2:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata got error from broker -1 while fetching metadata: read tcp 192.168.16.5:46352->192.168.122.5:9094: i/o timeout
2020-04-01T10:37:15Z D! [sarama] client/metadata got error from broker -1 while fetching metadata: read tcp 192.168.16.5:46352->192.168.122.5:9094: i/o timeout
2020-04-01T10:37:15Z D! [sarama] Closed connection to broker node2:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:15Z D! [sarama] client/brokers deregistered broker #-1 at node2:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:15Z D! [sarama] Connected to broker at node1:9094 (unregistered)
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:16Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:16Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:18Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:20Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:21Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
...
2020-04-01T10:37:58Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:58Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:00Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:02Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:03Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:03Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:03Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:03Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming kpi/0: read tcp 192.168.16.5:46504->192.168.122.5:9094: i/o timeout
2020-04-01T10:38:03Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:04Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:04Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:06Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:06Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:06Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:06Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:08Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:09Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:09Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:09Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:09Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:10Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:10Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:12Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:12Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:12Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:12Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:14Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:15Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
...

Expected behavior:

I expect that the kafka_consumer reads new metadata from Kafka cluster and connects to the new selected partition leader during Kafka cluster node failure

Actual behavior:

When there is a failure of partition leader in Kafka cluster and the new leader is selected, Telegraf kafka_consumer still tries to connect to the old partition leader which is not available. It does not use the new leader information from metadata to connect to other available Kafka node in the cluster.

Additional info:

From my point of view this looks as a serious issue, as it makes this plugin completely unusable with Kafka cluster.

I have also tested this with Telegraf 1.14.0 and using version = "2.4.0" in telegraf.conf

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/kafkabugunexpected problem or unintended behavior

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions