Skip to content

Commit 208ef91

Browse files
authored
feat: Support Customer TLS certificates (#20879)
1 parent 04e5176 commit 208ef91

File tree

4 files changed

+85
-1
lines changed

4 files changed

+85
-1
lines changed

plugins/destination/kafka/client/client.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package client
33
import (
44
"context"
55
"crypto/tls"
6+
"crypto/x509"
67
"encoding/json"
78
"fmt"
9+
"os"
810
"time"
911

1012
"github.com/IBM/sarama"
@@ -27,6 +29,29 @@ type Client struct {
2729
*filetypes.Client
2830
}
2931

32+
func createTLSConfiguration(userSpec *spec.Spec) (*tls.Config, error) {
33+
t := &tls.Config{
34+
InsecureSkipVerify: !userSpec.EnforceTLSVerification,
35+
}
36+
if userSpec.TlsDetails != nil && *userSpec.TlsDetails.CertFile != "" && *userSpec.TlsDetails.KeyFile != "" && *userSpec.TlsDetails.CaFile != "" {
37+
cert, err := tls.LoadX509KeyPair(*userSpec.TlsDetails.CertFile, *userSpec.TlsDetails.KeyFile)
38+
if err != nil {
39+
return nil, fmt.Errorf("failed to load X509 key pair: %w", err)
40+
}
41+
42+
caCert, err := os.ReadFile(*userSpec.TlsDetails.CaFile)
43+
if err != nil {
44+
return nil, fmt.Errorf("failed to read CA file: %w", err)
45+
}
46+
47+
caCertPool := x509.NewCertPool()
48+
caCertPool.AppendCertsFromPEM(caCert)
49+
t.Certificates = []tls.Certificate{cert}
50+
t.RootCAs = caCertPool
51+
}
52+
return t, nil
53+
}
54+
3055
func New(_ context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClientOptions) (plugin.Client, error) {
3156
c := &Client{
3257
logger: logger.With().Str("module", "dest-kafka").Logger(),
@@ -58,11 +83,16 @@ func New(_ context.Context, logger zerolog.Logger, s []byte, opts plugin.NewClie
5883
c.conf.ClientID = `cwc|1c04a227-aef8-47a9-9353-e20bbb6a9616|cq-destination-kafka|` + internalPlugin.Version
5984

6085
if c.spec.SASLUsername != "" {
86+
tlsConfig, err := createTLSConfiguration(c.spec)
87+
if err != nil {
88+
return nil, fmt.Errorf("failed to create TLS configuration: %w", err)
89+
}
90+
6191
c.conf.Net.SASL.Enable = true
6292
c.conf.Net.SASL.User = c.spec.SASLUsername
6393
c.conf.Net.SASL.Password = c.spec.SASLPassword
6494
c.conf.Net.TLS.Enable = true
65-
c.conf.Net.TLS.Config = &tls.Config{InsecureSkipVerify: !c.spec.EnforceTLSVerification}
95+
c.conf.Net.TLS.Config = tlsConfig
6696
c.conf.Net.SASL.Handshake = true
6797
}
6898

plugins/destination/kafka/client/spec/schema.json

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/destination/kafka/client/spec/spec.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ type topicDetails struct {
1313
ReplicationFactor int `json:"replication_factor,omitempty" jsonschema:"minimum=1,default=1"`
1414
}
1515

16+
type TlsDetails struct {
17+
// Path to the certificate file for client authentication
18+
CertFile *string `json:"cert_file,omitempty"`
19+
// Path to the key file for client authentication
20+
KeyFile *string `json:"key_file,omitempty"`
21+
// Path to the certificate authority file for TLS client authentication
22+
CaFile *string `json:"ca_file,omitempty"`
23+
}
24+
1625
type Spec struct {
1726
filetypes.FileSpec
1827

@@ -35,6 +44,9 @@ type Spec struct {
3544
// Enforce TLS Verification when configuring a username to connect to Kafka.
3645
EnforceTLSVerification bool `json:"enforce_tls_verification,omitempty"`
3746

47+
// TLS details for client authentication.
48+
TlsDetails *TlsDetails `json:"tls_details,omitempty"`
49+
3850
// Number of records to write before starting a new object.
3951
BatchSize int64 `json:"batch_size" jsonschema:"minimum=1,default=1000"`
4052

plugins/destination/kafka/docs/overview.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ This is the (nested) plugin spec
6161

6262
Optional parameters to set topic details.
6363

64+
- `tls_details` ([tls_details](#tls_details)) (optional)
65+
66+
Optional parameters to set TLS details for the Kafka connection.
67+
6468

6569
### format_spec
6670

@@ -107,3 +111,19 @@ Reserved for future use.
107111

108112
Replication factor for the topic.
109113

114+
115+
116+
### tls_details
117+
- `ca_file_path` (`string`) (optional) (default: empty)
118+
119+
Path to the certificate authority file for TLS client authentication
120+
121+
- `cert_file_path` (`string`) (optional) (default: empty)
122+
123+
Path to the certificate file for client authentication
124+
125+
- `key_file_path` (`string`) (optional) (default: empty)
126+
127+
Path to the key file for client authentication.
128+
129+

0 commit comments

Comments
 (0)