sql: introduce hash group-join operator#93483
Conversation
f8aff46 to
377ca0f
Compare
DrewKimball
left a comment
There was a problem hiding this comment.
I'm excited to see how this turns out!
Reviewed 7 of 7 files at r1, 29 of 29 files at r2, 15 of 15 files at r3, 7 of 7 files at r4, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @michae2 and @yuzefovich)
-- commits line 109 at r4:
Could we just reuse the same setting? Since this operator is only planned when a hash aggregate would previously have been planned, anyway.
pkg/sql/distsql_physical_planner.go line 1990 at r2 (raw file):
} if len(groupCols) == len(orderedGroupCols) { // We will plan streaming aggregation.
[nit] We may want a TODO here to consider whether an ordered aggregation is always better when this is supported in the optimizer. Consider a case where the join produces a huge result set that gets grouped down to very few rows - it might be better to avoid materializing the join result, especially in a distributed setting.
pkg/sql/colexec/hash_group_joiner.go line 113 at r4 (raw file):
return h.hj.ExportBuffered(input) } if !h.hjLeftSource.zeroBatchEnqueued {
[nit] not related to this PR, but should we make this check part of the spilling queue interface for convenience?
pkg/sql/colexec/colbuilder/execplan.go line 1360 at r4 (raw file):
// TODO(yuzefovich): think through whether the hash // group-join needs to maintain the ordering. execinfrapb.Ordering{}, /* outputOrdering */
Will order-sensitive aggregate functions run into problems here?
pkg/sql/colexec/colexecjoin/crossjoiner.go line 290 at r3 (raw file):
func (c *crossJoiner) Reset(ctx context.Context) { if r, ok := c.InputOne.(colexecop.Resetter); ok {
[nit] would it be more convenient to add this logic to the TwoInputInitHelper?
This commit introduces a specification of a hash group-join processor which combines two operations (the hash join followed by the hash aggregation) into one when the join's equality columns are exactly the same as the aggregation's grouping columns. The hash join cannot have an ON expression and the PostProcessSpec on top of the hash join is only allowed to have a projection set (i.e. renders, limits, and offsets are prohibited). Currently, there are some other additional limitations (like only inner and outer joins are allowed), but these will be lifted in the future. Similarly, cross and merge joins as the first part of this "composite" processor will be added later. Furthermore, there is no optimizer support (neither for costing nor for exec building), so the plan tree is also oblivious of the hash group join. In particular, the regular `EXPLAIN` now does not represent reality, and `EXPLAIN ANALYZE` now duplicates the same merged stats across two nodes in the tree. This work can (mostly) wait since converting a hash join followed by a hash aggregation into a hash group-join is always beneficial, and the feature is currently experimental and not documented. The main contribution of this commit is in the physical planning. The planning code for the aggregators has been taught to replace the last stage of the hash joiners with the hash group-join stage. Effectively, we're stitching two processor specs into one while preserving the same distribution. This planning is hidden behind an experimental session variable. Additionally, this commit introduces naive row-by-row execution support which just uses the existing processors with only minor adjustments for `EXPLAIN (VEC)` output and the stats collection. Support in the vectorized engine will be added in a separate commit. Epic: None Release note: None
This commit extracts out the `joinHelper` from `colexecjoin` package as a two input init reset helper into `colexecop` package. The new struct will be used by the upcoming hash group joiner. Additionally, this commit extracts a couple of structs that hold arguments to the constructors of the hash joiner and the hash aggregator. It also extracts out a couple of helper functions for the vectorized planning code when constructing these two operators. These things make it easier to implement naive hash group-join in the follow-up commit. This commit is effectively a noop with the only "meaningful" change being not making copies of the input types when constructing join output types because the function constructing output types already makes a copy. Epic: None Release note: None
377ca0f to
46421e8
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball and @michae2)
Previously, DrewKimball (Drew Kimball) wrote…
Could we just reuse the same setting? Since this operator is only planned when a hash aggregate would previously have been planned, anyway.
Yes, I plan to reuse the same setting in the future, but for now it seems ok to not have the ability to disable the input tuples tracking from the left input. Once the optimized implementation is fleshed out, then we'll add the disabling mechanism - right now it seemed mostly like a waste given that it would have to be effectively rewritten for the optimized implementation anyway.
However, I think that we'll implement "streaming" merge group-join for this use case in the future.
pkg/sql/distsql_physical_planner.go line 1990 at r2 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
[nit] We may want a TODO here to consider whether an ordered aggregation is always better when this is supported in the optimizer. Consider a case where the join produces a huge result set that gets grouped down to very few rows - it might be better to avoid materializing the join result, especially in a distributed setting.
Hm, I believe the hash join is not expected to provide any ordering, so I don't think it is actually possible for us to plan the ordered aggregation on top of the hash join. I expanded the comment.
pkg/sql/colexec/hash_group_joiner.go line 113 at r4 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
[nit] not related to this PR, but should we make this check part of the spilling queue interface for convenience?
colcontainer.Queue.Enqueue interface asks for the zero-batch to be always added at the end, and I think it's a fair requirement (so that the queue knows when to flush to disk). However, SpillingQueue is a utility wrapper that could easily do this, so I left a TODO.
pkg/sql/colexec/colbuilder/execplan.go line 1360 at r4 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
Will order-sensitive aggregate functions run into problems here?
Are you thinking about aggregate functions that support ORDER BY clause (i.e. something like array_agg(c1 ORDER BY c2))? That probably shouldn't be affected given that the input is the hash join with arbitrary order (plus such functions are handled internally as window functions).
When I left this comment, I was concerned about a bug that was fixed in #63372 where we had an issue where the external hash aggregator didn't maintain the ordering on the prefix of grouping columns that was already ordered by the input. I believe there shouldn't be a correctness issue on top of the hash join but wanted to remind myself to re-evaluate this later.
pkg/sql/colexec/colexecjoin/crossjoiner.go line 290 at r3 (raw file):
Previously, DrewKimball (Drew Kimball) wrote…
[nit] would it be more convenient to add this logic to the
TwoInputInitHelper?
Done. I also removed now-redundant references to the input operators in the merge join.
This commit introduces the naive hash group-join implementation to the vectorized engine which simply uses existing hash joiner and hash aggregator directly (similarly to what we have in the row-by-row engine). However, unlike in the row engine, a follow-up commit will introduce an optimized implementation for the vectorized hash group joiner. The main contribution of this commit is support of the disk spilling. The memory limit can either be exceeded during the "join" phase (i.e. building the hash table on the right side) or the "aggregation" phase (i.e. while allocating a new grouping bucket). Which means that the disk spilling infrastructure need to be taught to handle the fallback in several cases. In particular, we need to correctly "export buffered" tuples from both inputs. Exporting from the right side is easy - the hash joiner buffers all of the tuples from the right into the hash table and can easily export on demand, so we just delegate the export. However, on the left side things are a bit more difficult. If the memory limit is reached during the "aggregation" phase, then we must have read some tuples from the left input. The hash aggregator currently cannot spill its intermediate state and doesn't store all tuples in the hash table, so it handles it via a spilling queue which "tracks" all input tuples. We apply a similar logic for the hash group-join operation - all tuples from the left input are put into the spilling queue before they are pushed into the hash join. Thus, when exporting from the left input, we can simply dequeue from the spilling queue. In the future, I plan to disable this copying behavior based on a cluster setting (similar to what we have for the hash aggregation), but for now it is mandatory. This commit then also sets up some sanity unit tests. I additionally ran the CI with the experimental setting enabled by default, and there were no failures. Epic: None Release note: None
46421e8 to
3fbab32
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball and @michae2)
pkg/sql/colexec/hash_group_joiner.go line 108 at r7 (raw file):
// perform the export. func (h *hashGroupJoiner) ExportBuffered(input colexecop.Operator) coldata.Batch { if h.ha.ht != nil {
Added minor improvement here to be in sync with hashAggregator.ExportBuffered.
DrewKimball
left a comment
There was a problem hiding this comment.
Reviewed 34 of 34 files at r5, 32 of 32 files at r6, 13 of 13 files at r7, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @michae2 and @yuzefovich)
pkg/sql/colexec/colbuilder/execplan.go line 1360 at r4 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Are you thinking about aggregate functions that support
ORDER BYclause (i.e. something likearray_agg(c1 ORDER BY c2))? That probably shouldn't be affected given that the input is the hash join with arbitrary order (plus such functions are handled internally as window functions).When I left this comment, I was concerned about a bug that was fixed in #63372 where we had an issue where the external hash aggregator didn't maintain the ordering on the prefix of grouping columns that was already ordered by the input. I believe there shouldn't be a correctness issue on top of the hash join but wanted to remind myself to re-evaluate this later.
I think you're right about that, good point.
|
TFTR! bors r+ |
|
Build failed (retrying...): |
|
Build failed (retrying...): |
|
Build succeeded: |
This PR introduces the hash group-join operator (which combines a hash join followed by a hash aggregation when the join's equality columns are the same as aggregation's grouping columns into a single operator) to the execution engines . The optimizer is currently unaware of this new operator - the changes are plumbed only from the DistSQL physical planning. Naive implementations (which simply use a hash joiner followed by a hash aggregator) are introduced to both engines with the proper disk-spilling. The usage of this new operator is gated behind an experimental session variable.
See each commit for details.
Addresses: #38307.
Epic: None