Relevant telegraf.conf:
[agent]
interval = "30s"
round_interval = true
flush_buffer_when_full = true
metric_buffer_limit = 10000000
metric_batch_size = 100000
flush_interval = "2s"
flush_jitter = "0s"
debug = true
omit_hostname = true
[[outputs.influxdb]]
urls = ["http://db:8086"]
database = "kpi"
username =
password =
precision = "s"
retention_policy = "default"
timeout = "5s"
skip_database_creation = true
[[inputs.kafka_consumer]]
brokers = [ "node1:9094", "node2:9094", "node3:9094" ]
topics = ["kpi"]
version = "2.0.0"
max_message_len = 0
max_undelivered_messages = 1000
data_format = "influx"
System info:
Telegraf 1.13.4
Linux 5.3.0-42-generic #34~18.04.
Kafka version 2.4.0 (Commit:77a89fcf8d7fa018)
Docker
Docker Swarm 19.03.7
Steps to reproduce:
I have a Kafka cluster of 3 instances, running in the Docker Swarm environment with 3 nodes.
Each Kafka instance is on one node.
I have 3 Telegraf instances which uses kafka_consumer plugin to read the data from the Kafka cluster. It reads 1 topic which has 1 partition and 3 replicas - for each Kafka node
When Kafka instance which is the leader for the partition goes down, it happens that the Telegraf kafka_consumer does not switch to use another Kafka node (new partition leader) for consuming metric, but it tries to access the Kafka node (old partition leader) which is down.
Normal situation, everything up:
Kafka metadata - 3 brokers: node1, node2 and node3, partition leader is node2.
Metadata for all topics (from broker 1049: node1:9094/1049):
3 brokers:
broker 1049 at node1:9094
broker 1003 at node3:9094
broker 1001 at node2:9094
2 topics:
topic "kpi" with 1 partitions:
partition 0, leader 1001, replicas: 1001,1003,1049, isrs: 1049,1003,1001
topic "__consumer_offsets" with 1 partitions:
partition 0, leader 1001, replicas: 1001,1003,1049, isrs: 1049,1003,1001
Telegraf log:
2020-04-01T09:29:14Z I! Starting Telegraf 1.13.4
2020-04-01T09:29:14Z I! Using config file: /etc/telegraf/telegraf.conf
2020-04-01T09:29:14Z I! Loaded inputs: kafka_consumer
2020-04-01T09:29:14Z I! Loaded aggregators:
2020-04-01T09:29:14Z I! Loaded processors:
2020-04-01T09:29:14Z I! Loaded outputs: influxdb
2020-04-01T09:29:14Z I! Tags enabled:
2020-04-01T09:29:14Z I! [agent] Config: Interval:30s, Quiet:false, Hostname:"", Flush Interval:2s
2020-04-01T09:29:14Z D! [agent] Initializing plugins
2020-04-01T09:29:14Z D! [agent] Connecting outputs
2020-04-01T09:29:14Z D! [agent] Attempting connection to [outputs.influxdb]
2020-04-01T09:29:14Z D! [agent] Successfully connected to outputs.influxdb
2020-04-01T09:29:14Z D! [agent] Starting service inputs
2020-04-01T09:29:14Z D! [sarama]
2020-04-01T09:29:14Z D! [sarama] client/metadata fetching metadata for all topics from broker node1:9094
2020-04-01T09:29:14Z D! [sarama] Connected to broker at node1:9094 (unregistered)
2020-04-01T09:29:14Z D! [sarama] client/brokers registered new broker #1049 at node1:9094
2020-04-01T09:29:14Z D! [sarama] client/brokers registered new broker #1003 at node3:9094
2020-04-01T09:29:14Z D! [sarama] client/brokers registered new broker #1001 at node2:9094
2020-04-01T09:29:14Z D! [sarama]
2020-04-01T09:29:14Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T09:29:14Z D! [sarama] client/coordinator requesting coordinator for consumergroup RMIMH03S from node1:9094
2020-04-01T09:29:14Z D! [sarama] client/coordinator coordinator for consumergroup RMIMH03S is #1003 (node3:9094)
2020-04-01T09:29:14Z D! [sarama] Connected to broker at node3:9094 (registered as #1003)
2020-04-01T09:29:16Z D! [sarama] client/coordinator requesting coordinator for consumergroup RMIMH03S from node1:9094
2020-04-01T09:29:16Z D! [sarama] client/coordinator coordinator for consumergroup RMIMH03S is #1003 (node3:9094)
2020-04-01T09:29:16Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T09:29:16Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T09:29:16Z D! [sarama] consumer/broker/1003 added subscription to kpi/0
2020-04-01T09:29:18Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T09:29:20Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T09:29:22Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T09:29:22Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
...
Kafka partition leader failure (node2):
Kafka metadata - 2 brokers available: node1 and node3, partition leader changed to node3.
Metadata for all topics (from broker -1: 192.168.122.7:9094/bootstrap):
2 brokers:
broker 1049 at node1:9094
broker 1003 at node3:9094
2 topics:
topic "kpi" with 1 partitions:
partition 0, leader 1003, replicas: 1001,1003,1049, isrs: 1049,1003
topic "__consumer_offsets" with 1 partitions:
partition 0, leader 1003, replicas: 1001,1003,1049, isrs: 1049,1003
Telegraf still trying to access to old paritition leader node2, although the logs says that it is fetching metadata for the kafka topic ( [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094) where metadata clearly says there is a new leader:
2020-04-01T10:37:14Z D! [sarama] consumer/broker/1001 disconnecting due to error processing FetchRequest: read tcp 192.168.16.5:46504->192.168.122.5:9094: i/o timeout
2020-04-01T10:37:14Z D! [sarama] Closed connection to broker node2:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata got error from broker -1 while fetching metadata: read tcp 192.168.16.5:46352->192.168.122.5:9094: i/o timeout
2020-04-01T10:37:15Z D! [sarama] client/metadata got error from broker -1 while fetching metadata: read tcp 192.168.16.5:46352->192.168.122.5:9094: i/o timeout
2020-04-01T10:37:15Z D! [sarama] Closed connection to broker node2:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:15Z D! [sarama] client/brokers deregistered broker #-1 at node2:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:15Z D! [sarama] Connected to broker at node1:9094 (unregistered)
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:16Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:16Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:37:18Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:20Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:21Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
...
2020-04-01T10:37:58Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:37:58Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:00Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:02Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:03Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:03Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:03Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:03Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming kpi/0: read tcp 192.168.16.5:46504->192.168.122.5:9094: i/o timeout
2020-04-01T10:38:03Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:04Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:04Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:06Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:06Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:06Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:06Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:08Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:09Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:09Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:09Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:09Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:10Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:10Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:12Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:12Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:12Z E! [inputs.kafka_consumer] Error in plugin: dial tcp 192.168.122.5:9094: connect: no route to host
2020-04-01T10:38:12Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:14Z D! [outputs.influxdb] Buffer fullness: 0 / 10000000 metrics
2020-04-01T10:38:15Z D! [sarama] client/metadata fetching metadata for [kpi] from broker node1:9094
2020-04-01T10:38:15Z D! [sarama] Failed to connect to broker node2:9094: dial tcp 192.168.122.5:9094: connect: no route to host
...
Expected behavior:
I expect that the kafka_consumer reads new metadata from Kafka cluster and connects to the new selected partition leader during Kafka cluster node failure
Actual behavior:
When there is a failure of partition leader in Kafka cluster and the new leader is selected, Telegraf kafka_consumer still tries to connect to the old partition leader which is not available. It does not use the new leader information from metadata to connect to other available Kafka node in the cluster.
Additional info:
From my point of view this looks as a serious issue, as it makes this plugin completely unusable with Kafka cluster.
I have also tested this with Telegraf 1.14.0 and using version = "2.4.0" in telegraf.conf
Relevant telegraf.conf:
System info:
Telegraf 1.13.4
Linux 5.3.0-42-generic #34~18.04.
Kafka version 2.4.0 (Commit:77a89fcf8d7fa018)
Docker
Docker Swarm 19.03.7
Steps to reproduce:
I have a Kafka cluster of 3 instances, running in the Docker Swarm environment with 3 nodes.
Each Kafka instance is on one node.
I have 3 Telegraf instances which uses kafka_consumer plugin to read the data from the Kafka cluster. It reads 1 topic which has 1 partition and 3 replicas - for each Kafka node
When Kafka instance which is the leader for the partition goes down, it happens that the Telegraf kafka_consumer does not switch to use another Kafka node (new partition leader) for consuming metric, but it tries to access the Kafka node (old partition leader) which is down.
Normal situation, everything up:
Kafka metadata - 3 brokers: node1, node2 and node3, partition leader is node2.
Telegraf log:
Kafka partition leader failure (node2):
Kafka metadata - 2 brokers available: node1 and node3, partition leader changed to node3.
Telegraf still trying to access to old paritition leader node2, although the logs says that it is fetching metadata for the kafka topic (
[sarama] client/metadata fetching metadata for [kpi] from broker node1:9094) where metadata clearly says there is a new leader:Expected behavior:
I expect that the kafka_consumer reads new metadata from Kafka cluster and connects to the new selected partition leader during Kafka cluster node failure
Actual behavior:
When there is a failure of partition leader in Kafka cluster and the new leader is selected, Telegraf kafka_consumer still tries to connect to the old partition leader which is not available. It does not use the new leader information from metadata to connect to other available Kafka node in the cluster.
Additional info:
From my point of view this looks as a serious issue, as it makes this plugin completely unusable with Kafka cluster.
I have also tested this with Telegraf 1.14.0 and using
version = "2.4.0"in telegraf.conf