Skip to content

Auto spilling join#97813

Open
antaljanosbenjamin wants to merge 50 commits intomasterfrom
auto-spilling-join
Open

Auto spilling join#97813
antaljanosbenjamin wants to merge 50 commits intomasterfrom
auto-spilling-join

Conversation

@antaljanosbenjamin
Copy link
Copy Markdown
Member

@antaljanosbenjamin antaljanosbenjamin commented Feb 23, 2026

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Add automatic spilling to hash and parallel hash joins by converting them to grace hash join when memory limit is reached. This behavior is controlled by max_bytes_before_external_join.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Note

Medium Risk
Introduces a new join implementation that can switch from in-memory hash joins to disk-spilling GraceHashJoin at runtime, affecting core query execution and concurrency paths. Risk is mitigated by being opt-in via max_bytes_before_external_join plus extensive pipeline/test coverage, but regressions could impact join correctness/performance and read-in-order queries.

Overview
Adds opt-in automatic spilling for hash/parallel_hash/default/auto joins via new setting max_bytes_before_external_join; when the right side exceeds the threshold, the join converts to GraceHashJoin to spill to disk (new SpillingHashJoin).

Updates join selection (analyzer + planner) to instantiate SpillingHashJoin, extends IJoin with keepLeftPipelineInOrder() and canProcessNonJoinedBlocksInParallel(), and adjusts pipeline wiring so delayed-join blocks and parallel non-joined streams can coexist safely.

Improves correctness/observability around spilling by enabling per-slot block extraction for ConcurrentHashJoin, filtering released blocks to avoid duplication, adding new join profile events (non-joined/delayed block counts + spill count), fixing a couple dictionary exception messages, and adding/adjusting stateless tests (including a new 03915_spilling_hash_join suite and a read-in-order negative case).

Written by Cursor Bugbot for commit db5f199. This will update automatically on new commits. Configure here.

@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Feb 23, 2026

Workflow [PR], commit [1df7b04]

Summary:

job_name test_name status info comment
Stateless tests (amd_debug, flaky check) failure
03668_shard_join_in_reverse_order FAIL cidb, issue
Integration tests (amd_llvm_coverage, 2/5) failure
test_overcommit_tracker/test.py::test_user_overcommit FAIL cidb
Stress test (amd_tsan) failure
Server died FAIL cidb
Logical error: Shard number is greater than shard count: shard_num=A shard_count=B cluster=C (STID: 5066-457d) FAIL cidb
Stress test (amd_msan) failure
Server died FAIL cidb
Logical error: Shard number is greater than shard count: shard_num=A shard_count=B cluster=C (STID: 5066-457d) FAIL cidb
Stress test (arm_msan) failure
Server died FAIL cidb
MemorySanitizer: use-of-uninitialized-value (STID: 2410-543a) FAIL cidb
Stress test (arm_ubsan) failure
Server died FAIL cidb
Logical error: Shard number is greater than shard count: shard_num=A shard_count=B cluster=C (STID: 5066-457d) FAIL cidb
AST fuzzer (amd_tsan) failure
ThreadSanitizer (STID: 1282-2eb7) FAIL cidb

AI Review

Summary

This PR introduces automatic spilling for hash/parallel_hash joins by switching to GraceHashJoin when max_bytes_before_external_join is exceeded. The feature direction is valuable, but there are correctness/concurrency issues in SpillingHashJoin plus PR metadata/docs quality gaps that should be fixed before merge.

Missing context
  • ⚠️ No CI logs or benchmark reports were provided in this review context, so performance/regression validation is limited to code inspection.
Findings

❌ Blockers

  • [src/Interpreters/SpillingHashJoin.cpp:222] onBuildPhaseFinish updates chosen_join/state without synchronizing with switchToGraceHashJoin, which does transition under switch_mutex. This permits a race where a spill transition is overwritten by in-memory promotion, causing inconsistent join state.
    Suggested fix: guard concurrent branch of onBuildPhaseFinish with switch_mutex and re-check state under the lock before writing chosen_join/state.
  • [src/Interpreters/SpillingHashJoin.cpp:123] During COLLECTING, limit checks can be bypassed in underlying hash join insertion path, changing behavior versus regular hash/parallel_hash joins and potentially ignoring max_rows_in_join/max_bytes_in_join semantics before spill happens.
    Suggested fix: preserve equivalent JOIN size-limit checks for the in-memory collecting phase (or explicitly enforce them in SpillingHashJoin).
  • [src/Interpreters/SpillingHashJoin.h:22] The implementation still documents unresolved WITH TOTALS correctness via TODO/workarounds in tests, but selection is not hard-disabled for totals scenarios. This can expose known incorrect behavior when spilling is enabled.
    Suggested fix: reject SpillingHashJoin for totals queries during algorithm selection (or throw explicit NOT_IMPLEMENTED when totals are requested with auto-spill).

💡 Nits

  • [src/Core/Settings.cpp:7513] PR text/changelog references a non-existent setting name (enable_auto_spilling_hash_join) while implementation uses max_bytes_before_external_join.
    Suggested fix: replace changelog wording with the actual setting name.
  • [src/Core/Settings.cpp:7513] New user-facing setting behavior is introduced without corresponding documentation update (Documentation is written remains unchecked).
    Suggested fix: add docs describing semantics, constraints, and migration/rollout guidance for max_bytes_before_external_join.
Tests
  • ⚠️ Add/keep dedicated tests for WITH TOTALS + non-zero max_bytes_before_external_join to verify guarded behavior (either explicit rejection or correct semantics) rather than relying on disabling spilling in test setup.
  • ⚠️ Add a regression test for concurrent spill transition race (onBuildPhaseFinish vs spill switch) to ensure single, synchronized state transition.
  • ⚠️ Add a regression test proving JOIN size limits (max_rows_in_join/max_bytes_in_join with join_overflow_mode) are still enforced on the in-memory path before spill transition.
ClickHouse Rules
Item Status Notes
Deletion logging
Serialization versioning
Core-area scrutiny Concurrency/state-transition bug in core join path (SpillingHashJoin).
No test removal
Experimental gate
No magic constants
Backward compatibility ⚠️ JOIN limit semantics can change under auto-spill path if checks are bypassed.
SettingsChangesHistory.cpp
PR metadata quality ⚠️ Changelog text references wrong setting name; docs requirement not satisfied.
Safe rollout ⚠️ Known totals limitation is not hard-gated in algorithm selection.
Compilation time
Performance & Safety
  • Concurrency safety concern: unsynchronized state transition between build-finish promotion and spill switch can lead to inconsistent join implementation selection at runtime.
  • Safety/behavior concern: in-memory phase can diverge from established JOIN overflow semantics if limits are not equivalently enforced.
User-Lens
  • Users are likely to be confused by changelog/docs mismatch around setting name and by undocumented constraints (especially totals behavior and overflow semantics under auto-spill).
Final Verdict
  • Status: ⚠️ Request changes
  • Minimum required actions:
    • Fix synchronization/state transition race in SpillingHashJoin build-finish path.
    • Ensure JOIN overflow limits are preserved with auto-spilling enabled.
    • Hard-gate or explicitly reject unsupported totals scenarios for SpillingHashJoin.
    • Correct changelog setting name and add user documentation for max_bytes_before_external_join.

@clickhouse-gh clickhouse-gh bot added the pr-improvement Pull request with some product improvements label Feb 23, 2026
throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPipeline is already completed");
}

static void checkSource(const ProcessorPtr & source, bool can_have_totals)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions were simply unused, so I removed them.

Copy link
Copy Markdown
Member

@alexey-milovidov alexey-milovidov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not merge until investigating and fixing the bug in transactions:

Logical error: 'txn->getState() != MergeTreeTransaction::COMMITTED' (STID: 2508-2b69)

@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

I fixed the issue that you introduced with server side fuzzing. Now please unblock the PR.

antaljanosbenjamin and others added 5 commits February 25, 2026 14:22
This way we still do the conversion on multiple threads while also having all allocated memory in a single place and not shared in two join algorithms (concurrent hash and grace hash).
@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

optimizeJoinLegacy is depending on HashJoin, check if it is a problem.

Comment on lines +360 to +364
M(JoinNonJoinedTransformBlockCount, "Number of blocks emitted by NonJoinedBlocksTransform.", ValueType::Number) \
M(JoinNonJoinedTransformRowCount, "Number of non-joined rows emitted by NonJoinedBlocksTransform.", ValueType::Number) \
M(JoinDelayedJoinedTransformBlockCount, "Number of blocks emitted by DelayedJoinedBlocksWorkerTransform.", ValueType::Number) \
M(JoinDelayedJoinedTransformRowCount, "Number of rows emitted by DelayedJoinedBlocksWorkerTransform.", ValueType::Number) \
M(JoinSpilledToDisk, "Number of times a hash join was switched to GraceHashJoin due to memory limit.", ValueType::Number) \
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% happy about these names and metrics, but this is the best I could come up with.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to be more explicit about the emitter of the metric, i.e. the SpillingHashJoin? If yes, an alternative could be to use SpillingHashJoinSwitchedToGraceJoin instead of JoinSpilledToDisk. Is this what you are after?

