Skip to content

"Top K" style query with simple aggregation function reads many rows and uses a lot of memory #75098

@EmeraldShift

Description

@EmeraldShift

Company or project name

No response

Describe the situation

Writing queries like

select trace_id, max(start_time) from traces group by 1 order by 2 desc limit 10 settings max_threads=1

or

select trace_id, start_time from traces order by 2 desc limit 1 by trace_id limit 10 settings max_threads=1

cause a full table scan (even if start_time is part of the primary key and has a MinMax index), and uses memory as O(# rows) rather than O(limit).

I don't think #55518 is likely to solve it since there are no extra columns to delay here -- we need both the trace_id and the start_time to compute the result.

It would be nice if the query execution knew that max is a simple (stateless) aggregation function, so when a trace ID is removed from the top 10 because a more recent trace has been found, it can be dropped from memory.

How to reproduce

Server and client are version 24.12.3.47 (official build).

Traces is created as:

CREATE TABLE traces
(
    `trace_id` UInt32,
    `start_time` DateTime64
)
ENGINE = MergeTree
ORDER BY start_time
SETTINGS index_granularity = 1
AS SELECT
    number % 111 AS trace_id,
    CAST(number, 'DateTime64') AS start_time
FROM numbers(100_000_000)

The grouping select query above reads all 100M rows. The LIMIT 1 BY trace_id query reads only 7M, but this still seems like a lot more than necessary, since my index_granularity is 1, I have a sorted table, and a MinMax index on start_time.

See also fiddle. I used only 1M rows instead of 100M to ease the load.

Expected performance

My idea of the optimal execution for this query is:

  • Start scanning the table in reverse order (since table ORDER BY is start_time, and query ORDER BY is start_time DESC)
  • As we pass over the table, keep only the top 10 trace IDs and their start time in memory. This is possible because max is a stateless aggregation function, and when a trace drops out of the top 10 table, we can forget about it if/until it returns again.
  • Since we're reading the table in reverse order, at some point we will reach a point where all granules have a max(start_time) that is less than the smallest result in the table. Stop the query now, and return the results.

In case there is some parallelism reading the table, you can also

  • Have each parallel thread store the same size of table (as big as the limit)
  • When all readers are done, merge the tables together. Very similar to a Distributed limit query.

Additional context

In my real example my table is not actually sorted by start_time. The table looks more like

CREATE TABLE traces ( service LowCardinality(String), name LowCardinality(String), start_time DateTime64(6) ) ...
Engine = MergeTree
ORDER BY (service, name, toDateTime(start_time), ...)

So I understand it isn't as efficient to read the table in reverse order. However, because I'm sorting by toDateTime(start_time) and my service and name columns are low cardinality, It would still be nice if, within each unique (service, name) pair, we only scanned up to LIMIT unique trace IDs, then stopped reading, and finally merged together results, like in the parallel readers explanation above.

I'm eager to learn more about this kind of use case, or if I'm doing something wrong here! I'd like to make this observability platform as efficient as possible, and I'm willing to change the way I create this table if I need to.

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions