Skip to content

Add per-part aggregation cache for incremental GROUP BY#101073

Open
sociopate-sss wants to merge 23 commits intoClickHouse:masterfrom
sociopate-sss:sociopate-sss/query_cache_for_partial_results
Open

Add per-part aggregation cache for incremental GROUP BY#101073
sociopate-sss wants to merge 23 commits intoClickHouse:masterfrom
sociopate-sss:sociopate-sss/query_cache_for_partial_results

Conversation

@sociopate-sss
Copy link
Copy Markdown

Cache intermediate per-part aggregation states in an LRU cache. On repeated GROUP BY queries, reuse cached states for unchanged MergeTree parts and only aggregate new ones. Uses the existing AggregatingProjectionStep mechanism to merge cached and fresh results.

Changelog category

New Feature

Changelog entry

Add per-part aggregation cache for incremental GROUP BY over MergeTree tables.

@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@alexey-milovidov alexey-milovidov added the can be tested Allows running workflows for external contributors label Mar 29, 2026
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Mar 29, 2026

Workflow [PR], commit [57f3752]

Summary:

job_name test_name status info comment
Stateless tests (amd_debug, parallel) failure
03668_shard_join_in_reverse_order FAIL cidb, issue
Integration tests (amd_asan_ubsan, db disk, old analyzer, 2/6) failure
test_backup_restore_s3/test.py::test_backup_to_s3_different_credentials[data_file_name_from_first_file_name-native_multipart] FAIL cidb
Stress test (amd_debug) failure
Logical error: Shard number is greater than shard count: shard_num=A shard_count=B cluster=C (STID: 5066-564d) FAIL cidb
Stress test (amd_msan) failure
Logical error: No available columns (STID: 3938-33a6) FAIL cidb
Performance Comparison (arm_release, master_head, 4/6) failure
Check Results failure

AI Review

Summary

This PR introduces an experimental per-part aggregation cache for GROUP BY over MergeTree, plus plan rewrites that mix cached and fresh per-part states. High-level verdict: request changes. The implementation is promising, but there are correctness-critical cache-key issues (already flagged inline by clickhouse-gh[bot]) and one additional PR metadata mismatch: the feature is still explicitly experimental (allow_experimental_part_aggregation_cache) but the PR uses Changelog category: New Feature.

PR Metadata
  • ⚠️ Changelog category is not semantically correct for the current rollout stage.
    • Current: New Feature
    • Recommended replacement: Experimental Feature
  • Changelog entry is present and user-readable: Add per-part aggregation cache for incremental GROUP BY over MergeTree tables.
  • Entry quality is generally good (specific feature + affected scope). No migration guidance needed because this is additive and gated.
Findings

❌ Blockers

  • [src/Interpreters/Cache/PartAggregationCache.cpp:20-23, 25-33, 168-172] query_hash canonicalizes key/aggregate order via sorting, but cached blocks are consumed positionally by AggregatingStep headers. Different key orders can alias to the same hash and reuse incompatible cached layouts, leading to incorrect results.
    Suggested fix: preserve exact aggregation layout (key order + aggregate order) in calculateQueryHash; do not sort these vectors for hashing.

  • [src/Interpreters/Cache/PartAggregationCache.cpp:42-47; src/Processors/QueryPlan/Optimizations/optimizeUsePartAggregationCache.cpp:156-161] Hashing only output names for filter/intermediate DAGs is insufficient; different expressions with identical output names can collide and return wrong cached states.
    Suggested fix: hash full DAG semantics (e.g., ActionsDAG::updateHash) for prewhere/filter/intermediate actions.

💡 Nits

  • [src/Core/Settings.cpp:5388] PR template category should be Experimental Feature (not New Feature) while allow_experimental_part_aggregation_cache remains required.
Tests

⚠️ Add targeted regression coverage for hash-collision scenarios:

  • Queries that differ only by filter expression but keep the same filter output name.
  • Same aggregates with permuted GROUP BY key order to verify cache-key layout safety.
  • Different intermediate ExpressionStep DAGs that preserve output names.
ClickHouse Rules
Item Status Notes
Deletion logging
Serialization versioning
Core-area scrutiny
No test removal
Experimental gate
No magic constants
Backward compatibility ⚠️ Cache-key collisions can return incorrect historical states for semantically different query shapes.
SettingsChangesHistory.cpp
PR metadata quality ⚠️ Category should be Experimental Feature while feature is behind allow_experimental_part_aggregation_cache.
Safe rollout ⚠️ Correctness depends on fixing cache-key collisions before rollout confidence increases.
Compilation time
Performance & Safety
  • Correctness/safety risk is concentrated in cache-key construction: collisions can silently reuse incompatible aggregate states and return wrong query results.
Final Verdict
  • Status: ⚠️ Request changes
  • Minimum required actions:
    1. Fix cache-key hashing to include full semantic identity of filter/intermediate DAGs and preserve aggregation layout order.
    2. Add regression tests for the identified collision classes.
    3. Update PR Changelog category to Experimental Feature.

@clickhouse-gh clickhouse-gh bot added the pr-feature Pull request with new product feature label Mar 29, 2026
{
public:
struct Key
{
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartAggregationCache::Key does not include table identity (for example StorageID/UUID), only query_hash + part_name. MergeTree part names are not globally unique across tables (e.g. many tables have all_1_1_0), so the same GROUP BY shape on two tables can produce the same key and return cached states from the wrong table, which can return incorrect results.

Please include table identity in the cache key and in invalidation bookkeeping (e.g. {query_hash, storage_uuid, part_name}).

removeEntry(evict_key);
}

void PartAggregationCache::removeEntry(const Key & key)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeEntry erases from cache and lru_list, but it never removes the corresponding key from part_name_to_keys. Because set always does part_name_to_keys[key.part_name].push_back(key), this auxiliary map grows monotonically even when entries are evicted, so cache metadata memory is unbounded by max_size_in_bytes.

Please delete the key from part_name_to_keys inside removeEntry (and erase empty vectors) so metadata lifetime matches cache entry lifetime.

prewhere->prewhere_column_name});
}

IASTHash query_hash = PartAggregationCache::calculateQueryHash(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query_hash currently ignores intermediate_actions (the ExpressionStep/FilterStep chain between ReadFromMergeTree and AggregatingStep).

So two queries with the same GROUP BY keys/aggregates but different WHERE expressions can map to the same cache key and reuse incompatible per-part aggregate states. That can return incorrect results.

Please include all intermediate transformation/filter logic in the cache key (for example, hash the full action DAGs, not only prewhere outputs).

Possible values:

- Any string
)", 0) \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a new query-execution optimization path (rewriting aggregation plans to mix cached and fresh per-part states), but the rollout is guarded by a regular setting use_part_aggregation_cache instead of an experimental gate.

For new high-risk execution behavior, ClickHouse usually requires an explicit allow_experimental_* gate until correctness is proven in production-like workloads.

Please gate this with an experimental setting (for example allow_experimental_part_aggregation_cache) and keep use_part_aggregation_cache as an additional opt-in knob behind it.

return false;
break;
}
case Type::CLEAR_PART_AGGREGATION_CACHE:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type::CLEAR_PART_AGGREGATION_CACHE does not parse ON CLUSTER, unlike other SYSTEM CLEAR ... CACHE commands.

Right now this case just breaks, so statements like SYSTEM DROP PART AGGREGATION CACHE ON CLUSTER 'cluster' leave trailing tokens and fail to parse. This also makes behavior inconsistent with getRequiredAccessForDDLOnCluster, which already includes CLEAR_PART_AGGREGATION_CACHE.

Please call parseQueryWithOnCluster(res, pos, expected) here (same pattern as CLEAR_QUERY_CACHE / CLEAR_FILESYSTEM_CACHE) so cluster execution works.


/// Part aggregation cache: caches per-part intermediate aggregation states for partial GROUP BY reuse.
{
size_t part_aggregation_cache_max_size = server_settings[ServerSetting::part_aggregation_cache_max_size_in_bytes];
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initialization is good, but there is a config-reload gap: Context::updatePartAggregationCacheConfiguration is added yet never called from the reload path in Server::main (the block that updates other caches around updateQueryResultCacheConfiguration / updateQueryConditionCacheConfiguration).

Impact: changing part_aggregation_cache.max_size_in_bytes in config.xml and reloading config will not apply until restart, unlike neighboring cache settings.

Please add global_context->updatePartAggregationCacheConfiguration(config()); in the reload update block.

query_hash = getSipHash128AsPair(extra_hash);
}

