From ec4808163ff0f00f33e847738436fcd9c9c5dde9 Mon Sep 17 00:00:00 2001 From: sinzhang Date: Thu, 11 Jul 2019 21:02:46 +0800 Subject: [PATCH 1/6] Add mechanism configuration to kafka output, to support SCRAM-SHA-512 and SCRAM-SHA-256 mechanism. How to use it: ``` output.kafka: codec.format: string: '%{[@timestamp]} %{[message]}' hosts: ["localhost:9092"] topic: 'mytopic' partition.round_robin: reachable_only: false required_acks: 1 compression: gzip max_message_bytes: 1000000 username: user password: pass mechanism: SCRAM-SHA-512 ``` --- libbeat/outputs/kafka/config.go | 60 +++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index e96f96ef7a5e..bf2c6976ba0e 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -18,8 +18,11 @@ package kafka import ( + "crypto/sha256" + "crypto/sha512" "errors" "fmt" + "hash" "strings" "time" @@ -34,8 +37,36 @@ import ( "github.com/elastic/beats/libbeat/monitoring/adapter" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/codec" + "github.com/xdg/scram" ) +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} + type kafkaConfig struct { Hosts []string `config:"hosts" validate:"required"` TLS *tlscommon.Config `config:"ssl"` @@ -58,6 +89,7 @@ type kafkaConfig struct { Username string `config:"username"` Password string `config:"password"` Codec codec.Config `config:"codec"` + Mechanism string `config:"mechanism"` } type metaConfig struct { @@ -80,6 +112,12 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } +var mechanismModes = map[string]sarama.SASLMechanism{ + "PLAIN": sarama.SASLTypePlaintext, + "SCRAM-SHA-512": sarama.SASLTypeSCRAMSHA512, + "SCRAM-SHA-256": sarama.SASLTypeSCRAMSHA256, +} + func defaultConfig() kafkaConfig { return kafkaConfig{ Hosts: nil, @@ -107,6 +145,7 @@ func defaultConfig() kafkaConfig { ChanBufferSize: 256, Username: "", Password: "", + Mechanism: "PLAIN", } } @@ -135,6 +174,10 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("password must be set when username is configured") } + if _, ok := mechanismModes[c.Mechanism]; !ok { + return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.Mechanism) + } + if c.Compression == "gzip" { lvl := c.CompressionLevel if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) { @@ -166,6 +209,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { if err != nil { return nil, err } + if tls != nil { k.Net.TLS.Enable = true k.Net.TLS.Config = tls.BuildModuleConfig("") @@ -177,6 +221,22 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.SASL.Password = config.Password } + mechanism := config.Mechanism + + // SCRAM-SHA-512 mechanism + if mechanism == "SCRAM-SHA-512" { + k.Net.SASL.Handshake = true + k.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } + k.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + } + + // SCRAM-SHA-256 mechanism + if mechanism == "SCRAM-SHA-256" { + k.Net.SASL.Handshake = true + k.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } + k.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + } + // configure metadata update properties k.Metadata.Retry.Max = config.Metadata.Retry.Max k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff From abac3f1f74f029125c431c433dbb68aa119e9298 Mon Sep 17 00:00:00 2001 From: sinzhang Date: Fri, 6 Sep 2019 17:25:53 +0800 Subject: [PATCH 2/6] Allow unconfigured SASL. Move some codes to separate file. --- libbeat/outputs/kafka/config.go | 101 +++++++++++++------------------- libbeat/outputs/kafka/scram.go | 37 ++++++++++++ 2 files changed, 77 insertions(+), 61 deletions(-) create mode 100644 libbeat/outputs/kafka/scram.go diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index bf2c6976ba0e..59ccbdb9b9fd 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -18,11 +18,8 @@ package kafka import ( - "crypto/sha256" - "crypto/sha512" "errors" "fmt" - "hash" "strings" "time" @@ -37,36 +34,8 @@ import ( "github.com/elastic/beats/libbeat/monitoring/adapter" "github.com/elastic/beats/libbeat/outputs" "github.com/elastic/beats/libbeat/outputs/codec" - "github.com/xdg/scram" ) -var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } -var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } - -type XDGSCRAMClient struct { - *scram.Client - *scram.ClientConversation - scram.HashGeneratorFcn -} - -func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { - x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) - if err != nil { - return err - } - x.ClientConversation = x.Client.NewConversation() - return nil -} - -func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { - response, err = x.ClientConversation.Step(challenge) - return -} - -func (x *XDGSCRAMClient) Done() bool { - return x.ClientConversation.Done() -} - type kafkaConfig struct { Hosts []string `config:"hosts" validate:"required"` TLS *tlscommon.Config `config:"ssl"` @@ -89,7 +58,13 @@ type kafkaConfig struct { Username string `config:"username"` Password string `config:"password"` Codec codec.Config `config:"codec"` - Mechanism string `config:"mechanism"` + Sasl saslConfig `config:"sasl"` +} + +type saslConfig struct { + SaslMechanism string `config:"mechanism"` + //SaslUsername string `config:"username"` //maybe use ssl.username ssl.password instead in future? + //SaslPassword string `config:"password"` } type metaConfig struct { @@ -112,11 +87,11 @@ var compressionModes = map[string]sarama.CompressionCodec{ "snappy": sarama.CompressionSnappy, } -var mechanismModes = map[string]sarama.SASLMechanism{ - "PLAIN": sarama.SASLTypePlaintext, - "SCRAM-SHA-512": sarama.SASLTypeSCRAMSHA512, - "SCRAM-SHA-256": sarama.SASLTypeSCRAMSHA256, -} +const ( + saslTypePlaintext = sarama.SASLTypePlaintext + saslTypeSCRAMSHA256 = sarama.SASLTypeSCRAMSHA256 + saslTypeSCRAMSHA512 = sarama.SASLTypeSCRAMSHA512 +) func defaultConfig() kafkaConfig { return kafkaConfig{ @@ -145,10 +120,33 @@ func defaultConfig() kafkaConfig { ChanBufferSize: 256, Username: "", Password: "", - Mechanism: "PLAIN", } } +func (c *saslConfig) configureSarama(config *sarama.Config) error { + switch strings.ToUpper(c.SaslMechanism) { // try not to force users to use all upper case + case "": + // SASL is not enabled + return nil + case saslTypePlaintext: + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) + case saslTypeSCRAMSHA256: + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA256} + } + case saslTypeSCRAMSHA512: + config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) + config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { + return &XDGSCRAMClient{HashGeneratorFcn: SHA512} + } + default: + return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.SaslMechanism) + } + + return nil +} + func readConfig(cfg *common.Config) (*kafkaConfig, error) { c := defaultConfig() if err := cfg.Unpack(&c); err != nil { @@ -174,17 +172,12 @@ func (c *kafkaConfig) Validate() error { return fmt.Errorf("password must be set when username is configured") } - if _, ok := mechanismModes[c.Mechanism]; !ok { - return fmt.Errorf("not valid mechanism '%v', only supported with PLAIN|SCRAM-SHA-512|SCRAM-SHA-256", c.Mechanism) - } - if c.Compression == "gzip" { lvl := c.CompressionLevel if lvl != sarama.CompressionLevelDefault && !(0 <= lvl && lvl <= 9) { return fmt.Errorf("compression_level must be between 0 and 9") } } - return nil } @@ -219,22 +212,10 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.SASL.Enable = true k.Net.SASL.User = config.Username k.Net.SASL.Password = config.Password - } - - mechanism := config.Mechanism - - // SCRAM-SHA-512 mechanism - if mechanism == "SCRAM-SHA-512" { - k.Net.SASL.Handshake = true - k.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } - k.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) - } - - // SCRAM-SHA-256 mechanism - if mechanism == "SCRAM-SHA-256" { - k.Net.SASL.Handshake = true - k.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } - k.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) + err = config.Sasl.configureSarama(k) + if err != nil { + return nil, err + } } // configure metadata update properties @@ -286,8 +267,6 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { } k.Version = version - k.MetricRegistry = kafkaMetricsRegistry() - k.Producer.Partitioner = partitioner k.MetricRegistry = adapter.GetGoMetrics( monitoring.Default, diff --git a/libbeat/outputs/kafka/scram.go b/libbeat/outputs/kafka/scram.go new file mode 100644 index 000000000000..ad8c8bbf5484 --- /dev/null +++ b/libbeat/outputs/kafka/scram.go @@ -0,0 +1,37 @@ +// https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go +package kafka + +import ( + "crypto/sha256" + "crypto/sha512" + "hash" + + "github.com/xdg/scram" +) + +var SHA256 scram.HashGeneratorFcn = func() hash.Hash { return sha256.New() } +var SHA512 scram.HashGeneratorFcn = func() hash.Hash { return sha512.New() } + +type XDGSCRAMClient struct { + *scram.Client + *scram.ClientConversation + scram.HashGeneratorFcn +} + +func (x *XDGSCRAMClient) Begin(userName, password, authzID string) (err error) { + x.Client, err = x.HashGeneratorFcn.NewClient(userName, password, authzID) + if err != nil { + return err + } + x.ClientConversation = x.Client.NewConversation() + return nil +} + +func (x *XDGSCRAMClient) Step(challenge string) (response string, err error) { + response, err = x.ClientConversation.Step(challenge) + return +} + +func (x *XDGSCRAMClient) Done() bool { + return x.ClientConversation.Done() +} From f000b1c30c33533dc0b66e064f837269ba2448f2 Mon Sep 17 00:00:00 2001 From: zvictorino Date: Fri, 22 Nov 2019 21:53:38 +0800 Subject: [PATCH 3/6] Add docs to sasl.mechanism. --- libbeat/docs/outputs/output-kafka.asciidoc | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/libbeat/docs/outputs/output-kafka.asciidoc b/libbeat/docs/outputs/output-kafka.asciidoc index ce09e019ad8d..bd766d16eeb0 100644 --- a/libbeat/docs/outputs/output-kafka.asciidoc +++ b/libbeat/docs/outputs/output-kafka.asciidoc @@ -68,6 +68,11 @@ must be configured as well. Only SASL/PLAIN is supported. The password for connecting to Kafka. +===== `sasl.mechanism` + +The sasl.mechanism for connecting to Kafka. +It can be one of "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512". Default is PLAIN. + [[topic-option-kafka]] ===== `topic` From 2f8029926db7a369d230236e79151d6307035bb1 Mon Sep 17 00:00:00 2001 From: zvictorino Date: Mon, 25 Nov 2019 08:40:46 +0800 Subject: [PATCH 4/6] Add license headers --- libbeat/outputs/kafka/scram.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/libbeat/outputs/kafka/scram.go b/libbeat/outputs/kafka/scram.go index ad8c8bbf5484..3f70959ffeda 100644 --- a/libbeat/outputs/kafka/scram.go +++ b/libbeat/outputs/kafka/scram.go @@ -1,3 +1,20 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + // https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/scram_client.go package kafka From 08b7eb666e8c9533ddec2b774ddd99b7b31273f2 Mon Sep 17 00:00:00 2001 From: zvictorino Date: Fri, 6 Dec 2019 15:21:02 +0800 Subject: [PATCH 5/6] Fix SCRAM-SHA-512/SCRAM-SHA-256 --- libbeat/outputs/kafka/config.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 59ccbdb9b9fd..4057150c5b2c 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -131,11 +131,13 @@ func (c *saslConfig) configureSarama(config *sarama.Config) error { case saslTypePlaintext: config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypePlaintext) case saslTypeSCRAMSHA256: + config.Net.SASL.Handshake = true config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA256) config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } case saslTypeSCRAMSHA512: + config.Net.SASL.Handshake = true config.Net.SASL.Mechanism = sarama.SASLMechanism(sarama.SASLTypeSCRAMSHA512) config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} @@ -213,6 +215,8 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.SASL.User = config.Username k.Net.SASL.Password = config.Password err = config.Sasl.configureSarama(k) + fmt.Println("-------------------") + fmt.Println(k.Net.SASL) if err != nil { return nil, err } From ec6d58e2e3342274afbe35017c29aec40887cd3b Mon Sep 17 00:00:00 2001 From: zvictorino Date: Wed, 26 Feb 2020 16:43:58 +0800 Subject: [PATCH 6/6] Remove output-kafka.asciidoc --- libbeat/docs/outputs/output-kafka.asciidoc | 278 --------------------- libbeat/outputs/kafka/config.go | 3 +- 2 files changed, 1 insertion(+), 280 deletions(-) delete mode 100644 libbeat/docs/outputs/output-kafka.asciidoc diff --git a/libbeat/docs/outputs/output-kafka.asciidoc b/libbeat/docs/outputs/output-kafka.asciidoc deleted file mode 100644 index bd766d16eeb0..000000000000 --- a/libbeat/docs/outputs/output-kafka.asciidoc +++ /dev/null @@ -1,278 +0,0 @@ -[[kafka-output]] -=== Configure the Kafka output - -++++ -Kafka -++++ - -The Kafka output sends the events to Apache Kafka. - -Example configuration: - -[source,yaml] ------------------------------------------------------------------------------- -output.kafka: - # initial brokers for reading cluster metadata - hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"] - - # message topic selection + partitioning - topic: '%{[fields.log_topic]}' - partition.round_robin: - reachable_only: false - - required_acks: 1 - compression: gzip - max_message_bytes: 1000000 ------------------------------------------------------------------------------- - -NOTE: Events bigger than <> will be dropped. To avoid this problem, make sure {beatname_uc} does not generate events bigger than <>. - -[[kafka-compatibility]] -==== Compatibility - -This output works with all Kafka versions in between 0.11 and 2.1.0. Older versions -might work as well, but are not supported. - -==== Configuration options - -You can specify the following options in the `kafka` section of the +{beatname_lc}.yml+ config file: - -===== `enabled` - -The `enabled` config is a boolean setting to enable or disable the output. If set -to false, the output is disabled. - -The default value is true. - -===== `hosts` - -The list of Kafka broker addresses from where to fetch the cluster metadata. -The cluster metadata contain the actual Kafka brokers events are published to. - -===== `version` - -Kafka version {beatname_lc} is assumed to run against. Defaults to 1.0.0. - -Event timestamps will be added, if version 0.10.0.0+ is enabled. - -Valid values are all kafka releases in between `0.8.2.0` and `2.0.0`. - -See <> for information on supported versions. - -===== `username` - -The username for connecting to Kafka. If username is configured, the password -must be configured as well. Only SASL/PLAIN is supported. - -===== `password` - -The password for connecting to Kafka. - -===== `sasl.mechanism` - -The sasl.mechanism for connecting to Kafka. -It can be one of "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512". Default is PLAIN. - -[[topic-option-kafka]] -===== `topic` - -The Kafka topic used for produced events. - -You can set the topic dynamically by using a format string to access any -event field. For example, this configuration uses a custom field, -`fields.log_topic`, to set the topic for each event: - -[source,yaml] ------ -topic: '%{[fields.log_topic]}' ------ - -TIP: To learn how to add custom fields to events, see the -<> option. - -See the <> setting for other ways to set the -topic dynamically. - -[[topics-option-kafka]] -===== `topics` - -An array of topic selector rules. Each rule specifies the `topic` to use for -events that match the rule. During publishing, {beatname_uc} sets the `topic` -for each event based on the first matching rule in the array. Rules -can contain conditionals, format string-based fields, and name mappings. If the -`topics` setting is missing or no rule matches, the -<> field is used. - -Rule settings: - -*`topic`*:: The topic format string to use. If this string contains field -references, such as `%{[fields.name]}`, the fields must exist, or the rule -fails. - -*`mappings`*:: A dictionary that takes the value returned by `topic` and maps it -to a new name. - -*`default`*:: The default string value to use if `mappings` does not find a -match. - -*`when`*:: A condition that must succeed in order to execute the current rule. -ifndef::no-processors[] -All the <> supported by processors are also supported -here. -endif::no-processors[] - -The following example sets the topic based on whether the message field contains -the specified string: - -["source","yaml",subs="attributes"] ------------------------------------------------------------------------------- -output.kafka: - hosts: ["localhost:9092"] - topic: "logs-%{[beat.version]}" - topics: - - topic: "critical-%{[beat.version]}" - when.contains: - message: "CRITICAL" - - topic: "error-%{[beat.version]}" - when.contains: - message: "ERR" ------------------------------------------------------------------------------- - - -This configuration results in topics named +critical-{version}+, -+error-{version}+, and +logs-{version}+. - -===== `key` - -Optional formatted string specifying the Kafka event key. If configured, the -event key can be extracted from the event using a format string. - -See the Kafka documentation for the implications of a particular choice of key; -by default, the key is chosen by the Kafka cluster. - -===== `partition` - -Kafka output broker event partitioning strategy. Must be one of `random`, -`round_robin`, or `hash`. By default the `hash` partitioner is used. - -*`random.group_events`*: Sets the number of events to be published to the same - partition, before the partitioner selects a new partition by random. The - default value is 1 meaning after each event a new partition is picked randomly. - -*`round_robin.group_events`*: Sets the number of events to be published to the - same partition, before the partitioner selects the next partition. The default - value is 1 meaning after each event the next partition will be selected. - -*`hash.hash`*: List of fields used to compute the partitioning hash value from. - If no field is configured, the events `key` value will be used. - -*`hash.random`*: Randomly distribute events if no hash or key value can be computed. - -All partitioners will try to publish events to all partitions by default. If a -partition's leader becomes unreachable for the beat, the output might block. All -partitioners support setting `reachable_only` to overwrite this -behavior. If `reachable_only` is set to `true`, events will be published to -available partitions only. - -NOTE: Publishing to a subset of available partitions potentially increases resource usage because events may become unevenly distributed. - -===== `client_id` - -The configurable ClientID used for logging, debugging, and auditing purposes. The default is "beats". - -===== `worker` - -The number of concurrent load-balanced Kafka output workers. - -===== `codec` - -Output codec configuration. If the `codec` section is missing, events will be json encoded. - -See <> for more information. - -===== `metadata` - -Kafka metadata update settings. The metadata do contain information about -brokers, topics, partition, and active leaders to use for publishing. - -*`refresh_frequency`*:: Metadata refresh interval. Defaults to 10 minutes. - -*`full`*:: Strategy to use when fetching metadata, when this option is `true`, the client will maintain -a full set of metadata for all the available topics, if the this option is set to `false` it will only refresh the -metadata for the configured topics. The default is false. - -*`retry.max`*:: Total number of metadata update retries when cluster is in middle of leader election. The default is 3. - -*`retry.backoff`*:: Waiting time between retries during leader elections. Default is 250ms. - -===== `max_retries` - -ifdef::ignores_max_retries[] -{beatname_uc} ignores the `max_retries` setting and retries indefinitely. -endif::[] - -ifndef::ignores_max_retries[] -The number of times to retry publishing an event after a publishing failure. -After the specified number of retries, the events are typically dropped. - -Set `max_retries` to a value less than 0 to retry until all events are published. - -The default is 3. -endif::[] - -===== `bulk_max_size` - -The maximum number of events to bulk in a single Kafka request. The default is 2048. - -===== `bulk_flush_frequency` - -Duration to wait before sending bulk Kafka request. 0 is no delay. The default is 0. - -===== `timeout` - -The number of seconds to wait for responses from the Kafka brokers before timing -out. The default is 30 (seconds). - -===== `broker_timeout` - -The maximum duration a broker will wait for number of required ACKs. The default is 10s. - -===== `channel_buffer_size` - -Per Kafka broker number of messages buffered in output pipeline. The default is 256. - -===== `keep_alive` - -The keep-alive period for an active network connection. If 0s, keep-alives are disabled. The default is 0 seconds. - -===== `compression` - -Sets the output compression codec. Must be one of `none`, `snappy`, `lz4` and `gzip`. The default is `gzip`. - -===== `compression_level` - -Sets the compression level used by gzip. Setting this value to 0 disables compression. -The compression level must be in the range of 1 (best speed) to 9 (best compression). - -Increasing the compression level will reduce the network usage but will increase the cpu usage. - -The default value is 4. - -[[kafka-max_message_bytes]] -===== `max_message_bytes` - -The maximum permitted size of JSON-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). This value should be equal to or less than the broker's `message.max.bytes`. - -===== `required_acks` - -The ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1. - -Note: If set to 0, no ACKs are returned by Kafka. Messages might be lost silently on error. - -===== `ssl` - -Configuration options for SSL parameters like the root CA for Kafka connections. - The Kafka host keystore should be created with the -`-keyalg RSA` argument to ensure it uses a cipher supported by -https://github.com/Shopify/sarama/wiki/Frequently-Asked-Questions#why-cant-sarama-connect-to-my-kafka-cluster-using-ssl[Filebeat's Kafka library]. -See <> for more information. diff --git a/libbeat/outputs/kafka/config.go b/libbeat/outputs/kafka/config.go index 4057150c5b2c..266994b98e4a 100644 --- a/libbeat/outputs/kafka/config.go +++ b/libbeat/outputs/kafka/config.go @@ -215,8 +215,7 @@ func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) { k.Net.SASL.User = config.Username k.Net.SASL.Password = config.Password err = config.Sasl.configureSarama(k) - fmt.Println("-------------------") - fmt.Println(k.Net.SASL) + if err != nil { return nil, err }