Skip to content

changefeedccl: Rely on optimizer and distSQL when evaluating CDC expressions.#85177

Merged
craig[bot] merged 2 commits intocockroachdb:masterfrom
miretskiy:optexpr
Dec 2, 2022
Merged

changefeedccl: Rely on optimizer and distSQL when evaluating CDC expressions.#85177
craig[bot] merged 2 commits intocockroachdb:masterfrom
miretskiy:optexpr

Conversation

@miretskiy
Copy link
Copy Markdown
Contributor

@miretskiy miretskiy commented Jul 27, 2022

Previous PRs (#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified SELECT statement,
is now planned by the optimizer sql.PlanCDCExpression.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (sql.RunCDCEvaluation).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete. In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
ChangeFrontierProcessor and ChangeAggretagorProcessor.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state. The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of cdc_prev() function which returned JSONb representation of the
previous row, with a cdc_prev tuple. This makes changefeed
expressions more natural since tuples are strongly typed:

SELECT * FROM tbl WHERE col != cdc_prev.col`

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions. For example, to emit
previus row as JSONb we can do:

SELECT *, row_to_json(cdc_prev.*) AS prevJson FROM tbl

Fixes #90416
Fixes #90714
Fixes #90455
Informs #90442
Informs CRDB-18978
Informs CRDB-17161

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as cdc_prev tuple.

Release note (backward incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@miretskiy miretskiy changed the title Optexpr changefeedccl: Rely on optimizer and distSQL when evaluating CDC expressions. Jul 27, 2022
@miretskiy miretskiy force-pushed the optexpr branch 5 times, most recently from 6f538da to 5209457 Compare August 1, 2022 16:25
@miretskiy miretskiy force-pushed the optexpr branch 6 times, most recently from 909adb9 to bf3a569 Compare November 16, 2022 15:46
@miretskiy miretskiy marked this pull request as ready for review November 16, 2022 16:35
@miretskiy miretskiy requested review from a team as code owners November 16, 2022 16:35
@miretskiy miretskiy requested a review from a team November 16, 2022 16:35
@miretskiy miretskiy requested a review from a team as a code owner November 16, 2022 16:35
@miretskiy miretskiy requested review from DrewKimball, HonoreDB, rytaft and yuzefovich and removed request for a team November 16, 2022 16:35
@miretskiy miretskiy force-pushed the optexpr branch 2 times, most recently from 277a987 to 4f20450 Compare November 16, 2022 21:58
Copy link
Copy Markdown
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't finish reading through the last commit, but pushing what I have so far. Cool stuff!

Reviewed 14 of 14 files at r1, 3 of 13 files at r2, 48 of 48 files at r4, 13 of 13 files at r5, 13 of 40 files at r6, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @HonoreDB, @miretskiy, and @rytaft)


-- commits line 12 at r1:
nit: leftover in the commit message.


-- commits line 31 at r5:
nit: s/notes/note/.


pkg/ccl/changefeedccl/changefeed_dist.go line 178 at r6 (raw file):

// Usually, this is just the primary index span.
// However, if details.Select is not empty, the set of spans returned may be
// restricted to satisfy predicate in the select clause.  In that case,

nit: I think the last sentence is no longer applicable.


pkg/ccl/changefeedccl/changefeed_dist.go line 249 at r6 (raw file):

	if schemaTS.IsEmpty() {
		schemaTS = details.StatementTime

Is it guaranteed that details.StatementTime is set?


pkg/ccl/changefeedccl/changefeed_stmt.go line 887 at r6 (raw file):

	statementTime hlc.Timestamp,
) (*cdceval.NormalizedSelectClause, error) {
	// TODO -- DO NOT MERGE -- figure out includeVirtual/keyOnly business.

nit: just pointing this out.


pkg/ccl/changefeedccl/event_processing.go line 293 at r6 (raw file):

		// it would be superfluous to also encode prevRow.
		updatedRow, prevRow = projection, cdcevent.Row{}
		log.Infof(ctx, "DBG: Got Row: %s", updatedRow.DebugString())

nit: this probably needs an adjustment.


pkg/ccl/changefeedccl/event_processing.go line 376 at r6 (raw file):

}

// Close is a noop for the kvEventToRowConsumer because it

nit: the comment is no longer true.


pkg/ccl/changefeedccl/cdceval/doc.go line 31 at r6 (raw file):

Then, once aggregators start up, they will once again plan the expression
(sql.PlanCDCExpression), but this time each incoming KV event will be evaluated

nit: I think something is missing after "will be evaluated".


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 46 at r6 (raw file):

	fnResolver CDCFunctionResolver

	// rowCh receives projection datums (or nil if filtered out)

nit: missing period.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 123 at r6 (raw file):

			return cdcevent.Row{}, nil
		}
		row = row[1:]

nit: add a quick comment to say that we need to remove the temporary boolean value to not be emitted to the client.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 138 at r6 (raw file):

}

// sameVersion returns true row descriptor versions match this evaluator versions.

nit: s/true row/true if row/.

nit: maybe worth mentioning that we don't care whether current and previous versions match between each other.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 167 at r6 (raw file):

func (e *Evaluator) preparePlan(ctx context.Context, plan *sql.CDCExpressionPlan) error {
	return plannerExec(
		ctx, e.execCfg, e.user, e.currDesc.SchemaTS, sessiondatapb.SessionData{},

Something that we might want to try is to disable the usage of the vectorized engine since it would still effectively "degrade" to row-at-a-time model due to MustBeStreaming returning true. I'm not sure whether we'd see better performance or better allocations profile, but it might be worthy of a TODO to check.

Update: saw that we disable it in plannerExec.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 253 at r6 (raw file):

	input.InitWithNumSenders(inputTypes, 1)

	// write sends result of the evaluation into row channel.

nit: s/write/writer/.


pkg/ccl/changefeedccl/cdceval/plan.go line 91 at r6 (raw file):

			case pgcode.UndefinedColumn:
				return errors.WithHintf(err,
					"column nay not exist in the target column family %q", target.FamilyName)

nit: s/nay/may/.

nit: should we add this hint only if the table has multiple column families? Is that information easily available here?


pkg/ccl/changefeedccl/cdceval/plan.go line 170 at r6 (raw file):

	}

	scopedTable := &tree.SelectClause{

nit: maybe add an example in the comment here?


pkg/ccl/changefeedccl/cdceval/plan.go line 199 at r6 (raw file):

// plannerExec is a helper which invokes provided function inside
// a DescTxn transaction to ensure that descriptors get acquired

nit: s/DescTxn/sql.DescsTxn/.


pkg/ccl/changefeedccl/cdceval/plan.go line 215 at r6 (raw file):

			}

			sd.VectorizeMode = sessiondatapb.VectorizeOff

Interesting, so we do disable the vectorized engine. Maybe add a comment for why.


pkg/sql/distsql_plan_changefeed.go line 146 at r5 (raw file):

	planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone)
	planCtx.isLocal = true

nit: this isn't strictly needed with DistributionTypeNone.


pkg/sql/distsql_plan_changefeed.go line 149 at r5 (raw file):

	return CDCExpressionPlan{
		Plan:         p.curPlan.main,

We don't expect to see subqueries, right? It would probably be good to add assertions that all slices in p.curPlan are zero-lengths (i.e. no subqueries, cascades, nor checks).


pkg/sql/distsql_plan_changefeed.go line 208 at r5 (raw file):

// cdcValuesNode replaces regular scanNode with cdc specific implementation
// which returns values from the execinfra.RowSource.

nit: maybe expand on what the input RowSource is producing. IIRC it would be a source that never ends, right?


pkg/sql/distsql_plan_changefeed.go line 273 at r5 (raw file):

// Close implements planNode.
func (n *cdcValuesNode) Close(ctx context.Context) {
}

nit: the contract of RowSource says that unless we call Next until Next returns nil, nil, we must call ConsumerDone. I think it'd be cleaner to just do it here even if this might not be strictly necessary.


pkg/sql/distsql_plan_changefeed.go line 295 at r5 (raw file):

	ds, err := c.newCDCDataSource(desc)
	if err != nil {
		return nil, *name, err

nit: maybe s/*name/cat.DataSourceName{}/?


pkg/sql/distsql_plan_changefeed.go line 326 at r5 (raw file):

	desc.Indexes = desc.Indexes[:0]
	updated := tabledesc.NewBuilder(desc).BuildImmutableTable()
	return newOptTable(updated, c.codec(), nil, emptyZoneConfig)

nit: add an inline comment about what nil is.


pkg/sql/plan_node_to_row_source.go line 94 at r5 (raw file):

	switch p.node.(type) {
	case *hookFnNode, *cdcValuesNode:
		// hookFnNode is special because it might be blocked forever if we decide to

nit: say something about cdcValuesNode too.


pkg/sql/walk.go line 479 at r5 (raw file):

	reflect.TypeOf(&zigzagJoinNode{}):                          "zigzag join",
	reflect.TypeOf(&schemaChangePlanNode{}):                    "schema change",
	reflect.TypeOf(&cdcValuesNode{}):                           "wrapped streaming node",

nit: let's keep it sorted since it looks like it is already sorted except for the last two lines.


pkg/sql/opt/exec/execbuilder/scalar.go line 727 at r5 (raw file):

		ef := ref.(exec.Factory)
		eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide,
			b.evalCtx, b.semaCtx, false /* allowAutoCommit */, b.IsANSIDML)

Should this change be in the first commit.


pkg/sql/distsql_plan_changefeed_test.go line 93 at r5 (raw file):

	}

	for _, tc := range []struct {

Nice tests! I think we're missing some cases where multiple different tables are used by the query, right? It'd be good to add a case with two tables when both are existing as well as when one is not found. Or this checked elsewhere?


pkg/sql/opt/lookupjoin/constraint_builder_test.go line 324 at r1 (raw file):

	execBld := execbuilder.New(
		context.Background(), nil /* execFactory */, nil /* optimizer */, f.Memo(), nil, /* catalog */
		e, evalCtx, nil /*semaCtx */, false, /* allowAutoCommit */

nit: add a space before semaCtx.

Copy link
Copy Markdown
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @DrewKimball, @HonoreDB, @rytaft, and @yuzefovich)


pkg/ccl/changefeedccl/changefeed_dist.go line 249 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Is it guaranteed that details.StatementTime is set?

Yes; always.


pkg/ccl/changefeedccl/changefeed_stmt.go line 887 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: just pointing this out.

Thanks.


pkg/ccl/changefeedccl/event_processing.go line 293 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this probably needs an adjustment.

left over. Thanks.


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 167 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Something that we might want to try is to disable the usage of the vectorized engine since it would still effectively "degrade" to row-at-a-time model due to MustBeStreaming returning true. I'm not sure whether we'd see better performance or better allocations profile, but it might be worthy of a TODO to check.

Update: saw that we disable it in plannerExec.

Ack.


pkg/ccl/changefeedccl/cdceval/plan.go line 91 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: s/nay/may/.

nit: should we add this hint only if the table has multiple column families? Is that information easily available here?

Done; good idea.


pkg/ccl/changefeedccl/cdceval/plan.go line 215 at r6 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Interesting, so we do disable the vectorized engine. Maybe add a comment for why.

We are definitely a row-by-row execution mode now. Added comment.


pkg/sql/distsql_plan_changefeed.go line 146 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: this isn't strictly needed with DistributionTypeNone.

removed.


pkg/sql/distsql_plan_changefeed.go line 149 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

We don't expect to see subqueries, right? It would probably be good to add assertions that all slices in p.curPlan are zero-lengths (i.e. no subqueries, cascades, nor checks).

Ack. Good idea re extra checks. Added.


pkg/sql/distsql_plan_changefeed.go line 208 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: maybe expand on what the input RowSource is producing. IIRC it would be a source that never ends, right?

Correct. Added


pkg/sql/walk.go line 479 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: let's keep it sorted since it looks like it is already sorted except for the last two lines.

Done.


pkg/sql/opt/exec/execbuilder/scalar.go line 727 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Should this change be in the first commit.

Done.


pkg/sql/distsql_plan_changefeed_test.go line 93 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Nice tests! I think we're missing some cases where multiple different tables are used by the query, right? It'd be good to add a case with two tables when both are existing as well as when one is not found. Or this checked elsewhere?

We only support single table. I check to make sure we only have a single scan operation. Added a tests.

@miretskiy miretskiy requested a review from rytaft December 1, 2022 21:40
Copy link
Copy Markdown
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @rytaft, and @yuzefovich)


pkg/sql/distsql_plan_changefeed.go line 87 at r10 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

Oh ok -- that's fine then. You could also do false /* allowAutoCommit */, but this is fine as-is

I don't know why .. but I just dislike those inline /* */ style comments -- I think constants
are better.


pkg/sql/distsql_plan_changefeed.go line 320 at r10 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

since I already have to do rewriting on cdc side

I wonder if it's possible in the future to do almost all rewriting in the optimizer. E.g. you could specify your CDC rewrites as normalization rules. But this is definitely not needed for this PR.

Another advantage of showing only target columns is then you could rely on the optimizer for * expansion. But up to you (and feel free to save for the future)

Ack -- agree. That's a very, very interesting idea! Dealing with "*" was part of the complexity I had to tackle.
Idea bout a custom function/norm rule is rather neat.


pkg/sql/opt/exec/execbuilder/builder.go line 259 at r12 (raw file):

Previously, rytaft (Rebecca Taft) wrote…

any reason we can't use the catalog function resolver here (like you did when folding)? Then we wouldn't need to plumb the semaCtx

That's a ... good question! I think I did it so long ago... I don't think I realized that solution back then!
This is done now and the first commit to plumb semactx removed.
I had to add custom resolve implementation in cdcOptCatalog (to still provide access to CDC overrides), but no changes in execbuilder other than what you describe above.
Yay.

Copy link
Copy Markdown
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong:

Reviewed 17 of 17 files at r13, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, and @yuzefovich)


pkg/sql/opt/exec/execbuilder/builder.go line 259 at r12 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

That's a ... good question! I think I did it so long ago... I don't think I realized that solution back then!
This is done now and the first commit to plumb semactx removed.
I had to add custom resolve implementation in cdcOptCatalog (to still provide access to CDC overrides), but no changes in execbuilder other than what you describe above.
Yay.

Hooray! 🥳

Copy link
Copy Markdown
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 21 of 34 files at r8, 1 of 3 files at r10, 2 of 4 files at r11, 1 of 3 files at r12, 6 of 17 files at r13, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @miretskiy, and @rytaft)


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 109 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

This used to be the case before I added ConsumerDone call as you requested; now, it seems this
may no longer be the case. Still, I think returning error here is correct -- we will shutdown evaluator, and the whole flow, and then restart the flow (because all errors are now treated as retryable).
I augmented input initialization code with a hopefully useful comment below:

Going to add a test around shutdown as well.

// The row channel created below will have exactly 1 sender (this evaluator).
// The buffer size parameter doesn't matter much, as long as it is greater
// than 0 to make sure that if the main context is cancelled and the flow
// exits, that we can still push data into the row channel without blocking,
// so that we notice cancellation request when we try to read the result of
// the evaluation.
const numSenders = 1
const bufSize = 16
var input execinfra.RowChannel
input.InitWithBufSizeAndNumSenders(inputTypes, bufSize, numSenders)

Sorry, I responded below that we should probably remove that ConsumerDone call after all. Interested to hear how things will change once we do remove it.

My initial comment here was focused on whether it is an "assertion" or a "regular" error, I wasn't questioning whether it is an error.


pkg/ccl/changefeedccl/cdceval/validation.go line 238 at r7 (raw file):

Previously, miretskiy (Yevgeniy Miretskiy) wrote…

I reworked the code a bit. No longer have boolean indicating if the expression should be normalized.
Now, when changefeed created, normalization step is done -- which rewrites the expression; and the rewritten expression gets written to jobs record. Thereafter, there is no longer the need to normalize.
I also split the method into 2 helpers -- yes, there is a bit more repetition, but the code is better imo.
Let me know what you think.

Looks good!


pkg/sql/distsql_plan_changefeed.go line 273 at r5 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

nit: the contract of RowSource says that unless we call Next until Next returns nil, nil, we must call ConsumerDone. I think it'd be cleaner to just do it here even if this might not be strictly necessary.

Actually, on a second thought, it might be better to not call ConsumerDone here.

IIUC here is the structure of things: we have kvEventToRowConsumer which takes a single kvevent.Event, decodes KVs into updatedRow. This row is Pushed into RowChannel which is then read by cdcValuesNode. The structure of that flow is: DistSQLReceiver <- (arbitrary DistSQL processors) <- planNodeToRowSource <- cdcValuesNode <- RowChannel. Importantly, RowChannel is added as the "input to drain" by planNodeToRowSource (in SetInput), so planNodeToRowSource will call ConsumerDone or ConsumerClosed (depending on why the flow is being shutdown).

Thus, it is not necessary to do so in cdcValuesNode.Close. Furthermore, previously we had bugs around this behavior (#88964, fixed in 59b6f53) because the processors are shutdown (and possibly released to their pools) at the end of DistSQLPLanner.Run whereas the planNode tree is closed later. I don't think there is a possibility of a bug here because this RowChannel is not reused in any way after the flow is cleaned up, but it might be better to keep things in sync between what cdcValuesNode and rowSourceToPlanNode do (because they are similar). It would probably be good to add a quick comment into cdcValuesNode.Close about this (similar to what we have in rowSourceToPlanNode).


pkg/sql/walk.go line 371 at r13 (raw file):

	reflect.TypeOf(&cancelQueriesNode{}):                       "cancel queries",
	reflect.TypeOf(&cancelSessionsNode{}):                      "cancel sessions",
	reflect.TypeOf(&cdcValuesNode{}):                           "wrapped streaming node",

Is it possible to EXPLAIN the plan for the transformation, before actually creating it?

Copy link
Copy Markdown
Contributor Author

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @rytaft, and @yuzefovich)


pkg/ccl/changefeedccl/cdceval/expr_eval.go line 109 at r7 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Sorry, I responded below that we should probably remove that ConsumerDone call after all. Interested to hear how things will change once we do remove it.

My initial comment here was focused on whether it is an "assertion" or a "regular" error, I wasn't questioning whether it is an error.

You were right re assertion failure -- I changed that. And I've removed consumer done call. Everything still worked after I removed this call. I just think that this whole operation is so tightly controlled by cdc eval -- 1 row pushed, 1 row read -- that the shutdown issues probably not going to happen (famous words :) )


pkg/ccl/changefeedccl/cdceval/validation.go line 238 at r7 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Looks good!

Done.


pkg/sql/distsql_plan_changefeed.go line 273 at r5 (raw file):

RowChannel is added as the "input to drain" by planNodeToRowSource (in SetInput), so planNodeToRowSource will call ConsumerDone or ConsumerClosed (depending on why the flow is being shutdown).

Comment added, and I "stole" a good portion of your excellent explanation above to put that into the comment.


pkg/sql/walk.go line 371 at r13 (raw file):

Previously, yuzefovich (Yahor Yuzefovich) wrote…

Is it possible to EXPLAIN the plan for the transformation, before actually creating it?

I absolutely have this on my todo. Eventually, we do want to support explain changefeed.

@miretskiy miretskiy force-pushed the optexpr branch 4 times, most recently from 17bbde8 to bad437a Compare December 2, 2022 16:07
Copy link
Copy Markdown
Collaborator

@rytaft rytaft left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 15 of 15 files at r14, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, and @yuzefovich)

"INSERT INTO foo (a, b) VALUES (1, 'hello')",
},
predicate: "SELECT btrim(a) FROM _",
stmt: "SELECT a, b % 'hel' AS trigram FROM foo",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current bug with string % is that it always returns true when the expression is being evaluated in a job, because it depends on a session variable that defaults to 0.3 but isn't being set so it's using a similarity threshold of 0. I think this PR probably doesn't fix this, so I think either remove this test or (if I'm wrong) add a test case where it's false using an enterprise feed like environment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack; going to disable the test.

Copy link
Copy Markdown
Member

@yuzefovich yuzefovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 34 files at r8, 1 of 4 files at r11, 1 of 3 files at r12, 11 of 17 files at r13, 15 of 15 files at r14, all commit messages.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (and 1 stale) (waiting on @DrewKimball, @HonoreDB, @miretskiy, and @rytaft)

Yevgeniy Miretskiy added 2 commits December 2, 2022 15:09
Prior to this change, CDC expression evaluation was managed
by CDC library (`cdceval`) directly.

This PR introduces a mechanism to plan and execute CDC expressions
using optimizer and distSQL:  `PlanCDCExpression` plans the execution
of CDC expressions using optimizer, and `RunCDCEvaluation`, which
is just a thin wrapper around regular distsql mechanisms (`PlanAndRun`),
can be used to execute this plan.

This library offers significant benefits to the CDC expressions:
namely, CDC expressions will be more tightly integrated with optimizer
and dist SQL execution framework.  This in turn opens up additional
venues for additional improvements over time.

Release notes: None
Previous PRs (cockroachdb#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes cockroachdb#90416
Fixes cockroachdb#90714
Fixes cockroachdb#90455
Informs cockroachdb#90442
Informs CRDB-18978
Informs CRDB-17161

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
@miretskiy
Copy link
Copy Markdown
Contributor Author

bors r+

@craig
Copy link
Copy Markdown
Contributor

craig bot commented Dec 2, 2022

Build succeeded:

@craig craig bot merged commit 8419223 into cockroachdb:master Dec 2, 2022
miretskiy pushed a commit to miretskiy/cockroach that referenced this pull request Dec 6, 2022
Prior PR cockroachdb#85177 erroneously removed cast when assigning
projection values (we may yet have to bring this back).

This PR allows equivalent types when assigning projection
values.

Release note: none
craig bot pushed a commit that referenced this pull request Dec 6, 2022
92947: kvcoord: include replica info in RangeIterator.Seek into trace r=yuzefovich a=yuzefovich

This commit modifies the existing "info" log message into an "event"
when seeking the range iterator. This makes it so that the result of the
seek (the replica information) is included into the trace. Additionally,
this commit includes the corresponding message to be included into the
KV trace. The original "info" log message was added about five years ago
and probably hasn't been that useful.

Here is an example of the trace event:
```
key: /NamespaceTable/30/1/100/101/"t"/4/1, desc: r32:/NamespaceTable/{30-Max} [(n1,s1):1, next=2, gen=0]
```

Informs: https://github.com/cockroachlabs/support/issues/1933.

Release note: None

93118: changefeedccl: Allow equivalent types in projection r=miretskiy a=miretskiy

Prior PR #85177 erroneously removed cast when assigning projection values (we may yet have to bring this back).

This PR allows equivalent types when assigning projection values.

Epic: none
Release note: none

93137: dev: indicate that beaver hub errors are internal r=rickystewart a=healthy-pod

Release note: None
Epic: none

Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Co-authored-by: healthy-pod <ahmad@cockroachlabs.com>
@shermanCRL
Copy link
Copy Markdown
Contributor

Addresses #31214

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

6 participants