-
Notifications
You must be signed in to change notification settings - Fork 8.3k
"Top K" style query with simple aggregation function reads many rows and uses a lot of memory #75098
Description
Company or project name
No response
Describe the situation
Writing queries like
select trace_id, max(start_time) from traces group by 1 order by 2 desc limit 10 settings max_threads=1
or
select trace_id, start_time from traces order by 2 desc limit 1 by trace_id limit 10 settings max_threads=1
cause a full table scan (even if start_time is part of the primary key and has a MinMax index), and uses memory as O(# rows) rather than O(limit).
I don't think #55518 is likely to solve it since there are no extra columns to delay here -- we need both the trace_id and the start_time to compute the result.
It would be nice if the query execution knew that max is a simple (stateless) aggregation function, so when a trace ID is removed from the top 10 because a more recent trace has been found, it can be dropped from memory.
How to reproduce
Server and client are version 24.12.3.47 (official build).
Traces is created as:
CREATE TABLE traces
(
`trace_id` UInt32,
`start_time` DateTime64
)
ENGINE = MergeTree
ORDER BY start_time
SETTINGS index_granularity = 1
AS SELECT
number % 111 AS trace_id,
CAST(number, 'DateTime64') AS start_time
FROM numbers(100_000_000)
The grouping select query above reads all 100M rows. The LIMIT 1 BY trace_id query reads only 7M, but this still seems like a lot more than necessary, since my index_granularity is 1, I have a sorted table, and a MinMax index on start_time.
See also fiddle. I used only 1M rows instead of 100M to ease the load.
Expected performance
My idea of the optimal execution for this query is:
- Start scanning the table in reverse order (since table
ORDER BYisstart_time, and queryORDER BYisstart_time DESC) - As we pass over the table, keep only the top 10 trace IDs and their start time in memory. This is possible because
maxis a stateless aggregation function, and when a trace drops out of the top 10 table, we can forget about it if/until it returns again. - Since we're reading the table in reverse order, at some point we will reach a point where all granules have a
max(start_time)that is less than the smallest result in the table. Stop the query now, and return the results.
In case there is some parallelism reading the table, you can also
- Have each parallel thread store the same size of table (as big as the limit)
- When all readers are done, merge the tables together. Very similar to a Distributed limit query.
Additional context
In my real example my table is not actually sorted by start_time. The table looks more like
CREATE TABLE traces ( service LowCardinality(String), name LowCardinality(String), start_time DateTime64(6) ) ...
Engine = MergeTree
ORDER BY (service, name, toDateTime(start_time), ...)
So I understand it isn't as efficient to read the table in reverse order. However, because I'm sorting by toDateTime(start_time) and my service and name columns are low cardinality, It would still be nice if, within each unique (service, name) pair, we only scanned up to LIMIT unique trace IDs, then stopped reading, and finally merged together results, like in the parallel readers explanation above.
I'm eager to learn more about this kind of use case, or if I'm doing something wrong here! I'd like to make this observability platform as efficient as possible, and I'm willing to change the way I create this table if I need to.