Skip to content

sql,colexec: add support for wrapping LocalPlanNode#55909

Merged
craig[bot] merged 3 commits intocockroachdb:masterfrom
yuzefovich:vec-wrap-local
Dec 16, 2020
Merged

sql,colexec: add support for wrapping LocalPlanNode#55909
craig[bot] merged 3 commits intocockroachdb:masterfrom
yuzefovich:vec-wrap-local

Conversation

@yuzefovich
Copy link
Copy Markdown
Member

@yuzefovich yuzefovich commented Oct 23, 2020

tree: unwrap DOidWrapper before casting

When performing a cast from DOidWrapper, we're interested in casting
the wrapped datum, so this commit adds the unwrapping to the top of
performCast.

Release note: None

colexec: introduce streaming mode to the columnarizer

Previously, the columnarizer would always buffer up tuples (dynamically,
up to coldata.BatchSize()) before emitting them as output; however,
there are several processors which are of "streaming" nature and such
buffering approach is not compatible when wrapping them. This commit
introduces a streaming mode of operation to the columnarizer which is
used when wrapping a streaming processor (newly introduced marker
interface that currently is implemented by the planNodeToRowSource and
the changefeed processors).

Note that there are still some unresolved issues around the changefeed
processors, so they aren't being wrapped into the vectorized flows.

Release note: None

colexec: add support for wrapping LocalPlanNode

This commit adds the support for wrapping LocalPlanNode core. It also
changes the meaning of experimental_always to allow wrapping of
JoinReader and LocalPlanNode cores. The latter allows us to remove
VectorizeAlwaysException.

Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed. Namely, this would occur if we have a subquery with
LocalPlanNode core and a materializer is added in order to wrap that
core - what will happen is that all releasables are put back into their
pools upon the subquery's flow cleanup, yet the subquery planNode tree
isn't closed yet since its closure is down when the main planNode tree
is being closed.

Another change is fixing the tracing in the cFetcher during mutations
(we were accessing public only columns whereas we should have been
accessing deletable ones - that's what row.Fetcher does in two places
that needed a fix).

Addresses: #57268.

Release note: None

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@yuzefovich yuzefovich added the do-not-merge bors won't merge a PR with this label. label Oct 23, 2020
@yuzefovich yuzefovich force-pushed the vec-wrap-local branch 6 times, most recently from b160bc6 to d71cca2 Compare October 25, 2020 02:17
@yuzefovich yuzefovich changed the title sql: WIP on changing SupportsVectorized check sql: WIP on wrapping LocalPlanNode Oct 25, 2020
@yuzefovich yuzefovich requested a review from a team as a code owner November 13, 2020 07:04
@yuzefovich yuzefovich force-pushed the vec-wrap-local branch 5 times, most recently from 497e4b6 to 8b4a58f Compare December 3, 2020 03:04
@yuzefovich yuzefovich changed the title sql: WIP on wrapping LocalPlanNode sql,colexec: add support for wrapping LocalPlanNode Dec 3, 2020
@yuzefovich yuzefovich requested review from a team and asubiotto December 3, 2020 03:06
@yuzefovich
Copy link
Copy Markdown
Member Author

I believe I flushed out all of the failures that I've observed on this branch, so I think it is RFAL.

@jordanlewis I currently separated out the fourth commit that I'd like you to take a look at (and I'll squash it into the third one). The issue showed up in this build on orms logic test where we have an array of DOidWrappers that we want to cast to an array of strings with different Oids. I'm not sure whether the current approach will work in all cases, but it seems reasonable to me.

@asubiotto
Copy link
Copy Markdown
Contributor

Is this RFAL?

if w, ok := d.(*DOidWrapper); ok {
// If we're casting a DOidWrapper, then we want to cast the wrapped
// datum. It is also reasonable to lose the old Oid value too.
d = w.Wrapped
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to use the UnwrapDatum method here, which will unwrap a whole stack of DOidWrappers if there is one.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, we should probably just eagerly do that up top.

@yuzefovich
Copy link
Copy Markdown
Member Author

Several new failures seem to have popped since the threshold has been lowered to 0, and I'll need to look into those.

I extracted the first two commits into a separate PR #57644 which is RFAL.

@yuzefovich
Copy link
Copy Markdown
Member Author

The build now seems to be green (apart from a single backup flake), so I think it is RFAL. Note that only the last 3 commits belong to this PR.

Copy link
Copy Markdown
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed.

Is this a problem? What are the effects of this? Is this different from Closers?

Reviewed 57 of 57 files at r5, 1 of 1 files at r6, 13 of 13 files at r7, 8 of 8 files at r8, 9 of 9 files at r9, 9 of 9 files at r10, 1 of 1 files at r11, 11 of 11 files at r12, 48 of 48 files at r13.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)


pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):

	distSQLMetrics := execinfra.MakeDistSQLMetrics(time.Hour /* histogramWindow */)
	gw := gossip.MakeOptionalGossip(nil)
	tempEngine, tempFS, err := storage.NewTempEngine(ctx, base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec)

Is this an in-mem engine? See storage.NewDefaultInMem if not. Also, why did you have to add this?


pkg/sql/colexec/columnarizer.go, line 37 at r12 (raw file):

	// phase.
	ColumnarizerDefaultMode ColumnarizerMode = iota
	// ColumnarizerStreamingMode is the mode of operation in which the

Why does making the columnarizer implement the streaming mode solve any problems? Isn't it an issue if we, say, have another operator reading from the columnarizer and it's a buffering processor? Or is it ok for those types of flows to exist since the equivalent row execution flow will do the same.


pkg/sql/colexec/columnarizer.go, line 66 at r12 (raw file):

// NewColumnarizer returns a new Columnarizer.
func NewColumnarizer(

Might be nicer to "hide" thismode option to avoid having to change always pass in a mode if we mostly want default behavior:

NewBufferingColumnarizer(opts...) {
    return newColumnarizer(opts, DefaultMode)
}

NewStreamingColumnarizer(opts...) {
    return newColumnarizer(opts, StreamingMode)
}

newColumnarizer(opts..., mode mode) {

}

Also, maybe we should rename ColumnarizerDetaultMode to ColumnarizerBufferingMode since it's more descriptive? (i.e. it avoids the question of what is the default mode?)

@yuzefovich yuzefovich force-pushed the vec-wrap-local branch 2 times, most recently from 4fb1f07 to c6dbad4 Compare December 15, 2020 21:22
Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed.

Is this a problem? What are the effects of this? Is this different from Closers?

I don't think it is a problem - the code that closes the planNode tree is a bit broken in a sense that we cannot close the subquery's tree until after the main query runs, and releasing the materializer that wraps a LocalPlanNode is the only - I think - special case that outlives the flow that it was created by (as a reminder, we're putting all of the objects into their pools on the flow shutdown). So what happens is that subquery's flow is cleaned up yet the planNodes end up holding onto those materializers (via other adapters - planNodeToRowSource).

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)


pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Is this an in-mem engine? See storage.NewDefaultInMem if not. Also, why did you have to add this?

TestPortalsDestroyedOnTxnFinish fails with error setting up flow: FS unset on DiskQueueCfg, so we actually need to set the file system for all vectorized flows given that in vectorizedFlow.Setup we always create a DiskQueueCfg.


pkg/sql/colexec/columnarizer.go, line 37 at r12 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Why does making the columnarizer implement the streaming mode solve any problems? Isn't it an issue if we, say, have another operator reading from the columnarizer and it's a buffering processor? Or is it ok for those types of flows to exist since the equivalent row execution flow will do the same.

I think there might be only one very special planNode that requires this streaming behavior - hookFnNode. In its Next method it waits for the context cancellation or an error for shutting down or for the next row to be sent on the channel. The buffering behavior of the columnarizer can block indefinitely long - for example, in case of the changefeeds.

In TestChangefeedBasics/sinkless if we make the columnarizer buffering, we observe the following situation: there are 5 total changes on the table, so 5 events are emitted; the columnarizer puts the 1st into the first batch, then 2nd and 3rd into the second batch, then 4th and 5th into the third batch of 4 capacity, and the columnarizer asks for more rows, but they never arrive.

At the moment, I believe making a "local" decision whether to make the columnarizer streaming or not works in all cases - AFAIK these hookFnNodes are the only things that could block indefinitely when Next is called, so we simply wanna make sure to not buffer anything that is coming out of them. I'm guessing that changefeeds might be the only special case, so we could customize the columnarizer to be streaming only on hookFnNodes or even more specifically on changefeed hookFnNode, but I think it's worth being on the safer side and make all LocalPlanNode cores wrapped with the streaming columnarizer.

I cannot say that I have 100% confidence in this approach, but I haven't encountered any other special cases apart from the changefeeds. I'm assuming that if there is something that I'm missing, it'll be exposed by the nightly and unit tests, especially since we are now randomizing coldata.BatchSize.

To test my theory about changefeed flow being the only special case, I'll open up a separate PR and see whether anything else fails.


pkg/sql/colexec/columnarizer.go, line 66 at r12 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

Might be nicer to "hide" thismode option to avoid having to change always pass in a mode if we mostly want default behavior:

NewBufferingColumnarizer(opts...) {
    return newColumnarizer(opts, DefaultMode)
}

NewStreamingColumnarizer(opts...) {
    return newColumnarizer(opts, StreamingMode)
}

newColumnarizer(opts..., mode mode) {

}

Also, maybe we should rename ColumnarizerDetaultMode to ColumnarizerBufferingMode since it's more descriptive? (i.e. it avoids the question of what is the default mode?)

I initially also had "buffering" and "streaming" modes, but I like this suggestion. Done.


pkg/sql/sem/tree/casts.go, line 814 at r4 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Then again, we should probably just eagerly do that up top.

Done.

@yuzefovich yuzefovich removed the do-not-merge bors won't merge a PR with this label. label Dec 15, 2020
Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)


pkg/sql/colexec/columnarizer.go, line 37 at r12 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

I think there might be only one very special planNode that requires this streaming behavior - hookFnNode. In its Next method it waits for the context cancellation or an error for shutting down or for the next row to be sent on the channel. The buffering behavior of the columnarizer can block indefinitely long - for example, in case of the changefeeds.

In TestChangefeedBasics/sinkless if we make the columnarizer streaming, we observe the following situation: there are 5 total changes on the table, so 5 events are emitted; the columnarizer puts the 1st into the first batch, then 2nd and 3rd into the second batch, then 4th and 5th into the third batch of 4 capacity, and the columnarizer asks for more rows, but they never arrive.

At the moment, I believe making a "local" decision whether to make the columnarizer streaming or not works in all cases - AFAIK these hookFnNodes are the only things that could block indefinitely when Next is called, so we simply wanna make sure to not buffer anything that is coming out of them. I'm guessing that changefeeds might be the only special case, so we could customize the columnarizer to be streaming only on hookFnNodes or even more specifically on changefeed hookFnNode, but I think it's worth being on the safer side and make all LocalPlanNode cores wrapped with the streaming columnarizer.

I cannot say that I have 100% confidence in this approach, but I haven't encountered any other special cases apart from the changefeeds. I'm assuming that if there is something that I'm missing, it'll be exposed by the nightly and unit tests, especially since we are now randomizing coldata.BatchSize.

To test my theory about changefeed flow being the only special case, I'll open up a separate PR and see whether anything else fails.

Looks like my hypothesis might be correct since this build on a testing PR has only a couple of expected flakes (some linter issues and a benchmark data that needs to be updated in this PR), so I have a bit more confidence now in the current approach (of "local" decision on the mode of the columnarizer).

@yuzefovich
Copy link
Copy Markdown
Member Author

I believe I fleshed out all remaining places where the update was needed, so it is RFAL.

Copy link
Copy Markdown
Contributor

@asubiotto asubiotto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

Reviewed 57 of 57 files at r14, 11 of 11 files at r15, 1 of 54 files at r16.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, and @yuzefovich)


pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

TestPortalsDestroyedOnTxnFinish fails with error setting up flow: FS unset on DiskQueueCfg, so we actually need to set the file system for all vectorized flows given that in vectorizedFlow.Setup we always create a DiskQueueCfg.

👍 what about the in-mem part?

When performing a cast from `DOidWrapper`, we're interested in casting
the wrapped datum, so this commit adds the unwrapping to the top of
`performCast`.

Release note: None
Previously, the columnarizer would always buffer up tuples (dynamically,
up to `coldata.BatchSize()`) before emitting them as output; however,
there are several processors which are of "streaming" nature and such
buffering approach is not compatible when wrapping them. This commit
introduces a streaming mode of operation to the columnarizer which is
used when wrapping a streaming processor (newly introduced marker
interface that currently is implemented by the `planNodeToRowSource` and
the changefeed processors).

Note that there are still some unresolved issues around the changefeed
processors, so they aren't being wrapped into the vectorized flows.

Release note: None
Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TFTR!

bors r+

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto and @jordanlewis)


pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):

Previously, asubiotto (Alfonso Subiotto Marqués) wrote…

👍 what about the in-mem part?

The current invocation creates an in-memory engine and FS. Using NewDefaultInMem is not sufficient since it only returns the engine and not the FS, but we're only interested in the FS.

@yuzefovich
Copy link
Copy Markdown
Member Author

New logic test needs to be updated.

bors r-

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Dec 16, 2020

Canceled.

This commit adds the support for wrapping `LocalPlanNode` core. It also
changes the meaning of `experimental_always` to allow wrapping of
JoinReader and LocalPlanNode cores. The latter allows us to remove
VectorizeAlwaysException.

Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed. Namely, this would occur if we have a subquery with
LocalPlanNode core and a materializer is added in order to wrap that
core - what will happen is that all releasables are put back into their
pools upon the subquery's flow cleanup, yet the subquery planNode tree
isn't closed yet since its closure is down when the main planNode tree
is being closed.

Another change is fixing the tracing in the cFetcher during mutations
(we were accessing public only columns whereas we should have been
accessing deletable ones - that's what row.Fetcher does in two places
that needed a fix).

Release note: None
@yuzefovich
Copy link
Copy Markdown
Member Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Dec 16, 2020

Build succeeded:

@craig craig bot merged commit 9fd988f into cockroachdb:master Dec 16, 2020
@yuzefovich yuzefovich deleted the vec-wrap-local branch December 16, 2020 16:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants