Skip to content

sql: introduce hash group-join operator#93483

Merged
craig[bot] merged 3 commits intocockroachdb:masterfrom
yuzefovich:group-join
Dec 16, 2022
Merged

sql: introduce hash group-join operator#93483
craig[bot] merged 3 commits intocockroachdb:masterfrom
yuzefovich:group-join

Conversation

@yuzefovich
Copy link
Copy Markdown
Member

@yuzefovich yuzefovich commented Dec 13, 2022

This PR introduces the hash group-join operator (which combines a hash join followed by a hash aggregation when the join's equality columns are the same as aggregation's grouping columns into a single operator) to the execution engines . The optimizer is currently unaware of this new operator - the changes are plumbed only from the DistSQL physical planning. Naive implementations (which simply use a hash joiner followed by a hash aggregator) are introduced to both engines with the proper disk-spilling. The usage of this new operator is gated behind an experimental session variable.

See each commit for details.

Addresses: #38307.

Epic: None

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the group-join branch 12 times, most recently from f8aff46 to 377ca0f Compare December 14, 2022 22:41
@yuzefovich yuzefovich changed the title sql: introduce hash group-join spec sql: introduce hash group-join operator Dec 14, 2022
@yuzefovich yuzefovich marked this pull request as ready for review December 14, 2022 22:46
@yuzefovich yuzefovich requested a review from a team as a code owner December 14, 2022 22:46
Copy link
Copy Markdown
Collaborator

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: I'm excited to see how this turns out!

Reviewed 7 of 7 files at r1, 29 of 29 files at r2, 15 of 15 files at r3, 7 of 7 files at r4, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @michae2 and @yuzefovich)


-- commits line 109 at r4:
Could we just reuse the same setting? Since this operator is only planned when a hash aggregate would previously have been planned, anyway.


pkg/sql/distsql_physical_planner.go line 1990 at r2 (raw file):

			}
			if len(groupCols) == len(orderedGroupCols) {
				// We will plan streaming aggregation.

[nit] We may want a TODO here to consider whether an ordered aggregation is always better when this is supported in the optimizer. Consider a case where the join produces a huge result set that gets grouped down to very few rows - it might be better to avoid materializing the join result, especially in a distributed setting.


pkg/sql/colexec/hash_group_joiner.go line 113 at r4 (raw file):

		return h.hj.ExportBuffered(input)
	}
	if !h.hjLeftSource.zeroBatchEnqueued {

[nit] not related to this PR, but should we make this check part of the spilling queue interface for convenience?


pkg/sql/colexec/colbuilder/execplan.go line 1360 at r4 (raw file):

						// TODO(yuzefovich): think through whether the hash
						// group-join needs to maintain the ordering.
						execinfrapb.Ordering{}, /* outputOrdering */

Will order-sensitive aggregate functions run into problems here?


pkg/sql/colexec/colexecjoin/crossjoiner.go line 290 at r3 (raw file):

func (c *crossJoiner) Reset(ctx context.Context) {
	if r, ok := c.InputOne.(colexecop.Resetter); ok {

[nit] would it be more convenient to add this logic to the TwoInputInitHelper?

This commit introduces a specification of a hash group-join processor
which combines two operations (the hash join followed by the hash
aggregation) into one when the join's equality columns are exactly the
same as the aggregation's grouping columns. The hash join cannot have an
ON expression and the PostProcessSpec on top of the hash join is only
allowed to have a projection set (i.e. renders, limits, and offsets are
prohibited). Currently, there are some other additional limitations
(like only inner and outer joins are allowed), but these will be lifted
in the future. Similarly, cross and merge joins as the first part of this
"composite" processor will be added later.

Furthermore, there is no optimizer support (neither for costing nor for
exec building), so the plan tree is also oblivious of the hash group
join. In particular, the regular `EXPLAIN` now does not represent reality,
and `EXPLAIN ANALYZE` now duplicates the same merged stats across two
nodes in the tree. This work can (mostly) wait since converting a hash
join followed by a hash aggregation into a hash group-join is always
beneficial, and the feature is currently experimental and not documented.

The main contribution of this commit is in the physical planning. The
planning code for the aggregators has been taught to replace the last
stage of the hash joiners with the hash group-join stage. Effectively,
we're stitching two processor specs into one while preserving the same
distribution. This planning is hidden behind an experimental session
variable.

Additionally, this commit introduces naive row-by-row execution support
which just uses the existing processors with only minor adjustments for
`EXPLAIN (VEC)` output and the stats collection. Support in the
vectorized engine will be added in a separate commit.

Epic: None

Release note: None
This commit extracts out the `joinHelper` from `colexecjoin` package
as a two input init reset helper into `colexecop` package. The new struct
will be used by the upcoming hash group joiner.

Additionally, this commit extracts a couple of structs that hold
arguments to the constructors of the hash joiner and the hash
aggregator. It also extracts out a couple of helper functions for the
vectorized planning code when constructing these two operators. These
things make it easier to implement naive hash group-join in the follow-up
commit.

This commit is effectively a noop with the only "meaningful" change
being not making copies of the input types when constructing join output
types because the function constructing output types already makes
a copy.

Epic: None

Release note: None
Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball and @michae2)


-- commits line 109 at r4:

Previously, DrewKimball (Drew Kimball) wrote…

Could we just reuse the same setting? Since this operator is only planned when a hash aggregate would previously have been planned, anyway.

Yes, I plan to reuse the same setting in the future, but for now it seems ok to not have the ability to disable the input tuples tracking from the left input. Once the optimized implementation is fleshed out, then we'll add the disabling mechanism - right now it seemed mostly like a waste given that it would have to be effectively rewritten for the optimized implementation anyway.

However, I think that we'll implement "streaming" merge group-join for this use case in the future.


pkg/sql/distsql_physical_planner.go line 1990 at r2 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

[nit] We may want a TODO here to consider whether an ordered aggregation is always better when this is supported in the optimizer. Consider a case where the join produces a huge result set that gets grouped down to very few rows - it might be better to avoid materializing the join result, especially in a distributed setting.

Hm, I believe the hash join is not expected to provide any ordering, so I don't think it is actually possible for us to plan the ordered aggregation on top of the hash join. I expanded the comment.


pkg/sql/colexec/hash_group_joiner.go line 113 at r4 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

[nit] not related to this PR, but should we make this check part of the spilling queue interface for convenience?

colcontainer.Queue.Enqueue interface asks for the zero-batch to be always added at the end, and I think it's a fair requirement (so that the queue knows when to flush to disk). However, SpillingQueue is a utility wrapper that could easily do this, so I left a TODO.


pkg/sql/colexec/colbuilder/execplan.go line 1360 at r4 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

Will order-sensitive aggregate functions run into problems here?

Are you thinking about aggregate functions that support ORDER BY clause (i.e. something like array_agg(c1 ORDER BY c2))? That probably shouldn't be affected given that the input is the hash join with arbitrary order (plus such functions are handled internally as window functions).

When I left this comment, I was concerned about a bug that was fixed in #63372 where we had an issue where the external hash aggregator didn't maintain the ordering on the prefix of grouping columns that was already ordered by the input. I believe there shouldn't be a correctness issue on top of the hash join but wanted to remind myself to re-evaluate this later.


pkg/sql/colexec/colexecjoin/crossjoiner.go line 290 at r3 (raw file):

Previously, DrewKimball (Drew Kimball) wrote…

[nit] would it be more convenient to add this logic to the TwoInputInitHelper?

Done. I also removed now-redundant references to the input operators in the merge join.

This commit introduces the naive hash group-join implementation to the
vectorized engine which simply uses existing hash joiner and hash
aggregator directly (similarly to what we have in the row-by-row
engine). However, unlike in the row engine, a follow-up commit will
introduce an optimized implementation for the vectorized hash group
joiner.

The main contribution of this commit is support of the disk spilling.
The memory limit can either be exceeded during the "join" phase (i.e.
building the hash table on the right side) or the "aggregation" phase
(i.e. while allocating a new grouping bucket). Which means that the
disk spilling infrastructure need to be taught to handle the fallback
in several cases. In particular, we need to correctly "export buffered"
tuples from both inputs. Exporting from the right side is easy - the
hash joiner buffers all of the tuples from the right into the hash
table and can easily export on demand, so we just delegate the export.

However, on the left side things are a bit more difficult. If the memory
limit is reached during the "aggregation" phase, then we must have read
some tuples from the left input. The hash aggregator currently cannot
spill its intermediate state and doesn't store all tuples in the hash
table, so it handles it via a spilling queue which "tracks" all input
tuples. We apply a similar logic for the hash group-join operation - all
tuples from the left input are put into the spilling queue before they are
pushed into the hash join. Thus, when exporting from the left input, we
can simply dequeue from the spilling queue. In the future, I plan to
disable this copying behavior based on a cluster setting (similar to
what we have for the hash aggregation), but for now it is mandatory.

This commit then also sets up some sanity unit tests. I additionally ran
the CI with the experimental setting enabled by default, and there were
no failures.

Epic: None

Release note: None
Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball and @michae2)


pkg/sql/colexec/hash_group_joiner.go line 108 at r7 (raw file):

// perform the export.
func (h *hashGroupJoiner) ExportBuffered(input colexecop.Operator) coldata.Batch {
	if h.ha.ht != nil {

Added minor improvement here to be in sync with hashAggregator.ExportBuffered.

Copy link
Copy Markdown
Collaborator

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 34 of 34 files at r5, 32 of 32 files at r6, 13 of 13 files at r7, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @michae2 and @yuzefovich)


pkg/sql/colexec/colbuilder/execplan.go line 1360 at r4 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Are you thinking about aggregate functions that support ORDER BY clause (i.e. something like array_agg(c1 ORDER BY c2))? That probably shouldn't be affected given that the input is the hash join with arbitrary order (plus such functions are handled internally as window functions).

When I left this comment, I was concerned about a bug that was fixed in #63372 where we had an issue where the external hash aggregator didn't maintain the ordering on the prefix of grouping columns that was already ordered by the input. I believe there shouldn't be a correctness issue on top of the hash join but wanted to remind myself to re-evaluate this later.

I think you're right about that, good point.

@yuzefovich
Copy link
Copy Markdown
Member Author

TFTR!

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Dec 16, 2022

Build failed (retrying...):

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Dec 16, 2022

Build failed (retrying...):

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Dec 16, 2022

Build succeeded:

@craig craig bot merged commit a72ea76 into cockroachdb:master Dec 16, 2022
@yuzefovich yuzefovich deleted the group-join branch December 16, 2022 05:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants