Skip to content

colexecop: modify Operator.Next to return metadata#157869

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
yuzefovich:vec-meta
Jan 10, 2026
Merged

colexecop: modify Operator.Next to return metadata#157869
craig[bot] merged 1 commit intocockroachdb:masterfrom
yuzefovich:vec-meta

Conversation

@yuzefovich
Copy link
Copy Markdown
Member

@yuzefovich yuzefovich commented Nov 15, 2025

This commit changes Operator.Next interface to add an ability to
return metadata in the "streaming" fashion, similar to how it's done in
the row-by-row engine. This will allow us to support features like
estimated query progress (based on the number of rows scanned so far).
Note that we still preserve the concept of MetadataSources, but it'll
become similar to "trailing metadata" concept in the row-by-row engine,
i.e. it'll only handle metadata that is produced when the component is
being drained.

The following Operators are affected have non-trivial state transitions:

  • columnarizer - we're buffering multiple rows into a batch, so we need
    to continue building the current batch if there is some metadata between
    rows. Note that we don't change the error propagation and still throw it
    immediately. Additionally, since we no longer buffer the metadata, we
    can remove some memory accounting.
  • ordered synchronizer is similar to columnarizer - we need to ensure
    continuing building the current output batch if we get a metadata when
    fetching the new batch from one of the inputs. Additionaly we need
    handle the heap initialization carefully if we get a metadata object
    before any batches from some inputs.
  • parallel unordered synchronizer - here we needed to augment the logic
    so that metadata can be propagated in the streaming fashion. Previously,
    if an input sent the metadata, then it meant that the input has been
    drained, which is no longer true so we store the last message from one
    of the inputs and emit the metadata from it until exhausted. This
    allowed us to simplify some other logic about buffering the metadata
    though.
  • top K sort is somewhat similar to the ordered synchronizer - it first
    spills some number of batches to reach the desired K total, and then it
    reads one batch at a time comparing against the current maximum row (one
    input row from the batch at a time). Here we needed to adjust the logic
    to initialize the heap only once as well as maintain some state across
    Next calls.
  • hashBasedPartitioner - if the metadata is received from the 2nd input
    after having read a batch from the 1st input, then that batch must be
    used on the next iteration
  • hashJoiner - whenever the metadata is received from either input, it's
    propagated immediately, but then the execution flow can just be resumed
    (because the hashJoiner already has the necessary state machine)
  • crossJoiner and mergeJoiner read from inputs at different points in
    time and have non-trivial state transitions, so we handle them by
    introducing a helper struct that buffers the metadata, and then the
    operator must check for metadata as soon as its convenient. In the merge
    joiner we also need to continue building the output batch without
    resetting. Additionally, just to be safe, we mark both as MetadataSources to
    guarantee that all metadata is emitted (I think that only with
    ungraceful shutdown we can have some metadata still buffered in these
    two).
  • index joiner - need to preserve the row count that we've buffered for
    span generation so far, across Next calls, as well as reset the input
    batch mem size only when we're transitioning to constructing spans.
  • hash router - here we'll forward the metadata to the first stream
    (similar to what we do in the row-by-row routers). Additionally each
    router output is responsible for emitting forwarded to it metadata on
    its DrainMeta implementation (if unable to emit that forwarded metadata
    in the streaming fashion).
  • inbox - simply need to ensure that all metadata is returned as soon
    as possible, or at the very least during DrainMeta. Note that we still
    panic with an error in the metadata since we're not changing how errors
    are propagated throughout the vectorized engine.
  • outbox - here we send a new message for every metadata object from the
    input. Note that we could consider buffering up some metadata and
    sending together with the next batch, but that's left as a TODO (I doubt
    that minor increase in DistSQL messages will be noticeable, unless we
    start emitting metadata very frequently).

This commit also extends the invariantsChecker (which is an operator that we
plan in test-only builds that verifies some assumptions) to randomly
inject metadata on its Next calls. This improves the test coverage of
the streaming metadata in the vectorized engine (and it uncovered one
bug in the top K sorter). We inject RowNum metadata that is only used in
tests, and then we needed to adjust some tests to allow for that type of
metadata. NextNoMeta helper method has been adjusted to silently
swallow this metadata.

