@@ -2580,13 +2580,13 @@ func (g *groupConsumer) commitOffsetsSync(
25802580 onDone = func (* Client , * kmsg.OffsetCommitRequest , * kmsg.OffsetCommitResponse , error ) {}
25812581 }
25822582
2583- g .syncCommitMu .Lock () // block all other concurrent commits until our OnDone is done.
2584-
25852583 if err := g .waitJoinSyncMu (ctx ); err != nil {
25862584 onDone (g .cl , kmsg .NewPtrOffsetCommitRequest (), kmsg .NewPtrOffsetCommitResponse (), err )
25872585 close (done )
25882586 return
25892587 }
2588+
2589+ g .syncCommitMu .Lock () // block all other concurrent commits until our OnDone is done.
25902590 unblockCommits := func (cl * Client , req * kmsg.OffsetCommitRequest , resp * kmsg.OffsetCommitResponse , err error ) {
25912591 g .noCommitDuringJoinAndSync .RUnlock ()
25922592 defer close (done )
@@ -2663,19 +2663,16 @@ func (cl *Client) CommitOffsets(
26632663 return
26642664 }
26652665
2666- g .syncCommitMu .RLock () // block sync commit, but allow other concurrent Commit to cancel us
2667- unblockSyncCommit := func (cl * Client , req * kmsg.OffsetCommitRequest , resp * kmsg.OffsetCommitResponse , err error ) {
2668- defer g .syncCommitMu .RUnlock ()
2669- onDone (cl , req , resp , err )
2670- }
2671-
26722666 if err := g .waitJoinSyncMu (ctx ); err != nil {
26732667 onDone (g .cl , kmsg .NewPtrOffsetCommitRequest (), kmsg .NewPtrOffsetCommitResponse (), err )
26742668 return
26752669 }
2670+
2671+ g .syncCommitMu .RLock () // block sync commit, but allow other concurrent Commit to cancel us
26762672 unblockJoinSync := func (cl * Client , req * kmsg.OffsetCommitRequest , resp * kmsg.OffsetCommitResponse , err error ) {
26772673 g .noCommitDuringJoinAndSync .RUnlock ()
2678- unblockSyncCommit (cl , req , resp , err )
2674+ defer g .syncCommitMu .RUnlock ()
2675+ onDone (cl , req , resp , err )
26792676 }
26802677
26812678 g .mu .Lock ()
0 commit comments