Skip to content

sql: add hash join algorithm for USING/NATURAL predicates#10618

Merged
irfansharif merged 7 commits intocockroachdb:masterfrom
irfansharif:hash-join
Nov 17, 2016
Merged

sql: add hash join algorithm for USING/NATURAL predicates#10618
irfansharif merged 7 commits intocockroachdb:masterfrom
irfansharif:hash-join

Conversation

@irfansharif
Copy link
Copy Markdown
Contributor

@irfansharif irfansharif commented Nov 10, 2016

Implementation of the hash join algorithm, limited to only NATURAL and
USING predicates. ON predicates with explicit equality constraints
(i.e. a.x = b.x), at least for now, fall back to the nested loop join implementation.

There's an intermediary result buffer attached to the join node here,
this cleans up the code compared to an earlier implementation with
maintained state for consecutive calls to Next() and Values() and can be
used similarly for other plan nodes where the computation of a batch of
results is simpler to reason about with any 'state maintenance' pushed down to
the buffer itself.

Due to #10534 the current buffer implementation is non-functional,
extending RowContainer to support deletes at the front now to address this.
(edit: #10623, done)

Addresses #10397.

cc @RaduBerinde @knz


This change is Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 11, 2016

Seeing how the join tests needed to be changed to use ORDER BY made me smile; it's exactly what SQL is supposed to be!
Thanks for working on this.

  1. Regarding the first commit, please split further between one commit with the rename and one with the change. Otherwise I can't see the change.

  2. In general avoid moving methods around in the same commit where you change them. This obscures the changes and makes review excessively difficult.

  3. Change ExplainPlan to report the algorithm; then add new EXPLAIN tests that show how different algorithms are used for different join types.

  4. You're being a bit too liberal with RowContainer. I think this can be improved.


Reviewed 3 of 4 files at r1, 2 of 3 files at r2, 11 of 11 files at r3.
Review status: all files reviewed at latest revision, 18 unresolved discussions, all commit checks successful.


pkg/sql/join.go, line 43 at r2 (raw file):

)

// commonColumns returns the names of columns common on the

Just curious, why did you pull this method (commonColumns) up so far from the constructor that uses it?

Oh then, I see in the next commit it disappears from here again.
Could you ensure that this change disappears from the intermediate commit diff? So that anyone inspecting the changes later does not get distracted.


pkg/sql/join.go, line 34 at r3 (raw file):

const (
  joinTypeInner joinType = iota
  joinTypeLeftOuter

If you think it is important to rename these constants, please split in a different commit. Right now it obscures the diff.


pkg/sql/join.go, line 47 at r3 (raw file):

// there was a matching group (with the same group key) in the opposite stream.
type bucket struct {
  rows *RowContainer

Use a single RowContainer for the entire joinNode, then take slices of its row array in each bucket.
Use a single MemoryAccount for the entire joinNode to account for the slice container.

Separating RowContainers is only meaningful if they would have (say) different lists of columns.
You only want to Close() a single container; Close()ing is (relatively) expensive; you don't want to call it thousands of times.


pkg/sql/join.go, line 116 at r3 (raw file):

}

// makeJoin constructs a planDataSource for a JOIN node.

Don't move this function away from where it was originally defined in the same commit where you modify it. Right now in the diff I see a diff for the entire function (move), and the particular changes you've made are obscured by this. This makes it very hard to review.


pkg/sql/join.go, line 313 at r3 (raw file):

              n.rightRows = nil
          }
      } else if n.joinAlgorithm == hashJoin {

Split the code for the different loops in different functions; also use switch not if else.


pkg/sql/join.go, line 390 at r3 (raw file):

      return n.debugNext()
  }
  if n.joinAlgorithm == nestedLoopJoin {

switch


pkg/sql/join.go, line 397 at r3 (raw file):

  }

  return false, errors.New("unsupported JOIN algorithm")

extract the error into a global err variable.


pkg/sql/join.go, line 543 at r3 (raw file):

      // order to use it for RIGHT OUTER joins but if we encounter another
      // NULL row when going through the left stream (probing phase), matching
      // this with the first NULL row would be incorrect.

Extend this explanatory comment to highlight what happens when joining on 2 or more columns, and only one of them contains NULL.


pkg/sql/join.go, line 576 at r3 (raw file):

      if !ok {
          if n.joinType == joinTypeInner {
              scratch = encoded[:0]

Add a comment here "// Failed to match -- no matching row, nothing to do."


pkg/sql/join.go, line 579 at r3 (raw file):

              continue
          }
          // Given we didn't find a matching right row we append an empty

Insert a comment line above here "// Outer join: unmatched rows are padded with NULLs."


pkg/sql/join.go, line 641 at r3 (raw file):

// Values implements the planNode interface.
func (n *joinNode) Values() parser.DTuple {
  if n.joinAlgorithm == nestedLoopJoin {

switch


pkg/sql/join.go, line 663 at r3 (raw file):

// Close implements the planNode interface.
func (n *joinNode) Close() {
  if n.joinAlgorithm == nestedLoopJoin {

switch


pkg/sql/join.go, line 673 at r3 (raw file):

      n.buffer.Close()
      n.buffer = nil
      for _, b := range n.buckets {

See my comment above; this is insane.

As a general rule of thumb, the complexity of Close() for an entire logical plan should be linear in the number of nodes of the logical plan, and fully independent of the data in the database. That's the entire design contract of the monitor/account machinery. If we didn't want that, we may as well skip MemoryAccount entirely and use a loop in RowContainer to substract consumed memory row by row.


pkg/sql/join_predicate.go, line 37 at r3 (raw file):

  // input data sources.
  prepareRow(result, leftRow, rightRow parser.DTuple)
  encode([]byte, parser.DTuple, string) ([]byte, bool, error)
  1. Add an explanatory comment for this new method.

  2. don't use a string in the 3rd argument position, instead use an int with aptly named constants in the global scope.


pkg/sql/join_predicate.go, line 206 at r3 (raw file):

func (p *usingPredicate) encode(b []byte, row parser.DTuple, side string) ([]byte, bool, error) {
  var cols []int
  if side == "right" {

Use int for "side"; also use switch not if here.


pkg/sql/join_predicate.go, line 285 at r3 (raw file):

      if parser.ReNormalizeName(col.Name) == colName {
          idx = j
          break

Woah good catch, but not the right fix. There should be a check here before the assignment to idx: if idx != invalidColIdx { return 0, parser.TypeNull, fmt.Errorf("USING: column name %s is ambiguous in %s table", colName, context) }.


pkg/sql/row_buffer.go, line 33 at r3 (raw file):

type RowBuffer struct {
  rows   *RowContainer
  output parser.DTuple

I'd run PopFirst() at the beginning of Next() and avoid output entirely (use only At in Values()).


pkg/sql/distsql/hashjoiner.go, line 147 at r3 (raw file):

              return err
          }
          if row != nil && !h.output.PushRow(row) {

This seems unrelated. Please extract in a separate commit (and probably PR).


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 11, 2016

Review status: all files reviewed at latest revision, 19 unresolved discussions, all commit checks successful.


pkg/sql/join.go, line 97 at r3 (raw file):

  buckets map[string]bucket
  acc     mon.BoundAccount

Don't use BoundAccount here; use WrappableMemoryAccount. See group.go for examples.


Comments from Reviewable

@irfansharif
Copy link
Copy Markdown
Contributor Author

  1. Regarding the first commit, please split further between one commit with the rename and one with the change. Otherwise I can't see the change.

Done.

  1. In general avoid moving methods around in the same commit where you change them. This obscures the changes and makes review excessively difficult.

that's a good tip, thank you! restructured and moved commits around to reflect this now.


Review status: 1 of 9 files reviewed at latest revision, 19 unresolved discussions.


pkg/sql/join.go, line 43 at r2 (raw file):

Previously, knz (kena) wrote…

Just curious, why did you pull this method (commonColumns) up so far from the constructor that uses it?

Oh then, I see in the next commit it disappears from here again.
Could you ensure that this change disappears from the intermediate commit diff? So that anyone inspecting the changes later does not get distracted.

Done.

pkg/sql/join.go, line 34 at r3 (raw file):

Previously, knz (kena) wrote…

If you think it is important to rename these constants, please split in a different commit. Right now it obscures the diff.

Done.

pkg/sql/join.go, line 116 at r3 (raw file):

Previously, knz (kena) wrote…

Don't move this function away from where it was originally defined in the same commit where you modify it. Right now in the diff I see a diff for the entire function (move), and the particular changes you've made are obscured by this. This makes it very hard to review.

Done.

pkg/sql/distsql/hashjoiner.go, line 147 at r3 (raw file):

Previously, knz (kena) wrote…

This seems unrelated. Please extract in a separate commit (and probably PR).

Done.

Comments from Reviewable

@irfansharif
Copy link
Copy Markdown
Contributor Author

Review status: 1 of 9 files reviewed at latest revision, 19 unresolved discussions, some commit checks failed.


pkg/sql/join.go, line 313 at r3 (raw file):

Previously, knz (kena) wrote…

Split the code for the different loops in different functions; also use switch not if else.

Done.

pkg/sql/join.go, line 390 at r3 (raw file):

Previously, knz (kena) wrote…

switch

Done.

pkg/sql/join.go, line 397 at r3 (raw file):

Previously, knz (kena) wrote…

extract the error into a global err variable.

changed this to a `panic` instead.

pkg/sql/join.go, line 543 at r3 (raw file):

Previously, knz (kena) wrote…

Extend this explanatory comment to highlight what happens when joining on 2 or more columns, and only one of them contains NULL.

Done.

pkg/sql/join.go, line 576 at r3 (raw file):

Previously, knz (kena) wrote…

Add a comment here "// Failed to match -- no matching row, nothing to do."

Done.

pkg/sql/join.go, line 579 at r3 (raw file):

Previously, knz (kena) wrote…

Insert a comment line above here "// Outer join: unmatched rows are padded with NULLs."

Done.

pkg/sql/join.go, line 641 at r3 (raw file):

Previously, knz (kena) wrote…

switch

Done.

pkg/sql/join.go, line 663 at r3 (raw file):

Previously, knz (kena) wrote…

switch

Done.

pkg/sql/row_buffer.go, line 33 at r3 (raw file):

Previously, knz (kena) wrote…

I'd run PopFirst() at the beginning of Next() and avoid output entirely (use only At in Values()).

`PopFirst()` discards the first row from the internal `RowContainer`, we have to retain a reference to the original discarded row. It makes more intuitive sense for me to _discard_ a row when calling `Next()` as opposed to `Values()`, fits in better with the rest of the codebase.

pkg/sql/join_predicate.go, line 37 at r3 (raw file):

Previously, knz (kena) wrote…

  1. Add an explanatory comment for this new method.

  2. don't use a string in the 3rd argument position, instead use an int with aptly named constants in the global scope.

Done.

pkg/sql/join_predicate.go, line 206 at r3 (raw file):

Previously, knz (kena) wrote…

Use int for "side"; also use switch not if here.

Done.

pkg/sql/join_predicate.go, line 285 at r3 (raw file):

Previously, knz (kena) wrote…

Woah good catch, but not the right fix. There should be a check here before the assignment to idx: if idx != invalidColIdx { return 0, parser.TypeNull, fmt.Errorf("USING: column name %s is ambiguous in %s table", colName, context) }.

Done.

Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 14, 2016

Thanks for addressing most of the concerns. The topic of the row containers is still open though.


Reviewed 2 of 11 files at r3, 2 of 9 files at r4, 2 of 2 files at r5, 3 of 4 files at r7, 3 of 3 files at r8.
Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


pkg/sql/row_buffer.go, line 33 at r3 (raw file):

Previously, irfansharif (irfan sharif) wrote…

PopFirst() discards the first row from the internal RowContainer, we have to retain a reference to the original discarded row. It makes more intuitive sense for me to discard a row when calling Next() as opposed to Values(), fits in better with the rest of the codebase.

Ok.

Comments from Reviewable

@RaduBerinde
Copy link
Copy Markdown
Member

For the first commit, did you use git mv? It would be important for history purposes. The diffs (both on github and reviewable) look like they didn't realize the relationship. You might need to do two commits, one that moves join to index_join and the second one that moves table_join to join


Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


pkg/sql/index_join.go, line 241 at r5 (raw file):

      return true, nil
  }
  return false, nil

This refactoring is not correct; the loop is needed - what if there are no rows produced from this batch? (n.table can have a filter).


Comments from Reviewable

@RaduBerinde
Copy link
Copy Markdown
Member

Review status: all files reviewed at latest revision, 4 unresolved discussions, some commit checks failed.


pkg/sql/index_join.go, line 241 at r5 (raw file):

Previously, RaduBerinde wrote…

This refactoring is not correct; the loop is needed - what if there are no rows produced from this batch? (n.table can have a filter).

You might be able to trigger with existing tests by temporarily setting indexJoinBatchSize to something small (1?)

Comments from Reviewable

@irfansharif
Copy link
Copy Markdown
Contributor Author

@RaduBerinde: I did use git mv but I think join.go being common to both operations threw things off. Separated.


Review status: 0 of 12 files reviewed at latest revision, 5 unresolved discussions, some commit checks failed.


pkg/sql/index_join.go, line 241 at r5 (raw file):

Previously, RaduBerinde wrote…

You might be able to trigger with existing tests by temporarily setting indexJoinBatchSize to something small (1?)

ah. Removed.

pkg/sql/join.go, line 47 at r3 (raw file):

Use a single RowContainer for the entire joinNode, then take slices of its row array in each bucket.

This is not implemented currently as per discussions offline.

Use a single MemoryAccount for the entire joinNode to account for the slice container. You only want to Close() a single container; Close()ing is (relatively) expensive; you don't want to call it thousands of times.

Done


pkg/sql/join.go, line 97 at r3 (raw file):

Previously, knz (kena) wrote…

Don't use BoundAccount here; use WrappableMemoryAccount. See group.go for examples.

Done.

pkg/sql/join.go, line 673 at r3 (raw file):

Previously, knz (kena) wrote…

See my comment above; this is insane.

As a general rule of thumb, the complexity of Close() for an entire logical plan should be linear in the number of nodes of the logical plan, and fully independent of the data in the database. That's the entire design contract of the monitor/account machinery. If we didn't want that, we may as well skip MemoryAccount entirely and use a loop in RowContainer to substract consumed memory row by row.

Done.

Comments from Reviewable

renamed file to match the rest of the package layout.
renamed file to match the rest of the package layout.
refactor out join predicate definitions, implementations into separate
file.
@irfansharif
Copy link
Copy Markdown
Contributor Author

@knz: PTAL, latest revision demonstrates my ideas about the use of a shared RowContainer across all hash join buckets. Quite simply, use the RowContainer to actually store the rows, buckets simply store references to the stored copy in the RowContainer.


Review status: 0 of 19 files reviewed at latest revision, 4 unresolved discussions, some commit checks pending.


pkg/sql/join.go, line 47 at r3 (raw file):

Use a single RowContainer for the entire joinNode, then take slices of its row array in each bucket.

Done.


Comments from Reviewable

Implementation of the hash join algorithm, limited to only NATURAL and
USING predicates. ON predicates, atleast for now, fall back to the
nested join implementation.

There's an intermediary result buffer attached to the join node here,
this cleans up the code compared to an earlier implementation with
maintained state for consecutive calls to Next() and Values() and can be
used similarly for other plan nodes where the computation of a batch of
results is simpler to reason about with state maintenance pushed down to
the buffer itself.
@irfansharif
Copy link
Copy Markdown
Contributor Author

  1. Change ExplainPlan to report the algorithm; then add new EXPLAIN tests that show how different algorithms are used for different join types.

Done.


Review status: 0 of 19 files reviewed at latest revision, 4 unresolved discussions, some commit checks pending.


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 16, 2016

I have started the review from scratch. Thanks to you splitting the changes in different commits, the experience was much more pleasant this time!

The overall changes LGTM. The overall architecture of join looks fine this way.

However you did something with the memory accounts which I rather strongly disagree with. A lot of the current complexity in the code was added to avoid interfaces in the monitoring API and their call overhead, and you've just introduced one.

So I'd like us to look at this together and explore alternatives. If you wish I can also attempt to explore this myself and submit a PR towards your branch. Whatever you're most comfortable with.


Reviewed 2 of 11 files at r3, 8 of 9 files at r9, 2 of 2 files at r10, 2 of 2 files at r11, 3 of 3 files at r12, 5 of 5 files at r13.
Review status: 6 of 19 files reviewed at latest revision, 5 unresolved discussions, all commit checks successful.


pkg/sql/join.go, line 162 at r11 (raw file):

      case *parser.UsingJoinCond:
          pred, info, err = p.makeUsingPredicate(leftInfo, rightInfo, t.Cols)
      default:

So why did you remove the default case here?
("here" = makeJoin / switch t := cond.(type))


pkg/sql/row_buffer.go, line 31 at r12 (raw file):

// under the constraints imposed by Next() and Values() under the planNode
// interface.
type RowBuffer struct {

I think if you make RowBuffer inherit from *RowContainer you don't have to implement the two methods Close and AddRow.


pkg/sql/row_container.go, line 31 at r14 (raw file):

// TODO(irfansharif): Push this down to sql/mon. Collate differences between
// WrappedMemoryAccount and mon.BoundMonitor, etc.
type memoryMonitor interface {

No, please don't go there. I do not wish us to pay the price of an interface for this use case. :)
There must be a better way.


Comments from Reviewable

@irfansharif
Copy link
Copy Markdown
Contributor Author

I investigated possible alternatives here some more, I want to bring back to question what exactly do we have to gain using a WrappedMemoryAccount as opposed to a mon.BoundAccount that is still txnState bound. We seem to be using the latter in our nested loop join implementation as well (see newContainerValuesNode in values.go) and this retains the original implementation on RowContainer working off a mon.BoundAccount.

Additionally (possibly related) do you foresee the unification of the two (WrappedMemoryAccount and mon.BoundAccount) as used in sql? Currently I see both usages throughout and it's not immediately apparent to me what benefits we get from having two separate ways to maintain an account?


Review status: 6 of 20 files reviewed at latest revision, 5 unresolved discussions.


pkg/sql/join.go, line 162 at r11 (raw file):

Previously, knz (kena) wrote…

So why did you remove the default case here?
("here" = makeJoin / switch t := cond.(type))

default:
    err = util.UnimplementedWithIssueErrorf(2970, "unsupported JOIN predicate: %T", cond)

it used to be this, my understanding was we've covered all JOIN predicates and the issue was therefore subsequently closed. am I missing something?


pkg/sql/row_buffer.go, line 31 at r12 (raw file):

Previously, knz (kena) wrote…

I think if you make RowBuffer inherit from *RowContainer you don't have to implement the two methods Close and AddRow.

Done.

pkg/sql/row_container.go, line 31 at r14 (raw file):

Previously, knz (kena) wrote…

No, please don't go there. I do not wish us to pay the price of an interface for this use case. :)
There must be a better way.

hmm, I hadn't thought of the cost of indirection here. Need to revise this. Look at the top level comment above.

Comments from Reviewable

@irfansharif
Copy link
Copy Markdown
Contributor Author

Oh I see, cleaner transitioning between session specific and transaction specific contexts. I'm still not sure WrappedMemoryAccount is a better fit in this case however (as compared to mon.BoundAccount i.e.).


Review status: 6 of 20 files reviewed at latest revision, 5 unresolved discussions, all commit checks successful.


Comments from Reviewable

Shared RowContainer between all buckets, separate memory account used
for RowBuffer structure.
Additionally embedded RowContainer into RowBuffer to avoid duplicate
AddRow() and Close() functionality.
@irfansharif
Copy link
Copy Markdown
Contributor Author

@knz: PTAL, reverted back to the original use of mon.BoundMonitor. Additionally please look at the last commit in isolation, it covers all the changes necessary to track memory usage for storing all the parser.DTuple references, this forces me to pass around s *Session adding on to some of the simpler APIs I had originally wanted for buckets but no strong opinions on this.


Review status: 6 of 19 files reviewed at latest revision, 5 unresolved discussions, some commit checks pending.


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 17, 2016

:lgtm:, modulo a suggestion for readability improvement on the last commit.


Reviewed 1 of 2 files at r10, 14 of 14 files at r15, 2 of 2 files at r16.
Review status: all files reviewed at latest revision, 5 unresolved discussions, all commit checks successful.


pkg/sql/join.go, line 162 at r11 (raw file):

Previously, irfansharif (irfan sharif) wrote…

default:
  err = util.UnimplementedWithIssueErrorf(2970, "unsupported JOIN predicate: %T", cond)

it used to be this, my understanding was we've covered all JOIN predicates and the issue was therefore subsequently closed. am I missing something?

It was a protection against future changes: if someone ever adds a new condition type but forgets to do something about here, it will be caught properly instead of doing something non-sensical in the constructor below. But it's not very important either. Your call.

pkg/sql/join.go, line 76 at r16 (raw file):

}

func (b *buckets) AddRow(s *Session, encoding []byte, row parser.DTuple) error {
  • Add the WrappableMemoryAccount in joinNode.
  • pass WrappedMemoryAccount as argument to buckets.AddRow, and do the Wtxn call in hashJoinStart
  • call the Close method on the account in joinNode's Close method instead of buckets'.

pkg/sql/row_container.go, line 197 at r15 (raw file):

  copy(c.chunks[chunk][pos:pos+c.numCols], row)
  c.numRows++
  return c.chunks[chunk][pos : pos+c.numCols : pos+c.numCols], nil

Nice idea. I like this!


pkg/sql/row_container.go, line 31 at r16 (raw file):

  // targetChunkSize is the target number of Datums in a RowContainer chunk.
  targetChunkSize = 64
  sizeOfDatum     = int64(unsafe.Sizeof(parser.Datum(nil)))

Very nice, thanks for doing this!


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 17, 2016

Well done, btw. I dig this PR :) 💛


Review status: all files reviewed at latest revision, 2 unresolved discussions, all commit checks successful.


Comments from Reviewable

@irfansharif
Copy link
Copy Markdown
Contributor Author

Thank you for letting me work on this, I did give you a lot to review as well so thank you for being so prompt with that!


Review status: 18 of 19 files reviewed at latest revision, 2 unresolved discussions, some commit checks pending.


pkg/sql/join.go, line 76 at r16 (raw file):

Previously, knz (kena) wrote…

  • Add the WrappableMemoryAccount in joinNode.
  • pass WrappedMemoryAccount as argument to buckets.AddRow, and do the Wtxn call in hashJoinStart
  • call the Close method on the account in joinNode's Close method instead of buckets'.
Great suggestion, looks much cleaner! Done.

Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented Nov 17, 2016

👍


Reviewed 1 of 2 files at r10, 1 of 1 files at r17.
Review status: all files reviewed at latest revision, 1 unresolved discussion, all commit checks successful.


Comments from Reviewable

@irfansharif irfansharif merged commit 3fab39d into cockroachdb:master Nov 17, 2016
@irfansharif irfansharif deleted the hash-join branch November 17, 2016 16:54
@RaduBerinde
Copy link
Copy Markdown
Member

Great stuff!! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants