Fix DROP TABLE on Kafka tables hanging in rd_kafka_consumer_close#100604
Fix DROP TABLE on Kafka tables hanging in rd_kafka_consumer_close#100604alexey-milovidov wants to merge 8 commits intomasterfrom
Conversation
`DROP TABLE` on Kafka engine tables can hang indefinitely when `rd_kafka_consumer_close` deadlocks during the rebalance callback. The rebalance callback calls `rd_kafka_assign` which blocks on an internal queue that is not being serviced during close. Three fixes: 1. cppkafka: respect `RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE` flag in `Consumer::~Consumer()`. ClickHouse already sets this flag, but the destructor was ignoring it and calling `close()` anyway. 2. Add a 60-second timeout to `cleanConsumers()` in both `StorageKafka` and `StorageKafka2`, so that if consumers are stuck in-use, shutdown proceeds instead of hanging forever. 3. Remove unnecessary cleanup table in `test_kafka_consumer_hang` that creates and drops a second Kafka table with 8 consumers. The topic has no messages to drain, so this step is unnecessary and triggers the hang under MSan. Closes #100511 https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100392&sha=67f7eb5082b883fc918b13af17d67ee89ab57d2b&name_0=PR&name_1=Integration%20tests%20%28amd_msan%2C%202%2F6%29 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Workflow [PR], commit [1f4b705] Summary: ❌
AI ReviewSummaryThis PR hardens Kafka table shutdown by adding a bounded wait in ClickHouse Rules
Final Verdict
|
| std::count_if(consumers.begin(), consumers.end(), [](const auto & ptr) { return ptr->isInUse(); })); | ||
| } | ||
|
|
||
| for (const auto & consumer : consumers) |
There was a problem hiding this comment.
The new timeout path can proceed while some consumers are still marked in_use, but this loop still calls moveConsumer for every consumer unconditionally.
That is risky because an actually running worker can still hold/use this KafkaConsumer, and moveConsumer clears internal state and moves out the underlying cppkafka::Consumer. This can turn the timeout fallback into a use-after-move race instead of a safe shutdown.
Please skip in_use consumers after timeout (close only non-in_use ones), and keep/log the remaining count. The same pattern exists in StorageKafka2::cleanConsumers.
| LOG_WARNING(log, "Timed out waiting for consumer(s) to be released, proceeding with shutdown"); | ||
| } | ||
|
|
||
| for (const auto & consumer : consumers) |
There was a problem hiding this comment.
After wait_for times out, this loop still calls moveConsumer for all consumers, including those that may still be in_use.
For KeeperHandlingConsumer, moveConsumer has chassert(!isInUse()), so this fallback can trigger an assertion in debug/sanitizer builds. Even in release, force-closing a still-active consumer is unsafe.
Please guard this loop with if (consumer->isInUse()) continue; (and log how many were skipped), so the timeout fallback avoids hangs without violating the consumer-ownership invariant.
…hang-test # Conflicts: # contrib/cppkafka
After the timeout in `cleanConsumers`, the code previously called `moveConsumer` unconditionally on all consumers, which could race with worker threads still holding references. For `KeeperHandlingConsumer`, this would also trigger `chassert(!isInUse())` in debug/sanitizer builds. Now skip consumers that are still in use after the timeout and log how many were skipped. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The bugfix validation runs all tests in the modified `test_batch_fast.py` against the master binary (non-MSan). Since the consumer close deadlock only reproduces under MSan, all 93 tests pass on the master binary, causing the validation to report "Failed to reproduce the bug". By reverting the test file change, the bugfix validation for integration tests won't run (no integration test files changed). The C++ fix (cppkafka + timeout in `cleanConsumers`) is the real fix and makes the test cleanup code safe regardless. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
LLVM Coverage Report
Changed lines: 74.58% (44/59) · Uncovered code |
DROP TABLEon Kafka engine tables can hang indefinitely whenrd_kafka_consumer_closedeadlocks during the rebalance callback. The rebalance callback callsrd_kafka_assignwhich blocks on an internal queue that is not being serviced during close.Three fixes:
cppkafka: respect
RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSEflag inConsumer::~Consumer(). ClickHouse already sets this flag (inKafkaConsumer::createConsumer), but the cppkafka destructor was ignoring it and callingclose()anyway, leading to the deadlock.Timeout in
cleanConsumers: add a 60-second timeout tocleanConsumers()in bothStorageKafkaandStorageKafka2, so that if consumers are stuck in-use, shutdown proceeds instead of hanging forever.Test fix: remove unnecessary cleanup table in
test_kafka_consumer_hangthat creates and drops a second Kafka table with 8 consumers. The topic has no messages to drain, so this step is unnecessary and triggers the hang under MSan.CI report: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100392&sha=67f7eb5082b883fc918b13af17d67ee89ab57d2b&name_0=PR&name_1=Integration%20tests%20%28amd_msan%2C%202%2F6%29
Closes #100511
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Fix
DROP TABLEon Kafka engine tables potentially hanging indefinitely due to a deadlock inrd_kafka_consumer_close.Documentation entry for user-facing changes