String table_id = reading->getMergeTreeData().getStorageID().getFullTableName();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using getStorageID().getFullTableName() as cache identity is still unsafe across DROP TABLE + CREATE TABLE with the same name.

MergeTree part names restart from all_1_1_0, and this cache is global plus not invalidated on table drop, so a recreated table can hit stale cached states from the previous table instance and return incorrect results.

Please key by stable table identity (UUID for Atomic databases, fallback only when UUID is absent), e.g. include StorageID::uuid in PartAggregationCache::Key.


bool enable_writes = settings[Setting::enable_writes_to_part_aggregation_cache];

if (cached_entries.empty() && enable_writes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache population is gated by if (cached_entries.empty() && enable_writes), so writes happen only on fully cold cache.

When cache is partially warm (some hits + some misses, e.g. after new parts are inserted), uncached_parts are processed by the query path but never written back to PartAggregationCache. This means those parts stay uncached forever unless the whole cache becomes empty, so the optimization cannot incrementally learn new parts.

Please also populate cache for uncached_parts when enable_writes is enabled (not only when cached_entries.empty()).

for (const auto & part : parts)
{
PartAggregationCache::Key key{query_hash, table_id, part.data_part->name};
auto entry = cache->get(key);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enable_reads_from_part_aggregation_cache is bypassed on cold cache.

When reads are disabled, the first loop correctly does entry = nullptr and marks all parts as uncached. But if writes are enabled, the cold-cache branch populates the cache and then unconditionally reloads entries with cache->get(key) (without checking enable_reads). That rewrites the plan to consume cached states even though reads were explicitly disabled.

This violates setting semantics and can unexpectedly change execution behavior/perf for users who only want warmup writes.

Suggested fix: in the refill loop after populatePartAggregationCache, keep honoring enable_reads (or skip cache-based plan rewrite entirely when reads are disabled).

auto new_entry = std::make_shared<Entry>(Entry{.block = std::move(block)});
size_t entry_bytes = new_entry->sizeInBytes();

if (entry_bytes > max_size_in_bytes)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PartAggregationCache::set reads max_size_in_bytes before taking mutex:

if (entry_bytes > max_size_in_bytes)
    return;

At the same time, updateConfiguration writes max_size_in_bytes under the same mutex. This unsynchronized read/write pair is a data race (UB under TSAN/C++ memory model).

Please move the size check under the lock (or make max_size_in_bytes atomic). Keeping all accesses under mutex is the simplest fix.

{
SipHash hash;

Names sorted_keys = keys;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculateQueryHash canonicalizes both GROUP BY keys and aggregate descriptors by sorting them, but cache entries are stored/consumed in the original AggregatingStep column order. This can alias different query shapes into the same cache key.

Concrete failure mode:

  1. Query A: GROUP BY k1, k2 caches blocks in [k1, k2, agg_state...] order.
  2. Query B: GROUP BY k2, k1 gets the same hash (because keys are sorted).
  3. PartAggregationCacheSource returns A's column positions into B's intermediate_header ([k2, k1, ...]), so key columns are swapped positionally.

That can produce wrong results (or hard-to-debug type/position mismatches when key types differ). The cache key needs to preserve the exact aggregation layout contract used by the pipeline (key order + aggregate order), not a sorted canonical form.

DECLARE(Bool, allow_experimental_part_aggregation_cache, false, R"(
If turned on, intermediate per-part aggregation states for `GROUP BY` queries over `MergeTree` tables are cached.
On subsequent executions of the same query, cached states are reused for parts that still exist, and only new or changed parts need to be aggregated from scratch.
This is useful for time-series workloads where older data is immutable and only recent data changes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allow_experimental_part_aggregation_cache indicates this feature is still experimental. The PR template currently uses Changelog category: New Feature, but this should be Experimental Feature until the rollout is GA.

Please update the PR description category to Experimental Feature.

@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Mar 29, 2026

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.10% 84.10% +0.00%
Functions 90.90% 90.90% +0.00%
Branches 76.70% 76.70% +0.00%

Changed lines: 82.48% (513/622) · Uncovered code

Full report · Diff report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-feature Pull request with new product feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants