Skip to content

Commit 66829ff

Browse files
authored
Add support for SCRAM authentication in kafka metricbeat module (#24810)
1 parent 655984e commit 66829ff

12 files changed

Lines changed: 97 additions & 40 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
944944
- Move IIS module to GA and map fields. {issue}22609[22609] {pull}23024[23024]
945945
- Apache: convert status.total_kbytes to status.total_bytes in fleet mode. {pull}23022[23022]
946946
- Release MSSQL as GA {pull}23146[23146]
947+
- Add support for SASL/SCRAM authentication to the Kafka module. {pull}24810[24810]
947948

948949
*Packetbeat*
949950

libbeat/common/kafka/sasl.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package kafka
19+
20+
import (
21+
"fmt"
22+
"strings"
23+
24+
"github.com/Shopify/sarama"
25+
)
26+
27+
type SaslConfig struct {
28+
SaslMechanism string `config:"mechanism"`
29+
}
30+
31+
const (
32+
saslTypePlaintext = sarama.SASLTypePlaintext
33+
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
34+
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
35+
)
36+
37+
func (c *SaslConfig) ConfigureSarama(config *sarama.Config) {
38+
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
39+
case "":
40+
// SASL is not enabled
41+
return
42+
case saslTypePlaintext:
43+
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
44+
case saslTypeSCRAMSHA256:
45+
config.Net.SASL.Handshake = true
46+
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
47+
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
48+
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
49+
}
50+
case saslTypeSCRAMSHA512:
51+
config.Net.SASL.Handshake = true
52+
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
53+
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
54+
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
55+
}
56+
default:
57+
// This should never happen because `SaslMechanism` is checked on `Validate()`, keeping a panic to detect it earlier if it happens.
58+
panic(fmt.Sprintf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism))
59+
}
60+
}
61+
62+
func (c *SaslConfig) Validate() error {
63+
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
64+
case "", saslTypePlaintext, saslTypeSCRAMSHA256, saslTypeSCRAMSHA512:
65+
default:
66+
return fmt.Errorf("not valid SASL mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
67+
}
68+
return nil
69+
}

libbeat/outputs/kafka/config.go

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,10 @@ type kafkaConfig struct {
6868
Username string `config:"username"`
6969
Password string `config:"password"`
7070
Codec codec.Config `config:"codec"`
71-
Sasl saslConfig `config:"sasl"`
71+
Sasl kafka.SaslConfig `config:"sasl"`
7272
EnableFAST bool `config:"enable_krb5_fast"`
7373
}
7474

75-
type saslConfig struct {
76-
SaslMechanism string `config:"mechanism"`
77-
}
78-
7975
type metaConfig struct {
8076
Retry metaRetryConfig `config:"retry"`
8177
RefreshFreq time.Duration `config:"refresh_frequency" validate:"min=0"`
@@ -140,36 +136,6 @@ func defaultConfig() kafkaConfig {
140136
}
141137
}
142138

143-
func (c *saslConfig) configureSarama(config *sarama.Config) error {
144-
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
145-
case "":
146-
// SASL is not enabled
147-
return nil
148-
case saslTypePlaintext:
149-
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
150-
case saslTypeSCRAMSHA256:
151-
cfgwarn.Beta("SCRAM-SHA-256 authentication for Kafka is beta.")
152-
153-
config.Net.SASL.Handshake = true
154-
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
155-
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
156-
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
157-
}
158-
case saslTypeSCRAMSHA512:
159-
cfgwarn.Beta("SCRAM-SHA-512 authentication for Kafka is beta.")
160-
161-
config.Net.SASL.Handshake = true
162-
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
163-
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
164-
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
165-
}
166-
default:
167-
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
168-
}
169-
170-
return nil
171-
}
172-
173139
func readConfig(cfg *common.Config) (*kafkaConfig, error) {
174140
c := defaultConfig()
175141
if err := cfg.Unpack(&c); err != nil {
@@ -252,11 +218,7 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
252218
k.Net.SASL.Enable = true
253219
k.Net.SASL.User = config.Username
254220
k.Net.SASL.Password = config.Password
255-
err = config.Sasl.configureSarama(k)
256-
257-
if err != nil {
258-
return nil, err
259-
}
221+
config.Sasl.ConfigureSarama(k)
260222
}
261223

262224
// configure metadata update properties

metricbeat/docs/modules/kafka.asciidoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,10 @@ metricbeat.modules:
8585
#username: ""
8686
#password: ""
8787
88+
# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
89+
# Defaults to PLAIN when `username` and `password` are configured.
90+
#sasl.mechanism: ''
91+
8892
# Metrics collected from a Kafka broker using Jolokia
8993
#- module: kafka
9094
# metricsets:

metricbeat/metricbeat.reference.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -436,6 +436,10 @@ metricbeat.modules:
436436
#username: ""
437437
#password: ""
438438

439+
# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
440+
# Defaults to PLAIN when `username` and `password` are configured.
441+
#sasl.mechanism: ''
442+
439443
# Metrics collected from a Kafka broker using Jolokia
440444
#- module: kafka
441445
# metricsets:

metricbeat/module/kafka/_meta/config.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@
3030
#username: ""
3131
#password: ""
3232

33+
# SASL authentication mechanism used. Can be one of PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512.
34+
# Defaults to PLAIN when `username` and `password` are configured.
35+
#sasl.mechanism: ''
36+
3337
# Metrics collected from a Kafka broker using Jolokia
3438
#- module: kafka
3539
# metricsets:

metricbeat/module/kafka/broker.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type BrokerSettings struct {
6060
TLS *tls.Config
6161
Username, Password string
6262
Version kafka.Version
63+
Sasl kafka.SaslConfig
6364
}
6465

6566
type GroupDescription struct {
@@ -91,6 +92,7 @@ func NewBroker(host string, settings BrokerSettings) *Broker {
9192
cfg.Net.SASL.Enable = true
9293
cfg.Net.SASL.User = user
9394
cfg.Net.SASL.Password = settings.Password
95+
settings.Sasl.ConfigureSarama(cfg)
9496
}
9597
cfg.Version, _ = settings.Version.Get()
9698

metricbeat/module/kafka/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"fmt"
2222
"time"
2323

24+
"github.com/elastic/beats/v7/libbeat/common/kafka"
2425
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
2526
)
2627

@@ -31,6 +32,7 @@ type metricsetConfig struct {
3132
Username string `config:"username"`
3233
Password string `config:"password"`
3334
ClientID string `config:"client_id"`
35+
Sasl kafka.SaslConfig `config:"sasl"`
3436
}
3537

3638
var defaultConfig = metricsetConfig{

metricbeat/module/kafka/metricset.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ func NewMetricSet(base mb.BaseMetricSet, options MetricSetOptions) (*MetricSet,
6464
Username: config.Username,
6565
Password: config.Password,
6666
Version: Version(options.Version),
67+
Sasl: config.Sasl,
6768
}
6869

6970
return &MetricSet{

0 commit comments

Comments
 (0)