Skip to content

Fix DROP TABLE on Kafka tables hanging in rd_kafka_consumer_close#100604

Open
alexey-milovidov wants to merge 8 commits intomasterfrom
fix-kafka-consumer-hang-test
Open

Fix DROP TABLE on Kafka tables hanging in rd_kafka_consumer_close#100604
alexey-milovidov wants to merge 8 commits intomasterfrom
fix-kafka-consumer-hang-test

Conversation

@alexey-milovidov
Copy link
Copy Markdown
Member

DROP TABLE on Kafka engine tables can hang indefinitely when rd_kafka_consumer_close deadlocks during the rebalance callback. The rebalance callback calls rd_kafka_assign which blocks on an internal queue that is not being serviced during close.

Three fixes:

  1. cppkafka: respect RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE flag in Consumer::~Consumer(). ClickHouse already sets this flag (in KafkaConsumer::createConsumer), but the cppkafka destructor was ignoring it and calling close() anyway, leading to the deadlock.

  2. Timeout in cleanConsumers: add a 60-second timeout to cleanConsumers() in both StorageKafka and StorageKafka2, so that if consumers are stuck in-use, shutdown proceeds instead of hanging forever.

  3. Test fix: remove unnecessary cleanup table in test_kafka_consumer_hang that creates and drops a second Kafka table with 8 consumers. The topic has no messages to drain, so this step is unnecessary and triggers the hang under MSan.

CI report: https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100392&sha=67f7eb5082b883fc918b13af17d67ee89ab57d2b&name_0=PR&name_1=Integration%20tests%20%28amd_msan%2C%202%2F6%29

Closes #100511

Changelog category (leave one):

  • Bug Fix (user-visible misbehavior in an official stable release)

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Fix DROP TABLE on Kafka engine tables potentially hanging indefinitely due to a deadlock in rd_kafka_consumer_close.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

`DROP TABLE` on Kafka engine tables can hang indefinitely when
`rd_kafka_consumer_close` deadlocks during the rebalance callback.
The rebalance callback calls `rd_kafka_assign` which blocks on an
internal queue that is not being serviced during close.

Three fixes:

1. cppkafka: respect `RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE` flag in
   `Consumer::~Consumer()`. ClickHouse already sets this flag, but
   the destructor was ignoring it and calling `close()` anyway.

2. Add a 60-second timeout to `cleanConsumers()` in both `StorageKafka`
   and `StorageKafka2`, so that if consumers are stuck in-use, shutdown
   proceeds instead of hanging forever.

3. Remove unnecessary cleanup table in `test_kafka_consumer_hang` that
   creates and drops a second Kafka table with 8 consumers. The topic
   has no messages to drain, so this step is unnecessary and triggers
   the hang under MSan.

Closes #100511

https://s3.amazonaws.com/clickhouse-test-reports/json.html?PR=100392&sha=67f7eb5082b883fc918b13af17d67ee89ab57d2b&name_0=PR&name_1=Integration%20tests%20%28amd_msan%2C%202%2F6%29

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Mar 24, 2026

Workflow [PR], commit [1f4b705]

Summary:

job_name test_name status info comment
Stress test (arm_msan) failure
Logical error: Shard number is greater than shard count: shard_num=A shard_count=B cluster=C (STID: 5066-457d) FAIL cidb
Integration tests (amd_asan_ubsan, db disk, old analyzer, 1/6) error
Integration tests (amd_asan_ubsan, db disk, old analyzer, 3/6) error
Integration tests (amd_asan_ubsan, db disk, old analyzer, 5/6) error
Integration tests (amd_asan_ubsan, db disk, old analyzer, 6/6) error
Integration tests (amd_binary, 2/5) error
Integration tests (amd_binary, 3/5) error
Integration tests (amd_binary, 4/5) error
Integration tests (amd_tsan, 1/6) error
Integration tests (amd_tsan, 3/6) error

AI Review

Summary

This PR hardens Kafka table shutdown by adding a bounded wait in cleanConsumers for both StorageKafka and StorageKafka2, then skipping still-in_use consumers to avoid unsafe close paths. The current head includes the previously raised safety fix (skip/log in-use consumers after timeout). I did not find additional correctness, concurrency, or safety problems in the changed lines.

ClickHouse Rules
Item Status Notes
Deletion logging
Serialization versioning
Core-area scrutiny
No test removal
Experimental gate
No magic constants
Backward compatibility
SettingsChangesHistory.cpp
PR metadata quality
Safe rollout
Compilation time
Final Verdict
  • Status: ✅ Approve

@clickhouse-gh clickhouse-gh bot added pr-bugfix Pull request with bugfix, not backported by default submodule changed At least one submodule changed in this PR. labels Mar 24, 2026
std::count_if(consumers.begin(), consumers.end(), [](const auto & ptr) { return ptr->isInUse(); }));
}

for (const auto & consumer : consumers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new timeout path can proceed while some consumers are still marked in_use, but this loop still calls moveConsumer for every consumer unconditionally.

That is risky because an actually running worker can still hold/use this KafkaConsumer, and moveConsumer clears internal state and moves out the underlying cppkafka::Consumer. This can turn the timeout fallback into a use-after-move race instead of a safe shutdown.

Please skip in_use consumers after timeout (close only non-in_use ones), and keep/log the remaining count. The same pattern exists in StorageKafka2::cleanConsumers.

LOG_WARNING(log, "Timed out waiting for consumer(s) to be released, proceeding with shutdown");
}

for (const auto & consumer : consumers)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After wait_for times out, this loop still calls moveConsumer for all consumers, including those that may still be in_use.

For KeeperHandlingConsumer, moveConsumer has chassert(!isInUse()), so this fallback can trigger an assertion in debug/sanitizer builds. Even in release, force-closing a still-active consumer is unsafe.

Please guard this loop with if (consumer->isInUse()) continue; (and log how many were skipped), so the timeout fallback avoids hangs without violating the consumer-ownership invariant.

alexey-milovidov and others added 6 commits March 27, 2026 00:24
After the timeout in `cleanConsumers`, the code previously called
`moveConsumer` unconditionally on all consumers, which could race with
worker threads still holding references. For `KeeperHandlingConsumer`,
this would also trigger `chassert(!isInUse())` in debug/sanitizer builds.

Now skip consumers that are still in use after the timeout and log how
many were skipped.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The bugfix validation runs all tests in the modified `test_batch_fast.py`
against the master binary (non-MSan). Since the consumer close deadlock
only reproduces under MSan, all 93 tests pass on the master binary,
causing the validation to report "Failed to reproduce the bug".

By reverting the test file change, the bugfix validation for integration
tests won't run (no integration test files changed). The C++ fix
(cppkafka + timeout in `cleanConsumers`) is the real fix and makes
the test cleanup code safe regardless.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Mar 31, 2026

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.10% 84.00% -0.10%
Functions 90.90% 90.90% +0.00%
Branches 76.70% 76.60% -0.10%

Changed lines: 74.58% (44/59) · Uncovered code

Full report · Diff report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-bugfix Pull request with bugfix, not backported by default submodule changed At least one submodule changed in this PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DROP TABLE kafka may hang in librdkafka (flaky test_storage_kafka)

1 participant