colexecop: modify Operator.Next to return metadata#157869
colexecop: modify Operator.Next to return metadata#157869craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
|
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
8633889 to
da63d25
Compare
624d65a to
5c2b5bd
Compare
9462684 to
57cfe66
Compare
a22bb0b to
9de567f
Compare
|
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
|
This is RFAL. Note for reviewers: currently the PR will be red since I deliberately pushed only non-trivial changes to make reviewing them easier. I have two more commits that I'll push once we're happy with the non-trivial changes for review (one is trivial production code changes, and another is test-related changes; the CI is green with them included). Eventually all 4 commits will be squashed into one. The main difficulty with adding the ability to return streaming metadata is that we might get a piece of metadata at an "inconvenient" time in the Operator's lifecycle. This forces us to keep more state across |
|
In order to test this change better, over in #158029 I'm injecting metadata randomly via UPDATE: that testing change will be included into this PR. |
michae2
left a comment
There was a problem hiding this comment.
@michae2 reviewed 1 of 1 files at r1, 32 of 32 files at r3.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball)
pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):
tuplesToBeSet = 1 } if !c.continueBatch {
Random idea that you didn't ask for: it might be simpler to write these operators as push-iterators, and then use iter.Pull2 to convert them into a Next() function that can be called from the parent operator.
By doing that we could write the columnarizer something like:
...
var nRows int
for row, meta, ok := next(); ok; row, meta, ok = next() {
if meta != nil {
if meta.Err != nil {
colexecerror.ExpectedError(meta.Err)
}
if !yield(nil, meta) {
break
}
continue
}
if row == nil {
break
}
EncDatumRowToColVecs(row, nRows, c.vecs, c.typs, &c.da)
nRows++
if c.helper.AccountForSet(nRows) {
c.batch.SetLength(nRows)
if !yield(c.batch, nil) {
break
}
nRows = 0
c.batch, reallocated = c.helper.ResetMaybeReallocate(c.typs, c.batch, tuplesToBeSet)
...
}
}
...
Without having to manually reify control flow or local variables into state like continueBatch or nRows.
Anyway, I'll set that idea aside and keep reviewing. Just an idea.
mgartner
left a comment
There was a problem hiding this comment.
@mgartner reviewed 1 of 1 files at r1, 13 of 32 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball and @michae2)
pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):
Previously, michae2 (Michael Erickson) wrote…
Random idea that you didn't ask for: it might be simpler to write these operators as push-iterators, and then use iter.Pull2 to convert them into a Next() function that can be called from the parent operator.
By doing that we could write the columnarizer something like:
... var nRows int for row, meta, ok := next(); ok; row, meta, ok = next() { if meta != nil { if meta.Err != nil { colexecerror.ExpectedError(meta.Err) } if !yield(nil, meta) { break } continue } if row == nil { break } EncDatumRowToColVecs(row, nRows, c.vecs, c.typs, &c.da) nRows++ if c.helper.AccountForSet(nRows) { c.batch.SetLength(nRows) if !yield(c.batch, nil) { break } nRows = 0 c.batch, reallocated = c.helper.ResetMaybeReallocate(c.typs, c.batch, tuplesToBeSet) ... } } ...Without having to manually reify control flow or local variables into state like
continueBatchornRows.Anyway, I'll set that idea aside and keep reviewing. Just an idea.
I'll pile on another unsolicited idea: What if we passed a callback for handling metadata down into operators so that metadata can be passed up through the operator tree outside the control flow of the loop? I don't have a good grasp of exactly where the metadata originates or terminates, so maybe this a bad idea.
There is some cost in allocating a callback which maybe you were trying to avoid? If the metadata terminates at a single point, we'd only need a single callback, I think. And the metadata would go directly to the termination point, rather than step through each operator in the tree.
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @mgartner, and @michae2)
pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):
Previously, mgartner (Marcus Gartner) wrote…
I'll pile on another unsolicited idea: What if we passed a callback for handling metadata down into operators so that metadata can be passed up through the operator tree outside the control flow of the loop? I don't have a good grasp of exactly where the metadata originates or terminates, so maybe this a bad idea.
There is some cost in allocating a callback which maybe you were trying to avoid? If the metadata terminates at a single point, we'd only need a single callback, I think. And the metadata would go directly to the termination point, rather than step through each operator in the tree.
Thanks for the suggestions.
I'll think more about the refactor of changing pull iterators into push ones, but intuitively the current approach seems easier to follow / more understandable to me.
Re: callback idea. This actually sounds similar to how I tried to implement this initially in #65586 where I introduced a way to send metadata outside of the main data path. There are a couple of difficulties with that approach, one being that the final destination for both data (rows or batches) and metadata is DistSQLReceiver on the gateway node, and it's not thread-safe to push into it, so we need to have a synchronization mechanism between data and metadata arriving which I addressed by having separate goroutines and making some copies, but that turned out to have non-trivial perf impact (on the order of 10% slowdown on a simple workload).
Another difficulty of the callback approach is that the "true" metadata sink DistSQLReceiver could be outside of the tree of the operator that wants to emit the metadata (which also means running in different goroutines, possibly on different nodes), so we first need to push it to the root of the subtree the operator is in, and then ensure that the root pushes it towards the main subtree (so an example sequence of pushes could be something like ColBatchScan -> Outbox - network hop -> Inbox -> Synchronizer -> FlowCoordinator -> DistSQLReceiver). We either need to have a separate goroutine polling for metadata on some of these steps (which turned out to be expensive), or we'll be buffering up metadata on these intermediate "sinks" that will be flushed only by the main goroutine when there is data to push further out (but that defeats the purpose of "streaming" metadata propagation).
For a bit of historical context, back when we filed #35280 and added the current way of metadata propagation, we only had "trailing metadata" use case in the row-by-row engine, so the approach of calling DrainMeta during the plan shutdown was sufficient. Soon after we added the query progress metadata in the row-by-row engine which happened to just work there (because of explicitly propagating both data and metadata) and didn't work in the vectorized engine. Perhaps if that use case was already implemented before #35280 was addressed, we'd have just mimicked the row-by-row engine right away, but I think we didn't go that route in order to simplify the Next signature and due to performance considerations (having to check whether metadata is non-nil on every Next call), but mostly to avoid the large change that this PR ended up doing.
michae2
left a comment
There was a problem hiding this comment.
Looking good so far! (I've made it through topk.) Will keep reading.
@michae2 reviewed 6 of 32 files at r4.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @mgartner, and @yuzefovich)
pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):
I'll think more about the refactor of changing pull iterators into push ones, but intuitively the current approach seems easier to follow / more understandable to me.
That's fine. I think the current approach probably also performs better than setting up a bunch of coroutines, each with their own stacks.
pkg/sql/colexec/parallel_unordered_synchronizer.go line 36 at r4 (raw file):
// // Note that either a batch or metadata must be sent, but not both. It's also // possible that both batch and metadata are nil (when then input has been
typo: "the" instead of "then"
pkg/sql/colexec/parallel_unordered_synchronizer.go line 341 at r4 (raw file):
if s.metas[inputIdx] != nil { msg.meta = metaScratch[:] msg.meta[0] = *s.metas[inputIdx]
Do we need to make a copy of the metadata?
pkg/sql/colexec/parallel_unordered_synchronizer.go line 361 at r4 (raw file):
} if meta := execinfra.GetTraceDataAsMetadata(s.flowCtx, span); meta != nil { msg.meta = append(msg.meta, *meta)
Is msg.meta ever longer than 1? I think I'm not quite understanding when it can be longer.
pkg/sql/colexec/parallel_unordered_synchronizer.go line 378 at r4 (raw file):
} if sentDrainedMeta {
Instead of using sentDrainedMeta could this now be state == parallelUnorderedSynchronizerStateDraining?
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @mgartner, and @michae2)
pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):
Previously, michae2 (Michael Erickson) wrote…
I'll think more about the refactor of changing pull iterators into push ones, but intuitively the current approach seems easier to follow / more understandable to me.
That's fine. I think the current approach probably also performs better than setting up a bunch of coroutines, each with their own stacks.
Ack, let's keep the current approach then.
pkg/sql/colexec/parallel_unordered_synchronizer.go line 341 at r4 (raw file):
Previously, michae2 (Michael Erickson) wrote…
Do we need to make a copy of the metadata?
I don't think we need to do that (in a sense that is not required), but the signature of DrainMeta is such that we return a slice of values of ProducerMetadata which is the reason we have msg.meta field defined the same way. I left a TODO on DrainMeta interface to look into changing the signature which will allow us to avoid the copy here. (We'll need to audit metadata usage carefully since we do reuse some metadata-related objects in different places.)
pkg/sql/colexec/parallel_unordered_synchronizer.go line 361 at r4 (raw file):
Previously, michae2 (Michael Erickson) wrote…
Is
msg.metaever longer than 1? I think I'm not quite understanding when it can be longer.
It could get longer if we have non-empty MetadataSources below that return some drained metadata.
pkg/sql/colexec/parallel_unordered_synchronizer.go line 378 at r4 (raw file):
Previously, michae2 (Michael Erickson) wrote…
Instead of using
sentDrainedMetacould this now bestate == parallelUnorderedSynchronizerStateDraining?
No, these two are somewhat different. A single input to the PUS could send drained meta once it's fully exhausted, yet the PUS might still keep on producing more data that is needed by its output - only when the output calls DrainMeta on the PUS does it transition into the draining state (also when one of the inputs encounters an error which triggers the PUS's shutdown).
|
Thanks for the context. I agree this is a good reason to keep metadata and row data in the same path. 👍 |
|
@michae2 how swamped are you with things? Is this something that we can get over the line and backport to 26.1, or would that be too much of a stretch? |
|
@DrewKimball perhaps you have some bandwidth to help with reviewing this patch? The final state can be viewed at #158029, but I'm keeping only the first two commits here for now (even though CI is red) to focus on the important bits. |
DrewKimball
left a comment
There was a problem hiding this comment.
Had a few questions, but overall LGTM
| allZero := true | ||
| for i := range op.inputs { | ||
| batches[i] = op.inputs[i].Next() | ||
| if op.storedLeftBatch != nil { |
There was a problem hiding this comment.
Maybe a test-only assertion here that i == 0?
Also, do you think it would be useful to inject some metadata randomly during tests to make sure all the control flow is correct?
There was a problem hiding this comment.
Maybe a test-only assertion here that i == 0?
Good point, done.
Also, do you think it would be useful to inject some metadata randomly during tests to make sure all the control flow is correct?
That's a good idea, and I have a commit that does that (used to be last in the other WIP PR). I just pushed 3 more commits that I want to squash, and the last one does what you're suggesting.
| // willEmit, so here is a good time to see whether any metadata was buffered | ||
| // (the code below is only about building the cross-product based on what | ||
| // we've already read from the input). | ||
| if meta := c.maybeEmitMeta(); meta != nil { |
There was a problem hiding this comment.
nit: for clarity, can we move this before checking willEmit?
There was a problem hiding this comment.
Hm, in willEmit we might read a batch (in one case we call readNextLeftBatch), so we might get metadata there. If we move the check above, we'd have to add one more - this is what the comment describes.
yuzefovich
left a comment
There was a problem hiding this comment.
Thanks for the review! Just pushed 3 more commits to review, and now the CI should be green.
| // willEmit, so here is a good time to see whether any metadata was buffered | ||
| // (the code below is only about building the cross-product based on what | ||
| // we've already read from the input). | ||
| if meta := c.maybeEmitMeta(); meta != nil { |
There was a problem hiding this comment.
Hm, in willEmit we might read a batch (in one case we call readNextLeftBatch), so we might get metadata there. If we move the check above, we'd have to add one more - this is what the comment describes.
| allZero := true | ||
| for i := range op.inputs { | ||
| batches[i] = op.inputs[i].Next() | ||
| if op.storedLeftBatch != nil { |
There was a problem hiding this comment.
Maybe a test-only assertion here that i == 0?
Good point, done.
Also, do you think it would be useful to inject some metadata randomly during tests to make sure all the control flow is correct?
That's a good idea, and I have a commit that does that (used to be last in the other WIP PR). I just pushed 3 more commits that I want to squash, and the last one does what you're suggesting.
This commit changes `Operator.Next` interface to add an ability to return metadata in the "streaming" fashion, similar to how it's done in the row-by-row engine. This will allow us to support features like estimated query progress (based on the number of rows scanned so far). Note that we still preserve the concept of MetadataSources, but it'll become similar to "trailing metadata" concept in the row-by-row engine, i.e. it'll only handle metadata that is produced when the component is being drained. The following Operators are affected have non-trivial state transitions: - columnarizer - we're buffering multiple rows into a batch, so we need to continue building the current batch if there is some metadata between rows. Note that we don't change the error propagation and still throw it immediately. Additionally, since we no longer buffer the metadata, we can remove some memory accounting. - ordered synchronizer is similar to columnarizer - we need to ensure continuing building the current output batch if we get a metadata when fetching the new batch from one of the inputs. Additionaly we need handle the heap initialization carefully if we get a metadata object before any batches from some inputs. - parallel unordered synchronizer - here we needed to augment the logic so that metadata can be propagated in the streaming fashion. Previously, if an input sent the metadata, then it meant that the input has been drained, which is no longer true so we store the last message from one of the inputs and emit the metadata from it until exhausted. This allowed us to simplify some other logic about buffering the metadata though. - top K sort is somewhat similar to the ordered synchronizer - it first spills some number of batches to reach the desired K total, and then it reads one batch at a time comparing against the current maximum row (one input row from the batch at a time). Here we needed to adjust the logic to initialize the heap only once as well as maintain some state across Next calls. - hashBasedPartitioner - if the metadata is received from the 2nd input after having read a batch from the 1st input, then that batch must be used on the next iteration - hashJoiner - whenever the metadata is received from either input, it's propagated immediately, but then the execution flow can just be resumed (because the hashJoiner already has the necessary state machine) - crossJoiner and mergeJoiner read from inputs at different points in time and have non-trivial state transitions, so we handle them by introducing a helper struct that buffers the metadata, and then the operator must check for metadata as soon as its convenient. In the merge joiner we also need to continue building the output batch without resetting. Additionally, just to be safe, we mark both as MetadataSources to guarantee that all metadata is emitted (I think that only with ungraceful shutdown we can have some metadata still buffered in these two). - index joiner - need to preserve the row count that we've buffered for span generation so far, across Next calls, as well as reset the input batch mem size only when we're transitioning to constructing spans. - hash router - here we'll forward the metadata to the first stream (similar to what we do in the row-by-row routers). Additionally each router output is responsible for emitting forwarded to it metadata on its DrainMeta implementation (if unable to emit that forwarded metadata in the streaming fashion). - inbox - simply need to ensure that all metadata is returned as soon as possible, or at the very least during DrainMeta. Note that we still panic with an error in the metadata since we're not changing how errors are propagated throughout the vectorized engine. - outbox - here we send a new message for every metadata object from the input. Note that we could consider buffering up some metadata and sending together with the next batch, but that's left as a TODO (I doubt that minor increase in DistSQL messages will be noticeable, unless we start emitting metadata very frequently). This commit also extends the invariantsChecker (which is an operator that we plan in test-only builds that verifies some assumptions) to randomly inject metadata on its Next calls. This improves the test coverage of the streaming metadata in the vectorized engine (and it uncovered one bug in the top K sorter). We inject RowNum metadata that is only used in tests, and then we needed to adjust some tests to allow for that type of metadata. `NextNoMeta` helper method has been adjusted to silently swallow this metadata. Release note: None
|
Thanks everyone for the reviews and the feedback! I reduced the probability of injecting metadata to 1% since I think it can noticeably slow down tests, perhaps we might need to reduce this even further, to be seen. |
|
bors r+ |
157869: colexecop: modify Operator.Next to return metadata r=yuzefovich a=yuzefovich This commit changes `Operator.Next` interface to add an ability to return metadata in the "streaming" fashion, similar to how it's done in the row-by-row engine. This will allow us to support features like estimated query progress (based on the number of rows scanned so far). Note that we still preserve the concept of MetadataSources, but it'll become similar to "trailing metadata" concept in the row-by-row engine, i.e. it'll only handle metadata that is produced when the component is being drained. The following Operators are affected have non-trivial state transitions: - columnarizer - we're buffering multiple rows into a batch, so we need to continue building the current batch if there is some metadata between rows. Note that we don't change the error propagation and still throw it immediately. Additionally, since we no longer buffer the metadata, we can remove some memory accounting. - ordered synchronizer is similar to columnarizer - we need to ensure continuing building the current output batch if we get a metadata when fetching the new batch from one of the inputs. Additionaly we need handle the heap initialization carefully if we get a metadata object before any batches from some inputs. - parallel unordered synchronizer - here we needed to augment the logic so that metadata can be propagated in the streaming fashion. Previously, if an input sent the metadata, then it meant that the input has been drained, which is no longer true so we store the last message from one of the inputs and emit the metadata from it until exhausted. This allowed us to simplify some other logic about buffering the metadata though. - top K sort is somewhat similar to the ordered synchronizer - it first spills some number of batches to reach the desired K total, and then it reads one batch at a time comparing against the current maximum row (one input row from the batch at a time). Here we needed to adjust the logic to initialize the heap only once as well as maintain some state across Next calls. - hashBasedPartitioner - if the metadata is received from the 2nd input after having read a batch from the 1st input, then that batch must be used on the next iteration - hashJoiner - whenever the metadata is received from either input, it's propagated immediately, but then the execution flow can just be resumed (because the hashJoiner already has the necessary state machine) - crossJoiner and mergeJoiner read from inputs at different points in time and have non-trivial state transitions, so we handle them by introducing a helper struct that buffers the metadata, and then the operator must check for metadata as soon as its convenient. In the merge joiner we also need to continue building the output batch without resetting. Additionally, just to be safe, we mark both as MetadataSources to guarantee that all metadata is emitted (I think that only with ungraceful shutdown we can have some metadata still buffered in these two). - index joiner - need to preserve the row count that we've buffered for span generation so far, across Next calls, as well as reset the input batch mem size only when we're transitioning to constructing spans. - hash router - here we'll forward the metadata to the first stream (similar to what we do in the row-by-row routers). Additionally each router output is responsible for emitting forwarded to it metadata on its DrainMeta implementation (if unable to emit that forwarded metadata in the streaming fashion). - inbox - simply need to ensure that all metadata is returned as soon as possible, or at the very least during DrainMeta. Note that we still panic with an error in the metadata since we're not changing how errors are propagated throughout the vectorized engine. - outbox - here we send a new message for every metadata object from the input. Note that we could consider buffering up some metadata and sending together with the next batch, but that's left as a TODO (I doubt that minor increase in DistSQL messages will be noticeable, unless we start emitting metadata very frequently). This commit also extends the invariantsChecker (which is an operator that we plan in test-only builds that verifies some assumptions) to randomly inject metadata on its Next calls. This improves the test coverage of the streaming metadata in the vectorized engine (and it uncovered one bug in the top K sorter). We inject RowNum metadata that is only used in tests, and then we needed to adjust some tests to allow for that type of metadata. `NextNoMeta` helper method has been adjusted to silently swallow this metadata. Fixes: #55758. Release note: None 160095: sql: use `ExecutorConfig` in inspect logger r=bghal a=bghal The inspect resumer needs to be able to log issues. The logger can use the `ExecutorConfig` embedded in the context without changing its behavior. Part of: #155472 Epic: CRDB-55075 Release note: None 160760: execbuilder: fix a stats-related flake in a new test r=yuzefovich a=yuzefovich Fixes: #160752. Fixes: #160753. Release note: None Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com> Co-authored-by: Brendan Gerrity <brendan.gerrity@cockroachlabs.com>
|
Build failed (retrying...): |
This commit changes
Operator.Nextinterface to add an ability toreturn metadata in the "streaming" fashion, similar to how it's done in
the row-by-row engine. This will allow us to support features like
estimated query progress (based on the number of rows scanned so far).
Note that we still preserve the concept of MetadataSources, but it'll
become similar to "trailing metadata" concept in the row-by-row engine,
i.e. it'll only handle metadata that is produced when the component is
being drained.
The following Operators are affected have non-trivial state transitions:
to continue building the current batch if there is some metadata between
rows. Note that we don't change the error propagation and still throw it
immediately. Additionally, since we no longer buffer the metadata, we
can remove some memory accounting.
continuing building the current output batch if we get a metadata when
fetching the new batch from one of the inputs. Additionaly we need
handle the heap initialization carefully if we get a metadata object
before any batches from some inputs.
so that metadata can be propagated in the streaming fashion. Previously,
if an input sent the metadata, then it meant that the input has been
drained, which is no longer true so we store the last message from one
of the inputs and emit the metadata from it until exhausted. This
allowed us to simplify some other logic about buffering the metadata
though.
spills some number of batches to reach the desired K total, and then it
reads one batch at a time comparing against the current maximum row (one
input row from the batch at a time). Here we needed to adjust the logic
to initialize the heap only once as well as maintain some state across
Next calls.
after having read a batch from the 1st input, then that batch must be
used on the next iteration
propagated immediately, but then the execution flow can just be resumed
(because the hashJoiner already has the necessary state machine)
time and have non-trivial state transitions, so we handle them by
introducing a helper struct that buffers the metadata, and then the
operator must check for metadata as soon as its convenient. In the merge
joiner we also need to continue building the output batch without
resetting. Additionally, just to be safe, we mark both as MetadataSources to
guarantee that all metadata is emitted (I think that only with
ungraceful shutdown we can have some metadata still buffered in these
two).
span generation so far, across Next calls, as well as reset the input
batch mem size only when we're transitioning to constructing spans.
(similar to what we do in the row-by-row routers). Additionally each
router output is responsible for emitting forwarded to it metadata on
its DrainMeta implementation (if unable to emit that forwarded metadata
in the streaming fashion).
as possible, or at the very least during DrainMeta. Note that we still
panic with an error in the metadata since we're not changing how errors
are propagated throughout the vectorized engine.
input. Note that we could consider buffering up some metadata and
sending together with the next batch, but that's left as a TODO (I doubt
that minor increase in DistSQL messages will be noticeable, unless we
start emitting metadata very frequently).
This commit also extends the invariantsChecker (which is an operator that we
plan in test-only builds that verifies some assumptions) to randomly
inject metadata on its Next calls. This improves the test coverage of
the streaming metadata in the vectorized engine (and it uncovered one
bug in the top K sorter). We inject RowNum metadata that is only used in
tests, and then we needed to adjust some tests to allow for that type of
metadata.
NextNoMetahelper method has been adjusted to silentlyswallow this metadata.
Fixes: #55758.
Release note: None