-
Notifications
You must be signed in to change notification settings - Fork 242
Kafka Receiver: Data Loss #643
Description
Hello again,
Continuing the (small) discussion here: #642
The client here starts a new goroutine each time an event is pulled. At the end of "invoke" here, Finish() is called. For kafka receiver it calls MarkMessage() here
This method is extracting the message offset, adding 1 to it and finally (not synchroneously) inform kafka that this consumer-group has processed all messages before message.offset+1
Since there is no control over the processing time (the time spent in invoke), the goroutine processing the last kafka message (for example) could finish first. Then the offset of this message is pushed "first", saying that this consumer group is up to date with the current kafka log for this partition. If the process is stopped / crashed / etc, before the other goroutine finished, when the process will be restarted (or when the rebalance is done for other consumer in the same group). Only new messages will be sent. All the other messages that were computed by the different goroutines are lost
For example, I pushed 100 messages in kafka in a single partition topic
After that, I start a consumer with this configuration
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldestfor each message, I apply a pseudo processing with a random duration
func receive(ctx context.Context, event cloudevents.Event) {
// Random processing time
n := rand.Intn(1000)
time.Sleep(time.Duration(n) * time.Second)
fmt.Printf("After %vs, %s\n", n, event.DataEncoded)
}
After a few second I stop the process using Ctrl-C

Looking at the kafka offset I see

Which is unexpected in many ways. I consumed 3 messages (id 26, 10 & 42). The current offset of my consumer group shouldn't be 100, should be 42+1
After investigating, there is an issue here
I think it should be something like
func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
tmp := message
m := NewMessageFromConsumerMessage(tmp)
r.incoming <- msgErr{
msg: binding.WithFinish(m, func(err error) {
if protocol.IsACK(err) {
session.MarkMessage(tmp, "")
}
}),
}
}
return nil
}(I would be happy to make a PR for this)
After applying this and replaying my scenario I have

and in kafka

Which is better but still problematic im my opinion. I have only consume messages 9 & 25 and my offset is now 26. When my consumer will restart, the messages [0-8] U [10-24] won't be consumed
I saw the notion of partitioning in cloudevents, maybe it would interesting to introduce it in the client (that should stay an abstraction above the different protocol) and consume partition by partition
I don't plan to use the Kafka Receiver, I don't have an use-case (I'm more likelyto follow @slinkydeveloper advices and only focus on transforming a sarama Message into a cloudevents and vice-versa). I just though it was important enough to mention it here