Skip to content

Commit 992f0df

Browse files
Merge pull request #11149 from filimonov/kafka_reschedule
Fixed reschedule issue in Kafka
2 parents 3399e57 + c73b837 commit 992f0df

4 files changed

Lines changed: 64 additions & 9 deletions

File tree

src/Storages/Kafka/KafkaBlockInputStream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class KafkaBlockInputStream : public IBlockInputStream
2525
void readSuffixImpl() override;
2626

2727
void commit();
28+
bool isStalled() const { return buffer->isStalled(); }
2829

2930
private:
3031
StorageKafka & storage;

src/Storages/Kafka/ReadBufferFromKafkaConsumer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
3838

3939
bool hasMorePolledMessages() const;
4040
bool polledDataUnusable() const { return (was_stopped || rebalance_happened); }
41+
bool isStalled() const { return stalled; }
4142

4243
void storeLastReadMessageOffset();
4344
void resetToLastCommitted(const char * msg);

src/Storages/Kafka/StorageKafka.cpp

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ namespace
5151
{
5252
const auto RESCHEDULE_MS = 500;
5353
const auto CLEANUP_TIMEOUT_MS = 3000;
54+
const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever)
5455

5556
/// Configuration prefix
5657
const String CONFIG_PREFIX = "kafka";
@@ -386,6 +387,8 @@ void StorageKafka::threadFunc()
386387
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
387388
if (dependencies_count)
388389
{
390+
auto start_time = std::chrono::steady_clock::now();
391+
389392
// Keep streaming as long as there are attached views and streaming is not cancelled
390393
while (!stream_cancelled && num_created_consumers > 0)
391394
{
@@ -394,9 +397,21 @@ void StorageKafka::threadFunc()
394397

395398
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
396399

397-
// Reschedule if not limited
398-
if (!streamToViews())
400+
// Exit the loop & reschedule if some stream stalled
401+
auto some_stream_is_stalled = streamToViews();
402+
if (some_stream_is_stalled)
403+
{
404+
LOG_TRACE(log, "Stream(s) stalled. Reschedule.");
405+
break;
406+
}
407+
408+
auto ts = std::chrono::steady_clock::now();
409+
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
410+
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
411+
{
412+
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
399413
break;
414+
}
400415
}
401416
}
402417
}
@@ -459,15 +474,15 @@ bool StorageKafka::streamToViews()
459474
// It will be cancelled on underlying layer (kafka buffer)
460475
std::atomic<bool> stub = {false};
461476
copyData(*in, *block_io.out, &stub);
477+
478+
bool some_stream_is_stalled = false;
462479
for (auto & stream : streams)
480+
{
481+
some_stream_is_stalled = some_stream_is_stalled || stream->as<KafkaBlockInputStream>()->isStalled();
463482
stream->as<KafkaBlockInputStream>()->commit();
483+
}
464484

465-
// Check whether the limits were applied during query execution
466-
bool limits_applied = false;
467-
const BlockStreamProfileInfo & info = in->getProfileInfo();
468-
limits_applied = info.hasAppliedLimit();
469-
470-
return limits_applied;
485+
return some_stream_is_stalled;
471486
}
472487

473488
void registerStorageKafka(StorageFactory & factory)

tests/integration/test_storage_kafka/test.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,10 @@ def test_kafka_flush_by_block_size(kafka_cluster):
10171017

10181018
time.sleep(1)
10191019

1020-
result = instance.query('SELECT count() FROM test.view')
1020+
# TODO: due to https://github.com/ClickHouse/ClickHouse/issues/11216
1021+
# second flush happens earlier than expected, so we have 2 parts here instead of one
1022+
# flush by block size works correctly, so the feature checked by the test is working correctly
1023+
result = instance.query("SELECT count() FROM test.view WHERE _part='all_1_1_0'")
10211024
# print(result)
10221025

10231026
# kafka_cluster.open_bash_shell('instance')
@@ -1390,6 +1393,41 @@ def produce():
13901393
assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0]-1)), 'Missing data!'
13911394

13921395

1396+
1397+
@pytest.mark.timeout(120)
1398+
def test_bad_reschedule(kafka_cluster):
1399+
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(20000)]
1400+
kafka_produce('test_bad_reschedule', messages)
1401+
1402+
instance.query('''
1403+
CREATE TABLE test.kafka (key UInt64, value UInt64)
1404+
ENGINE = Kafka
1405+
SETTINGS kafka_broker_list = 'kafka1:19092',
1406+
kafka_topic_list = 'test_bad_reschedule',
1407+
kafka_group_name = 'test_bad_reschedule',
1408+
kafka_format = 'JSONEachRow',
1409+
kafka_max_block_size = 1000;
1410+
1411+
CREATE MATERIALIZED VIEW test.destination Engine=Log AS
1412+
SELECT
1413+
key,
1414+
now() as consume_ts,
1415+
value,
1416+
_topic,
1417+
_key,
1418+
_offset,
1419+
_partition,
1420+
_timestamp
1421+
FROM test.kafka;
1422+
''')
1423+
1424+
while int(instance.query("SELECT count() FROM test.destination")) < 20000:
1425+
print("Waiting for consume")
1426+
time.sleep(1)
1427+
1428+
assert int(instance.query("SELECT max(consume_ts) - min(consume_ts) FROM test.destination")) < 8
1429+
1430+
13931431
@pytest.mark.timeout(1200)
13941432
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
13951433
messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)]

0 commit comments

Comments
 (0)