Skip to content

Kafka Receiver: Data Loss #643

@pastequo

Description

@pastequo

Hello again,

Continuing the (small) discussion here: #642

The client here starts a new goroutine each time an event is pulled. At the end of "invoke" here, Finish() is called. For kafka receiver it calls MarkMessage() here
This method is extracting the message offset, adding 1 to it and finally (not synchroneously) inform kafka that this consumer-group has processed all messages before message.offset+1

Since there is no control over the processing time (the time spent in invoke), the goroutine processing the last kafka message (for example) could finish first. Then the offset of this message is pushed "first", saying that this consumer group is up to date with the current kafka log for this partition. If the process is stopped / crashed / etc, before the other goroutine finished, when the process will be restarted (or when the rebalance is done for other consumer in the same group). Only new messages will be sent. All the other messages that were computed by the different goroutines are lost

For example, I pushed 100 messages in kafka in a single partition topic
After that, I start a consumer with this configuration

	saramaConfig := sarama.NewConfig()
	saramaConfig.Version = sarama.V2_0_0_0
	saramaConfig.Consumer.Offsets.AutoCommit.Enable = true
	saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

for each message, I apply a pseudo processing with a random duration

func receive(ctx context.Context, event cloudevents.Event) {
	// Random processing time
	n := rand.Intn(1000)
	time.Sleep(time.Duration(n) * time.Second)

	fmt.Printf("After %vs, %s\n", n, event.DataEncoded)
}

After a few second I stop the process using Ctrl-C
image

Looking at the kafka offset I see
image

Which is unexpected in many ways. I consumed 3 messages (id 26, 10 & 42). The current offset of my consumer group shouldn't be 100, should be 42+1

After investigating, there is an issue here
I think it should be something like

func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		tmp := message
		m := NewMessageFromConsumerMessage(tmp)

		r.incoming <- msgErr{
			msg: binding.WithFinish(m, func(err error) {
				if protocol.IsACK(err) {
					session.MarkMessage(tmp, "")
				}
			}),
		}
	}
	return nil
}

(I would be happy to make a PR for this)

After applying this and replaying my scenario I have
image
and in kafka
image

Which is better but still problematic im my opinion. I have only consume messages 9 & 25 and my offset is now 26. When my consumer will restart, the messages [0-8] U [10-24] won't be consumed

I saw the notion of partitioning in cloudevents, maybe it would interesting to introduce it in the client (that should stay an abstraction above the different protocol) and consume partition by partition

I don't plan to use the Kafka Receiver, I don't have an use-case (I'm more likelyto follow @slinkydeveloper advices and only focus on transforming a sarama Message into a cloudevents and vice-versa). I just though it was important enough to mention it here

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions