Properly remove unneeded columns for lazy materialization.#96682
Properly remove unneeded columns for lazy materialization.#96682KochetovNicolai wants to merge 12 commits intomasterfrom
Conversation
|
Workflow [PR], commit [7698acd] Summary: ⏳
AI ReviewSummaryThis PR fixes lazy materialization column propagation by tracking required DAG outputs separately from step-input availability, then splitting Missing context
ClickHouse Rules
Final Verdict
|
…WHERE and parallel replicas" This reverts commit 10f0f02.
ebeb5e2 to
e1ebc73
Compare
There was a problem hiding this comment.
Pull request overview
This PR fixes a Block structure mismatch in stream error caused by lazy materialization returning unnecessary columns. The issue occurred when PREWHERE added extra columns that weren't consumed by subsequent operations, particularly in parallel replica scenarios.
Changes:
- Refactored the lazy materialization optimization to properly track and remove unneeded columns
- Introduced
available_input_positionsto track which columns are actually available after filtering - Added logic to transitively determine required dependencies in the ActionsDAG
Reviewed changes
Copilot reviewed 2 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
| tests/queries/0_stateless/03814_lazy_materialization_unneded_columns_bug.sql | Adds regression test for the lazy materialization bug with parallel replicas |
| src/Processors/QueryPlan/Optimizations/optimizeLazyMaterialization.cpp | Refactors column tracking logic to properly handle available vs required columns and removes workaround projection step |
| SELECT length(thread_ids) > 0 | ||
| FROM system.query_log | ||
| WHERE (current_database = currentDatabase()) AND (event_date >= (today() - 1)) AND (lower(query) LIKE '%abcd%') AND (type = 'QueryFinish') | ||
| ORDER BY query_start_time DESC | ||
| LIMIT 1 | ||
| SETTINGS enable_parallel_replicas = 1, query_plan_optimize_lazy_materialization = 1, max_parallel_replicas = 2, cluster_for_parallel_replicas = 'parallel_replicas', parallel_replicas_for_non_replicated_merge_tree = 1 |
There was a problem hiding this comment.
Corrected spelling of 'unneded' to 'unneeded' in filename.
| } | ||
| } | ||
|
|
||
| /// requred_outputs are outputs of ActionsDAG, however required_inputs are inputs corresponding to the step input header. |
There was a problem hiding this comment.
Corrected spelling of 'requred' to 'required'.
| /// requred_outputs are outputs of ActionsDAG, however required_inputs are inputs corresponding to the step input header. | |
| /// required_outputs are outputs of ActionsDAG, however required_inputs are inputs corresponding to the step input header. |
| // std::cerr << "split nodes: " << split_nodes.size() << std::endl; | ||
| // for (const auto * node : split_nodes) | ||
| // std::cerr << " " << node->result_name << std::endl; | ||
|
|
There was a problem hiding this comment.
Remove commented-out debugging code before merging to production.
| // std::cerr << "split nodes: " << split_nodes.size() << std::endl; | |
| // for (const auto * node : split_nodes) | |
| // std::cerr << " " << node->result_name << std::endl; |
| // std::cerr << "::: req columns (" << required_columns.size() << ") ["; | ||
| // for (auto && required_column : required_columns) | ||
| // std::cerr << required_column << " "; | ||
| // std::cerr << "]\n"; |
There was a problem hiding this comment.
Remove commented-out debugging code before merging to production.
| // std::cerr << "::: req columns (" << required_columns.size() << ") ["; | |
| // for (auto && required_column : required_columns) | |
| // std::cerr << required_column << " "; | |
| // std::cerr << "]\n"; |
| // std::cerr << ".. Main header " << read_from_merge_tree->getOutputHeader()->dumpNames() << std::endl; | ||
| // std::cerr << ".. Lazy header " << lazy_reading->getOutputHeader()->dumpNames() << std::endl; |
There was a problem hiding this comment.
Remove commented-out debugging code before merging to production.
| // std::cerr << " req columns (" << required_columns.size() << ") ["; | ||
| // for (auto && required_column : required_columns) | ||
| // std::cerr << required_column << " "; | ||
| // std::cerr << "]\n"; |
There was a problem hiding this comment.
Remove commented-out debugging code before merging to production.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
|
|
||
| // std::cerr << "split nodes: " << split_nodes.size() << std::endl; | ||
| // for (const auto * node : split_nodes) | ||
| // std::cerr << " " << node->result_name << std::endl; |
There was a problem hiding this comment.
Commented-out debug statements left in production code
Low Severity
Multiple new commented-out std::cerr debug statements were added across splitExpressionStep, the forward pass loop, and the reading step modification block. While existing commented debug code was already present in this file, these new instances (lines 267–269, 467–470, 528–529, 537–540) appear to be leftover development/debugging aids that weren't cleaned up before the PR.
Additional Locations (2)
| /// This function transitively adds ActionsDAG::Node into the set, if all the arguments are already in set (or constants). | ||
| /// It's useful because the main branch of lazy materialization can return more columns than actually required. | ||
| /// As an example, for the query `select a from table prewhere b > 0 order by c limit 1`, only columns `c` is required for ORDER BY, | ||
| /// but the column `a` is returned as well (it's need for PREWHERE). |
There was a problem hiding this comment.
Minor typo in the comment: it's need for PREWHERE → it's needed for PREWHERE.
| required_columns = getRequiredHeaderPositions(expr_step->getExpression(), *expr_step->getInputHeaders().front() , std::move(required_columns)); | ||
| { | ||
| const auto & expr = expr_step->getExpression(); | ||
| /// The number of DAG outputs can be less then the number of columns in the header. |
There was a problem hiding this comment.
Typo: less then should be less than in this comment.
Please fix it to keep the code comments clear and consistent.
| @@ -0,0 +1,7 @@ | |||
| SELECT length(thread_ids) > 0 | |||
| FROM system.query_log | |||
| WHERE (current_database = currentDatabase()) AND (event_date >= (today() - 1)) AND (lower(query) LIKE '%abcd%') AND (type = 'QueryFinish') | |||
There was a problem hiding this comment.
Let's execute some deterministic query. It someone will use abcd in test queries, - this one will become flaky
There was a problem hiding this comment.
But we also check for the current database, which is unique.
LLVM Coverage Report
PR changed lines: PR changed-lines coverage: 97.56% (200/205, 2 noise lines excluded) |


Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
Fix
Block structure mismatch in streamerror caused by unnecessary columns returned from Lazy materialization.Fixes #95191
Documentation entry for user-facing changes
Note
Medium Risk
Touches lazy materialization query-plan splitting and required-column propagation, which can affect correctness of query results and headers across filters/expressions. Changes are localized but involve non-trivial DAG dependency and header-position accounting.
Overview
Fixes lazy materialization planning so the main branch only forwards columns that are actually required/available, preventing extra columns (e.g. from
PREWHERE) from polluting downstream headers and causingBlock structure mismatch(notably with parallel replicas).This refactors required-column tracking to distinguish DAG-output requirements vs step-input header positions, adds transitive dependency inclusion (
addRequiredInputDependenciesIntoNodesSet), and updatessplitExpressionStep/splitFilterStepto compute the next step’s available inputs (including handling of removed filter columns) instead of relying on a final projection cleanup. Adds a stateless regression test covering the parallel-replicas failure case.Written by Cursor Bugbot for commit d7f724b. This will update automatically on new commits. Configure here.