Add per-part aggregation cache for incremental GROUP BY#101073
Add per-part aggregation cache for incremental GROUP BY#101073sociopate-sss wants to merge 23 commits intoClickHouse:masterfrom
Conversation
|
|
|
Workflow [PR], commit [57f3752] Summary: ❌
AI ReviewSummaryThis PR introduces an experimental per-part aggregation cache for PR Metadata
Findings❌ Blockers
💡 Nits
Tests
ClickHouse Rules
Performance & Safety
Final Verdict
|
| { | ||
| public: | ||
| struct Key | ||
| { |
There was a problem hiding this comment.
PartAggregationCache::Key does not include table identity (for example StorageID/UUID), only query_hash + part_name. MergeTree part names are not globally unique across tables (e.g. many tables have all_1_1_0), so the same GROUP BY shape on two tables can produce the same key and return cached states from the wrong table, which can return incorrect results.
Please include table identity in the cache key and in invalidation bookkeeping (e.g. {query_hash, storage_uuid, part_name}).
| removeEntry(evict_key); | ||
| } | ||
|
|
||
| void PartAggregationCache::removeEntry(const Key & key) |
There was a problem hiding this comment.
removeEntry erases from cache and lru_list, but it never removes the corresponding key from part_name_to_keys. Because set always does part_name_to_keys[key.part_name].push_back(key), this auxiliary map grows monotonically even when entries are evicted, so cache metadata memory is unbounded by max_size_in_bytes.
Please delete the key from part_name_to_keys inside removeEntry (and erase empty vectors) so metadata lifetime matches cache entry lifetime.
| prewhere->prewhere_column_name}); | ||
| } | ||
|
|
||
| IASTHash query_hash = PartAggregationCache::calculateQueryHash( |
There was a problem hiding this comment.
query_hash currently ignores intermediate_actions (the ExpressionStep/FilterStep chain between ReadFromMergeTree and AggregatingStep).
So two queries with the same GROUP BY keys/aggregates but different WHERE expressions can map to the same cache key and reuse incompatible per-part aggregate states. That can return incorrect results.
Please include all intermediate transformation/filter logic in the cache key (for example, hash the full action DAGs, not only prewhere outputs).
| Possible values: | ||
|
|
||
| - Any string | ||
| )", 0) \ |
There was a problem hiding this comment.
This introduces a new query-execution optimization path (rewriting aggregation plans to mix cached and fresh per-part states), but the rollout is guarded by a regular setting use_part_aggregation_cache instead of an experimental gate.
For new high-risk execution behavior, ClickHouse usually requires an explicit allow_experimental_* gate until correctness is proven in production-like workloads.
Please gate this with an experimental setting (for example allow_experimental_part_aggregation_cache) and keep use_part_aggregation_cache as an additional opt-in knob behind it.
| return false; | ||
| break; | ||
| } | ||
| case Type::CLEAR_PART_AGGREGATION_CACHE: |
There was a problem hiding this comment.
Type::CLEAR_PART_AGGREGATION_CACHE does not parse ON CLUSTER, unlike other SYSTEM CLEAR ... CACHE commands.
Right now this case just breaks, so statements like SYSTEM DROP PART AGGREGATION CACHE ON CLUSTER 'cluster' leave trailing tokens and fail to parse. This also makes behavior inconsistent with getRequiredAccessForDDLOnCluster, which already includes CLEAR_PART_AGGREGATION_CACHE.
Please call parseQueryWithOnCluster(res, pos, expected) here (same pattern as CLEAR_QUERY_CACHE / CLEAR_FILESYSTEM_CACHE) so cluster execution works.
…intermediate actions, experimental gate, ON CLUSTER support
programs/server/Server.cpp
Outdated
|
|
||
| /// Part aggregation cache: caches per-part intermediate aggregation states for partial GROUP BY reuse. | ||
| { | ||
| size_t part_aggregation_cache_max_size = server_settings[ServerSetting::part_aggregation_cache_max_size_in_bytes]; |
There was a problem hiding this comment.
This initialization is good, but there is a config-reload gap: Context::updatePartAggregationCacheConfiguration is added yet never called from the reload path in Server::main (the block that updates other caches around updateQueryResultCacheConfiguration / updateQueryConditionCacheConfiguration).
Impact: changing part_aggregation_cache.max_size_in_bytes in config.xml and reloading config will not apply until restart, unlike neighboring cache settings.
Please add global_context->updatePartAggregationCacheConfiguration(config()); in the reload update block.
| query_hash = getSipHash128AsPair(extra_hash); | ||
| } | ||
|
|
||
| String table_id = reading->getMergeTreeData().getStorageID().getFullTableName(); |
There was a problem hiding this comment.
Using getStorageID().getFullTableName() as cache identity is still unsafe across DROP TABLE + CREATE TABLE with the same name.
MergeTree part names restart from all_1_1_0, and this cache is global plus not invalidated on table drop, so a recreated table can hit stale cached states from the previous table instance and return incorrect results.
Please key by stable table identity (UUID for Atomic databases, fallback only when UUID is absent), e.g. include StorageID::uuid in PartAggregationCache::Key.
|
|
||
| bool enable_writes = settings[Setting::enable_writes_to_part_aggregation_cache]; | ||
|
|
||
| if (cached_entries.empty() && enable_writes) |
There was a problem hiding this comment.
The cache population is gated by if (cached_entries.empty() && enable_writes), so writes happen only on fully cold cache.
When cache is partially warm (some hits + some misses, e.g. after new parts are inserted), uncached_parts are processed by the query path but never written back to PartAggregationCache. This means those parts stay uncached forever unless the whole cache becomes empty, so the optimization cannot incrementally learn new parts.
Please also populate cache for uncached_parts when enable_writes is enabled (not only when cached_entries.empty()).
| for (const auto & part : parts) | ||
| { | ||
| PartAggregationCache::Key key{query_hash, table_id, part.data_part->name}; | ||
| auto entry = cache->get(key); |
There was a problem hiding this comment.
enable_reads_from_part_aggregation_cache is bypassed on cold cache.
When reads are disabled, the first loop correctly does entry = nullptr and marks all parts as uncached. But if writes are enabled, the cold-cache branch populates the cache and then unconditionally reloads entries with cache->get(key) (without checking enable_reads). That rewrites the plan to consume cached states even though reads were explicitly disabled.
This violates setting semantics and can unexpectedly change execution behavior/perf for users who only want warmup writes.
Suggested fix: in the refill loop after populatePartAggregationCache, keep honoring enable_reads (or skip cache-based plan rewrite entirely when reads are disabled).
…eload and performance test
| auto new_entry = std::make_shared<Entry>(Entry{.block = std::move(block)}); | ||
| size_t entry_bytes = new_entry->sizeInBytes(); | ||
|
|
||
| if (entry_bytes > max_size_in_bytes) |
There was a problem hiding this comment.
PartAggregationCache::set reads max_size_in_bytes before taking mutex:
if (entry_bytes > max_size_in_bytes)
return;At the same time, updateConfiguration writes max_size_in_bytes under the same mutex. This unsynchronized read/write pair is a data race (UB under TSAN/C++ memory model).
Please move the size check under the lock (or make max_size_in_bytes atomic). Keeping all accesses under mutex is the simplest fix.
| { | ||
| SipHash hash; | ||
|
|
||
| Names sorted_keys = keys; |
There was a problem hiding this comment.
calculateQueryHash canonicalizes both GROUP BY keys and aggregate descriptors by sorting them, but cache entries are stored/consumed in the original AggregatingStep column order. This can alias different query shapes into the same cache key.
Concrete failure mode:
- Query A:
GROUP BY k1, k2caches blocks in[k1, k2, agg_state...]order. - Query B:
GROUP BY k2, k1gets the same hash (because keys are sorted). PartAggregationCacheSourcereturns A's column positions into B'sintermediate_header([k2, k1, ...]), so key columns are swapped positionally.
That can produce wrong results (or hard-to-debug type/position mismatches when key types differ). The cache key needs to preserve the exact aggregation layout contract used by the pipeline (key order + aggregate order), not a sorted canonical form.
| DECLARE(Bool, allow_experimental_part_aggregation_cache, false, R"( | ||
| If turned on, intermediate per-part aggregation states for `GROUP BY` queries over `MergeTree` tables are cached. | ||
| On subsequent executions of the same query, cached states are reused for parts that still exist, and only new or changed parts need to be aggregated from scratch. | ||
| This is useful for time-series workloads where older data is immutable and only recent data changes. |
There was a problem hiding this comment.
allow_experimental_part_aggregation_cache indicates this feature is still experimental. The PR template currently uses Changelog category: New Feature, but this should be Experimental Feature until the rollout is GA.
Please update the PR description category to Experimental Feature.
LLVM Coverage Report
Changed lines: 82.48% (513/622) · Uncovered code |
Cache intermediate per-part aggregation states in an LRU cache. On repeated GROUP BY queries, reuse cached states for unchanged MergeTree parts and only aggregate new ones. Uses the existing
AggregatingProjectionStepmechanism to merge cached and fresh results.Changelog category
New Feature
Changelog entry
Add per-part aggregation cache for incremental GROUP BY over MergeTree tables.