Skip to content

Better settings for Kafka#11388

Merged
abyss7 merged 9 commits intoClickHouse:masterfrom
filimonov:kafka_better_settings
Jun 6, 2020
Merged

Better settings for Kafka#11388
abyss7 merged 9 commits intoClickHouse:masterfrom
filimonov:kafka_better_settings

Conversation

@filimonov
Copy link
Copy Markdown
Contributor

@filimonov filimonov commented Jun 2, 2020

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Support for all format settings in Kafka, expose some setting on table level, adjust the defaults for better performance.

Detailed description / Documentation draft:
Now it is possible to use any setting related to format parsing during the creation of Kafka table.
Sample:

CREATE TABLE test.kafka (a UInt64, b String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'issue4116',
kafka_group_name = 'issue4116',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
format_csv_delimiter = '|';

Those settings changed/added:

  • kafka_max_block_size: Number of rows collected by poll(s) for flushing data from Kafka. Default changed to min_insert_block_size / kafka_num_consumers (for the single consumer it is 1048576, before that max_block_size (65536) was used, which was suboptimal, because was leading to too frequent commits and too small insert blocks in the target table. See Fixed reschedule issue in Kafka #11149 (comment) Kafka fixes part2 #8917 (comment)
  • kafka_poll_max_batch_size: Maximum amount of messages to be polled in a single Kafka poll. (default now is min(max_block_size, kafka_max_block_size), normally 65536). Can now be configurable separated, before is was kafka_max_block_size. It's better to do smaller polls, to avoid bigger allocations, and to give a chance for librdkafka to fill the queue while we processing polled block.
  • kafka_poll_timeout_ms: Timeout for the single poll from Kafka (default is taken from stream_poll_timeout_ms=500ms), new setting, can now be configurable per table (before stream_poll_timeout_ms was always used).
  • kafka_flush_interval_ms: Timeout for flushing data from Kafka (default is taken from stream_flush_interval_ms=7500ms), new setting, can now be configurable per table (before stream_flush_interval_ms was always used).

To understand the relation of those settings check the following preuso-code illustrating how ClickHouse consumes the data from the kafka:

insert_block = [];
timer.start();
while (true)
{
   messages = kafka.batch_poll(kafka_poll_max_batch_size, kafka_poll_timeout_ms);  
   insert_block.add_rows(parse(messages));
   if (insert_block.rows >= kafka_max_block_size)
      break;
   if (timer.time_since_start >=  kafka_flush_interval_ms)
      break;
}
put_to_target_tables(insert_block);
kafka.commit();

Librdkafka settings adjusted:

  • client.software.name - now filled as "ClickHouse"
  • client.software.version - now filled with clickhouse version, for example "v20.5.1.1-prestable"
  • queued.min.messages - default (100000) is increased to kafka_max_block_size (but not decreased), which allows preventing fast draining of the librdkafka queue during the building of a single insert block. Improves performance significantly, but may lead to bigger memory consumption.

Extra:

  • code around Kafka settings cleaned a bit,
  • get rid of excessive Context copying

Closes #11308
Closes #4116
Closes #8056

@blinkov blinkov added the pr-improvement Pull request with some product improvements label Jun 2, 2020
@qoega qoega added the doc-alert label Jun 3, 2020
@filimonov filimonov force-pushed the kafka_better_settings branch from 3973762 to 94261b9 Compare June 3, 2020 17:07
@filimonov filimonov marked this pull request as ready for review June 3, 2020 17:08
@filimonov filimonov changed the title [wip] Better settings for Kafka Better settings for Kafka Jun 3, 2020
@filimonov
Copy link
Copy Markdown
Contributor Author

filimonov commented Jun 4, 2020

It seems like queued.min.messages=kafka_max_block_size allows to archive very good performance.

Also it seem that the issue #11216 affects performance (when we drain librdkafka internal queue it start reporting 'stalled' status).

@abyss7 abyss7 self-assigned this Jun 4, 2020
@filimonov
Copy link
Copy Markdown
Contributor Author

filimonov commented Jun 4, 2020

Benchmark results: consume speed (measured by https://github.com/filimonov/ch-kafka-consume-perftest):

That PR (v20.5.1.3632. 26d93fd)

  • all settings default: ~466000 rows/sec
  • kafka_num_consumers=4: ~1243000 rows/sec

Base master commit:

  • all settings default: ~180000 rows/sec
  • kafka_max_block_size=1048576: ~342000 rows/sec
  • kafka_max_block_size=1048576, queued.min.messages=1048576: ~436000 rows/sec
  • kafka_max_block_size=1048576, queued.min.messages=1048576, kafka_num_consumers=4: ~1157000 row/sec

With default settings: 2.58x times faster. <-- that was the main goal of changing defaults.
That PR default setting vs master (tuned) >=5-40% faster (depends on used tuning).

Comment thread tests/integration/test_storage_kafka/test.py
@abyss7
Copy link
Copy Markdown
Contributor

abyss7 commented Jun 6, 2020

Yandex check failure is unrelated

Comment thread src/Storages/Kafka/StorageKafka.cpp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements

Projects

None yet

5 participants