Fixes: #55758.

Release note: None

@blathers-crl
Copy link
Copy Markdown

blathers-crl bot commented Nov 15, 2025

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@yuzefovich yuzefovich force-pushed the vec-meta branch 2 times, most recently from 8633889 to da63d25 Compare November 15, 2025 07:02
@cockroach-teamcity cockroach-teamcity added the X-perf-gain Microbenchmarks CI: Added if a performance gain is detected label Nov 15, 2025
@yuzefovich yuzefovich force-pushed the vec-meta branch 7 times, most recently from 624d65a to 5c2b5bd Compare November 18, 2025 07:04
@yuzefovich yuzefovich removed the X-perf-gain Microbenchmarks CI: Added if a performance gain is detected label Nov 18, 2025
@github-actions github-actions bot added the o-AI-Review-Potential-Issue-Detected AI reviewer found potential issue. Never assign manually—auto-applied by GH action only. label Nov 18, 2025
@cockroachdb cockroachdb deleted a comment from github-actions bot Nov 18, 2025
@yuzefovich yuzefovich added O-No-AI-Review Prevents AI Review from running and removed o-AI-Review-Potential-Issue-Detected AI reviewer found potential issue. Never assign manually—auto-applied by GH action only. labels Nov 18, 2025
@yuzefovich yuzefovich force-pushed the vec-meta branch 2 times, most recently from 9462684 to 57cfe66 Compare November 19, 2025 02:16
@yuzefovich yuzefovich added X-perf-check Microbenchmarks CI: Added to a PR if a performance regression is detected and should be checked O-AI-Review and removed O-No-AI-Review Prevents AI Review from running X-perf-check Microbenchmarks CI: Added to a PR if a performance regression is detected and should be checked O-AI-Review labels Nov 19, 2025
@yuzefovich yuzefovich changed the title colexecop: change Next to also return metadata colexecop: modify Operator.Next to return metadata Nov 19, 2025
@yuzefovich yuzefovich force-pushed the vec-meta branch 2 times, most recently from a22bb0b to 9de567f Compare November 19, 2025 04:06
@blathers-crl
Copy link
Copy Markdown

blathers-crl bot commented Nov 19, 2025

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@yuzefovich
Copy link
Copy Markdown
Member Author

This is RFAL.

Note for reviewers: currently the PR will be red since I deliberately pushed only non-trivial changes to make reviewing them easier. I have two more commits that I'll push once we're happy with the non-trivial changes for review (one is trivial production code changes, and another is test-related changes; the CI is green with them included). Eventually all 4 commits will be squashed into one.

The main difficulty with adding the ability to return streaming metadata is that we might get a piece of metadata at an "inconvenient" time in the Operator's lifecycle. This forces us to keep more state across Next calls to be able to resume whatever we were doing before metadata came along. Most non-trivial operators already have an internal state machine set up which makes things mostly just work. In 2nd commit I outlined briefly the changes needed for each non-trivial operator, in the order the files appear in Reviewable.

@yuzefovich
Copy link
Copy Markdown
Member Author

yuzefovich commented Nov 20, 2025

In order to test this change better, over in #158029 I'm injecting metadata randomly via invariantsChecker (which we plan in test-only builds). This already exposed a bug in the top K sorter (now fixed), and I probably will merge that improved testing, perhaps after the main patch merges - it'll require changes to colexecop.NextNoMeta usage.

UPDATE: that testing change will be included into this PR.

Copy link
Copy Markdown
Collaborator

@michae2 michae2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michae2 reviewed 1 of 1 files at r1, 32 of 32 files at r3.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball)


pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):

		tuplesToBeSet = 1
	}
	if !c.continueBatch {

Random idea that you didn't ask for: it might be simpler to write these operators as push-iterators, and then use iter.Pull2 to convert them into a Next() function that can be called from the parent operator.

By doing that we could write the columnarizer something like:

...
var nRows int
for row, meta, ok := next(); ok; row, meta, ok = next() {
  if meta != nil {
    if meta.Err != nil {
      colexecerror.ExpectedError(meta.Err)
    }
    if !yield(nil, meta) {
      break
    }
    continue
  }
  if row == nil {
    break
  }
  EncDatumRowToColVecs(row, nRows, c.vecs, c.typs, &c.da)
  nRows++
  if c.helper.AccountForSet(nRows) {
    c.batch.SetLength(nRows)
    if !yield(c.batch, nil) {
      break
    }
    nRows = 0
    c.batch, reallocated = c.helper.ResetMaybeReallocate(c.typs, c.batch, tuplesToBeSet)
    ...
  }
}
...

Without having to manually reify control flow or local variables into state like continueBatch or nRows.

Anyway, I'll set that idea aside and keep reviewing. Just an idea.

Copy link
Copy Markdown
Contributor

@mgartner mgartner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgartner reviewed 1 of 1 files at r1, 13 of 32 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball and @michae2)


pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):

Previously, michae2 (Michael Erickson) wrote…

Random idea that you didn't ask for: it might be simpler to write these operators as push-iterators, and then use iter.Pull2 to convert them into a Next() function that can be called from the parent operator.

By doing that we could write the columnarizer something like:

...
var nRows int
for row, meta, ok := next(); ok; row, meta, ok = next() {
  if meta != nil {
    if meta.Err != nil {
      colexecerror.ExpectedError(meta.Err)
    }
    if !yield(nil, meta) {
      break
    }
    continue
  }
  if row == nil {
    break
  }
  EncDatumRowToColVecs(row, nRows, c.vecs, c.typs, &c.da)
  nRows++
  if c.helper.AccountForSet(nRows) {
    c.batch.SetLength(nRows)
    if !yield(c.batch, nil) {
      break
    }
    nRows = 0
    c.batch, reallocated = c.helper.ResetMaybeReallocate(c.typs, c.batch, tuplesToBeSet)
    ...
  }
}
...

Without having to manually reify control flow or local variables into state like continueBatch or nRows.

Anyway, I'll set that idea aside and keep reviewing. Just an idea.

I'll pile on another unsolicited idea: What if we passed a callback for handling metadata down into operators so that metadata can be passed up through the operator tree outside the control flow of the loop? I don't have a good grasp of exactly where the metadata originates or terminates, so maybe this a bad idea.

There is some cost in allocating a callback which maybe you were trying to avoid? If the metadata terminates at a single point, we'd only need a single callback, I think. And the metadata would go directly to the termination point, rather than step through each operator in the tree.

Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @mgartner, and @michae2)


pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):

Previously, mgartner (Marcus Gartner) wrote…

I'll pile on another unsolicited idea: What if we passed a callback for handling metadata down into operators so that metadata can be passed up through the operator tree outside the control flow of the loop? I don't have a good grasp of exactly where the metadata originates or terminates, so maybe this a bad idea.

There is some cost in allocating a callback which maybe you were trying to avoid? If the metadata terminates at a single point, we'd only need a single callback, I think. And the metadata would go directly to the termination point, rather than step through each operator in the tree.

Thanks for the suggestions.

I'll think more about the refactor of changing pull iterators into push ones, but intuitively the current approach seems easier to follow / more understandable to me.

Re: callback idea. This actually sounds similar to how I tried to implement this initially in #65586 where I introduced a way to send metadata outside of the main data path. There are a couple of difficulties with that approach, one being that the final destination for both data (rows or batches) and metadata is DistSQLReceiver on the gateway node, and it's not thread-safe to push into it, so we need to have a synchronization mechanism between data and metadata arriving which I addressed by having separate goroutines and making some copies, but that turned out to have non-trivial perf impact (on the order of 10% slowdown on a simple workload).

