Skip to content

Commit 6aa2742

Browse files
committed
context.Done() may never reach if waiting on r.incoming <- msgErr
Signed-off-by: nbajaj90 <nbajaj90@gmail.com>
1 parent 754a26b commit 6aa2742

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed

protocol/kafka_sarama/v2/receiver.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,15 +66,23 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
6666
return nil
6767
}
6868
m := NewMessageFromConsumerMessage(msg)
69-
70-
r.incoming <- msgErr{
69+
msgErrObj := msgErr{
7170
msg: binding.WithFinish(m, func(err error) {
7271
if protocol.IsACK(err) {
7372
session.MarkMessage(msg, "")
7473
}
7574
}),
7675
}
7776

77+
// Need to use select clause here, otherwise r.incoming <- msgErrObj can become a blocking operation,
78+
// resulting in never reaching outside block's case <-session.Context().Done()
79+
select {
80+
case r.incoming <- msgErrObj:
81+
// do nothing
82+
case <-session.Context().Done():
83+
return nil
84+
}
85+
7886
// Should return when `session.Context()` is done.
7987
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
8088
// https://github.com/Shopify/sarama/issues/1192

0 commit comments

Comments
 (0)