Conversation
|
Workflow [PR], commit [1df7b04] Summary: ❌
AI ReviewSummaryThis PR introduces automatic spilling for Missing context
Findings❌ Blockers
💡 Nits
Tests
ClickHouse Rules
Performance & Safety
User-Lens
Final Verdict
|
| throw Exception(ErrorCodes::LOGICAL_ERROR, "QueryPipeline is already completed"); | ||
| } | ||
|
|
||
| static void checkSource(const ProcessorPtr & source, bool can_have_totals) |
There was a problem hiding this comment.
These functions were simply unused, so I removed them.
alexey-milovidov
left a comment
There was a problem hiding this comment.
Do not merge until investigating and fixing the bug in transactions:
Logical error: 'txn->getState() != MergeTreeTransaction::COMMITTED' (STID: 2508-2b69)
|
I fixed the issue that you introduced with server side fuzzing. Now please unblock the PR. |
This way we still do the conversion on multiple threads while also having all allocated memory in a single place and not shared in two join algorithms (concurrent hash and grace hash).
|
|
src/Common/ProfileEvents.cpp
Outdated
| M(JoinNonJoinedTransformBlockCount, "Number of blocks emitted by NonJoinedBlocksTransform.", ValueType::Number) \ | ||
| M(JoinNonJoinedTransformRowCount, "Number of non-joined rows emitted by NonJoinedBlocksTransform.", ValueType::Number) \ | ||
| M(JoinDelayedJoinedTransformBlockCount, "Number of blocks emitted by DelayedJoinedBlocksWorkerTransform.", ValueType::Number) \ | ||
| M(JoinDelayedJoinedTransformRowCount, "Number of rows emitted by DelayedJoinedBlocksWorkerTransform.", ValueType::Number) \ | ||
| M(JoinSpilledToDisk, "Number of times a hash join was switched to GraceHashJoin due to memory limit.", ValueType::Number) \ |
There was a problem hiding this comment.
I am not 100% happy about these names and metrics, but this is the best I could come up with.
There was a problem hiding this comment.
Would it be better to be more explicit about the emitter of the metric, i.e. the SpillingHashJoin? If yes, an alternative could be to use SpillingHashJoinSwitchedToGraceJoin instead of JoinSpilledToDisk. Is this what you are after?
src/Interpreters/IJoin.h
Outdated
| /// The decision should be done at latest in onBuildPhaseFinish, after that the returned value should not change. | ||
| /// This is important for SpillingHashJoin, which can change algorithms runtime, and parallel non-joined blocks | ||
| /// processing depends on the algorithm used. | ||
| virtual bool canProcessNonJoinedBlocksInParallel() const { return supportParallelNonJoinedBlocksProcessing(); } |
There was a problem hiding this comment.
Also not happy about the naming here: support vs can is not clear.
The first one supposed to mean whether the regarding processors (NonJoinedBlocksTransform) should be included in the pipeline or not.
The second one supposed to mean whether those processors can be actually used during execution. Decision is made latest on onBuildPhaseFinished.
There was a problem hiding this comment.
What about adding the term Now or RightNow to make it more clear that it's a runtime decision that may change: canProcessNonJoinedBlocksInParallelNow()
Or differentiating it from supportParallelNonJoinedBlocksProcessing() by emphasizing that it may request enabling/disabling the NonJoinedBlocksTransform at runtime: isParallelNonJoinedProcessingEnabled()
|
My attempt to fix the read buffer related issue #100732 |
| if (state.load(std::memory_order_acquire) != State::COLLECTING) | ||
| return chosen_join->addBlockToJoin(block, check_limits); | ||
|
|
||
| concurrent_join->addBlockToJoin(block, /*check_limits=*/false); |
There was a problem hiding this comment.
SpillingHashJoin::addBlockToJoin ignores check_limits while still in COLLECTING mode (ConcurrentHashJoin::addBlockToJoin(..., false) / HashJoin::addBlockToJoin(..., false)).
This changes semantics vs regular hash / parallel_hash: with max_bytes_before_external_join enabled but set high enough that no switch happens, max_rows_in_join / max_bytes_in_join (join_overflow_mode) are no longer enforced during build, so queries that should stop on JOIN size limits can now proceed in-memory.
Please preserve limit checks on the in-memory path (or explicitly apply equivalent checks in SpillingHashJoin) so enabling auto-spill does not silently disable JOIN size-limit enforcement.
|
The flaky test check failed because of long running query. I used So I think the only problem is the huge amount of test runs (1000+) run in the flaky test check. |
|
I misread the query, there LIMIT is applied before the ORDER BY, so the output is only consistent because the normal hash join is single threaded. |
| )", 0) \ | ||
| DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"( | ||
| Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \ | ||
| DECLARE(UInt64, max_bytes_before_external_join, 0, R"( |
There was a problem hiding this comment.
The PR description says this feature is controlled by enable_auto_spilling_hash_join, but the implemented setting is max_bytes_before_external_join (this line). Please update the changelog entry to use the correct setting name, otherwise users will try a non-existent setting.
| )", 0) \ | ||
| DECLARE(UInt64, archive_adaptive_buffer_max_size_bytes, 8 * DBMS_DEFAULT_BUFFER_SIZE, R"( | ||
| Limits the maximum size of the adaptive buffer used when writing to archive files (for example, tar archives)", 0) \ | ||
| DECLARE(UInt64, max_bytes_before_external_join, 0, R"( |
There was a problem hiding this comment.
This introduces a new user-facing join setting (max_bytes_before_external_join), but the PR template still has Documentation is written unchecked and there are no docs updates in this PR. Please add docs for semantics/usage and migration guidance for users enabling this setting.
| DECLARE(NonZeroUInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \ | ||
| DECLARE(NonZeroUInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \ | ||
| \ | ||
| DECLARE(UInt64, max_bytes_before_external_join, 0, "If set to a non-zero value and `join_algorithm` is `hash`, `parallel_hash`, `default`, or `auto`, the hash join will automatically be converted to grace hash join to enable spilling to disk when the right-side data exceeds this many bytes. When set to 0 (default), automatic spilling is disabled.", 0) \ |
There was a problem hiding this comment.
We should not forget to provide a reasonable value (e.g. half of the pod main memory capacity) at dataplane side.
There was a problem hiding this comment.
I am not sure, if in the current state there is any meaningful default value. In the future I am planning to fine tune the settings to consider multiple things, like total memory, query memory limit, available memory, etc.
LLVM Coverage Report
Changed lines: 82.66% (510/617) · Uncovered code |
|
The parallel replicas related issues are fixed here #101154 |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Add automatic spilling to hash and parallel hash joins by converting them to grace hash join when memory limit is reached. This behavior is controlled by
max_bytes_before_external_join.Documentation entry for user-facing changes
Note
Medium Risk
Introduces a new join implementation that can switch from in-memory hash joins to disk-spilling
GraceHashJoinat runtime, affecting core query execution and concurrency paths. Risk is mitigated by being opt-in viamax_bytes_before_external_joinplus extensive pipeline/test coverage, but regressions could impact join correctness/performance and read-in-order queries.Overview
Adds opt-in automatic spilling for
hash/parallel_hash/default/autojoins via new settingmax_bytes_before_external_join; when the right side exceeds the threshold, the join converts toGraceHashJointo spill to disk (newSpillingHashJoin).Updates join selection (analyzer + planner) to instantiate
SpillingHashJoin, extendsIJoinwithkeepLeftPipelineInOrder()andcanProcessNonJoinedBlocksInParallel(), and adjusts pipeline wiring so delayed-join blocks and parallel non-joined streams can coexist safely.Improves correctness/observability around spilling by enabling per-slot block extraction for
ConcurrentHashJoin, filtering released blocks to avoid duplication, adding new join profile events (non-joined/delayed block counts + spill count), fixing a couple dictionary exception messages, and adding/adjusting stateless tests (including a new03915_spilling_hash_joinsuite and a read-in-order negative case).Written by Cursor Bugbot for commit db5f199. This will update automatically on new commits. Configure here.