changefeedccl: Introduce cdcevent package.#81249
changefeedccl: Introduce cdcevent package.#81249craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
HonoreDB
left a comment
There was a problem hiding this comment.
Thank you for taking this on! Couple of minor comments. Also, is there an easy way to verify that introducing this extra layer of abstraction doesn't result in more allocs on the hot path?
Reviewed 3 of 3 files at r1, 2 of 2 files at r2, 3 of 3 files at r3, 1 of 1 files at r4, 20 of 20 files at r5, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/ccl/changefeedccl/cdcevent/event.go line 47 at r5 (raw file):
FamilyName() string // NumFamilies returns number of families in the table. NumFamilies() int
nit: could this be TableHasOtherFamilies() bool? Both because the EventSource always only has one family, and the only information we actually need is whether there are others.
pkg/ccl/changefeedccl/cdcevent/event.go line 380 at r5 (raw file):
return EventRow{}, err } // Copy datums since row fetcher reuses alloc.
nit: can remove this comment
Done.
Done. |
I don't think there is an easy way; I've tried not to do anything too silly. However, this should not prevent from merging (eventually) this pr.. We can improve things if needed. |
c21459c to
feb8dfa
Compare
HonoreDB
left a comment
There was a problem hiding this comment.
Reviewed 11 of 15 files at r6, 8 of 8 files at r7, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @miretskiy)
pkg/ccl/changefeedccl/cdcevent/event.go line 183 at r7 (raw file):
desc catalog.TableDescriptor, family *descpb.ColumnFamilyDescriptor, includeVirtualColumns bool, ) (*eventDescriptor, error) { var inFamily catalog.TableColSet
golf: inFamily := catalog.MakeTableColSet(family.ColumnIDs...)
Nice; thanks |
ajwerner
left a comment
There was a problem hiding this comment.
Reviewed 1 of 15 files at r6, 1 of 1 files at r8.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @miretskiy)
pkg/ccl/changefeedccl/cdcevent/event.go line 244 at r8 (raw file):
// DebugString returns event descriptor debug information. func (d *eventDescriptor) DebugString() string {
if you felt fancy, you could implement SafeFormat or whatever the method name is so that this object and even the row itself can be logged. Might come in handy
Code quote:
// DebugString returns event descriptor debug information.
func (d *eventDescriptor) DebugString() string {
return fmt.Sprintf("eventDescriptor{table: %q(%d) family: %q(%d) pkCols=%v valCols=%v",
d.TableName, d.TableID, d.FamilyName, d.FamilyID, d.keyCols, d.familyCols)
}
added |
Event processing in CDC is a fairly involved procedure, consisting
of multiple stages. A low level KeyValue event is received from
rangefeeds. Then this low level event needs to be decoded into
encoded datums, along with type information describing those datums,
representing the "row" in the table.
Thereafter those datums must be further encoded into appropriate format.
Historically, this pipeline operated on structures representing
fairly low level objects -- such as catalog.Descriptor, EncDatumRow, etc.
This is problematic for multiple reasons:
* A tight coupling to the underlying table/family descriptors makes it
hard to introduce layers into pipeline that may not be directly tied
to those descriptors -- such as projections and predicates.
* Exposing low level descriptor information meant that every encoder
implementation had to have repeated, and very subtle and error prone code,
to process columns in the correct order -- filtering unwanted
families, processing primary index column in the correct order, etc.
* The catalog.Descriptor interfaces are fairly large interfaces,
understandably so. However, CDC does not need access to the
entirety of those interfaces -- mostly, CDC needs to access to very few
methods. It is desirable therefore to pass in the smallest
interface necessary to make CDC work.
This is the purpose of `cdcevent` package. It introduces several
abstractions to facilitate conversion from the low level KeyValue event
into `cdcevent.EventRow` event. The `cdcevent.EventDecoder` interface
defines conversion from low level event into `cdcevent.EventRow`. Event
row has associated event descriptor describing the set of columns needed
to perform CDC activities. The rest of the CDC processing operates
on these higher level interfaces instead of having direct depency on
low level catalog interfaces.
Release Notes: None
|
bors r+ |
|
Build succeeded: |
|
We still need access to e.g. descpb packages; (access to tableID types,
etc).
…On Tue, May 17, 2022 at 4:40 PM Matt Sherman ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pkg/ccl/changefeedccl/encoder_csv.go
<#81249 (comment)>
:
> "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
- "github.com/cockroachdb/cockroach/pkg/sql/catalog"
- "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Interesting bit after our conversation about the abstraction layer: what
would it take to have zero /sql dependency in the encoders?
—
Reply to this email directly, view it on GitHub
<#81249 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/ANA4FVHUD5Z2YPZ2GKNFAA3VKP75FANCNFSM5V4OFPNA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
82562: changefeeccl: Projections and Filters in CDC. r=miretskiy a=miretskiy
Add a variant of CHANGEFEED statement that allows specification
of predicates and projections.
```
CREATE CHANGEFEED [INTO 'sink'] [WITH opt=val, ...]
AS SELECT .... FROM t WHERE ...
```
This changefeed variant can target at most 1 table (and 1 column
family) at a time. The expressions used as the projections and
filters can be almost any supported expression with some restrictions:
* Volatile functions not allowed.
* Sub-selects not allowed.
* Aggregate and window functions (i.e. functions operating over many
rows) not allowed.
* Some stable functions, notably functions which return MVCC
timestamp, are overridden to return MVCC timestamp of the event.
In addition, some CDC specific functions are provided:
* cdc_is_delete: returns true if the event is a deletion event.
* cdc_prev: returns JSON representation of the previous row state.
* cdc_updated_timestamp: returns event update timestamp (usually MVCC
timestamp, but can be different if e.g. undergoing schema changes)
Additional CDC specific functions will be added in the follow on PRs.
Few examples:
* Emit all but the deletion events:
```
CREATE CHANGEFEED INTO 'kafka://'
AS SELECT * FROM table
WHERE NOT cdc_is_delete()
```
* Emit all events that modified `important_col` column:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *, cdc_prev() AS previous
FROM important_table
WHERE important_col != cdc_prev()->'important_col'
```
* Emit few colums, as well as computed expresions:
```
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT warehouseID, (totalItems - orderedItems) as itemsAvailable
FROM warehouse
WHERE region='US/east';
```
When filter expression is specified, changefeed will now consult
optimizer so that the set of spans scanned by changefeed can be
restricted based on the predicate.
For example, given the following table and a changefeed:
```
CREATE TABLE warehouse (
region STRING,
warehouseID int,
....
PRIMARY KEY (region, warehouseID)
);
CREATE CHANGEFEED INTO 'kafka://' WITH diff
AS SELECT *
FROM warehouse
WHERE region='US/east';
```
The create changefeed will only scan table spans that contain `US/east`
region (and ignore all other table spans).
---
For foundational work, see:
- #81676
- #81249
- #80499
Addresses:
- #56949
- #31214
---
Release Notes (enterprise):
CHANGEFEED statement now supports general expressions -- predicates and projections.
Projections allow customers to emit only the data that they care about,
including computed columns, while predicates (i.e. filters) allow them
to restrict the data that's emitted only to those events that match the
filter.
```
CREATE CHANGEFEED INTO 'kafka://' AS SELECT * FROM t WHERE NOT cdc_is_delete()
```
Co-authored-by: Yevgeniy Miretskiy <yevgeniy@cockroachlabs.com>
Event processing in CDC is a fairly involved procedure, consisting
of multiple stages. A low level KeyValue event is received from
rangefeeds. Then this low level event needs to be decoded into
encoded datums, along with type information describing those datums,
representing the "row" in the table.
Thereafter those datums must be further encoded into appropriate format.
Historically, this pipeline operated on structures representing
fairly low level objects -- such as catalog.Descriptor, EncDatumRow, etc.
This is problematic for multiple reasons:
hard to introduce layers into pipeline that may not be directly tied
to those descriptors -- such as projections and predicates.
implementation had to have repeated, and very subtle and error prone code,
to process columns in the correct order -- filtering unwanted
families, processing primary index column in the correct order, etc.
understandably so. However, CDC does not need access to the
entirety of those interfaces -- mostly, CDC needs to access to very few
methods. It is desirable therefore to pass in the smallest
interface necessary to make CDC work.
This is the purpose of
cdceventpackage. It introduces severalabstractions to facilitate conversion from the low level KeyValue event
into
cdcevent.EventRowevent. Thecdcevent.EventDecoderinterfacedefines conversion from low level event into
cdcevent.EventRow. Eventrow has associated event descriptor describing the set of columns needed
to perform CDC activities. The rest of the CDC processing operates
on these higher level interfaces instead of having direct depency on
low level catalog interfaces.
Release Notes: None