Another difficulty of the callback approach is that the "true" metadata sink DistSQLReceiver could be outside of the tree of the operator that wants to emit the metadata (which also means running in different goroutines, possibly on different nodes), so we first need to push it to the root of the subtree the operator is in, and then ensure that the root pushes it towards the main subtree (so an example sequence of pushes could be something like ColBatchScan -> Outbox - network hop -> Inbox -> Synchronizer -> FlowCoordinator -> DistSQLReceiver). We either need to have a separate goroutine polling for metadata on some of these steps (which turned out to be expensive), or we'll be buffering up metadata on these intermediate "sinks" that will be flushed only by the main goroutine when there is data to push further out (but that defeats the purpose of "streaming" metadata propagation).


For a bit of historical context, back when we filed #35280 and added the current way of metadata propagation, we only had "trailing metadata" use case in the row-by-row engine, so the approach of calling DrainMeta during the plan shutdown was sufficient. Soon after we added the query progress metadata in the row-by-row engine which happened to just work there (because of explicitly propagating both data and metadata) and didn't work in the vectorized engine. Perhaps if that use case was already implemented before #35280 was addressed, we'd have just mimicked the row-by-row engine right away, but I think we didn't go that route in order to simplify the Next signature and due to performance considerations (having to check whether metadata is non-nil on every Next call), but mostly to avoid the large change that this PR ended up doing.

Copy link
Copy Markdown
Collaborator

@michae2 michae2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking good so far! (I've made it through topk.) Will keep reading.

@michae2 reviewed 6 of 32 files at r4.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @mgartner, and @yuzefovich)


pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):

I'll think more about the refactor of changing pull iterators into push ones, but intuitively the current approach seems easier to follow / more understandable to me.

That's fine. I think the current approach probably also performs better than setting up a bunch of coroutines, each with their own stacks.


pkg/sql/colexec/parallel_unordered_synchronizer.go line 36 at r4 (raw file):

//
// Note that either a batch or metadata must be sent, but not both. It's also
// possible that both batch and metadata are nil (when then input has been

typo: "the" instead of "then"


pkg/sql/colexec/parallel_unordered_synchronizer.go line 341 at r4 (raw file):

					if s.metas[inputIdx] != nil {
						msg.meta = metaScratch[:]
						msg.meta[0] = *s.metas[inputIdx]

Do we need to make a copy of the metadata?


pkg/sql/colexec/parallel_unordered_synchronizer.go line 361 at r4 (raw file):

						}
						if meta := execinfra.GetTraceDataAsMetadata(s.flowCtx, span); meta != nil {
							msg.meta = append(msg.meta, *meta)

Is msg.meta ever longer than 1? I think I'm not quite understanding when it can be longer.


pkg/sql/colexec/parallel_unordered_synchronizer.go line 378 at r4 (raw file):

				}

				if sentDrainedMeta {

Instead of using sentDrainedMeta could this now be state == parallelUnorderedSynchronizerStateDraining?

Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @mgartner, and @michae2)


pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):

Previously, michae2 (Michael Erickson) wrote…

I'll think more about the refactor of changing pull iterators into push ones, but intuitively the current approach seems easier to follow / more understandable to me.

That's fine. I think the current approach probably also performs better than setting up a bunch of coroutines, each with their own stacks.

Ack, let's keep the current approach then.


pkg/sql/colexec/parallel_unordered_synchronizer.go line 341 at r4 (raw file):

Previously, michae2 (Michael Erickson) wrote…

Do we need to make a copy of the metadata?

I don't think we need to do that (in a sense that is not required), but the signature of DrainMeta is such that we return a slice of values of ProducerMetadata which is the reason we have msg.meta field defined the same way. I left a TODO on DrainMeta interface to look into changing the signature which will allow us to avoid the copy here. (We'll need to audit metadata usage carefully since we do reuse some metadata-related objects in different places.)


pkg/sql/colexec/parallel_unordered_synchronizer.go line 361 at r4 (raw file):

Previously, michae2 (Michael Erickson) wrote…

Is msg.meta ever longer than 1? I think I'm not quite understanding when it can be longer.

It could get longer if we have non-empty MetadataSources below that return some drained metadata.


pkg/sql/colexec/parallel_unordered_synchronizer.go line 378 at r4 (raw file):

Previously, michae2 (Michael Erickson) wrote…

Instead of using sentDrainedMeta could this now be state == parallelUnorderedSynchronizerStateDraining?

No, these two are somewhat different. A single input to the PUS could send drained meta once it's fully exhausted, yet the PUS might still keep on producing more data that is needed by its output - only when the output calls DrainMeta on the PUS does it transition into the draining state (also when one of the inputs encounters an error which triggers the PUS's shutdown).

@mgartner
Copy link
Copy Markdown
Contributor

pkg/sql/colexec/columnarizer.go line 235 at r4 (raw file):

the "true" metadata sink DistSQLReceiver could be outside of the tree of the operator that wants to emit the metadata (which also means running in different goroutines, possibly on different nodes)

Thanks for the context. I agree this is a good reason to keep metadata and row data in the same path. 👍

@yuzefovich
Copy link
Copy Markdown
Member Author

@michae2 how swamped are you with things? Is this something that we can get over the line and backport to 26.1, or would that be too much of a stretch?

@yuzefovich
Copy link
Copy Markdown
Member Author

yuzefovich commented Jan 7, 2026

@DrewKimball perhaps you have some bandwidth to help with reviewing this patch? The final state can be viewed at #158029, but I'm keeping only the first two commits here for now (even though CI is red) to focus on the important bits.

Copy link
Copy Markdown
Collaborator

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a few questions, but overall LGTM

allZero := true
for i := range op.inputs {
batches[i] = op.inputs[i].Next()
if op.storedLeftBatch != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a test-only assertion here that i == 0?

Also, do you think it would be useful to inject some metadata randomly during tests to make sure all the control flow is correct?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a test-only assertion here that i == 0?

Good point, done.

Also, do you think it would be useful to inject some metadata randomly during tests to make sure all the control flow is correct?

That's a good idea, and I have a commit that does that (used to be last in the other WIP PR). I just pushed 3 more commits that I want to squash, and the last one does what you're suggesting.

// willEmit, so here is a good time to see whether any metadata was buffered
// (the code below is only about building the cross-product based on what
// we've already read from the input).
if meta := c.maybeEmitMeta(); meta != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for clarity, can we move this before checking willEmit?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, in willEmit we might read a batch (in one case we call readNextLeftBatch), so we might get metadata there. If we move the check above, we'd have to add one more - this is what the comment describes.

Copy link
Copy Markdown
Member Author

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! Just pushed 3 more commits to review, and now the CI should be green.

// willEmit, so here is a good time to see whether any metadata was buffered
// (the code below is only about building the cross-product based on what
// we've already read from the input).
if meta := c.maybeEmitMeta(); meta != nil {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, in willEmit we might read a batch (in one case we call readNextLeftBatch), so we might get metadata there. If we move the check above, we'd have to add one more - this is what the comment describes.

allZero := true
for i := range op.inputs {
batches[i] = op.inputs[i].Next()
if op.storedLeftBatch != nil {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a test-only assertion here that i == 0?

Good point, done.

Also, do you think it would be useful to inject some metadata randomly during tests to make sure all the control flow is correct?

That's a good idea, and I have a commit that does that (used to be last in the other WIP PR). I just pushed 3 more commits that I want to squash, and the last one does what you're suggesting.

Copy link
Copy Markdown
Collaborator

@DrewKimball DrewKimball left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! LGTM

This commit changes `Operator.Next` interface to add an ability to
return metadata in the "streaming" fashion, similar to how it's done in
the row-by-row engine. This will allow us to support features like
estimated query progress (based on the number of rows scanned so far).
Note that we still preserve the concept of MetadataSources, but it'll
become similar to "trailing metadata" concept in the row-by-row engine,
i.e. it'll only handle metadata that is produced when the component is
being drained.

The following Operators are affected have non-trivial state transitions:
- columnarizer - we're buffering multiple rows into a batch, so we need
to continue building the current batch if there is some metadata between
rows. Note that we don't change the error propagation and still throw it
immediately. Additionally, since we no longer buffer the metadata, we
can remove some memory accounting.
- ordered synchronizer is similar to columnarizer - we need to ensure
continuing building the current output batch if we get a metadata when
fetching the new batch from one of the inputs. Additionaly we need
handle the heap initialization carefully if we get a metadata object
before any batches from some inputs.
- parallel unordered synchronizer - here we needed to augment the logic
so that metadata can be propagated in the streaming fashion. Previously,
if an input sent the metadata, then it meant that the input has been
drained, which is no longer true so we store the last message from one
of the inputs and emit the metadata from it until exhausted. This
allowed us to simplify some other logic about buffering the metadata
though.
- top K sort is somewhat similar to the ordered synchronizer - it first
spills some number of batches to reach the desired K total, and then it
reads one batch at a time comparing against the current maximum row (one
input row from the batch at a time). Here we needed to adjust the logic
to initialize the heap only once as well as maintain some state across
Next calls.
- hashBasedPartitioner - if the metadata is received from the 2nd input
after having read a batch from the 1st input, then that batch must be
used on the next iteration
- hashJoiner - whenever the metadata is received from either input, it's
propagated immediately, but then the execution flow can just be resumed
(because the hashJoiner already has the necessary state machine)
- crossJoiner and mergeJoiner read from inputs at different points in
time and have non-trivial state transitions, so we handle them by
introducing a helper struct that buffers the metadata, and then the
operator must check for metadata as soon as its convenient. In the merge
joiner we also need to continue building the output batch without
resetting. Additionally, just to be safe, we mark both as MetadataSources to
guarantee that all metadata is emitted (I think that only with
ungraceful shutdown we can have some metadata still buffered in these
two).
- index joiner - need to preserve the row count that we've buffered for
span generation so far, across Next calls, as well as reset the input
batch mem size only when we're transitioning to constructing spans.
- hash router - here we'll forward the metadata to the first stream
(similar to what we do in the row-by-row routers). Additionally each
router output is responsible for emitting forwarded to it metadata on
its DrainMeta implementation (if unable to emit that forwarded metadata
in the streaming fashion).
- inbox - simply need to ensure that all metadata is returned as soon
as possible, or at the very least during DrainMeta. Note that we still
panic with an error in the metadata since we're not changing how errors
are propagated throughout the vectorized engine.
- outbox - here we send a new message for every metadata object from the
input. Note that we could consider buffering up some metadata and
sending together with the next batch, but that's left as a TODO (I doubt
that minor increase in DistSQL messages will be noticeable, unless we
start emitting metadata very frequently).

This commit also extends the invariantsChecker (which is an operator that we
plan in test-only builds that verifies some assumptions) to randomly
inject metadata on its Next calls. This improves the test coverage of
the streaming metadata in the vectorized engine (and it uncovered one
bug in the top K sorter). We inject RowNum metadata that is only used in
tests, and then we needed to adjust some tests to allow for that type of
metadata. `NextNoMeta` helper method has been adjusted to silently
swallow this metadata.

Release note: None
@yuzefovich
Copy link
Copy Markdown
Member Author

Thanks everyone for the reviews and the feedback!

I reduced the probability of injecting metadata to 1% since I think it can noticeably slow down tests, perhaps we might need to reduce this even further, to be seen.

@yuzefovich
Copy link
Copy Markdown
Member Author

bors r+

craig bot pushed a commit that referenced this pull request Jan 10, 2026
157869: colexecop: modify Operator.Next to return metadata r=yuzefovich a=yuzefovich

This commit changes `Operator.Next` interface to add an ability to
return metadata in the "streaming" fashion, similar to how it's done in
the row-by-row engine. This will allow us to support features like
estimated query progress (based on the number of rows scanned so far).
Note that we still preserve the concept of MetadataSources, but it'll
become similar to "trailing metadata" concept in the row-by-row engine,
i.e. it'll only handle metadata that is produced when the component is
being drained.

The following Operators are affected have non-trivial state transitions:
- columnarizer - we're buffering multiple rows into a batch, so we need
to continue building the current batch if there is some metadata between
rows. Note that we don't change the error propagation and still throw it
immediately. Additionally, since we no longer buffer the metadata, we
can remove some memory accounting.
- ordered synchronizer is similar to columnarizer - we need to ensure
continuing building the current output batch if we get a metadata when
fetching the new batch from one of the inputs. Additionaly we need
handle the heap initialization carefully if we get a metadata object
before any batches from some inputs.
- parallel unordered synchronizer - here we needed to augment the logic
so that metadata can be propagated in the streaming fashion. Previously,
if an input sent the metadata, then it meant that the input has been
drained, which is no longer true so we store the last message from one
of the inputs and emit the metadata from it until exhausted. This
allowed us to simplify some other logic about buffering the metadata
though.
- top K sort is somewhat similar to the ordered synchronizer - it first
spills some number of batches to reach the desired K total, and then it
reads one batch at a time comparing against the current maximum row (one
input row from the batch at a time). Here we needed to adjust the logic
to initialize the heap only once as well as maintain some state across
Next calls.
- hashBasedPartitioner - if the metadata is received from the 2nd input
after having read a batch from the 1st input, then that batch must be
used on the next iteration
- hashJoiner - whenever the metadata is received from either input, it's
propagated immediately, but then the execution flow can just be resumed
(because the hashJoiner already has the necessary state machine)
- crossJoiner and mergeJoiner read from inputs at different points in
time and have non-trivial state transitions, so we handle them by
introducing a helper struct that buffers the metadata, and then the
operator must check for metadata as soon as its convenient. In the merge
joiner we also need to continue building the output batch without
resetting. Additionally, just to be safe, we mark both as MetadataSources to
guarantee that all metadata is emitted (I think that only with
ungraceful shutdown we can have some metadata still buffered in these
two).
- index joiner - need to preserve the row count that we've buffered for
span generation so far, across Next calls, as well as reset the input
batch mem size only when we're transitioning to constructing spans.
- hash router - here we'll forward the metadata to the first stream
(similar to what we do in the row-by-row routers). Additionally each
router output is responsible for emitting forwarded to it metadata on
its DrainMeta implementation (if unable to emit that forwarded metadata
in the streaming fashion).
- inbox - simply need to ensure that all metadata is returned as soon
as possible, or at the very least during DrainMeta. Note that we still
panic with an error in the metadata since we're not changing how errors
are propagated throughout the vectorized engine.
- outbox - here we send a new message for every metadata object from the
input. Note that we could consider buffering up some metadata and
sending together with the next batch, but that's left as a TODO (I doubt
that minor increase in DistSQL messages will be noticeable, unless we
start emitting metadata very frequently).

This commit also extends the invariantsChecker (which is an operator that we
plan in test-only builds that verifies some assumptions) to randomly
inject metadata on its Next calls. This improves the test coverage of
the streaming metadata in the vectorized engine (and it uncovered one
bug in the top K sorter). We inject RowNum metadata that is only used in
tests, and then we needed to adjust some tests to allow for that type of
metadata. `NextNoMeta` helper method has been adjusted to silently
swallow this metadata.

Fixes: #55758.

Release note: None

160095: sql: use `ExecutorConfig` in inspect logger r=bghal a=bghal

The inspect resumer needs to be able to log issues. The logger can use
the `ExecutorConfig` embedded in the context without changing its
behavior.

Part of: #155472
Epic: CRDB-55075

Release note: None

160760: execbuilder: fix a stats-related flake in a new test r=yuzefovich a=yuzefovich

Fixes: #160752.
Fixes: #160753.
Release note: None

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Brendan Gerrity <brendan.gerrity@cockroachlabs.com>
@craig
Copy link
Copy Markdown
Contributor

craig bot commented Jan 10, 2026

Build failed (retrying...):

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Jan 10, 2026

@craig craig bot merged commit 7403428 into cockroachdb:master Jan 10, 2026
27 checks passed
@yuzefovich yuzefovich deleted the vec-meta branch January 10, 2026 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

colexec: add a way to propagate metadata in a streaming fashion

5 participants