Skip to content

Commit e935b26

Browse files
authored
Add "mechanism" in output.kafka to support SCRAM-SHA-512 and SCRAM-SHA-256 (#12867)
1 parent 755227d commit e935b26

2 files changed

Lines changed: 99 additions & 1 deletion

File tree

libbeat/outputs/kafka/config.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,13 @@ type kafkaConfig struct {
5858
Username string `config:"username"`
5959
Password string `config:"password"`
6060
Codec codec.Config `config:"codec"`
61+
Sasl saslConfig `config:"sasl"`
62+
}
63+
64+
type saslConfig struct {
65+
SaslMechanism string `config:"mechanism"`
66+
//SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future?
67+
//SaslPassword string `config:"password"`
6168
}
6269

6370
type metaConfig struct {
@@ -83,6 +90,12 @@ var compressionModes = map[string]sarama.CompressionCodec{
8390
"snappy": sarama.CompressionSnappy,
8491
}
8592

93+
const (
94+
saslTypePlaintext = sarama.SASLTypePlaintext
95+
saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256
96+
saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512
97+
)
98+
8699
func defaultConfig() kafkaConfig {
87100
return kafkaConfig{
88101
Hosts: nil,
@@ -113,6 +126,32 @@ func defaultConfig() kafkaConfig {
113126
}
114127
}
115128

129+
func (c *saslConfig) configureSarama(config *sarama.Config) error {
130+
switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case
131+
case "":
132+
// SASL is not enabled
133+
return nil
134+
case saslTypePlaintext:
135+
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext)
136+
case saslTypeSCRAMSHA256:
137+
config.Net.SASL.Handshake = true
138+
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256)
139+
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
140+
return &XDGSCRAMClient{HashGeneratorFcn: SHA256}
141+
}
142+
case saslTypeSCRAMSHA512:
143+
config.Net.SASL.Handshake = true
144+
config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512)
145+
config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient {
146+
return &XDGSCRAMClient{HashGeneratorFcn: SHA512}
147+
}
148+
default:
149+
return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism)
150+
}
151+
152+
return nil
153+
}
154+
116155
func readConfig(cfg *common.Config) (*kafkaConfig, error) {
117156
c := defaultConfig()
118157
if err := cfg.Unpack(&c); err != nil {
@@ -144,7 +183,6 @@ func (c *kafkaConfig) Validate() error {
144183
return fmt.Errorf("compression_level must be between 0 and 9")
145184
}
146185
}
147-
148186
return nil
149187
}
150188

@@ -169,6 +207,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
169207
if err != nil {
170208
return nil, err
171209
}
210+
172211
if tls != nil {
173212
k.Net.TLS.Enable = true
174213
k.Net.TLS.Config = tls.BuildModuleConfig("")
@@ -178,6 +217,11 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
178217
k.Net.SASL.Enable = true
179218
k.Net.SASL.User = config.Username
180219
k.Net.SASL.Password = config.Password
220+
err = config.Sasl.configureSarama(k)
221+
222+
if err != nil {
223+
return nil, err
224+
}
181225
}
182226

183227
// configure metadata update properties

libbeat/outputs/kafka/scram.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go
19+
package kafka
20+
21+
import (
22+
"crypto/sha256"
23+
"crypto/sha512"
24+
"hash"
25+
26+
"github.com/xdg/scram"
27+
)
28+
29+
var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() }
30+
var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() }
31+
32+
type XDGSCRAMClient struct {
33+
*scram.Client
34+
*scram.ClientConversation
35+
scram.HashGeneratorFcn
36+
}
37+
38+
func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) {
39+
x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID)
40+
if err != nil {
41+
return err
42+
}
43+
x.ClientConversation = x.Client.NewConversation()
44+
return nil
45+
}
46+
47+
func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) {
48+
response, err = x.ClientConversation.Step(challenge)
49+
return
50+
}
51+
52+
func (x *XDGSCRAMClient) Done() bool {
53+
return x.ClientConversation.Done()
54+
}

0 commit comments

Comments
 (0)