Support rewrite to optimize order by limit#82478
Support rewrite to optimize order by limit#82478acking-you wants to merge 11 commits intoClickHouse:masterfrom
Conversation
7c4a818 to
db759d5
Compare
|
Could you please provide a test for it? |
okey,I will do it later |
|
Another alternative is to use Also, make sense to wrap it in indexHint function, to only use it in index lookup. |
Thank you very much for your suggestion! In fact, this optimization is not only suitable for large data volumes but also performs well with small data volumes (e.g., limit 10).
|
done @novikd |
@UnamedRus I don't think it's feasible. We still need row offsets to retrieve the Top K rows, and index analysis for |
|
Workflow [PR], commit [eb995ef] Summary: ❌
|
Could it be useful in an inverted projection index to efficiently store granules to check? Maybe there are cases where there would be too much data if you store the individual offsets, and you're willing to spend more time re-checking the indexed predicate upon reading the data rather than getting it from the index? Something like where |
It's more like, If we are going to read most of granula anyway, wouldn't it be simpler to read it as whole and just sort more rows? It's bit of stretch, but example when this happens. (plus second subquery is more "expensive" to run) |
75d4f55 to
d257ad7
Compare
|
Now, by modifying the manual invocation of rewrite in |
But you'll still need to apply filters again. While there might be edge cases that benefit from this approach, ClickHouse already handles IN sets with around a million elements quite efficiently. I’m skeptical there's a real use case for doing top-N over billions :) |
The main benefits of the projection index (row-level index) are twofold:
If we only track granules, we can only support the first benefit — and its actual impact can vary significantly depending on workload and data distribution in real-world scenarios. |
c0e9198 to
eb995ef
Compare
|
An additional question, does it support pushing optimize_read_in_order into projection? Or will it? |
Great point! I think that whether to support the pushdown of Here is my testing process: CREATE TABLE mydata (`A` Int64, `B` Int64, `C` String, projection p (select A,B,_part_offset order by B) ) ENGINE = MergeTree ORDER BY A AS SELECT number AS A, 999999 - number AS B, if(number between 1000 and 2000, 'x', toString(number)) AS C FROM numbers(1000000);
insert into mydata SELECT * from mydata where B > 14 limit 20;
insert into mydata values(1000001,-100,'a');
-- where can be optimized by the projection
select * from mydata where _part_offset+_part_starting_offset in( select _part_offset+_part_starting_offset from mydata where B < 10 order by B limit 10);
10 rows in set. Elapsed: 0.038 sec. Processed 8.77 thousand rows, 88.13 KB (231.96 thousand rows/s., 2.33 MB/s.)
Peak memory usage: 472.89 KiB.
-- optimize_read_in_order can be applied to non-projection
select * from mydata where _part_offset+_part_starting_offset in( select _part_offset+_part_starting_offset from mydata order by A limit 10) settings optimize_read_in_order=1;
10 rows in set. Elapsed: 0.027 sec. Processed 16.41 thousand rows, 430.40 KB (607.41 thousand rows/s., 15.94 MB/s.)
Peak memory usage: 448.11 KiB.
-- Cannot be applied to projection
select * from mydata where _part_offset+_part_starting_offset in( select _part_offset+_part_starting_offset from mydata order by B limit 10) settings optimize_read_in_order=1;I understand that as long as the original read process in The potential concern is: will the read process of |
|
Dear @novikd, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself. |
|
I manually tried this rewrite for a use case where the right-hand side of |
Yes, this is expected. But I understand that doing so is similar to splitting a single SQL into two executions. The second execution already clearly identifies which data needs to be scanned, which is very effective in scenarios where a full table scan is required by default, such as the "order by limit" issue resolved in this PR. If you need to avoid the involvement of the CreatingSets operator, the previous ColumnLazy was a good solution. However, it also requires a secondary table-rewriting operation when data is actually read, and it is difficult to make full use of the existing parallel reading pipeline. I haven't thought of a better solution for now. |
|
Does anyone know why this is so much faster than lazy materialization? I'm looking into a logging use case with search queries like this: At a limit of 500, should I expect this feature to still be ~100x faster (!!!) than ColumnLazy? What are the trade-offs? Also, in the meantime, without this PR merged yet, is there any way to get the correct behavior with a |
When I tested before, this gap was mainly because ColumLazy was unable to read in parallel when performing final materialized reading. I don’t know if the latest version of ClickHouse has solved this problem.
I think if the data itself is order by time, there should be no advantage in using delayed materialization, including rewriting the query statement. I think you can try turning off delayed materialization. |
I'm curious, can you say more about how this optimization works for Distributed tables? Does it perform a local |
The current implementation of distributed tables is very simple—essentially just a rewrite over each local table. Your guess is correct: it performs a local
We're implementing a two-phase approach to achieve proper global ORDER BY with LIMIT: Phase 1: Collect globally-sorted RowIDs -- Rewritten internally
INSERT INTO _rowid_collector()
SELECT _rowid FROM distributed_table
WHERE condition
ORDER BY sort_col
LIMIT 100 -- This is GLOBAL limitHere, Phase 2: Fetch exact rows -- Using collected RowIDs
SELECT cols FROM distributed_table
WHERE _rowid IN (...100 exact IDs...)Key difference from per-shard LIMIT: Phase 1 performs global sorting across all shards (not just local per shard), ensuring correct top-100 results. Bonus: Once RowIDs are stored in memory, we can support asynchronous fetching for simple queries that can use these RowIDs directly (though not for aggregations that would require re-scanning data). |
|
The improved implementation of lazy materialization in 25.12 shows me about the same speedup |
great job!❤️ |
Summary
Relevant issue: #79645
This PR optimizes
order by limitbased onwhere (xx) in subquery, with the specific description as follows:For SQL statements with a large LIMIT value, such as
order by limit 100000, significant performance improvements can be achieved. Below are the test results on the hits dataset, showing a performance increase of nearly 100 times compared to ColumnLazy.The test version is
25.7.1.1, and the following query results are all hot queries (execute three times and take the best result):The performance comparison is as follows:
SELECT * FROM hits ORDER BY EventTime LIMIT 100000(Memory limit exceeded)
SET query_plan_max_limit...+ original query
SELECT * FROM hits WHERE ... IN (subquery)(71.017 ÷ 0.784)
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
RewriteOrderByLimitPassin Analyzer for pattern recognition and rewritingquery_plan_rewrite_order_by_limitquery_plan_max_limit_for_rewrite_order_by_limitquery_plan_min_columns_to_use_rewrite_order_by_limitDocumentation entry for user-facing changes
Add the capability to rewrite "order by limit" into a subquery based on rowid.
Known Issues
This optimization is only applicable to non-distributed tables for the following reasons:
(part_starting_offset+part_offset)or(part,part_offset)to identify a row is only valid in the case of non-distributed tables.Possible solutions are:
createLocalPlan.ip,shard_name) to identify a row in the case of distributed tables.