@@ -58,6 +58,13 @@ type kafkaConfig struct {
5858 Username string `config:"username"`
5959 Password string `config:"password"`
6060 Codec codec.Config `config:"codec"`
61+ Sasl saslConfig `config:"sasl"`
62+ }
63+
64+ type saslConfig struct {
65+ SaslMechanism string `config:"mechanism"`
66+ //SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future?
67+ //SaslPassword string `config:"password"`
6168}
6269
6370type metaConfig struct {
@@ -83,6 +90,12 @@ var compressionModes = map[string]sarama.CompressionCodec{
8390 "snappy" : sarama .CompressionSnappy ,
8491}
8592
93+ const (
94+ saslTypePlaintext = sarama .SASLTypePlaintext
95+ saslTypeSCRAMSHA256 = sarama .SASLTypeSCRAMSHA256
96+ saslTypeSCRAMSHA512 = sarama .SASLTypeSCRAMSHA512
97+ )
98+
8699func defaultConfig () kafkaConfig {
87100 return kafkaConfig {
88101 Hosts : nil ,
@@ -113,6 +126,32 @@ func defaultConfig() kafkaConfig {
113126 }
114127}
115128
129+ func (c * saslConfig ) configureSarama (config * sarama.Config ) error {
130+ switch strings .ToUpper (c .SaslMechanism ) { // try not to force users to use all upper case
131+ case "" :
132+ // SASL is not enabled
133+ return nil
134+ case saslTypePlaintext :
135+ config .Net .SASL .Mechanism = sarama .SASLMechanism (sarama .SASLTypePlaintext )
136+ case saslTypeSCRAMSHA256 :
137+ config .Net .SASL .Handshake = true
138+ config .Net .SASL .Mechanism = sarama .SASLMechanism (sarama .SASLTypeSCRAMSHA256 )
139+ config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient {
140+ return & XDGSCRAMClient {HashGeneratorFcn : SHA256 }
141+ }
142+ case saslTypeSCRAMSHA512 :
143+ config .Net .SASL .Handshake = true
144+ config .Net .SASL .Mechanism = sarama .SASLMechanism (sarama .SASLTypeSCRAMSHA512 )
145+ config .Net .SASL .SCRAMClientGeneratorFunc = func () sarama.SCRAMClient {
146+ return & XDGSCRAMClient {HashGeneratorFcn : SHA512 }
147+ }
148+ default :
149+ return fmt .Errorf ("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256" , c .SaslMechanism )
150+ }
151+
152+ return nil
153+ }
154+
116155func readConfig (cfg * common.Config ) (* kafkaConfig , error ) {
117156 c := defaultConfig ()
118157 if err := cfg .Unpack (& c ); err != nil {
@@ -144,7 +183,6 @@ func (c *kafkaConfig) Validate() error {
144183 return fmt .Errorf ("compression_level must be between 0 and 9" )
145184 }
146185 }
147-
148186 return nil
149187}
150188
@@ -169,6 +207,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
169207 if err != nil {
170208 return nil , err
171209 }
210+
172211 if tls != nil {
173212 k .Net .TLS .Enable = true
174213 k .Net .TLS .Config = tls .BuildModuleConfig ("" )
@@ -178,6 +217,11 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
178217 k .Net .SASL .Enable = true
179218 k .Net .SASL .User = config .Username
180219 k .Net .SASL .Password = config .Password
220+ err = config .Sasl .configureSarama (k )
221+
222+ if err != nil {
223+ return nil , err
224+ }
181225 }
182226
183227 // configure metadata update properties
0 commit comments