Skip to content

fix(kafka): drain the final buffered batch on async producer Stop#1027

Merged
liam merged 1 commit into
bsv-blockchain:mainfrom
liam:liam/kafka-drain-on-stop
Jun 3, 2026
Merged

fix(kafka): drain the final buffered batch on async producer Stop#1027
liam merged 1 commit into
bsv-blockchain:mainfrom
liam:liam/kafka-drain-on-stop

Conversation

@liam

@liam liam commented Jun 3, 2026

Copy link
Copy Markdown
Collaborator

Bug

The Kafka async producer's worker accumulates messages into a local batch buffer and flushes on size/linger. On a graceful Stop(), that buffer was silently dropped:

  • Stop() sets shuttingDown=true before closing the publish channel.
  • flushBuffered — including the final drain triggered on channel-close — bailed early on shuttingDown.

So up to a full linger window / batch of messages (txmeta, blocks-final, rejected-tx, …) was lost on every graceful shutdown, across every service using the producer. It's a logic drop, not a panic, so recover() doesn't help. (Found during a concurrency/lifecycle audit; the send-on-closed race that #1009 patched is a different, already-handled issue — this is the buffered batch behind it.)

Two compounding causes, both removed:

  1. flushBuffered returned early on shuttingDown, defeating the final drain — and also discarding any batch the size/linger paths cleared right after that no-op flush.
  2. The worker loop breaked the instant shuttingDown was observed, which could also strand messages still queued in the channel.

Fix

flushBuffered now gates only on closed (client gone), not shuttingDown. This is provably safe: Stop closes the client only after publishWg.Wait() — i.e. after the worker has returned — so producing while shutting-down is always valid, and the subsequent client.Flush() delivers it. The worker now keeps draining until Stop closes the channel; the close drains channel-resident messages into the buffer and the final drain produces them.

Steady-state behaviour is unchanged (shuttingDown is false then, so the removed gate never fired).

Testability

The buffered loop is extracted into runProducerWorker (behaviour-preserving move) and a produceHook test seam captures what the loop emits without a live broker (the in-memory producer path is separate and doesn't reproduce the bug, and project policy is no kafka mocking).

TestAsyncProducer_DrainsBufferedBatchOnStop: linger/batch set huge so messages only ever leave via the final drain; asserts all are produced on Stop.

Verification

  • New test fails on the old code (0 of 25 produced), passes on the fix.
  • Full util/kafka suite passes under -race (81s).
  • go vet, gofmt, gci clean.

Note: the large line count in kafka_producer_async.go is mostly the runProducerWorker extraction (the loop moved out of Start), not new logic.

The async producer's worker accumulates messages into a local batch buffer
and flushes on size/linger. On graceful Stop() that buffer was silently
dropped: Stop sets shuttingDown before closing the publish channel, and
flushBuffered (including the final drain on channel-close) bailed early on
shuttingDown — so up to a full linger window / batch of messages (txmeta,
blocks-final, rejected-tx, etc.) was lost on every shutdown, across all
services using the producer. recover() can't help: it's a logic drop, not a
panic.

Two compounding causes, both removed:
  - flushBuffered returned early when shuttingDown, defeating the final
    drain and also discarding any batch the size/linger paths cleared after
    a no-op flush during shutdown;
  - the worker loop broke out of the loop the moment shuttingDown was set,
    which could also strand messages still queued in the channel.

flushBuffered now gates only on closed (the client is gone). This is safe:
Stop closes the client only after publishWg.Wait — i.e. after the worker
has returned — so producing while shutting-down is always valid, and the
subsequent client.Flush() delivers it. The worker now drains until Stop
closes the channel: the close drains channel-resident messages into the
buffer and the final drain produces them.

Extract the worker loop into runProducerWorker (behavior-preserving) and add
a produceHook test seam so the batching loop can be exercised without a live
broker. New regression test TestAsyncProducer_DrainsBufferedBatchOnStop: with
linger/batch set huge so messages only leave via the final drain, asserts all
are produced on Stop (fails on the old code: 0 of 25 produced).

Full util/kafka suite passes under -race.
@github-actions

github-actions Bot commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

🤖 Claude Code Review

Status: Complete

No issues found. This fix correctly addresses a silent message-loss bug during graceful shutdowns.

Summary:
The PR fixes a critical bug where buffered messages were silently dropped during graceful shutdown. The fix is sound: by removing the premature shuttingDown gate from flushBuffered, the final drain now executes correctly. The worker loop keeps draining until the channel closes, and Stop correctly sequences: close channel, wait for worker, then close client. This ensures all buffered and channel-resident messages are produced before shutdown completes.

The test coverage is excellent, directly exercising the drain path via the produceHook seam, with assertions that fail on the old code and pass on the fix.

@sonarqubecloud

sonarqubecloud Bot commented Jun 3, 2026

Copy link
Copy Markdown

@github-actions

github-actions Bot commented Jun 3, 2026

Copy link
Copy Markdown
Contributor

Benchmark Comparison Report

Baseline: main (unknown)

Current: PR-1027 (af177ad)

Summary

  • Regressions: 0
  • Improvements: 0
  • Unchanged: 144
  • Significance level: p < 0.05
All benchmark results (sec/op)
Benchmark Baseline Current Change p-value
_NewBlockFromBytes-4 1.803µ 1.771µ ~ 0.100
SplitSyncedParentMap_SetIfNotExists/256_buckets-4 59.49n 59.78n ~ 1.000
SplitSyncedParentMap_SetIfNotExists/16_buckets-4 59.34n 59.33n ~ 0.700
SplitSyncedParentMap_SetIfNotExists/1_bucket-4 59.26n 59.24n ~ 1.000
SplitSyncedParentMap_ConcurrentSetIfNotExists/256_buckets... 35.18n 34.28n ~ 0.200
SplitSyncedParentMap_ConcurrentSetIfNotExists/16_buckets_... 62.11n 57.11n ~ 0.100
SplitSyncedParentMap_ConcurrentSetIfNotExists/1_bucket_pa... 165.3n 149.6n ~ 0.100
MiningCandidate_Stringify_Short-4 260.9n 256.0n ~ 0.100
MiningCandidate_Stringify_Long-4 1.830µ 1.814µ ~ 0.100
MiningSolution_Stringify-4 947.7n 940.7n ~ 0.100
BlockInfo_MarshalJSON-4 1.824µ 1.804µ ~ 0.200
NewFromBytes-4 161.9n 130.1n ~ 0.100
AddTxBatchColumnar_Validation-4 2.455µ 2.463µ ~ 1.000
OffsetValidationLoop-4 637.4n 644.4n ~ 0.200
Mine_EasyDifficulty-4 67.20µ 66.95µ ~ 0.700
Mine_WithAddress-4 7.188µ 7.577µ ~ 1.000
BlockAssembler_AddTx-4 0.02646n 0.03138n ~ 0.400
AddNode-4 11.12 10.97 ~ 1.000
AddNodeWithMap-4 12.11 12.16 ~ 1.000
DiskTxMap_SetIfNotExists-4 3.522µ 3.708µ ~ 0.100
DiskTxMap_SetIfNotExists_Parallel-4 3.316µ 3.319µ ~ 1.000
DiskTxMap_ExistenceOnly-4 314.1n 332.7n ~ 0.100
Queue-4 188.7n 188.3n ~ 0.600
AtomicPointer-4 4.695n 4.202n ~ 0.700
ReorgOptimizations/DedupFilterPipeline/Old/10K-4 888.1µ 893.3µ ~ 0.400
ReorgOptimizations/DedupFilterPipeline/New/10K-4 803.8µ 823.6µ ~ 0.400
ReorgOptimizations/AllMarkFalse/Old/10K-4 105.6µ 108.8µ ~ 0.700
ReorgOptimizations/AllMarkFalse/New/10K-4 61.78µ 63.02µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/10K-4 58.45µ 65.64µ ~ 0.700
ReorgOptimizations/HashSlicePool/New/10K-4 12.40µ 11.87µ ~ 0.100
ReorgOptimizations/NodeFlags/Old/10K-4 4.774µ 4.779µ ~ 0.700
ReorgOptimizations/NodeFlags/New/10K-4 1.607µ 1.597µ ~ 0.800
ReorgOptimizations/DedupFilterPipeline/Old/100K-4 9.793m 9.651m ~ 0.700
ReorgOptimizations/DedupFilterPipeline/New/100K-4 10.53m 10.98m ~ 0.200
ReorgOptimizations/AllMarkFalse/Old/100K-4 1.218m 1.172m ~ 0.100
ReorgOptimizations/AllMarkFalse/New/100K-4 691.6µ 687.8µ ~ 0.100
ReorgOptimizations/HashSlicePool/Old/100K-4 642.4µ 558.8µ ~ 0.700
ReorgOptimizations/HashSlicePool/New/100K-4 297.7µ 292.6µ ~ 0.700
ReorgOptimizations/NodeFlags/Old/100K-4 51.01µ 51.31µ ~ 1.000
ReorgOptimizations/NodeFlags/New/100K-4 18.42µ 17.98µ ~ 0.100
TxMapSetIfNotExists-4 52.64n 52.90n ~ 0.100
TxMapSetIfNotExistsDuplicate-4 39.91n 39.99n ~ 0.400
ChannelSendReceive-4 631.5n 632.2n ~ 1.000
DirectSubtreeAdd/4_per_subtree-4 56.23n 57.77n ~ 1.000
DirectSubtreeAdd/64_per_subtree-4 29.28n 29.49n ~ 0.700
DirectSubtreeAdd/256_per_subtree-4 28.16n 28.75n ~ 0.400
DirectSubtreeAdd/1024_per_subtree-4 26.83n 26.87n ~ 1.000
DirectSubtreeAdd/2048_per_subtree-4 26.33n 26.44n ~ 0.200
SubtreeProcessorAdd/4_per_subtree-4 296.8n 307.7n ~ 0.100
SubtreeProcessorAdd/64_per_subtree-4 286.4n 293.9n ~ 0.100
SubtreeProcessorAdd/256_per_subtree-4 287.2n 293.3n ~ 0.400
SubtreeProcessorAdd/1024_per_subtree-4 279.7n 280.9n ~ 0.400
SubtreeProcessorAdd/2048_per_subtree-4 277.9n 282.2n ~ 0.100
SubtreeProcessorRotate/4_per_subtree-4 285.0n 287.4n ~ 0.400
SubtreeProcessorRotate/64_per_subtree-4 289.4n 285.2n ~ 0.200
SubtreeProcessorRotate/256_per_subtree-4 293.2n 287.0n ~ 0.100
SubtreeProcessorRotate/1024_per_subtree-4 288.8n 288.9n ~ 0.700
SubtreeNodeAddOnly/4_per_subtree-4 55.75n 56.31n ~ 0.400
SubtreeNodeAddOnly/64_per_subtree-4 36.49n 36.47n ~ 1.000
SubtreeNodeAddOnly/256_per_subtree-4 35.25n 35.49n ~ 0.200
SubtreeNodeAddOnly/1024_per_subtree-4 34.68n 34.73n ~ 0.400
SubtreeCreationOnly/4_per_subtree-4 112.9n 111.9n ~ 0.300
SubtreeCreationOnly/64_per_subtree-4 359.8n 363.5n ~ 0.400
SubtreeCreationOnly/256_per_subtree-4 1.260µ 1.258µ ~ 1.000
SubtreeCreationOnly/1024_per_subtree-4 3.909µ 3.985µ ~ 0.400
SubtreeCreationOnly/2048_per_subtree-4 7.172µ 7.188µ ~ 1.000
SubtreeProcessorOverheadBreakdown/64_per_subtree-4 280.6n 284.5n ~ 0.300
SubtreeProcessorOverheadBreakdown/1024_per_subtree-4 283.1n 287.5n ~ 0.400
ParallelGetAndSetIfNotExists/1k_nodes-4 2.012m 2.023m ~ 0.400
ParallelGetAndSetIfNotExists/10k_nodes-4 5.167m 5.316m ~ 0.100
ParallelGetAndSetIfNotExists/50k_nodes-4 7.193m 7.493m ~ 0.100
ParallelGetAndSetIfNotExists/100k_nodes-4 9.851m 10.645m ~ 0.100
SequentialGetAndSetIfNotExists/1k_nodes-4 1.799m 1.806m ~ 0.400
SequentialGetAndSetIfNotExists/10k_nodes-4 4.512m 4.628m ~ 0.100
SequentialGetAndSetIfNotExists/50k_nodes-4 14.03m 14.14m ~ 0.700
SequentialGetAndSetIfNotExists/100k_nodes-4 25.49m 27.01m ~ 0.100
ProcessOwnBlockSubtreeNodesParallel/1k_nodes-4 2.069m 2.081m ~ 0.400
ProcessOwnBlockSubtreeNodesParallel/10k_nodes-4 8.419m 8.568m ~ 1.000
ProcessOwnBlockSubtreeNodesParallel/100k_nodes-4 13.60m 13.82m ~ 0.400
ProcessOwnBlockSubtreeNodesSequential/1k_nodes-4 1.833m 1.809m ~ 0.400
ProcessOwnBlockSubtreeNodesSequential/10k_nodes-4 8.185m 8.354m ~ 0.700
ProcessOwnBlockSubtreeNodesSequential/100k_nodes-4 44.05m 43.83m ~ 0.100
CalcBlockWork-4 508.2n 504.7n ~ 0.100
CalculateWork-4 693.6n 694.7n ~ 0.700
BuildBlockLocatorString_Helpers/Size_10-4 1.372µ 1.368µ ~ 0.700
BuildBlockLocatorString_Helpers/Size_100-4 15.87µ 13.14µ ~ 1.000
BuildBlockLocatorString_Helpers/Size_1000-4 128.9µ 130.5µ ~ 0.200
CatchupWithHeaderCache-4 104.7m 104.6m ~ 0.400
_BufferPoolAllocation/16KB-4 3.977µ 3.976µ ~ 1.000
_BufferPoolAllocation/32KB-4 11.66µ 11.30µ ~ 0.700
_BufferPoolAllocation/64KB-4 18.26µ 16.58µ ~ 0.100
_BufferPoolAllocation/128KB-4 32.88µ 34.23µ ~ 0.700
_BufferPoolAllocation/512KB-4 119.5µ 124.4µ ~ 0.200
_BufferPoolConcurrent/32KB-4 20.28µ 19.49µ ~ 0.200
_BufferPoolConcurrent/64KB-4 31.64µ 32.15µ ~ 0.400
_BufferPoolConcurrent/512KB-4 148.7µ 150.4µ ~ 0.700
_SubtreeDeserializationWithBufferSizes/16KB-4 634.8µ 675.3µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/32KB-4 639.3µ 720.6µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/64KB-4 635.1µ 746.0µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/128KB-4 647.8µ 720.2µ ~ 0.100
_SubtreeDeserializationWithBufferSizes/512KB-4 648.0µ 661.5µ ~ 1.000
_SubtreeDataDeserializationWithBufferSizes/16KB-4 36.94m 36.78m ~ 0.400
_SubtreeDataDeserializationWithBufferSizes/32KB-4 36.73m 36.44m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/64KB-4 36.53m 36.89m ~ 0.700
_SubtreeDataDeserializationWithBufferSizes/128KB-4 36.72m 36.31m ~ 0.100
_SubtreeDataDeserializationWithBufferSizes/512KB-4 36.61m 36.09m ~ 0.100
_PooledVsNonPooled/Pooled-4 655.2n 741.6n ~ 0.100
_PooledVsNonPooled/NonPooled-4 8.157µ 8.207µ ~ 0.700
_MemoryFootprint/Current_512KB_32concurrent-4 6.631µ 6.552µ ~ 0.200
_MemoryFootprint/Proposed_32KB_32concurrent-4 9.782µ 11.573µ ~ 0.100
_MemoryFootprint/Alternative_64KB_32concurrent-4 9.205µ 10.346µ ~ 0.100
_prepareTxsPerLevel-4 345.4m 334.4m ~ 0.400
_prepareTxsPerLevelOrdered-4 3.411m 3.136m ~ 0.200
_prepareTxsPerLevel_Comparison/Original-4 322.9m 311.5m ~ 0.100
_prepareTxsPerLevel_Comparison/Optimized-4 3.108m 2.767m ~ 0.400
SubtreeSizes/10k_tx_4_per_subtree-4 1.256m 1.261m ~ 1.000
SubtreeSizes/10k_tx_16_per_subtree-4 296.6µ 301.7µ ~ 0.700
SubtreeSizes/10k_tx_64_per_subtree-4 70.74µ 72.28µ ~ 0.400
SubtreeSizes/10k_tx_256_per_subtree-4 17.92µ 17.74µ ~ 0.100
SubtreeSizes/10k_tx_512_per_subtree-4 8.791µ 8.897µ ~ 0.200
SubtreeSizes/10k_tx_1024_per_subtree-4 4.400µ 4.385µ ~ 0.100
SubtreeSizes/10k_tx_2k_per_subtree-4 2.179µ 2.186µ ~ 1.000
BlockSizeScaling/10k_tx_64_per_subtree-4 68.81µ 70.01µ ~ 0.400
BlockSizeScaling/10k_tx_256_per_subtree-4 17.42µ 17.62µ ~ 0.700
BlockSizeScaling/10k_tx_1024_per_subtree-4 4.344µ 4.383µ ~ 1.000
BlockSizeScaling/50k_tx_64_per_subtree-4 366.3µ 370.4µ ~ 1.000
BlockSizeScaling/50k_tx_256_per_subtree-4 88.12µ 87.04µ ~ 0.200
BlockSizeScaling/50k_tx_1024_per_subtree-4 21.53µ 21.65µ ~ 1.000
SubtreeAllocations/small_subtrees_exists_check-4 149.3µ 148.6µ ~ 1.000
SubtreeAllocations/small_subtrees_data_fetch-4 158.2µ 162.9µ ~ 0.100
SubtreeAllocations/small_subtrees_full_validation-4 308.4µ 309.9µ ~ 0.400
SubtreeAllocations/medium_subtrees_exists_check-4 8.827µ 8.835µ ~ 0.700
SubtreeAllocations/medium_subtrees_data_fetch-4 9.450µ 9.287µ ~ 0.100
SubtreeAllocations/medium_subtrees_full_validation-4 17.75µ 17.21µ ~ 0.200
SubtreeAllocations/large_subtrees_exists_check-4 2.080µ 2.075µ ~ 0.700
SubtreeAllocations/large_subtrees_data_fetch-4 2.251µ 2.219µ ~ 0.100
SubtreeAllocations/large_subtrees_full_validation-4 4.353µ 4.295µ ~ 0.700
StoreBlock_Sequential/BelowCSVHeight-4 335.8µ 336.8µ ~ 1.000
StoreBlock_Sequential/AboveCSVHeight-4 338.9µ 337.1µ ~ 0.200
GetUtxoHashes-4 266.5n 268.9n ~ 0.700
GetUtxoHashes_ManyOutputs-4 45.36µ 43.26µ ~ 0.100
_NewMetaDataFromBytes-4 228.1n 229.0n ~ 1.000
_Bytes-4 401.3n 399.4n ~ 1.000
_MetaBytes-4 136.5n 137.4n ~ 0.200

Threshold: >10% with p < 0.05 | Generated: 2026-06-03 10:52 UTC

@liam liam requested review from freemans13, ordishs and sugh01 June 3, 2026 14:49

@ordishs ordishs left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approve. Correct fix for a genuine, broadly-impacting data-loss bug on graceful Stop().

The safety argument holds: Stop() orders shuttingDown → close(ch) → publishWg.Wait() → closed.Store(true) → client.Flush()/Close(), so closed is only true after the worker has returned. Narrowing flushBuffered and the worker loop gate from closed || shuttingDown to closed is therefore provably safe — the final drain always produces against a live client and the trailing Flush() guarantees delivery. The runProducerWorker extraction is behaviour-preserving (only the two gate narrowings differ), and the produceHook-based test pins the exact regression (0/25 → 25/25). Verified TestAsyncProducer_DrainsBufferedBatchOnStop passes locally under -race.

Non-blocking: the closed guards in the worker loop and flushBuffered are effectively dead in the single-Stop lifecycle (closed is set only after the worker exits) — harmless defensive guards for a restart race; a one-line note would preempt reader confusion.

@liam liam merged commit 72e4a17 into bsv-blockchain:main Jun 3, 2026
46 of 47 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants