Implement queue mode for MergeTree#59827
Implement queue mode for MergeTree#59827Michicosun wants to merge 57 commits intoClickHouse:masterfrom
Conversation
…/42990/block_row_column
How this is different from |
|
This is an automated comment for commit 11f2a34 with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page
Successful checks
|
Combination of columns (_queue_block_number, _queue_block_offset) gives a strict order over table rows inside each partition. _block_number by itself only gives strict order over blocks, which is not enough to express the cursor in any case, but if to add offset inside the block as second order column, the combination will. _queue_block_number is semantically equal to _block_number, but:
|
Does it mean that for streaming queries, you need to have specific Each new insert create new part, so in general data with latest |
Yes, the first idea in issue was to create some column like sequence-id, but it does not link to block numbers synchronization, so it is better to use block number in data ordering. Primary key and Ordering key will contain
In streaming queries should be possible to create stream from any offset in queue - it is like to cut some prefix and read all suffix as stream, but without this columns in index it will be complicated. |
|
@UnamedRus, it is for persistent queues, like Kafka, and they have to support efficient reading from an arbitrary offset - that's why the queue mode requires this offset in the primary key. |
…/42990/block_row_column
|
Ok, i see. |
| auto cols = block.block.getColumns(); | ||
| size_t rows = block.block.rows(); | ||
|
|
||
| SipHash hash; | ||
| for (size_t j = 0; j < rows; ++j) | ||
| { | ||
| for (const auto & col : cols) | ||
| col->updateHashWithValue(j, hash); | ||
| } |
There was a problem hiding this comment.
This method does the same
Lines 787 to 792 in c1754d3
There was a problem hiding this comment.
changed the logic of these methods, added a column filter.
| auto& block_number_lock = lock_holder != nullptr ? lock_holder->block_number_lock : current_try_block_number_lock; | ||
| chassert(block_number_lock.has_value()); |
There was a problem hiding this comment.
Seems a bit useless, it is not used anywhere, but assertion was already performed in line 874.
There was a problem hiding this comment.
I think it should be leaved, this check helps to understand that in any execution above, lock is initialized.
Removed assert in 874
…/42990/block_row_column
…/42990/block_row_column
…/42990/block_row_column
…/42990/block_row_column
…/42990/block_row_column
…/42990/block_row_column
| if (!partsContainSameProjections(left, right, disable_reason)) | ||
| return false; | ||
|
|
||
| /// If storage in queue mode then block number allocation is performed not under parts lock. |
There was a problem hiding this comment.
Why only in queue mode? Any block number allocation is performed without parts lock
There was a problem hiding this comment.
For regular MergeTree i think it is not true, as i see final allocation of block number is performed here: https://github.com/ClickHouse/ClickHouse/blob/master/src/Storages/MergeTree/MergeTreeSink.cpp#L174.
|
|
||
| CREATE TABLE queue_mode_test(a UInt64, b UInt64) ENGINE=MergeTree() ORDER BY tuple() PARTITION BY a SETTINGS queue_mode=1, index_granularity=1; | ||
|
|
||
| INSERT INTO queue_mode_test (*) SELECT 0, number FROM numbers(5); -- _queue_block_number = 1 |
There was a problem hiding this comment.
let's add a test with async inserts
…/42990/block_row_column
…/42990/block_row_column
…/42990/block_row_column
…/42990/block_row_column
|
Dear @kssenii, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself. |
|
Dear @kssenii, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself. |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Relates to: #42990
Implemented queue feature for [Replicated]MergeTree storage engines. With queue flag enabled 2 materialized columns will be added to table: _queue_block_number, _queue_block_offset, their combination introduces a global order in the rows of the table, which in the future will help to support cursors for streaming queries.
Example:
Documentation entry for user-facing changes