Skip to content

Fix Kafka ConsumeClaim loop to Mark right offset#645

Merged
n3wscott merged 1 commit intocloudevents:masterfrom
pastequo:643-fix-kafka-ConsumeClaim-loop
Jan 4, 2021
Merged

Fix Kafka ConsumeClaim loop to Mark right offset#645
n3wscott merged 1 commit intocloudevents:masterfrom
pastequo:643-fix-kafka-ConsumeClaim-loop

Conversation

@pastequo
Copy link
Copy Markdown
Contributor

#643

message is a pointer varying in the for loop, when MarkMessage is called, the pointer points to something potentially different to the current iteration

Signed-off-by: Matthieu Bernardin matt.bernardin@gmail.com

Signed-off-by: Matthieu Bernardin <matt.bernardin@gmail.com>
@slinkydeveloper slinkydeveloper added this to the SDK 2.4 milestone Dec 16, 2020
@slinkydeveloper
Copy link
Copy Markdown
Member

Wanna double check @n3wscott?

Copy link
Copy Markdown
Member

@n3wscott n3wscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no go call, so there is no fork and the closure fix does not seem to be required. I do not see how this code is any different than what was before. Add a test?

@slinkydeveloper
Copy link
Copy Markdown
Member

There is no go call,

But the message is pushed inside a channel and used later, i think that's the issue here

@pastequo
Copy link
Copy Markdown
Contributor Author

pastequo commented Dec 16, 2020

I think it is related to the anonymous function:
https://play.golang.org/p/d2dGm_k_omn
whereas
https://play.golang.org/p/FVUHDwNw-dI

Edit (2)
Rethinking about this, the behavior is not surprising at all.
The code is saying:

  1. Allocate a *sarama.ConsumerMessage named "message" and store into it the different iteration (ie change at each iteration)
  2. Then put into a channel something that uses this pointer
    For sure when consuming the output channel, the value of the pointer depends on the number of iteration (ie it reflect the current state of the iteration, not the state when the pointer was added to the output channel)
    This kind of "tricky situation" is not limited to
for plouf := range myChannel {
    go myFunction(&plouf)
}

which is more obvious, but exactly the same, ie we don't control when the function is being called (relatively to the for loop)

https://play.golang.org/p/z5qVb776T-u
Here my output will be "random", because I don't control when the chan is being consumed

@n3wscott
Copy link
Copy Markdown
Member

n3wscott commented Jan 4, 2021

LGTM

@n3wscott n3wscott merged commit 57e1073 into cloudevents:master Jan 4, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working component/protocol/kafka

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants