Skip to content

Commit e109b4b

Browse files
author
Yevgeniy Miretskiy
committed
changefeedccl: Increase message size limits for kafka sink.
Sarama library, used by kafka sink, limits the maximum message sizes locally. When those limits are exceeded, sarama library returns confusing error message which seems to imply that the remote kafka server rejected the message, even though this rejection happened locally: `kafka server: Message was too large, server rejected it to avoid allocation error.` This PR addresses the problem by increasing sarama limits to 2GB (max int32). An alternative approach was to extend `kafka_sink_config` to specify maximum message size. However, this alternative is less desirable. For one, the user supplied configuration can run afoul other limits imposed by sarama library (e.g. `MaxRequestSize`), so more configuration option must be added. In addition, this really exposes very low level implementation details in the sarama library -- something that we probably should not do. Fixes #76258 Release Notes (enterprise change): Kafka sink supports larger messages, up to 2GB in size.
1 parent 3651e3c commit e109b4b

1 file changed

Lines changed: 13 additions & 0 deletions

File tree

pkg/ccl/changefeedccl/sink_kafka.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"crypto/x509"
1515
"encoding/json"
1616
"fmt"
17+
"math"
1718
"strings"
1819
"sync"
1920
"time"
@@ -56,6 +57,12 @@ func init() {
5657
ctx := context.Background()
5758
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
5859
sarama.Logger = &kafkaLogAdapter{ctx: ctx}
60+
61+
// Sarama should not be rejecting messages based on some arbitrary limits.
62+
// This sink already manages its resource usage. Sarama should attempt to deliver
63+
// messages, no matter their size. Of course, the downstream kafka may reject
64+
// those messages, but this rejection should not be done locally.
65+
sarama.MaxRequestSize = math.MaxInt32
5966
}
6067

6168
// kafkaClient is a small interface restricting the functionality in sarama.Client
@@ -442,6 +449,12 @@ func (j *jsonDuration) UnmarshalJSON(b []byte) error {
442449

443450
// Apply configures provided kafka configuration struct based on this config.
444451
func (c *saramaConfig) Apply(kafka *sarama.Config) error {
452+
// Sarama limits the size of each message to be MaxMessageSize (1MB) bytes.
453+
// This is silly; This sink already manages its memory, and therefore, if we
454+
// had enough resources to ingest and process this message, then sarama shouldn't
455+
// get in a way. Set this limit to be just a bit under maximum request size.
456+
kafka.Producer.MaxMessageBytes = int(sarama.MaxRequestSize - 1)
457+
445458
kafka.Producer.Flush.Bytes = c.Flush.Bytes
446459
kafka.Producer.Flush.Messages = c.Flush.Messages
447460
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency)

0 commit comments

Comments
 (0)