fix(kafka): drain the final buffered batch on async producer Stop#1027
Conversation
The async producer's worker accumulates messages into a local batch buffer
and flushes on size/linger. On graceful Stop() that buffer was silently
dropped: Stop sets shuttingDown before closing the publish channel, and
flushBuffered (including the final drain on channel-close) bailed early on
shuttingDown — so up to a full linger window / batch of messages (txmeta,
blocks-final, rejected-tx, etc.) was lost on every shutdown, across all
services using the producer. recover() can't help: it's a logic drop, not a
panic.
Two compounding causes, both removed:
- flushBuffered returned early when shuttingDown, defeating the final
drain and also discarding any batch the size/linger paths cleared after
a no-op flush during shutdown;
- the worker loop broke out of the loop the moment shuttingDown was set,
which could also strand messages still queued in the channel.
flushBuffered now gates only on closed (the client is gone). This is safe:
Stop closes the client only after publishWg.Wait — i.e. after the worker
has returned — so producing while shutting-down is always valid, and the
subsequent client.Flush() delivers it. The worker now drains until Stop
closes the channel: the close drains channel-resident messages into the
buffer and the final drain produces them.
Extract the worker loop into runProducerWorker (behavior-preserving) and add
a produceHook test seam so the batching loop can be exercised without a live
broker. New regression test TestAsyncProducer_DrainsBufferedBatchOnStop: with
linger/batch set huge so messages only leave via the final drain, asserts all
are produced on Stop (fails on the old code: 0 of 25 produced).
Full util/kafka suite passes under -race.
|
🤖 Claude Code Review Status: Complete No issues found. This fix correctly addresses a silent message-loss bug during graceful shutdowns. Summary: The test coverage is excellent, directly exercising the drain path via the produceHook seam, with assertions that fail on the old code and pass on the fix. |
|
Benchmark Comparison ReportBaseline: Current: Summary
All benchmark results (sec/op)
Threshold: >10% with p < 0.05 | Generated: 2026-06-03 10:52 UTC |
ordishs
left a comment
There was a problem hiding this comment.
Approve. Correct fix for a genuine, broadly-impacting data-loss bug on graceful Stop().
The safety argument holds: Stop() orders shuttingDown → close(ch) → publishWg.Wait() → closed.Store(true) → client.Flush()/Close(), so closed is only true after the worker has returned. Narrowing flushBuffered and the worker loop gate from closed || shuttingDown to closed is therefore provably safe — the final drain always produces against a live client and the trailing Flush() guarantees delivery. The runProducerWorker extraction is behaviour-preserving (only the two gate narrowings differ), and the produceHook-based test pins the exact regression (0/25 → 25/25). Verified TestAsyncProducer_DrainsBufferedBatchOnStop passes locally under -race.
Non-blocking: the closed guards in the worker loop and flushBuffered are effectively dead in the single-Stop lifecycle (closed is set only after the worker exits) — harmless defensive guards for a restart race; a one-line note would preempt reader confusion.



Bug
The Kafka async producer's worker accumulates messages into a local batch buffer and flushes on size/linger. On a graceful
Stop(), that buffer was silently dropped:Stop()setsshuttingDown=truebefore closing the publish channel.flushBuffered— including the final drain triggered on channel-close — bailed early onshuttingDown.So up to a full linger window / batch of messages (txmeta, blocks-final, rejected-tx, …) was lost on every graceful shutdown, across every service using the producer. It's a logic drop, not a panic, so
recover()doesn't help. (Found during a concurrency/lifecycle audit; the send-on-closed race that #1009 patched is a different, already-handled issue — this is the buffered batch behind it.)Two compounding causes, both removed:
flushBufferedreturned early onshuttingDown, defeating the final drain — and also discarding any batch the size/linger paths cleared right after that no-op flush.breaked the instantshuttingDownwas observed, which could also strand messages still queued in the channel.Fix
flushBufferednow gates only onclosed(client gone), notshuttingDown. This is provably safe:Stopcloses the client only afterpublishWg.Wait()— i.e. after the worker has returned — so producing while shutting-down is always valid, and the subsequentclient.Flush()delivers it. The worker now keeps draining untilStopcloses the channel; the close drains channel-resident messages into the buffer and the final drain produces them.Steady-state behaviour is unchanged (
shuttingDownis false then, so the removed gate never fired).Testability
The buffered loop is extracted into
runProducerWorker(behaviour-preserving move) and aproduceHooktest seam captures what the loop emits without a live broker (the in-memory producer path is separate and doesn't reproduce the bug, and project policy is no kafka mocking).TestAsyncProducer_DrainsBufferedBatchOnStop: linger/batch set huge so messages only ever leave via the final drain; asserts all are produced onStop.Verification
0 of 25produced), passes on the fix.util/kafkasuite passes under-race(81s).go vet,gofmt,gciclean.Note: the large line count in
kafka_producer_async.gois mostly therunProducerWorkerextraction (the loop moved out ofStart), not new logic.