-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Description
kafka 集群配置如下:
broker.id=1
listeners=SASL_PLAINTEXT://QL110:9092
advertised.listeners=SASL_PLAINTEXT://QL110:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
default.replication.factor=3
log.cleanup.policy=delete
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.91.198.2:2181,10.91.198.1:2181,10.91.198.3:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
security.inter.broker.protocol=SASL_PLAINTEXT
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin;User:tiuser
brokers在zookeeper里的值为:
get /brokers/ids/1
{"listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"},"endpoints":["SASL_PLAINTEXT://QL110:9092"],"jmx_port":9999,"host":null,"timestamp":"1666333001145","port":-1,"version":4}
此时接入集群,正常填写zk、bootstrapServers、jmx、集群配置(SASL相关)配置,前端web response:
{"message":"成功","code":0,"data":{"jmxPort":9999,"kafkaVersion":null,"errList":[{"message":"连接Jmx失败","code":31,"data":null}],"zookeeper":"xxx.xxx.xxx.xx:2181"}}
如果此时忽略进行保存确定,后台会大量报:
2023-02-14 06:41:16.205 ERROR 48967 --- [-1-8-thread-484] c.x.k.s.k.c.s.p.i.PartitionServiceImpl : method=batchGetPartitionOffsetFromKafkaAdminClient||clusterPhyId=1||topicName=OGGTOORACLE||offsetSpec=com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec$KSLatestSpec@494c3a83||errMsg=exception!
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.batchGetPartitionOffsetFromKafkaAdminClient(PartitionServiceImpl.java:393)
at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
at com.xiaojukeji.know.streaming.km.core.service.partition.impl.PartitionServiceImpl.getAllPartitionOffsetFromKafka(PartitionServiceImpl.java:195)
at com.xiaojukeji.know.streaming.km.core.service.cluster.impl.ClusterMetricServiceImpl.getMessageSize(ClusterMetricServiceImpl.java:352)
at com.xiaojukeji.know.streaming.km.core.service.version.impl.VersionControlServiceImpl.doHandler(VersionControlServiceImpl.java:93)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService.doVCHandler(BaseVersionControlService.java:62)
at com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService.doVCHandler(BaseKafkaVersionControlService.java:29)
at com.xiaojukeji.know.streaming.km.core.service.cluster.impl.ClusterMetricServiceImpl.collectClusterMetricsFromKafka(ClusterMetricServiceImpl.java:208)
at com.xiaojukeji.know.streaming.km.collector.metric.kafka.ClusterMetricCollector.lambda$collectKafkaMetrics$0(ClusterMetricCollector.java:57)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Topic authorization failed.
解决方法:
com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl.KafkaZKDAOImpl.class 里的 getBrokerMetadata(String zkAddress) 方法体(55-56行之间)里增加BrokerMetadata.parseAndUpdateBrokerMetadata(metadata);逻辑:如果host为空,就取endpoint里的值,具体详见方法体,然后重新打包部署