/// The decision should be done at latest in onBuildPhaseFinish, after that the returned value should not change.
/// This is important for SpillingHashJoin, which can change algorithms runtime, and parallel non-joined blocks
/// processing depends on the algorithm used.
virtual bool canProcessNonJoinedBlocksInParallel() const { return supportParallelNonJoinedBlocksProcessing(); }
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also not happy about the naming here: support vs can is not clear.

The first one supposed to mean whether the regarding processors (NonJoinedBlocksTransform) should be included in the pipeline or not.

The second one supposed to mean whether those processors can be actually used during execution. Decision is made latest on onBuildPhaseFinished.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about adding the term Now or RightNow to make it more clear that it's a runtime decision that may change: canProcessNonJoinedBlocksInParallelNow()

Or differentiating it from supportParallelNonJoinedBlocksProcessing() by emphasizing that it may request enabling/disabling the NonJoinedBlocksTransform at runtime: isParallelNonJoinedProcessingEnabled()

@antaljanosbenjamin antaljanosbenjamin requested a review from cv4g March 23, 2026 13:42
@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

My attempt to fix the read buffer related issue #100732

if (state.load(std::memory_order_acquire) != State::COLLECTING)
return chosen_join->addBlockToJoin(block, check_limits);

concurrent_join->addBlockToJoin(block, /*check_limits=*/false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SpillingHashJoin::addBlockToJoin ignores check_limits while still in COLLECTING mode (ConcurrentHashJoin::addBlockToJoin(..., false) / HashJoin::addBlockToJoin(..., false)).

This changes semantics vs regular hash / parallel_hash: with max_bytes_before_external_join enabled but set high enough that no switch happens, max_rows_in_join / max_bytes_in_join (join_overflow_mode) are no longer enforced during build, so queries that should stop on JOIN size limits can now proceed in-memory.

Please preserve limit checks on the in-memory path (or explicitly apply equivalent checks in SpillingHashJoin) so enabling auto-spill does not silently disable JOIN size-limit enforcement.

@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

The flaky test check failed because of long running query. I used time ./clickhouse-test 03580_join_runtime_filter --test-runs 1000 -j 30 to measure the test time and it looked fine:

Master:
real    1m27.366s
user    10m39.690s
sys     5m14.201s

this branch:
real    1m27.177s
user    10m37.015s
sys     5m17.897s

So I think the only problem is the huge amount of test runs (1000+) run in the flaky test check.

@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

antaljanosbenjamin commented Mar 31, 2026

Finally with Claude I managed to figure out why 00163_shard_join_with_empty_table with empty table fails. In short, the added Resize can mess up the order of the blocks, which means the problem is also present for GraceHash join in master, example (it might need to be run a few times to produce wrong result as it is basically a datarace).

I misread the query, there LIMIT is applied before the ORDER BY, so the output is only consistent because the normal hash join is single threaded.

)", 0) \
DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"(
Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \
DECLARE(UInt64, max_bytes_before_external_join, 0, R"(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says this feature is controlled by enable_auto_spilling_hash_join, but the implemented setting is max_bytes_before_external_join (this line). Please update the changelog entry to use the correct setting name, otherwise users will try a non-existent setting.

)", 0) \
DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"(
Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \
DECLARE(UInt64, max_bytes_before_external_join, 0, R"(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This introduces a new user-facing join setting (max_bytes_before_external_join), but the PR template still has Documentation is written unchecked and there are no docs updates in this PR. Please add docs for semantics/usage and migration guidance for users enabling this setting.

DECLARE(NonZeroUInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
DECLARE(NonZeroUInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
\
DECLARE(UInt64, max_bytes_before_external_join, 0, "If set to a non-zero value and `join_algorithm` is `hash`, `parallel_hash`, `default`, or `auto`, the hash join will automatically be converted to grace hash join to enable spilling to disk when the right-side data exceeds this many bytes. When set to 0 (default), automatic spilling is disabled.", 0) \
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not forget to provide a reasonable value (e.g. half of the pod main memory capacity) at dataplane side.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, if in the current state there is any meaningful default value. In the future I am planning to fine tune the settings to consider multiple things, like total memory, query memory limit, available memory, etc.

@rschu1ze rschu1ze mentioned this pull request Mar 31, 2026
72 tasks
@clickhouse-gh
Copy link
Copy Markdown
Contributor

clickhouse-gh bot commented Mar 31, 2026

LLVM Coverage Report

Metric Baseline Current Δ
Lines 84.10% 84.00% -0.10%
Functions 90.90% 90.90% +0.00%
Branches 76.60% 76.60% +0.00%

Changed lines: 82.66% (510/617) · Uncovered code

Full report · Diff report

@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

03668_shard_join_in_reverse_order is fixed by #101456
test_user_overcommit is fixed by #101270

@antaljanosbenjamin
Copy link
Copy Markdown
Member Author

The parallel replicas related issues are fixed here #101154

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants