sql,colexec: add support for wrapping LocalPlanNode#55909
sql,colexec: add support for wrapping LocalPlanNode#55909craig[bot] merged 3 commits intocockroachdb:masterfrom
Conversation
b160bc6 to
d71cca2
Compare
d71cca2 to
5ed5cf5
Compare
5ed5cf5 to
f37e04f
Compare
f37e04f to
a16a401
Compare
a16a401 to
99ff340
Compare
497e4b6 to
8b4a58f
Compare
|
I believe I flushed out all of the failures that I've observed on this branch, so I think it is RFAL. @jordanlewis I currently separated out the fourth commit that I'd like you to take a look at (and I'll squash it into the third one). The issue showed up in this build on |
8b4a58f to
5d3dcbc
Compare
|
Is this RFAL? |
pkg/sql/sem/tree/casts.go
Outdated
| if w, ok := d.(*DOidWrapper); ok { | ||
| // If we're casting a DOidWrapper, then we want to cast the wrapped | ||
| // datum. It is also reasonable to lose the old Oid value too. | ||
| d = w.Wrapped |
There was a problem hiding this comment.
I think you want to use the UnwrapDatum method here, which will unwrap a whole stack of DOidWrappers if there is one.
There was a problem hiding this comment.
Then again, we should probably just eagerly do that up top.
|
Several new failures seem to have popped since the threshold has been lowered to 0, and I'll need to look into those. I extracted the first two commits into a separate PR #57644 which is RFAL. |
5d3dcbc to
5deb7c9
Compare
|
The build now seems to be green (apart from a single backup flake), so I think it is RFAL. Note that only the last 3 commits belong to this PR. |
afea7a7 to
347be11
Compare
e5cc4e2 to
d816265
Compare
asubiotto
left a comment
There was a problem hiding this comment.
Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed.
Is this a problem? What are the effects of this? Is this different from Closers?
Reviewed 57 of 57 files at r5, 1 of 1 files at r6, 13 of 13 files at r7, 8 of 8 files at r8, 9 of 9 files at r9, 9 of 9 files at r10, 1 of 1 files at r11, 11 of 11 files at r12, 48 of 48 files at r13.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @yuzefovich)
pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):
distSQLMetrics := execinfra.MakeDistSQLMetrics(time.Hour /* histogramWindow */) gw := gossip.MakeOptionalGossip(nil) tempEngine, tempFS, err := storage.NewTempEngine(ctx, base.DefaultTestTempStorageConfig(st), base.DefaultTestStoreSpec)
Is this an in-mem engine? See storage.NewDefaultInMem if not. Also, why did you have to add this?
pkg/sql/colexec/columnarizer.go, line 37 at r12 (raw file):
// phase. ColumnarizerDefaultMode ColumnarizerMode = iota // ColumnarizerStreamingMode is the mode of operation in which the
Why does making the columnarizer implement the streaming mode solve any problems? Isn't it an issue if we, say, have another operator reading from the columnarizer and it's a buffering processor? Or is it ok for those types of flows to exist since the equivalent row execution flow will do the same.
pkg/sql/colexec/columnarizer.go, line 66 at r12 (raw file):
// NewColumnarizer returns a new Columnarizer. func NewColumnarizer(
Might be nicer to "hide" thismode option to avoid having to change always pass in a mode if we mostly want default behavior:
NewBufferingColumnarizer(opts...) {
return newColumnarizer(opts, DefaultMode)
}
NewStreamingColumnarizer(opts...) {
return newColumnarizer(opts, StreamingMode)
}
newColumnarizer(opts..., mode mode) {
}
Also, maybe we should rename ColumnarizerDetaultMode to ColumnarizerBufferingMode since it's more descriptive? (i.e. it avoids the question of what is the default mode?)
4fb1f07 to
c6dbad4
Compare
There was a problem hiding this comment.
Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed.
Is this a problem? What are the effects of this? Is this different from
Closers?
I don't think it is a problem - the code that closes the planNode tree is a bit broken in a sense that we cannot close the subquery's tree until after the main query runs, and releasing the materializer that wraps a LocalPlanNode is the only - I think - special case that outlives the flow that it was created by (as a reminder, we're putting all of the objects into their pools on the flow shutdown). So what happens is that subquery's flow is cleaned up yet the planNodes end up holding onto those materializers (via other adapters - planNodeToRowSource).
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)
pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Is this an in-mem engine? See
storage.NewDefaultInMemif not. Also, why did you have to add this?
TestPortalsDestroyedOnTxnFinish fails with error setting up flow: FS unset on DiskQueueCfg, so we actually need to set the file system for all vectorized flows given that in vectorizedFlow.Setup we always create a DiskQueueCfg.
pkg/sql/colexec/columnarizer.go, line 37 at r12 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Why does making the columnarizer implement the streaming mode solve any problems? Isn't it an issue if we, say, have another operator reading from the columnarizer and it's a buffering processor? Or is it ok for those types of flows to exist since the equivalent row execution flow will do the same.
I think there might be only one very special planNode that requires this streaming behavior - hookFnNode. In its Next method it waits for the context cancellation or an error for shutting down or for the next row to be sent on the channel. The buffering behavior of the columnarizer can block indefinitely long - for example, in case of the changefeeds.
In TestChangefeedBasics/sinkless if we make the columnarizer buffering, we observe the following situation: there are 5 total changes on the table, so 5 events are emitted; the columnarizer puts the 1st into the first batch, then 2nd and 3rd into the second batch, then 4th and 5th into the third batch of 4 capacity, and the columnarizer asks for more rows, but they never arrive.
At the moment, I believe making a "local" decision whether to make the columnarizer streaming or not works in all cases - AFAIK these hookFnNodes are the only things that could block indefinitely when Next is called, so we simply wanna make sure to not buffer anything that is coming out of them. I'm guessing that changefeeds might be the only special case, so we could customize the columnarizer to be streaming only on hookFnNodes or even more specifically on changefeed hookFnNode, but I think it's worth being on the safer side and make all LocalPlanNode cores wrapped with the streaming columnarizer.
I cannot say that I have 100% confidence in this approach, but I haven't encountered any other special cases apart from the changefeeds. I'm assuming that if there is something that I'm missing, it'll be exposed by the nightly and unit tests, especially since we are now randomizing coldata.BatchSize.
To test my theory about changefeed flow being the only special case, I'll open up a separate PR and see whether anything else fails.
pkg/sql/colexec/columnarizer.go, line 66 at r12 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
Might be nicer to "hide" this
modeoption to avoid having to change always pass in a mode if we mostly want default behavior:NewBufferingColumnarizer(opts...) { return newColumnarizer(opts, DefaultMode) } NewStreamingColumnarizer(opts...) { return newColumnarizer(opts, StreamingMode) } newColumnarizer(opts..., mode mode) { }Also, maybe we should rename
ColumnarizerDetaultModetoColumnarizerBufferingModesince it's more descriptive? (i.e. it avoids the question of what is the default mode?)
I initially also had "buffering" and "streaming" modes, but I like this suggestion. Done.
pkg/sql/sem/tree/casts.go, line 814 at r4 (raw file):
Previously, jordanlewis (Jordan Lewis) wrote…
Then again, we should probably just eagerly do that up top.
Done.
c6dbad4 to
e7d5ac6
Compare
e7d5ac6 to
12d81ba
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @asubiotto and @jordanlewis)
pkg/sql/colexec/columnarizer.go, line 37 at r12 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
I think there might be only one very special
planNodethat requires this streaming behavior -hookFnNode. In itsNextmethod it waits for the context cancellation or an error for shutting down or for the next row to be sent on the channel. The buffering behavior of the columnarizer can block indefinitely long - for example, in case of the changefeeds.In
TestChangefeedBasics/sinklessif we make the columnarizer streaming, we observe the following situation: there are 5 total changes on the table, so 5 events are emitted; the columnarizer puts the 1st into the first batch, then 2nd and 3rd into the second batch, then 4th and 5th into the third batch of 4 capacity, and the columnarizer asks for more rows, but they never arrive.At the moment, I believe making a "local" decision whether to make the columnarizer streaming or not works in all cases - AFAIK these
hookFnNodes are the only things that could block indefinitely whenNextis called, so we simply wanna make sure to not buffer anything that is coming out of them. I'm guessing that changefeeds might be the only special case, so we could customize the columnarizer to be streaming only onhookFnNodes or even more specifically on changefeedhookFnNode, but I think it's worth being on the safer side and make allLocalPlanNodecores wrapped with the streaming columnarizer.I cannot say that I have 100% confidence in this approach, but I haven't encountered any other special cases apart from the changefeeds. I'm assuming that if there is something that I'm missing, it'll be exposed by the nightly and unit tests, especially since we are now randomizing
coldata.BatchSize.To test my theory about changefeed flow being the only special case, I'll open up a separate PR and see whether anything else fails.
Looks like my hypothesis might be correct since this build on a testing PR has only a couple of expected flakes (some linter issues and a benchmark data that needs to be updated in this PR), so I have a bit more confidence now in the current approach (of "local" decision on the mode of the columnarizer).
12d81ba to
b137126
Compare
|
I believe I fleshed out all remaining places where the update was needed, so it is RFAL. |
asubiotto
left a comment
There was a problem hiding this comment.
Reviewed 57 of 57 files at r14, 11 of 11 files at r15, 1 of 54 files at r16.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @asubiotto, @jordanlewis, and @yuzefovich)
pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
TestPortalsDestroyedOnTxnFinishfails witherror setting up flow: FS unset on DiskQueueCfg, so we actually need to set the file system for all vectorized flows given that invectorizedFlow.Setupwe always create aDiskQueueCfg.
👍 what about the in-mem part?
When performing a cast from `DOidWrapper`, we're interested in casting the wrapped datum, so this commit adds the unwrapping to the top of `performCast`. Release note: None
Previously, the columnarizer would always buffer up tuples (dynamically, up to `coldata.BatchSize()`) before emitting them as output; however, there are several processors which are of "streaming" nature and such buffering approach is not compatible when wrapping them. This commit introduces a streaming mode of operation to the columnarizer which is used when wrapping a streaming processor (newly introduced marker interface that currently is implemented by the `planNodeToRowSource` and the changefeed processors). Note that there are still some unresolved issues around the changefeed processors, so they aren't being wrapped into the vectorized flows. Release note: None
b137126 to
610a011
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
TFTR!
bors r+
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @asubiotto and @jordanlewis)
pkg/sql/conn_executor_internal_test.go, line 259 at r13 (raw file):
Previously, asubiotto (Alfonso Subiotto Marqués) wrote…
👍 what about the in-mem part?
The current invocation creates an in-memory engine and FS. Using NewDefaultInMem is not sufficient since it only returns the engine and not the FS, but we're only interested in the FS.
|
New logic test needs to be updated. bors r- |
|
Canceled. |
This commit adds the support for wrapping `LocalPlanNode` core. It also changes the meaning of `experimental_always` to allow wrapping of JoinReader and LocalPlanNode cores. The latter allows us to remove VectorizeAlwaysException. Notable change is that now the materializer created when wrapping row-execution processors are no longer added to the set of releasables at the flow cleanup because in some cases it could be released before being closed. Namely, this would occur if we have a subquery with LocalPlanNode core and a materializer is added in order to wrap that core - what will happen is that all releasables are put back into their pools upon the subquery's flow cleanup, yet the subquery planNode tree isn't closed yet since its closure is down when the main planNode tree is being closed. Another change is fixing the tracing in the cFetcher during mutations (we were accessing public only columns whereas we should have been accessing deletable ones - that's what row.Fetcher does in two places that needed a fix). Release note: None
610a011 to
999533f
Compare
|
bors r+ |
|
Build succeeded: |
tree: unwrap DOidWrapper before casting
When performing a cast from
DOidWrapper, we're interested in castingthe wrapped datum, so this commit adds the unwrapping to the top of
performCast.Release note: None
colexec: introduce streaming mode to the columnarizer
Previously, the columnarizer would always buffer up tuples (dynamically,
up to
coldata.BatchSize()) before emitting them as output; however,there are several processors which are of "streaming" nature and such
buffering approach is not compatible when wrapping them. This commit
introduces a streaming mode of operation to the columnarizer which is
used when wrapping a streaming processor (newly introduced marker
interface that currently is implemented by the
planNodeToRowSourceandthe changefeed processors).
Note that there are still some unresolved issues around the changefeed
processors, so they aren't being wrapped into the vectorized flows.
Release note: None
colexec: add support for wrapping LocalPlanNode
This commit adds the support for wrapping
LocalPlanNodecore. It alsochanges the meaning of
experimental_alwaysto allow wrapping ofJoinReader and LocalPlanNode cores. The latter allows us to remove
VectorizeAlwaysException.
Notable change is that now the materializer created when wrapping
row-execution processors are no longer added to the set of releasables
at the flow cleanup because in some cases it could be released before
being closed. Namely, this would occur if we have a subquery with
LocalPlanNode core and a materializer is added in order to wrap that
core - what will happen is that all releasables are put back into their
pools upon the subquery's flow cleanup, yet the subquery planNode tree
isn't closed yet since its closure is down when the main planNode tree
is being closed.
Another change is fixing the tracing in the cFetcher during mutations
(we were accessing public only columns whereas we should have been
accessing deletable ones - that's what row.Fetcher does in two places
that needed a fix).
Addresses: #57268.
Release note: None