changefeedccl: Rely on optimizer and distSQL when evaluating CDC expressions.#85177
changefeedccl: Rely on optimizer and distSQL when evaluating CDC expressions.#85177craig[bot] merged 2 commits intocockroachdb:masterfrom
Conversation
6f538da to
5209457
Compare
909adb9 to
bf3a569
Compare
277a987 to
4f20450
Compare
yuzefovich
left a comment
There was a problem hiding this comment.
Didn't finish reading through the last commit, but pushing what I have so far. Cool stuff!
Reviewed 14 of 14 files at r1, 3 of 13 files at r2, 48 of 48 files at r4, 13 of 13 files at r5, 13 of 40 files at r6, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @HonoreDB, @miretskiy, and @rytaft)
-- commits line 12 at r1:
nit: leftover in the commit message.
-- commits line 31 at r5:
nit: s/notes/note/.
pkg/ccl/changefeedccl/changefeed_dist.go line 178 at r6 (raw file):
// Usually, this is just the primary index span. // However, if details.Select is not empty, the set of spans returned may be // restricted to satisfy predicate in the select clause. In that case,
nit: I think the last sentence is no longer applicable.
pkg/ccl/changefeedccl/changefeed_dist.go line 249 at r6 (raw file):
if schemaTS.IsEmpty() { schemaTS = details.StatementTime
Is it guaranteed that details.StatementTime is set?
pkg/ccl/changefeedccl/changefeed_stmt.go line 887 at r6 (raw file):
statementTime hlc.Timestamp, ) (*cdceval.NormalizedSelectClause, error) { // TODO -- DO NOT MERGE -- figure out includeVirtual/keyOnly business.
nit: just pointing this out.
pkg/ccl/changefeedccl/event_processing.go line 293 at r6 (raw file):
// it would be superfluous to also encode prevRow. updatedRow, prevRow = projection, cdcevent.Row{} log.Infof(ctx, "DBG: Got Row: %s", updatedRow.DebugString())
nit: this probably needs an adjustment.
pkg/ccl/changefeedccl/event_processing.go line 376 at r6 (raw file):
} // Close is a noop for the kvEventToRowConsumer because it
nit: the comment is no longer true.
pkg/ccl/changefeedccl/cdceval/doc.go line 31 at r6 (raw file):
Then, once aggregators start up, they will once again plan the expression (sql.PlanCDCExpression), but this time each incoming KV event will be evaluated
nit: I think something is missing after "will be evaluated".
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 46 at r6 (raw file):
fnResolver CDCFunctionResolver // rowCh receives projection datums (or nil if filtered out)
nit: missing period.
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 123 at r6 (raw file):
return cdcevent.Row{}, nil } row = row[1:]
nit: add a quick comment to say that we need to remove the temporary boolean value to not be emitted to the client.
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 138 at r6 (raw file):
} // sameVersion returns true row descriptor versions match this evaluator versions.
nit: s/true row/true if row/.
nit: maybe worth mentioning that we don't care whether current and previous versions match between each other.
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 167 at r6 (raw file):
func (e *Evaluator) preparePlan(ctx context.Context, plan *sql.CDCExpressionPlan) error { return plannerExec( ctx, e.execCfg, e.user, e.currDesc.SchemaTS, sessiondatapb.SessionData{},
Something that we might want to try is to disable the usage of the vectorized engine since it would still effectively "degrade" to row-at-a-time model due to MustBeStreaming returning true. I'm not sure whether we'd see better performance or better allocations profile, but it might be worthy of a TODO to check.
Update: saw that we disable it in plannerExec.
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 253 at r6 (raw file):
input.InitWithNumSenders(inputTypes, 1) // write sends result of the evaluation into row channel.
nit: s/write/writer/.
pkg/ccl/changefeedccl/cdceval/plan.go line 91 at r6 (raw file):
case pgcode.UndefinedColumn: return errors.WithHintf(err, "column nay not exist in the target column family %q", target.FamilyName)
nit: s/nay/may/.
nit: should we add this hint only if the table has multiple column families? Is that information easily available here?
pkg/ccl/changefeedccl/cdceval/plan.go line 170 at r6 (raw file):
} scopedTable := &tree.SelectClause{
nit: maybe add an example in the comment here?
pkg/ccl/changefeedccl/cdceval/plan.go line 199 at r6 (raw file):
// plannerExec is a helper which invokes provided function inside // a DescTxn transaction to ensure that descriptors get acquired
nit: s/DescTxn/sql.DescsTxn/.
pkg/ccl/changefeedccl/cdceval/plan.go line 215 at r6 (raw file):
} sd.VectorizeMode = sessiondatapb.VectorizeOff
Interesting, so we do disable the vectorized engine. Maybe add a comment for why.
pkg/sql/distsql_plan_changefeed.go line 146 at r5 (raw file):
planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone) planCtx.isLocal = true
nit: this isn't strictly needed with DistributionTypeNone.
pkg/sql/distsql_plan_changefeed.go line 149 at r5 (raw file):
return CDCExpressionPlan{ Plan: p.curPlan.main,
We don't expect to see subqueries, right? It would probably be good to add assertions that all slices in p.curPlan are zero-lengths (i.e. no subqueries, cascades, nor checks).
pkg/sql/distsql_plan_changefeed.go line 208 at r5 (raw file):
// cdcValuesNode replaces regular scanNode with cdc specific implementation // which returns values from the execinfra.RowSource.
nit: maybe expand on what the input RowSource is producing. IIRC it would be a source that never ends, right?
pkg/sql/distsql_plan_changefeed.go line 273 at r5 (raw file):
// Close implements planNode. func (n *cdcValuesNode) Close(ctx context.Context) { }
nit: the contract of RowSource says that unless we call Next until Next returns nil, nil, we must call ConsumerDone. I think it'd be cleaner to just do it here even if this might not be strictly necessary.
pkg/sql/distsql_plan_changefeed.go line 295 at r5 (raw file):
ds, err := c.newCDCDataSource(desc) if err != nil { return nil, *name, err
nit: maybe s/*name/cat.DataSourceName{}/?
pkg/sql/distsql_plan_changefeed.go line 326 at r5 (raw file):
desc.Indexes = desc.Indexes[:0] updated := tabledesc.NewBuilder(desc).BuildImmutableTable() return newOptTable(updated, c.codec(), nil, emptyZoneConfig)
nit: add an inline comment about what nil is.
pkg/sql/plan_node_to_row_source.go line 94 at r5 (raw file):
switch p.node.(type) { case *hookFnNode, *cdcValuesNode: // hookFnNode is special because it might be blocked forever if we decide to
nit: say something about cdcValuesNode too.
pkg/sql/walk.go line 479 at r5 (raw file):
reflect.TypeOf(&zigzagJoinNode{}): "zigzag join", reflect.TypeOf(&schemaChangePlanNode{}): "schema change", reflect.TypeOf(&cdcValuesNode{}): "wrapped streaming node",
nit: let's keep it sorted since it looks like it is already sorted except for the last two lines.
pkg/sql/opt/exec/execbuilder/scalar.go line 727 at r5 (raw file):
ef := ref.(exec.Factory) eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide, b.evalCtx, b.semaCtx, false /* allowAutoCommit */, b.IsANSIDML)
Should this change be in the first commit.
pkg/sql/distsql_plan_changefeed_test.go line 93 at r5 (raw file):
} for _, tc := range []struct {
Nice tests! I think we're missing some cases where multiple different tables are used by the query, right? It'd be good to add a case with two tables when both are existing as well as when one is not found. Or this checked elsewhere?
pkg/sql/opt/lookupjoin/constraint_builder_test.go line 324 at r1 (raw file):
execBld := execbuilder.New( context.Background(), nil /* execFactory */, nil /* optimizer */, f.Memo(), nil, /* catalog */ e, evalCtx, nil /*semaCtx */, false, /* allowAutoCommit */
nit: add a space before semaCtx.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @HonoreDB, @rytaft, and @yuzefovich)
pkg/ccl/changefeedccl/changefeed_dist.go line 249 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Is it guaranteed that
details.StatementTimeis set?
Yes; always.
pkg/ccl/changefeedccl/changefeed_stmt.go line 887 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: just pointing this out.
Thanks.
pkg/ccl/changefeedccl/event_processing.go line 293 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: this probably needs an adjustment.
left over. Thanks.
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 167 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Something that we might want to try is to disable the usage of the vectorized engine since it would still effectively "degrade" to row-at-a-time model due to
MustBeStreamingreturningtrue. I'm not sure whether we'd see better performance or better allocations profile, but it might be worthy of a TODO to check.Update: saw that we disable it in
plannerExec.
Ack.
pkg/ccl/changefeedccl/cdceval/plan.go line 91 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit:
s/nay/may/.nit: should we add this hint only if the table has multiple column families? Is that information easily available here?
Done; good idea.
pkg/ccl/changefeedccl/cdceval/plan.go line 215 at r6 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Interesting, so we do disable the vectorized engine. Maybe add a comment for why.
We are definitely a row-by-row execution mode now. Added comment.
pkg/sql/distsql_plan_changefeed.go line 146 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: this isn't strictly needed with
DistributionTypeNone.
removed.
pkg/sql/distsql_plan_changefeed.go line 149 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
We don't expect to see subqueries, right? It would probably be good to add assertions that all slices in
p.curPlanare zero-lengths (i.e. no subqueries, cascades, nor checks).
Ack. Good idea re extra checks. Added.
pkg/sql/distsql_plan_changefeed.go line 208 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: maybe expand on what the input
RowSourceis producing. IIRC it would be a source that never ends, right?
Correct. Added
pkg/sql/walk.go line 479 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: let's keep it sorted since it looks like it is already sorted except for the last two lines.
Done.
pkg/sql/opt/exec/execbuilder/scalar.go line 727 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Should this change be in the first commit.
Done.
pkg/sql/distsql_plan_changefeed_test.go line 93 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Nice tests! I think we're missing some cases where multiple different tables are used by the query, right? It'd be good to add a case with two tables when both are existing as well as when one is not found. Or this checked elsewhere?
We only support single table. I check to make sure we only have a single scan operation. Added a tests.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @rytaft, and @yuzefovich)
pkg/sql/distsql_plan_changefeed.go line 87 at r10 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
Oh ok -- that's fine then. You could also do
false /* allowAutoCommit */, but this is fine as-is
I don't know why .. but I just dislike those inline /* */ style comments -- I think constants
are better.
pkg/sql/distsql_plan_changefeed.go line 320 at r10 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
since I already have to do rewriting on cdc side
I wonder if it's possible in the future to do almost all rewriting in the optimizer. E.g. you could specify your CDC rewrites as normalization rules. But this is definitely not needed for this PR.
Another advantage of showing only target columns is then you could rely on the optimizer for * expansion. But up to you (and feel free to save for the future)
Ack -- agree. That's a very, very interesting idea! Dealing with "*" was part of the complexity I had to tackle.
Idea bout a custom function/norm rule is rather neat.
pkg/sql/opt/exec/execbuilder/builder.go line 259 at r12 (raw file):
Previously, rytaft (Rebecca Taft) wrote…
any reason we can't use the catalog function resolver here (like you did when folding)? Then we wouldn't need to plumb the semaCtx
That's a ... good question! I think I did it so long ago... I don't think I realized that solution back then!
This is done now and the first commit to plumb semactx removed.
I had to add custom resolve implementation in cdcOptCatalog (to still provide access to CDC overrides), but no changes in execbuilder other than what you describe above.
Yay.
rytaft
left a comment
There was a problem hiding this comment.
Reviewed 17 of 17 files at r13, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, and @yuzefovich)
pkg/sql/opt/exec/execbuilder/builder.go line 259 at r12 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
That's a ... good question! I think I did it so long ago... I don't think I realized that solution back then!
This is done now and the first commit to plumb semactx removed.
I had to add custom resolve implementation in cdcOptCatalog (to still provide access to CDC overrides), but no changes in execbuilder other than what you describe above.
Yay.
Hooray! 🥳
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewed 21 of 34 files at r8, 1 of 3 files at r10, 2 of 4 files at r11, 1 of 3 files at r12, 6 of 17 files at r13, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @miretskiy, and @rytaft)
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 109 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
This used to be the case before I added ConsumerDone call as you requested; now, it seems this
may no longer be the case. Still, I think returning error here is correct -- we will shutdown evaluator, and the whole flow, and then restart the flow (because all errors are now treated as retryable).
I augmented input initialization code with a hopefully useful comment below:Going to add a test around shutdown as well.
// The row channel created below will have exactly 1 sender (this evaluator). // The buffer size parameter doesn't matter much, as long as it is greater // than 0 to make sure that if the main context is cancelled and the flow // exits, that we can still push data into the row channel without blocking, // so that we notice cancellation request when we try to read the result of // the evaluation. const numSenders = 1 const bufSize = 16 var input execinfra.RowChannel input.InitWithBufSizeAndNumSenders(inputTypes, bufSize, numSenders)
Sorry, I responded below that we should probably remove that ConsumerDone call after all. Interested to hear how things will change once we do remove it.
My initial comment here was focused on whether it is an "assertion" or a "regular" error, I wasn't questioning whether it is an error.
pkg/ccl/changefeedccl/cdceval/validation.go line 238 at r7 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
I reworked the code a bit. No longer have boolean indicating if the expression should be normalized.
Now, when changefeed created, normalization step is done -- which rewrites the expression; and the rewritten expression gets written to jobs record. Thereafter, there is no longer the need to normalize.
I also split the method into 2 helpers -- yes, there is a bit more repetition, but the code is better imo.
Let me know what you think.
Looks good!
pkg/sql/distsql_plan_changefeed.go line 273 at r5 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
nit: the contract of
RowSourcesays that unless we callNextuntilNextreturnsnil, nil, we must callConsumerDone. I think it'd be cleaner to just do it here even if this might not be strictly necessary.
Actually, on a second thought, it might be better to not call ConsumerDone here.
IIUC here is the structure of things: we have kvEventToRowConsumer which takes a single kvevent.Event, decodes KVs into updatedRow. This row is Pushed into RowChannel which is then read by cdcValuesNode. The structure of that flow is: DistSQLReceiver <- (arbitrary DistSQL processors) <- planNodeToRowSource <- cdcValuesNode <- RowChannel. Importantly, RowChannel is added as the "input to drain" by planNodeToRowSource (in SetInput), so planNodeToRowSource will call ConsumerDone or ConsumerClosed (depending on why the flow is being shutdown).
Thus, it is not necessary to do so in cdcValuesNode.Close. Furthermore, previously we had bugs around this behavior (#88964, fixed in 59b6f53) because the processors are shutdown (and possibly released to their pools) at the end of DistSQLPLanner.Run whereas the planNode tree is closed later. I don't think there is a possibility of a bug here because this RowChannel is not reused in any way after the flow is cleaned up, but it might be better to keep things in sync between what cdcValuesNode and rowSourceToPlanNode do (because they are similar). It would probably be good to add a quick comment into cdcValuesNode.Close about this (similar to what we have in rowSourceToPlanNode).
pkg/sql/walk.go line 371 at r13 (raw file):
reflect.TypeOf(&cancelQueriesNode{}): "cancel queries", reflect.TypeOf(&cancelSessionsNode{}): "cancel sessions", reflect.TypeOf(&cdcValuesNode{}): "wrapped streaming node",
Is it possible to EXPLAIN the plan for the transformation, before actually creating it?
miretskiy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @rytaft, and @yuzefovich)
pkg/ccl/changefeedccl/cdceval/expr_eval.go line 109 at r7 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Sorry, I responded below that we should probably remove that
ConsumerDonecall after all. Interested to hear how things will change once we do remove it.My initial comment here was focused on whether it is an "assertion" or a "regular" error, I wasn't questioning whether it is an error.
You were right re assertion failure -- I changed that. And I've removed consumer done call. Everything still worked after I removed this call. I just think that this whole operation is so tightly controlled by cdc eval -- 1 row pushed, 1 row read -- that the shutdown issues probably not going to happen (famous words :) )
pkg/ccl/changefeedccl/cdceval/validation.go line 238 at r7 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Looks good!
Done.
pkg/sql/distsql_plan_changefeed.go line 273 at r5 (raw file):
RowChannel is added as the "input to drain" by planNodeToRowSource (in SetInput), so planNodeToRowSource will call ConsumerDone or ConsumerClosed (depending on why the flow is being shutdown).
Comment added, and I "stole" a good portion of your excellent explanation above to put that into the comment.
pkg/sql/walk.go line 371 at r13 (raw file):
Previously, yuzefovich (Yahor Yuzefovich) wrote…
Is it possible to
EXPLAINthe plan for the transformation, before actually creating it?
I absolutely have this on my todo. Eventually, we do want to support explain changefeed.
17bbde8 to
bad437a
Compare
rytaft
left a comment
There was a problem hiding this comment.
Reviewed 15 of 15 files at r14, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, and @yuzefovich)
| "INSERT INTO foo (a, b) VALUES (1, 'hello')", | ||
| }, | ||
| predicate: "SELECT btrim(a) FROM _", | ||
| stmt: "SELECT a, b % 'hel' AS trigram FROM foo", |
There was a problem hiding this comment.
The current bug with string % is that it always returns true when the expression is being evaluated in a job, because it depends on a session variable that defaults to 0.3 but isn't being set so it's using a similarity threshold of 0. I think this PR probably doesn't fix this, so I think either remove this test or (if I'm wrong) add a test case where it's false using an enterprise feed like environment.
There was a problem hiding this comment.
Ack; going to disable the test.
yuzefovich
left a comment
There was a problem hiding this comment.
Reviewed 2 of 34 files at r8, 1 of 4 files at r11, 1 of 3 files at r12, 11 of 17 files at r13, 15 of 15 files at r14, all commit messages.
Reviewable status:complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @miretskiy, and @rytaft)
Prior to this change, CDC expression evaluation was managed by CDC library (`cdceval`) directly. This PR introduces a mechanism to plan and execute CDC expressions using optimizer and distSQL: `PlanCDCExpression` plans the execution of CDC expressions using optimizer, and `RunCDCEvaluation`, which is just a thin wrapper around regular distsql mechanisms (`PlanAndRun`), can be used to execute this plan. This library offers significant benefits to the CDC expressions: namely, CDC expressions will be more tightly integrated with optimizer and dist SQL execution framework. This in turn opens up additional venues for additional improvements over time. Release notes: None
Previous PRs (cockroachdb#82562) introduced CDC expressions. This PR builds on that and replaces majority of hand written evaluation logic in favor of tighter integration with optimizer and dist SQL processors. CDC expression, which is really a simplified `SELECT` statement, is now planned by the optimizer `sql.PlanCDCExpression`. The resulting plan is then fed to the distSQL, to produce a specialized CDC execution plan (`sql.RunCDCEvaluation`). The execution plan is special in that it is guaranteed to be a local execution plan, and changefeed is expected to "feed" the data (encoded row) directly into the execution pipeline, with change aggregators consuming resulting projection. The benefit of this approach is that expression optimization, and evaluation is now handled by optimizer and distSQL. The responsibility of CDC evaluation package is to make sure that CDC expression is "sane" and to setup CDC specific functions. Since the whole expression is not yet fully planned by distSQL (i.e. we don't have changefeed operator implemented yet), the integration between CDC expressions and optimizer/execInfra is not yet complete. In particular, this PR does not replace current distSQL execution for CDC -- namely, we still keep the same execution model using hand planned `ChangeFrontierProcessor` and `ChangeAggretagorProcessor`. It augments existing model, while tightening the integration. Still, this is an improvement over previous state. The follow on work will complete integration. Some changes enabled by this implementation include the replacement of `cdc_prev()` function which returned JSONb representation of the previous row, with a `cdc_prev` tuple. This makes changefeed expressions more natural since tuples are strongly typed: ``` SELECT * FROM tbl WHERE col != cdc_prev.col` ``` In addition, by using tuple to represent the state of the previous row, we can now leverage existing SQL functions. For example, to emit previus row as JSONb we can do: ``` SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl ``` Fixes cockroachdb#90416 Fixes cockroachdb#90714 Fixes cockroachdb#90455 Informs cockroachdb#90442 Informs CRDB-18978 Informs CRDB-17161 Release note (enterprise change): CDC expressions are now planned and evaluated using SQL optimizer and distSQL execution. The state of the previous row is now exposed as `cdc_prev` tuple. Release note (backward-incompatible change): The replacement of cdc_prev() function in favor a cdc_prev tuple is an incompatible change that may break changefeeds that use old cdc_prev() function.
|
bors r+ |
|
Build succeeded: |
Prior PR cockroachdb#85177 erroneously removed cast when assigning projection values (we may yet have to bring this back). This PR allows equivalent types when assigning projection values. Release note: none
92947: kvcoord: include replica info in RangeIterator.Seek into trace r=yuzefovich a=yuzefovich
This commit modifies the existing "info" log message into an "event"
when seeking the range iterator. This makes it so that the result of the
seek (the replica information) is included into the trace. Additionally,
this commit includes the corresponding message to be included into the
KV trace. The original "info" log message was added about five years ago
and probably hasn't been that useful.
Here is an example of the trace event:
```
key: /NamespaceTable/30/1/100/101/"t"/4/1, desc: r32:/NamespaceTable/{30-Max} [(n1,s1):1, next=2, gen=0]
```
Informs: https://github.com/cockroachlabs/support/issues/1933.
Release note: None
93118: changefeedccl: Allow equivalent types in projection r=miretskiy a=miretskiy
Prior PR #85177 erroneously removed cast when assigning projection values (we may yet have to bring this back).
This PR allows equivalent types when assigning projection values.
Epic: none
Release note: none
93137: dev: indicate that beaver hub errors are internal r=rickystewart a=healthy-pod
Release note: None
Epic: none
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: healthy-pod <ahmad@cockroachlabs.com>
|
Addresses #31214 |
Previous PRs (#82562) introduced CDC expressions.
This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.
CDC expression, which is really a simplified
SELECTstatement,is now planned by the optimizer
sql.PlanCDCExpression.The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (
sql.RunCDCEvaluation).The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.
The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.
Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete. In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
ChangeFrontierProcessorandChangeAggretagorProcessor.It augments existing model, while tightening the integration.
Still, this is an improvement over previous state. The follow on work
will complete integration.
Some changes enabled by this implementation include the replacement
of
cdc_prev()function which returned JSONb representation of theprevious row, with a
cdc_prevtuple. This makes changefeedexpressions more natural since tuples are strongly typed:
In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions. For example, to emit
previus row as JSONb we can do:
Fixes #90416
Fixes #90714
Fixes #90455
Informs #90442
Informs CRDB-18978
Informs CRDB-17161
Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as
cdc_prevtuple.Release note (backward incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.