Skip to content

rfc: Change Data Capture#25229

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
danhhz:cdc_rfc
Jun 11, 2018
Merged

rfc: Change Data Capture#25229
craig[bot] merged 1 commit intocockroachdb:masterfrom
danhhz:cdc_rfc

Conversation

@danhhz
Copy link
Copy Markdown
Contributor

@danhhz danhhz commented May 2, 2018

Change Data Capture (CDC) provides efficient, distributed, row-level
change subscriptions.

CockroachDB is an excellent system of record, but no technology exists
in a vacuum. Users would like to keep their data mirrored in full-text
indexes, analytics engines, and big data pipelines, among others. A pull
model can currently be used to do this, but it’s inefficient, overly
manual, and doesn’t scale; the push model described in this RFC
addresses these limitations.

@danhhz danhhz added the do-not-merge bors won't merge a PR with this label. label May 2, 2018
@danhhz danhhz requested review from nvb and tbg May 2, 2018 01:29
@danhhz danhhz requested a review from a team as a code owner May 2, 2018 01:29
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented May 2, 2018

There are a number of open questions and a good amount of polish left, but this is ready for initial feedback. I'm particularly looking for feedback on the "Guide-level explanation" first, since that's what we need to nail down first for the prototype.

@knz
Copy link
Copy Markdown
Contributor

knz commented May 2, 2018

Thanks for writing this up. I like the fact that the proposal is comprehensive yet not more complex than strictly necessary for the given scope. I do not see major remaining points of discussion (although I'm not yet primed on the technical topics enough to be a suitable in-depth reviewer). From my POV, it's a good proposal already as-is.

I have looked at the proposed SQL syntax only superficially. I decided to refrain from scrutinizing and engaging a fine-tuning discussion about the syntax because this is a new feature with unclear precedent in other SQL engines (i.e. no clear example to follow), and the first iteration will provide us experience about how to improve UX in later iterations. Also meanwhile I didn't find anything sticking out that would compel immediate discussion anyway. I would just like to suggest to avoid introducing new ways to specify values, and stick to the existing scalar expression format if at all possible, see my suggestion below.

Also one unanswered question, see below.


Reviewed 1 of 1 files at r1.
Review status: all files reviewed at latest revision, all discussions resolved.


docs/RFCS/20180501_change_data_capture.md, line 34 at r1 (raw file):

configurable format (initially JSON or Avro) to a configurable sink (initially
Kafka). Changefeeds are [long running jobs] and can be paused, unpaused, and
cancelled.

I'll restate my opinion here that what you are defining in this RFC is not jobs but DB-level background services (or "tasks" or "processes").

A job conceptually has a finite, initially bounded amount of work to do, and thus a predictable end time. Changefeeds do not have any of this. There's to my knowledge no historical precedent for extending the term "job" in that way and I predict it will cause concerns. As always, terminology is important. Conflating many notions behind the same words means that it becomes hard to understand what we're talking about in conversations.


docs/RFCS/20180501_change_data_capture.md, line 56 at r1 (raw file):

## Row Ordering

Changes to a row are always emitted in order and rows are sharded between Kafka

Can you clarify in which order here, I think it's in order of mvcc timestamps.

Also if you do in mvcc order, and multiple kv pairs have the same timestamp, do you want to guarantee the partial order? like mvcc ts, then key?


docs/RFCS/20180501_change_data_capture.md, line 103 at r1 (raw file):

TODO: Should we allow changefeeds to be named? Tables and indexes are named, but
system jobs are currently not. It may be a nice shorthand for the job id in
PAUSE/RESUME JOB, etc.

I'd suggest that if you want to give names to jobs, that this becomes a separate, orthogonal project (would also benefit other types of jobs).


docs/RFCS/20180501_change_data_capture.md, line 135 at r1 (raw file):

- `WITH confluent_schema_registry=<address>` The address of a schema registry
  instance. Only allowed with `format=avro`. When unspecified, no schema
  registry is used.

I'd recommend the value part of each option/value pair to follow SQL expression syntax. For examples quote words like SQL string literals. This will allow users to compute the values from SQL expressions (and possibly later placeholders).


docs/RFCS/20180501_change_data_capture.md, line 154 at r1 (raw file):

TODO: This could use more detail.

A user creates an avro changefeed with no envelope. A kafka consumer group tails

The RFC could provide a super-short intro to Avro to provide a little more context to the example.


docs/RFCS/20180501_change_data_capture.md, line 236 at r1 (raw file):

sets up a long-running [DistSQL] flow with some new processors described below.
The current 0-100% progress tracking doesn’t match an continuous stream of
updates. It will be replaced by the minimum of all closed timestamps.

This change is a symptom of the "job" concept being ill-suited for this. This price of breaking the abstraction is not trivial, because this change also requires updates to the web UI and possibly other tools.


docs/RFCS/20180501_change_data_capture.md, line 385 at r1 (raw file):

truncated. Oracle GoldenGate operates as SQL statements and forwards the
truncate as a statement. The Debezium MySQL connector stores truncates in the
schema change topic, if it’s used.

I'd suggest having truncate trigger a function in the changefeed that will create changefeed events for all the rows currently in the truncated table. This can be done fully asynchronously since these rows will not change in KV after TRUNCATE completes (the truncate generates a new table ID).


docs/RFCS/20180501_change_data_capture.md, line 473 at r1 (raw file):

- Flow control/throttling of the ChangeFeed streams
- Lots of small changefeeds will require some coalescing

How to dump/restore or backup/restore all currently configured changefeeds.

I'd argue that changefeeds are much more database objects than jobs. I'd imagine them to have descriptors in the schema, and be subject to a shared code path as other descriptors. Including backup/restore, support in cockroach dump and SHOW CREATE.


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented May 3, 2018

Review status: all files reviewed at latest revision, 8 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 94 at r1 (raw file):

```sql
CREATE CHANGEFEED EMIT TABLE tweets

Summary of our discussion this morning:

  1. if you opt in the CREATE + ALTER pattern, then you're also opting in the symmetry with tables/views/sequences, and that mandates offering DROP CHANGEFEED and SHOW CREATE CHANGEFEED as well. You can probably desugar DROP CHANGEFEED to CANCEL JOB in the parser in your implementation, but we'll want to document the former.

We touched in that discussion on whether to offer PAUSE CHANGEFEED in addition to PAUSE JOB. I don't have firm opinion on that. We probably want to, again for consistency in docs. For this too you can desugar in the parser.

  1. in the first version I would personally recommend that there is exactly one EMIT target per changefeed, either EMIT TABLE or EMIT DATABASE. If a user wants changefeeds over multiple specific tables, they can create multiple changefeeds. We can now cancel/resume multiple jobs using queries so the administration would not be too complicated. Only if there's demand for more could you extend the implementation to support patterns.

  2. to select specific columns on a given table, make the syntax EMIT TABLE foo with optional EMIT TABLE foo (col1, col2, col3,...). To select columns in tables selected implicitly by EMIT DATABASE, extend the table descriptors themselves (not the changefeed config) with per-column attributes (which you'd configure with ALTER TABLE ... ALTER COLUMN ... SET NONPUBLIC)


Comments from Reviewable

@bdarnell
Copy link
Copy Markdown
Contributor

bdarnell commented May 3, 2018

Reviewed 1 of 1 files at r1.
Review status: all files reviewed at latest revision, 9 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 62 at r1 (raw file):

will work as expected.

While changes to a row are emitted in order, we offer “at least once” semantics,

This doesn't meet my definition of "in order". What good is this guarantee if you still have to remember that you've seen V3 so you don't overwrite it with the replayed V2?


docs/RFCS/20180501_change_data_capture.md, line 77 at r1 (raw file):

doesn’t do this ordering itself.)

Each emitted record contains the transaction timestamp the most recent change

You mean the timestamp of the version of the row, or could the record emitted for row version T1 contain some later timestamp T2 because it is known that another version has committed after T2?


docs/RFCS/20180501_change_data_capture.md, line 81 at r1 (raw file):

which causes the changefeed to periodically emit timestamp close notifications
on every kafka partition. The timestamp included in this record is a guarantee
that every record emitted to that partition afterward will have a higher

How does this interact with the at-least-once replays? Can we guarantee that records will never be replayed across a timestamp closure boundary, or do we just guarantee that any record T1 that is received after a closed timestamp T2 is a duplicate of some record that was emitted before the timestamp was closed?


docs/RFCS/20180501_change_data_capture.md, line 91 at r1 (raw file):

## Syntax Examples

TODO: This syntax is a placeholder, I'd love to hear your thoughts.

Is this syntax your own invention or is it following the example of some existing system? Postgres has CREATE PUBLICATION FOR TABLE $1 WITH (...). It's not exactly the same thing (the analogous setup for postgres would be creating a publication on the DB and then you'd consume that publication from some other process that passes it along to kafka), so I'm not sure if it's better to reuse CREATE PUBLICATION or make our own syntax, but either way we should be explicit about the decision. And even if we go with CREATE CHANGEFEED, it would be nice to be consistent in the little things of the syntax like FOR TABLE instead of EMIT TABLE.


docs/RFCS/20180501_change_data_capture.md, line 127 at r1 (raw file):

    format. The record value is an Avro record, mapping column names to column
    values, serialized in the binary format.
- `WITH sink=kafka` Kafka is currently the only sink option.

For ease of testing, we may want to consider a table sink to write a log of the raw changefeed data into a CRDB table. We may want this as a full-fledged feature in the future but for now I'm just thinking of an experimental/testing version to allow us to test changefeeds without external dependencies.


docs/RFCS/20180501_change_data_capture.md, line 172 at r1 (raw file):

Only paused changefeed jobs may be altered, so the user runs `PAUSE JOB
<jobid>`. (TODO: Should we error or pause the job for them if they try to alter a

ALTER shouldn't leave the job in a paused state. If we can't alter a running job then that should either be an error or it should do a PAUSE/ALTER/UNPAUSE sequence.


docs/RFCS/20180501_change_data_capture.md, line 190 at r1 (raw file):

A user has geo-partitioned their data for GDPR compliance and cannot let EU data
out of the EU. One changefeed is created for EU partitions, pointing at a Kafka

The syntax above uses FROM x TO y syntax. It would be nice to have syntactic sugar for creating a changefeed for a partition instead of having to duplicate the partition definition there.


docs/RFCS/20180501_change_data_capture.md, line 385 at r1 (raw file):

Previously, knz (kena) wrote…

I'd suggest having truncate trigger a function in the changefeed that will create changefeed events for all the rows currently in the truncated table. This can be done fully asynchronously since these rows will not change in KV after TRUNCATE completes (the truncate generates a new table ID).

This needs user feedback; emitting a changefeed event for every existing row is very expensive and I'm not sure that's ever what someone would want. I'd expect the user to truncate the elastic search (for example) table at the same time if they're going to truncate. I'd prefer the SQL server behavior (disallow truncation of tables with changefeeds) unless with have more confirmation of what people expect here.


docs/RFCS/20180501_change_data_capture.md, line 391 at r1 (raw file):

To avoid ambiguity and surprise if SELECT permissions are revoked from a user
that created an active changefeed, changefeeds may only be created by admin.

I'm fine with making changefeeds admin-only for v1 since they have a potentially large impact on the cluster, but eventually we should allow non-admin users to create changefeeds, and admins should be able to revoke those users' permissions (breaking/cancelling the feeds)


docs/RFCS/20180501_change_data_capture.md, line 405 at r1 (raw file):

subscribe to the diff of a changed row. Connectors in the Confluent platform and
other CDC implementations also seem to have a large number of configuration
options, so maybe this is just unavoidable.

We could keep the configuration complexity out of the cockroach binary by following the postgres example of publications. The knowledge of kafka would only exist in a separate binary which would pull a feed from cockroach. I don't know that that's a good tradeoff overall. Just for kafka, I'm OK with having everything in the main binary, but I wouldn't like to have a proliferation of these.


docs/RFCS/20180501_change_data_capture.md, line 470 at r1 (raw file):

# Unresolved Questions

- What happens if a row is changed twice in the same transaction?

Only the final value should be visible.


Comments from Reviewable

@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented May 10, 2018

@jordanlewis and/or @arjunravinarayan can you look at the distsql part of this please? I made it intentionally very simple for this first version of cdc but I’d still like more eyes to make sure I didnt miss something.

Thanks for the reviews so far! I’ll do a big pass to address comments after all the initial feedback is in.

@nvb
Copy link
Copy Markdown
Contributor

nvb commented May 10, 2018

Review status: all files reviewed at latest revision, 19 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 56 at r1 (raw file):

I think it's in order of mvcc timestamps

Yes, that's correct.

Also if you do in mvcc order, and multiple kv pairs have the same timestamp, do you want to guarantee the partial order? like mvcc ts, then key?

Dan can correct me if I'm wrong, but I think the idea is that no two mvcc values will ever have the same timestamp at the same key. If two values do have the same timestamp then they necessarily will be at different keys. In this situation, no ordering is defined.

I think this is what is implied by "Cross-row or cross-table order guarantees are not given."


docs/RFCS/20180501_change_data_capture.md, line 77 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

You mean the timestamp of the version of the row, or could the record emitted for row version T1 contain some later timestamp T2 because it is known that another version has committed after T2?

The former. The record will contain the timestamp of the corresponding MVCC value.


docs/RFCS/20180501_change_data_capture.md, line 103 at r1 (raw file):

Previously, knz (kena) wrote…

I'd suggest that if you want to give names to jobs, that this becomes a separate, orthogonal project (would also benefit other types of jobs).

I agree that it should be a separate discussion. I also agree that its something we should do.


docs/RFCS/20180501_change_data_capture.md, line 154 at r1 (raw file):

Previously, knz (kena) wrote…

The RFC could provide a super-short intro to Avro to provide a little more context to the example.

+1, I think that would be helpful.


docs/RFCS/20180501_change_data_capture.md, line 222 at r1 (raw file):

and keys.)

If the sink is underprovisioned and cannot keep up, the changefeed will

We'll want to test that this case gracefully fails without affecting foreground traffic.

Can you also explain what happens if a sink falls so far behind that it can no longer spill anything else to disk? This may not necessarily imply that it has fallen behind by more than the gc threshold, so it isn't necessarily a failure state. However, there won't be any backpressure, so instead, we'll need to fall back to lots of catch-up scans after employing some kind of FIFO policy for on-disk buffered records. I think this means we'll want to tear down our change feed until the sink is able to catch back up.


docs/RFCS/20180501_change_data_capture.md, line 295 at r1 (raw file):

Note that a ChangeFeed can be thought of as a slightly lighter weight additional replica.

I would change replica to Raft follower.


docs/RFCS/20180501_change_data_capture.md, line 311 at r1 (raw file):

Importantly, this
follower could be the slowest one, increasing commit-to-kafka changefeed
latency.

Raft snapshots will also make this a complicated case to handle.


Comments from Reviewable

@tbg
Copy link
Copy Markdown
Member

tbg commented May 12, 2018

Reviewed 1 of 1 files at r1.
Review status: all files reviewed at latest revision, 22 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 39 at r1 (raw file):

impacted as little as possible.

By default, when a new changefeed is created, an initial “catch-up” (TODO better

Maybe say that a current timestamp is chosen and that then the values at that timestamp are emitted.


docs/RFCS/20180501_change_data_capture.md, line 56 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I think it's in order of mvcc timestamps

Yes, that's correct.

Also if you do in mvcc order, and multiple kv pairs have the same timestamp, do you want to guarantee the partial order? like mvcc ts, then key?

Dan can correct me if I'm wrong, but I think the idea is that no two mvcc values will ever have the same timestamp at the same key. If two values do have the same timestamp then they necessarily will be at different keys. In this situation, no ordering is defined.

I think this is what is implied by "Cross-row or cross-table order guarantees are not given."

Rows are spread out over multiple KV pairs potentially, when there are column families. Say you have a row and first a write to /row/1 at ts=99 and then one to /row/0 at ts=100. If both are intents and are resolved out of order, then you are going to report first the value at 100 and then that at 99. They are for the same row. So unless there's something later in the doc that I haven't read yet, your guarantee is that you won't see changes to a single column family out of order.


docs/RFCS/20180501_change_data_capture.md, line 62 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This doesn't meet my definition of "in order". What good is this guarantee if you still have to remember that you've seen V3 so you don't overwrite it with the replayed V2?

I'm also curious whether these semantics are workable for the consumers.


docs/RFCS/20180501_change_data_capture.md, line 68 at r1 (raw file):

this.

Cross-row or cross-table order guarantees are not given.

May need to be updated to Cross-column family.


docs/RFCS/20180501_change_data_capture.md, line 97 at r1 (raw file):

    WITH sink=kafka, kafka_bootstrap_server=<address>
CREATE CHANGEFEED EMIT TABLE tweets VALUES FROM (1) TO (2) WITH <...>
CREATE CHANGEFEED EMIT DATABASE db TO KAFKA WITH <...>

What if we used a URL here?

TO 'kafka://host:port?column_whitelist=foo,bar&envelope=...'

That alleviates that there's a bunch of Kafka specific stuff that would otherwise become SQL (?).


docs/RFCS/20180501_change_data_capture.md, line 127 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

For ease of testing, we may want to consider a table sink to write a log of the raw changefeed data into a CRDB table. We may want this as a full-fledged feature in the future but for now I'm just thinking of an experimental/testing version to allow us to test changefeeds without external dependencies.

I like this idea.


docs/RFCS/20180501_change_data_capture.md, line 140 at r1 (raw file):

## Full-text index example: Elasticsearch

A user creates a json changefeed with no envelope and kafka_topic_prefix set to

The _ looks like it needs escaping.


docs/RFCS/20180501_change_data_capture.md, line 275 at r1 (raw file):

```proto
// ChangeAggregatorSpec is the specification for a processor that watches for
// changes in a set of spans. Each span may cross mutiple ranges.

multiple


docs/RFCS/20180501_change_data_capture.md, line 289 at r1 (raw file):

span. The changes are returned exactly as they are [proposed and sent through
raft]: in RocksDB WriteBatch format, possibly with unrelated data (parts of the
range’s keyspace not being watched by the ChangeFeed and range-local keys).

GDPR, though? The processor may already be outside of the required jurisdiction? I think this rules out the option on filtering on the columns you mention above.


docs/RFCS/20180501_change_data_capture.md, line 311 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Importantly, this
follower could be the slowest one, increasing commit-to-kafka changefeed
latency.

Raft snapshots will also make this a complicated case to handle.

More so than now? The leaseholder may not be the Raft leader and in particular may also receive a snapshot.


docs/RFCS/20180501_change_data_capture.md, line 323 at r1 (raw file):

Before `StreamState`, keys can be emitted out of order, so the ChangeAggregator
must buffer them (TODO mem vs disk). After `StreamState`, individual keys are

Ranges are 64mb right now, so I assume your aggregator will have some limit on the parallelism and will chug through over time. But if we support larger ranges (which sounds like the plan) you may run into the situation in which you're buffering significant amounts of Raft log before you get to send it.

Also, consider harping on the format in which the catch-up read is carried out. Using the KV interface naively would be pretty inefficient. You're going to want to stream out an SST or something like that.

BACKUP/RESTORE will have many of these same problems once range sizes become too large to fit into ram comfortably. The way I think this is reasonably addressed at the moment is to change the command that creates an SSTable to a streaming one, without changing the KV API. Instead, the response returns an endpoint at which the replica can be reached to stream out the SST.


docs/RFCS/20180501_change_data_capture.md, line 325 at r1 (raw file):

must buffer them (TODO mem vs disk). After `StreamState`, individual keys are
guaranteed to be emitted in timestamp order, which means no buffering is needed
and ChangeAggregator can immediately emit them as rows.

May need update w.r.t column families.


docs/RFCS/20180501_change_data_capture.md, line 328 at r1 (raw file):

After `StreamState`, the ChangeFeed will also begin emitting timestamp close
notifications, which are messages that indicate that no kvs with a lower

What do the close notifications guarantee in this PR? If an intent is written at time T, can the timestamp be closed out before it is resolved? Those are the follower reads style close notifications. If you want to make sure that no intents are below the closed out timestamp, you need something that doesn't exist today. I get the impression that you don't want to build something new here, which I support. But I don't see how consumers would use this without building that state-massaging layer that takes into account the close notifications and open intents themselves, individually. That also seems unfortunate. The PR should treat the interplay between intents, records, and closed timestamps in more detail.


docs/RFCS/20180501_change_data_capture.md, line 329 at r1 (raw file):

After `StreamState`, the ChangeFeed will also begin emitting timestamp close
notifications, which are messages that indicate that no kvs with a lower
timestamp (TODO what’s the < vs <= here?) than the one in the close

It's probably going to be <= but it doesn't really matter.

Escaping problem in the markdown here (< needs escape).


docs/RFCS/20180501_change_data_capture.md, line 330 at r1 (raw file):

notifications, which are messages that indicate that no kvs with a lower
timestamp (TODO what’s the < vs <= here?) than the one in the close
notification. These are passed on by ChangeAggregator to be used by downstream

... will be emitted.


docs/RFCS/20180501_change_data_capture.md, line 385 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This needs user feedback; emitting a changefeed event for every existing row is very expensive and I'm not sure that's ever what someone would want. I'd expect the user to truncate the elastic search (for example) table at the same time if they're going to truncate. I'd prefer the SQL server behavior (disallow truncation of tables with changefeeds) unless with have more confirmation of what people expect here.

What Ben said.


Comments from Reviewable

@jordanlewis
Copy link
Copy Markdown
Member

Review status: all files reviewed at latest revision, 33 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 97 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

What if we used a URL here?

TO 'kafka://host:port?column_whitelist=foo,bar&envelope=...'

That alleviates that there's a bunch of Kafka specific stuff that would otherwise become SQL (?).

+1, I'm into this idea.


docs/RFCS/20180501_change_data_capture.md, line 203 at r1 (raw file):

necessary. One table failing will not affect other tables.

If this buffer gets larger than the `TODO` cluster setting, the changefeed will

We might want to consider making this variable based on total disk space, as well. if the cluster setting was set to 10 GB, but there was only 1 GB left, it would be better to stall the changefeed than let this disk buffer fill the disks completely.


docs/RFCS/20180501_change_data_capture.md, line 248 at r1 (raw file):

configured changefeed options. Progress information in the form of `(span,
timestamp)` pairs is passed back to the coordinator via DistSQL metadata. (TODO:
Is the metadata the right place to do this?).

Why 2 processors, if they're always going to be 1:1? It's more overhead (not much, but some) - I think you might be better served by a single processor and some helper functions. I think that would also fix your progress information pass-back problem?

Although, what's this coordinator you speak of? Is that something above the DistSQL flow, or in int?


docs/RFCS/20180501_change_data_capture.md, line 282 at r1 (raw file):

  }
  repeated Watch watches = 1;
  bool incremental = 2;

What's this? I don't see any mention of it elsewhere.


docs/RFCS/20180501_change_data_capture.md, line 385 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

What Ben said.

We also have TRUNCATE vs TRUNCATE CASCADE to play with. I'd expect that running TRUNCATE on a table with a changefeed would fail, with a message that you should run TRUNCATE CASCADE instead - that should lessen user confusion.


Comments from Reviewable

@a-robinson
Copy link
Copy Markdown
Contributor

Review status: all files reviewed at latest revision, 36 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 127 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

I like this idea.

+1


docs/RFCS/20180501_change_data_capture.md, line 203 at r1 (raw file):

necessary. One table failing will not affect other tables.

If this buffer gets larger than the `TODO` cluster setting, the changefeed will

Is the setting for the memory buffer size or the disk buffer size? If the former, like most of our other memory-related settings, we'll probably want this to be a command-line flag so it can be customized for nodes running on different hardware. If the latter, it may need to be a command-line parameter on each store.


docs/RFCS/20180501_change_data_capture.md, line 222 at r1 (raw file):

and keys.)

If the sink is underprovisioned and cannot keep up, the changefeed will

This situation seems under-specified. Before we hit the gc threshold, what do we do when the in-memory/on-disk buffers fill up? It's different from the case above of an unavailable sink in that we can continue making progress so we wouldn't want to just stall.


docs/RFCS/20180501_change_data_capture.md, line 367 at r1 (raw file):

For easy development and testing, a 1 partition topic will be auto-created on
demand when missing. For production deployments, the user should set the Kafka
option to disallow auto-creation of topics and manually create the topic with

Does this allow for manual re-partitioning as well? If so, how does changing the partitioning scheme of a live changefeed affect cockroach?


Comments from Reviewable

@a-robinson
Copy link
Copy Markdown
Contributor

Reviewed 1 of 1 files at r1.
Review status: all files reviewed at latest revision, 39 unresolved discussions.


Comments from Reviewable

@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented May 15, 2018

Thanks for the comments everyone! This doesn't address everything, but I'm flying today and wanted to get out what I had.


Review status: 0 of 1 files reviewed at latest revision, 39 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 34 at r1 (raw file):

Previously, knz (kena) wrote…

I'll restate my opinion here that what you are defining in this RFC is not jobs but DB-level background services (or "tasks" or "processes").

A job conceptually has a finite, initially bounded amount of work to do, and thus a predictable end time. Changefeeds do not have any of this. There's to my knowledge no historical precedent for extending the term "job" in that way and I predict it will cause concerns. As always, terminology is important. Conflating many notions behind the same words means that it becomes hard to understand what we're talking about in conversations.

Done. I avoided picking a name for the category because I think it's important but I don't yet have an opinion between services vs tasks vs processes vs something else. Do you?


docs/RFCS/20180501_change_data_capture.md, line 39 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Maybe say that a current timestamp is chosen and that then the values at that timestamp are emitted.

Done.


docs/RFCS/20180501_change_data_capture.md, line 56 at r1 (raw file):

do you want to guarantee the partial order? like mvcc ts, then key?

As nathan mentions, this is intentionally not done to reduce our buffering.

Rows are spread out over multiple KV pairs potentially, when there are column families.

Good point. We need the per-row ordering to make the "easy case easy" (e.g. elasticsearch with no buffering) , so I think we have to buffer in the ChangeAggregator processor when a multi-family table is watched. We'll already have to build this for the pre-StreamState, so it's a performance hit but shouldn't be much more complexity. Added a note.


docs/RFCS/20180501_change_data_capture.md, line 62 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

I'm also curious whether these semantics are workable for the consumers.

Agreed that this is a confused usage of ordering, but we're pretty limited by what the sinks support. They tend to have limited exactly-once guarantees on the consumer side, so my current thought is there's not much we'd gain by doing the extra work to make it exactly-once on the producer side, when that's even possible. I think I see a way to do it using Kafka transactions, but a) I'll have to run some experiments to see what the perf overhead is and b) dunno how it will work on other sinks, on some of them it may be impossible to do exactly-once.


docs/RFCS/20180501_change_data_capture.md, line 68 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

May need to be updated to Cross-column family.

I think we need the row guarantees for usability (added this above), so this stays the same.


docs/RFCS/20180501_change_data_capture.md, line 77 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

The former. The record will contain the timestamp of the corresponding MVCC value.

Yeah, the former. Don't know what I was getting at with the previous wording


docs/RFCS/20180501_change_data_capture.md, line 81 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

How does this interact with the at-least-once replays? Can we guarantee that records will never be replayed across a timestamp closure boundary, or do we just guarantee that any record T1 that is received after a closed timestamp T2 is a duplicate of some record that was emitted before the timestamp was closed?

We can guarantee it if we synchronously update the job table before emitting a close notification. I added a note to the implementation section.


docs/RFCS/20180501_change_data_capture.md, line 91 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Is this syntax your own invention or is it following the example of some existing system? Postgres has CREATE PUBLICATION FOR TABLE $1 WITH (...). It's not exactly the same thing (the analogous setup for postgres would be creating a publication on the DB and then you'd consume that publication from some other process that passes it along to kafka), so I'm not sure if it's better to reuse CREATE PUBLICATION or make our own syntax, but either way we should be explicit about the decision. And even if we go with CREATE CHANGEFEED, it would be nice to be consistent in the little things of the syntax like FOR TABLE instead of EMIT TABLE.

I hadn't even looked at the logical decoding syntax, since it is such a different model. The PUBLICATION/SUBSCRIPTION division could be a nice way to give users's control over how to coalesce a large number of feeds, which is something I mention in the unresolved questions.

I dislike inventing our own and syntax and it does seem like we could plug into CREATE PUBLICATION/CREATE SUBSCRIPTION as-is. My biggest hesitation is that we'd have a different conninfo in the subscription and both would have a different set of options, so we wouldn't be a drop in replacement. Thoughts?


docs/RFCS/20180501_change_data_capture.md, line 97 at r1 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

+1, I'm into this idea.

I like this too, though I think we should only use query parameters in the URI for things specific to the sink (so envelope and column_whitelist would remain WITH options)


docs/RFCS/20180501_change_data_capture.md, line 127 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

+1

I also like this, the way I've written the unit tests in the prototype is brittle. Done.


docs/RFCS/20180501_change_data_capture.md, line 135 at r1 (raw file):

Previously, knz (kena) wrote…

I'd recommend the value part of each option/value pair to follow SQL expression syntax. For examples quote words like SQL string literals. This will allow users to compute the values from SQL expressions (and possibly later placeholders).

Ah, yes. I intended this, just constructed the examples wrong.


docs/RFCS/20180501_change_data_capture.md, line 140 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

The _ looks like it needs escaping.

Done.


docs/RFCS/20180501_change_data_capture.md, line 154 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

+1, I think that would be helpful.

Done.


docs/RFCS/20180501_change_data_capture.md, line 172 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

ALTER shouldn't leave the job in a paused state. If we can't alter a running job then that should either be an error or it should do a PAUSE/ALTER/UNPAUSE sequence.

Unless it was paused before, in which case it's surprising for alter to also unpause etc etc. I think we should do this, but there are enough UX edge cases that it's probably too low level to detail in the rfc


docs/RFCS/20180501_change_data_capture.md, line 190 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

The syntax above uses FROM x TO y syntax. It would be nice to have syntactic sugar for creating a changefeed for a partition instead of having to duplicate the partition definition there.

Oops. I intended for there to be one, but omitted it.


docs/RFCS/20180501_change_data_capture.md, line 236 at r1 (raw file):

Previously, knz (kena) wrote…

This change is a symptom of the "job" concept being ill-suited for this. This price of breaking the abstraction is not trivial, because this change also requires updates to the web UI and possibly other tools.

We don't want to duplicate the system.jobs infrastructure but changed this to stop calling it a "job".


docs/RFCS/20180501_change_data_capture.md, line 248 at r1 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Why 2 processors, if they're always going to be 1:1? It's more overhead (not much, but some) - I think you might be better served by a single processor and some helper functions. I think that would also fix your progress information pass-back problem?

Although, what's this coordinator you speak of? Is that something above the DistSQL flow, or in int?

Cool, this sort of thing is exactly why we looped you in, I'd been assuming that the overhead is low. I was doing this in the interest of composable processors. ChangeAggregator, for example, is exactly what naiad would need and KafkaEmitter could make it possible to select into kafka (if we ever decided we wanted to build that).

As for the progress, IMPORT has taught us that it's better to do it from one place if possible.

The coordinator is the job coordinator, it sets up the distsql flow and restarts it from the last checkpoint on error or node death.


docs/RFCS/20180501_change_data_capture.md, line 275 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

multiple

Done.


docs/RFCS/20180501_change_data_capture.md, line 282 at r1 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

What's this? I don't see any mention of it elsewhere.

Renamed to match the terminology I've been using elsewhere.


docs/RFCS/20180501_change_data_capture.md, line 289 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

GDPR, though? The processor may already be outside of the required jurisdiction? I think this rules out the option on filtering on the columns you mention above.

We currently don't have anything that causes a DistSQL flow to place processors in a certain region. Added a note above to mention there is work to be done here once distsql supports it


docs/RFCS/20180501_change_data_capture.md, line 295 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

Note that a ChangeFeed can be thought of as a slightly lighter weight additional replica.

I would change replica to Raft follower.

Done.


docs/RFCS/20180501_change_data_capture.md, line 311 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

More so than now? The leaseholder may not be the Raft leader and in particular may also receive a snapshot.

+1 I think we have to solve this regardless


docs/RFCS/20180501_change_data_capture.md, line 325 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

May need update w.r.t column families.

Done


docs/RFCS/20180501_change_data_capture.md, line 328 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

What do the close notifications guarantee in this PR? If an intent is written at time T, can the timestamp be closed out before it is resolved? Those are the follower reads style close notifications. If you want to make sure that no intents are below the closed out timestamp, you need something that doesn't exist today. I get the impression that you don't want to build something new here, which I support. But I don't see how consumers would use this without building that state-massaging layer that takes into account the close notifications and open intents themselves, individually. That also seems unfortunate. The PR should treat the interplay between intents, records, and closed timestamps in more detail.

I've written this assuming that the Checkpoint described in the change feeds rfc handles the intent closure tracking, is that not the case?


docs/RFCS/20180501_change_data_capture.md, line 329 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

It's probably going to be <= but it doesn't really matter.

Escaping problem in the markdown here (< needs escape).

Done.


docs/RFCS/20180501_change_data_capture.md, line 330 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

... will be emitted.

Done.


docs/RFCS/20180501_change_data_capture.md, line 367 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

Does this allow for manual re-partitioning as well? If so, how does changing the partitioning scheme of a live changefeed affect cockroach?

Done.


docs/RFCS/20180501_change_data_capture.md, line 385 at r1 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

We also have TRUNCATE vs TRUNCATE CASCADE to play with. I'd expect that running TRUNCATE on a table with a changefeed would fail, with a message that you should run TRUNCATE CASCADE instead - that should lessen user confusion.

I was initially hesitant for the presence of a changefeed on a table to affect which DDL statements can be run on it, but after more thought nothing else seems to make sense. Because of our row ordering guarantee, the delete backfill could cause a tremendous amount of buffering. And relying on a schema change topic is fiddly and error prone. In the absence of customer feedback otherwise, I'm taking ben's suggestion and modeling our behavior after sql server.


docs/RFCS/20180501_change_data_capture.md, line 391 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

I'm fine with making changefeeds admin-only for v1 since they have a potentially large impact on the cluster, but eventually we should allow non-admin users to create changefeeds, and admins should be able to revoke those users' permissions (breaking/cancelling the feeds)

Done.


docs/RFCS/20180501_change_data_capture.md, line 405 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

We could keep the configuration complexity out of the cockroach binary by following the postgres example of publications. The knowledge of kafka would only exist in a separate binary which would pull a feed from cockroach. I don't know that that's a good tradeoff overall. Just for kafka, I'm OK with having everything in the main binary, but I wouldn't like to have a proliferation of these.

This is really an artifact of us having the equivalent of the "connector" inside cockroach, which we're forced to do because of our distributed nature.


docs/RFCS/20180501_change_data_capture.md, line 470 at r1 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Only the final value should be visible.

Done.


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented May 15, 2018

No further comment at this time. Nice RFC so far.


Reviewed 1 of 1 files at r2.
Review status: all files reviewed at latest revision, 36 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 34 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Done. I avoided picking a name for the category because I think it's important but I don't yet have an opinion between services vs tasks vs processes vs something else. Do you?

No opinion yet.


docs/RFCS/20180501_change_data_capture.md, line 209 at r2 (raw file):

watching multiple tables. The `key` and `value` are encoded using the `format`
specified by the changefeed. `message_id`s are only comparable between rows with
equal `key`s.

For UX and ease of analyzing/testing/troubleshooting I would recommend splitting the key column into a table key prefix and per-table key.


Comments from Reviewable

@rjnn
Copy link
Copy Markdown
Contributor

rjnn commented May 15, 2018

docs/RFCS/20180501_change_data_capture.md, line 248 at r2 (raw file):

If the sink is underprovisioned and cannot keep up, the changefeed will
eventually fall behind by more than the gc threshold and will fail. Additional

How difficult would it be to stall the gc in such a case? cc @tschottdorf


Comments from Reviewable

@rjnn
Copy link
Copy Markdown
Contributor

rjnn commented May 15, 2018

Review status: all files reviewed at latest revision, 37 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 248 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Cool, this sort of thing is exactly why we looped you in, I'd been assuming that the overhead is low. I was doing this in the interest of composable processors. ChangeAggregator, for example, is exactly what naiad would need and KafkaEmitter could make it possible to select into kafka (if we ever decided we wanted to build that).

As for the progress, IMPORT has taught us that it's better to do it from one place if possible.

The coordinator is the job coordinator, it sets up the distsql flow and restarts it from the last checkpoint on error or node death.

I don't believe the overhead is very much, but also refactoring processors is not difficult, so I wouldn't preemptively start with two processors. We can always split them out into several processors later when we work on materialized views. You should feel free to keep this part as simple as possible, which to me sounds like 1 processor for now.


docs/RFCS/20180501_change_data_capture.md, line 260 at r2 (raw file):

A new page is used in the AdminUI to show the status of changefeeds, distinct
from the "Jobs" page because changefeeds don't have a finite lifespan.
Incrementally materialized views, for example, would also appear on this page.

nit: incrementally updated materialized views.


docs/RFCS/20180501_change_data_capture.md, line 300 at r2 (raw file):

and the amount of work duplicated when the flow is restarted should be
relatively small, so it’s not as bad as it first sounds. If/when DistSQL's
capabilities change, this will be revisited.

It would be great if you could provide some additional insight as to how much better your life would be if DistSQL had fault tolerance and resilience built in, as that would help us organize our longer term roadmap. From this reading it sounds like you can work around it without too much trouble, which means we can continue to deprioritize it and kick that can down the road. Is that true?


docs/RFCS/20180501_change_data_capture.md, line 313 at r2 (raw file):

manually pausing and resuming the job. This restarting of the flow is costly in
proportion to the size of the changefeed and will affect tail latencies, so
something should be done here in future work.

One thing that comes up here is that lineage-based fault tolerance in DistSQL can be used to provide this (if you detect that you are now a remote processor, you can ask your parent to recreate their subflow from scratch, and then decommission you). This movement use-case also ties into the comment above on DistSQL fault tolerance.


docs/RFCS/20180501_change_data_capture.md, line 316 at r2 (raw file):

GDPR will also require that processors be scheduled in a certain region, once
that's supported by DistSQL.

@jordanlewis I believe this should not be that hard. Do we have an issue for tracking this piece of work? It might make for a good intern project (its got the trifecta of not too difficult, interesting, and not currently on the critical path for the next release).


docs/RFCS/20180501_change_data_capture.md, line 483 at r2 (raw file):

these other ones), which will not always work.

As a result, we’ve decided to give CockroachDB users the information necessary

What is the information necessary to reconstruct transactions? Is it just the hlc timestamp? Or is it the hlc timestamp along with the node on which the transaction master intent was?


Comments from Reviewable

@jordanlewis
Copy link
Copy Markdown
Member

Review status: all files reviewed at latest revision, 41 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 316 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

@jordanlewis I believe this should not be that hard. Do we have an issue for tracking this piece of work? It might make for a good intern project (its got the trifecta of not too difficult, interesting, and not currently on the critical path for the next release).

Seems like a good idea to me. I don't think this is tracked. We probably need this for partitioned data as well, although maybe it happens automatically already by virtue of data locality?


Comments from Reviewable

@nvb
Copy link
Copy Markdown
Contributor

nvb commented May 21, 2018

Review status: all files reviewed at latest revision, 40 unresolved discussions.


docs/RFCS/20180501_change_data_capture.md, line 311 at r1 (raw file):

The leaseholder may not be the Raft leader and in particular may also receive a snapshot.

Yeah, that's a good point. When we see a raft snapshot, we won't know what changed because we don't have access to the individual raft entries that constitute the delta between the raft index before the snapshot and after. I don't think we can do much more than return an error on the change feed and force another catch-up scan. The downstream receiver can then use its most recent closed timestamp to bound this catch-up scan.


docs/RFCS/20180501_change_data_capture.md, line 328 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I've written this assuming that the Checkpoint described in the change feeds rfc handles the intent closure tracking, is that not the case?

I talked to Dan about this in-person. I was under the impression that we still needed this "state-massaging layer" that buffered intents and only output committed KVs. The layer would also be in charge of translating follower reads style close timestamps to a "resolved timestamp".

You say you're in support of not building anything new here, but it's not clear whether you think there's a way to avoid building this kind of translation/buffering layer.


docs/RFCS/20180501_change_data_capture.md, line 248 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

How difficult would it be to stall the gc in such a case? cc @tschottdorf

This is something that we could do without too much issue, but I don't think we should actually expose this option. A general theme in this RFC is that CDC performance should not affect live traffic. With an option like this, a single CDC sink going down could have dramatic effects on the health of a user's entire database. On top of this, I don't think the dependency would be easy to explain, so I doubt many users would understand the risks until it started causing issues in Cockroach. We will want to document that if a user doesn't want to lose data in a CDC stream that's down for over 24 hours, they'll want to increase their GC TLL, but I don't think we should do anything more fancy than that.

TLDR; I think it would make it way too easy for users to shoot themselves in the foot.


Comments from Reviewable

@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented May 24, 2018

Thanks for all the reviews, everyone! I think this is pretty much ready to go. PTAL


Review status: 0 of 1 files reviewed at latest revision, 40 unresolved discussions, some commit checks pending.


docs/RFCS/20180501_change_data_capture.md, line 62 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Agreed that this is a confused usage of ordering, but we're pretty limited by what the sinks support. They tend to have limited exactly-once guarantees on the consumer side, so my current thought is there's not much we'd gain by doing the extra work to make it exactly-once on the producer side, when that's even possible. I think I see a way to do it using Kafka transactions, but a) I'll have to run some experiments to see what the perf overhead is and b) dunno how it will work on other sinks, on some of them it may be impossible to do exactly-once.

I had a conversation with Nathan about this and realized that it's not possible to do what I was initially thinking. I've updated this with the new ordering guarantee


docs/RFCS/20180501_change_data_capture.md, line 81 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

We can guarantee it if we synchronously update the job table before emitting a close notification. I added a note to the implementation section.

Turns out we can't guarantee that without kafka transactions. Updated this to what we can actually guarantee


docs/RFCS/20180501_change_data_capture.md, line 91 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I hadn't even looked at the logical decoding syntax, since it is such a different model. The PUBLICATION/SUBSCRIPTION division could be a nice way to give users's control over how to coalesce a large number of feeds, which is something I mention in the unresolved questions.

I dislike inventing our own and syntax and it does seem like we could plug into CREATE PUBLICATION/CREATE SUBSCRIPTION as-is. My biggest hesitation is that we'd have a different conninfo in the subscription and both would have a different set of options, so we wouldn't be a drop in replacement. Thoughts?

I have gone on a journey and discovered that logical decoding and logical replication are different things! I've updated this with a discussion of why we invented our own syntax, though I don't have strong feelings on this, so if anyone has input I'm all ears. Also switched EMIT TABLE to FOR TABLE


docs/RFCS/20180501_change_data_capture.md, line 94 at r1 (raw file):

Previously, knz (kena) wrote…

Summary of our discussion this morning:

  1. if you opt in the CREATE + ALTER pattern, then you're also opting in the symmetry with tables/views/sequences, and that mandates offering DROP CHANGEFEED and SHOW CREATE CHANGEFEED as well. You can probably desugar DROP CHANGEFEED to CANCEL JOB in the parser in your implementation, but we'll want to document the former.

We touched in that discussion on whether to offer PAUSE CHANGEFEED in addition to PAUSE JOB. I don't have firm opinion on that. We probably want to, again for consistency in docs. For this too you can desugar in the parser.

  1. in the first version I would personally recommend that there is exactly one EMIT target per changefeed, either EMIT TABLE or EMIT DATABASE. If a user wants changefeeds over multiple specific tables, they can create multiple changefeeds. We can now cancel/resume multiple jobs using queries so the administration would not be too complicated. Only if there's demand for more could you extend the implementation to support patterns.

  2. to select specific columns on a given table, make the syntax EMIT TABLE foo with optional EMIT TABLE foo (col1, col2, col3,...). To select columns in tables selected implicitly by EMIT DATABASE, extend the table descriptors themselves (not the changefeed config) with per-column attributes (which you'd configure with ALTER TABLE ... ALTER COLUMN ... SET NONPUBLIC)

Done, except I've moved column whitelist to the unresolved section because we don't yet have the distsql infrastructure to make it useful for gdpr and I need to cut scope where I can


docs/RFCS/20180501_change_data_capture.md, line 103 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I agree that it should be a separate discussion. I also agree that its something we should do.

Cool. In that case, I'm going to remove this TODO


docs/RFCS/20180501_change_data_capture.md, line 203 at r1 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

We might want to consider making this variable based on total disk space, as well. if the cluster setting was set to 10 GB, but there was only 1 GB left, it would be better to stall the changefeed than let this disk buffer fill the disks completely.

That's a cool idea, though I'm inclined to follow precedent of the existing --max-sql-memory and --max-disk-temp-storage flags which don't seem to do this (I think)


docs/RFCS/20180501_change_data_capture.md, line 203 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

Is the setting for the memory buffer size or the disk buffer size? If the former, like most of our other memory-related settings, we'll probably want this to be a command-line flag so it can be customized for nodes running on different hardware. If the latter, it may need to be a command-line parameter on each store.

I added both to mirror the ones we have for sql processing, but perhaps this is overkill.


docs/RFCS/20180501_change_data_capture.md, line 222 at r1 (raw file):

Previously, a-robinson (Alex Robinson) wrote…

This situation seems under-specified. Before we hit the gc threshold, what do we do when the in-memory/on-disk buffers fill up? It's different from the case above of an unavailable sink in that we can continue making progress so we wouldn't want to just stall.

Reworked this (and split the details into the reference-level explanation). I still don't feel like the story here is totally figured out tbh


docs/RFCS/20180501_change_data_capture.md, line 222 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

We'll want to test that this case gracefully fails without affecting foreground traffic.

Can you also explain what happens if a sink falls so far behind that it can no longer spill anything else to disk? This may not necessarily imply that it has fallen behind by more than the gc threshold, so it isn't necessarily a failure state. However, there won't be any backpressure, so instead, we'll need to fall back to lots of catch-up scans after employing some kind of FIFO policy for on-disk buffered records. I think this means we'll want to tear down our change feed until the sink is able to catch back up.

Reworked this (and split the details into the reference-level explanation). I still don't feel like the story here is totally figured out tbh


docs/RFCS/20180501_change_data_capture.md, line 248 at r1 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

I don't believe the overhead is very much, but also refactoring processors is not difficult, so I wouldn't preemptively start with two processors. We can always split them out into several processors later when we work on materialized views. You should feel free to keep this part as simple as possible, which to me sounds like 1 processor for now.

"refactoring processors is not difficult" seems to understate the overhead and technical debt of backwards compatibility of changing the processor spec protos. However, if I don't get this exactly right the first time, we end up in the same place, anyway. I defer to y'alls judgement; 1 processor it is.


docs/RFCS/20180501_change_data_capture.md, line 311 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

The leaseholder may not be the Raft leader and in particular may also receive a snapshot.

Yeah, that's a good point. When we see a raft snapshot, we won't know what changed because we don't have access to the individual raft entries that constitute the delta between the raft index before the snapshot and after. I don't think we can do much more than return an error on the change feed and force another catch-up scan. The downstream receiver can then use its most recent closed timestamp to bound this catch-up scan.

Done.


docs/RFCS/20180501_change_data_capture.md, line 323 at r1 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Ranges are 64mb right now, so I assume your aggregator will have some limit on the parallelism and will chug through over time. But if we support larger ranges (which sounds like the plan) you may run into the situation in which you're buffering significant amounts of Raft log before you get to send it.

Also, consider harping on the format in which the catch-up read is carried out. Using the KV interface naively would be pretty inefficient. You're going to want to stream out an SST or something like that.

BACKUP/RESTORE will have many of these same problems once range sizes become too large to fit into ram comfortably. The way I think this is reasonably addressed at the moment is to change the command that creates an SSTable to a streaming one, without changing the KV API. Instead, the response returns an endpoint at which the replica can be reached to stream out the SST.

I added some detail about the catch-up scan. Mentioned larger range sizes, but not sure that anything more complicated than reusing ExportRequest is going to make it in 2.1


docs/RFCS/20180501_change_data_capture.md, line 328 at r1 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

I talked to Dan about this in-person. I was under the impression that we still needed this "state-massaging layer" that buffered intents and only output committed KVs. The layer would also be in charge of translating follower reads style close timestamps to a "resolved timestamp".

You say you're in support of not building anything new here, but it's not clear whether you think there's a way to avoid building this kind of translation/buffering layer.

Nathan explained offline that our latest thinking has been to push the open intent tracking and the catch up scan downstream of the ChangeFeed primitive. Which in this case, means into the cdc distsql flow. Updated.


docs/RFCS/20180501_change_data_capture.md, line 473 at r1 (raw file):

Previously, knz (kena) wrote…

How to dump/restore or backup/restore all currently configured changefeeds.

I'd argue that changefeeds are much more database objects than jobs. I'd imagine them to have descriptors in the schema, and be subject to a shared code path as other descriptors. Including backup/restore, support in cockroach dump and SHOW CREATE.

It's not clear to me yet how backup/restore of changefeeds should work. I've left this in unresolved questions


docs/RFCS/20180501_change_data_capture.md, line 209 at r2 (raw file):

Previously, knz (kena) wrote…

For UX and ease of analyzing/testing/troubleshooting I would recommend splitting the key column into a table key prefix and per-table key.

This key/value is the encoded key/value (with json or avro) not our raw key values. Good call on the table name, that seems useful


docs/RFCS/20180501_change_data_capture.md, line 248 at r2 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

This is something that we could do without too much issue, but I don't think we should actually expose this option. A general theme in this RFC is that CDC performance should not affect live traffic. With an option like this, a single CDC sink going down could have dramatic effects on the health of a user's entire database. On top of this, I don't think the dependency would be easy to explain, so I doubt many users would understand the risks until it started causing issues in Cockroach. We will want to document that if a user doesn't want to lose data in a CDC stream that's down for over 24 hours, they'll want to increase their GC TLL, but I don't think we should do anything more fancy than that.

TLDR; I think it would make it way too easy for users to shoot themselves in the foot.

Agreed. It's way too easy for this to surprise the user in a bad way. I do think it's reasonable for this to be an option in the user's production runbook if they won't be able to bring the sink back in time (and we should document that)


docs/RFCS/20180501_change_data_capture.md, line 260 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

nit: incrementally updated materialized views.

Done.


docs/RFCS/20180501_change_data_capture.md, line 300 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

It would be great if you could provide some additional insight as to how much better your life would be if DistSQL had fault tolerance and resilience built in, as that would help us organize our longer term roadmap. From this reading it sounds like you can work around it without too much trouble, which means we can continue to deprioritize it and kick that can down the road. Is that true?

Happy to talk about this in detail offline, but the summary is we can work around it but we'll have bad tail latencies until it's fixed. But this is certainly not the only thing affecting tail latencies, so I don't know yet if it will be the bottleneck. So far, I don't think there's distsql work to be done for cdc in 2.1 but we'll probably want to improve this at some point


docs/RFCS/20180501_change_data_capture.md, line 313 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

One thing that comes up here is that lineage-based fault tolerance in DistSQL can be used to provide this (if you detect that you are now a remote processor, you can ask your parent to recreate their subflow from scratch, and then decommission you). This movement use-case also ties into the comment above on DistSQL fault tolerance.

Yeah, seems related. Is that something you're planning on building?


docs/RFCS/20180501_change_data_capture.md, line 316 at r2 (raw file):

Previously, jordanlewis (Jordan Lewis) wrote…

Seems like a good idea to me. I don't think this is tracked. We probably need this for partitioned data as well, although maybe it happens automatically already by virtue of data locality?

Yeah, it probably happens to do the right thing now, but we'll need better guarantees (and tests) if we want users to rely on this


docs/RFCS/20180501_change_data_capture.md, line 483 at r2 (raw file):

Previously, arjunravinarayan (Arjun Narayan) wrote…

What is the information necessary to reconstruct transactions? Is it just the hlc timestamp? Or is it the hlc timestamp along with the node on which the transaction master intent was?

timestamp plus close notifications. check out the guide-level explanation and let me know if it doesn't make sense


Comments from Reviewable

@danhhz danhhz removed the do-not-merge bors won't merge a PR with this label. label May 24, 2018
@knz
Copy link
Copy Markdown
Contributor

knz commented May 24, 2018

Reviewed 1 of 1 files at r3.
Review status: all files reviewed at latest revision, 37 unresolved discussions, all commit checks successful.


docs/RFCS/20180501_change_data_capture.md, line 40 at r3 (raw file):

By default, when a new changefeed is created, an initial timestamp is chosen and
the current value of each watched row as of that timestamp is emitted (similar
to `SELECT ... AS OF SYSTEM TIME`). After this "initial scan", updates to

There's a practical consideration here. Say a user enables changefeeds over a very, very large table. So there are, say, millions of rows at that timestamp, and the CDC sink is not particularly fast.

This initial scan may take a long time, perhaps more than a day.

In the meantime, the database is perhaps active with lots of new data but also data deletions.

How do you propose to track these changes while the backscan at the initial timestamp is running? In particular ensure that none of this intermediate events are lost when the GC window for them expires?


docs/RFCS/20180501_change_data_capture.md, line 161 at r3 (raw file):

command and can be used view the changefeed definition.

Tthe following `ALTER CHANGEFEED` commands can be used to change the

"The"


docs/RFCS/20180501_change_data_capture.md, line 291 at r3 (raw file):

DistSQL is currently optimized for short-running queries and doesn’t yet have a
facility for resilience to processor errors. Instead, it shuts down the whole
flow. This is unfortunate for CDC, but failures should be relatively infrequent

"failures should be relatively infrequent" - I'd like to challenge this: spencer has been harping on the "node failure should be considered a common occurrence" story line sufficiently long that it ostensibly clashes with the assumption here.


docs/RFCS/20180501_change_data_capture.md, line 559 at r3 (raw file):

- A column whitelist, which could be useful for GDPR compliance or performance
- Backup/restore of changefeeds
- Details of recovering a failed changefeed using `revision_history` backups

I'm hestitant to say "this RFC is good to go" when there are unresolved questions still :)
I would be more comfortable if you would outline your opinion about those things, and at least bin them in different categories upfront: "will be figured out during initial implementation", "will be investigated for v2", "can remain unsupported, because workaround/alternative exists", etc.


Comments from Reviewable

@bdarnell
Copy link
Copy Markdown
Contributor

:lgtm:


Review status: all files reviewed at latest revision, 33 unresolved discussions, all commit checks successful.


docs/RFCS/20180501_change_data_capture.md, line 107 at r3 (raw file):

```sql
CREATE CHANGEFEED <name> FOR TABLE tweets EMIT TO 'kafka://host:port' WITH <...>

This introduces two new (non-reserved, I assume) keywords. I'm fine with the new CHANGEFEED keyword. Is there an existing keyword we could adopt here instead of EMIT? (or maybe even omit it completely: CREATE CHANGEFEED FOR TABLE t TO 'kafka:...'. This would be analogous to the use of TO url in BACKUP, but using TO for that might be inviting too much yacc complexity since it also appears in other clauses)

OUTPUT TO might be a possibility - OUTPUT is a reserved word in sql-92, but it was un-reserved in newer versions of the spec and it's not reserved in postgres. The risk of name collisions here is probably too high. I'm OK with EMIT if no other alternatives present themselves.


docs/RFCS/20180501_change_data_capture.md, line 192 at r3 (raw file):

## Data warehouse example: Amazon Redshift

TODO: This could use more detail.

Remove this TODO unless you're planning to do it.


Comments from Reviewable

@knz
Copy link
Copy Markdown
Contributor

knz commented May 28, 2018

Review status: all files reviewed at latest revision, 35 unresolved discussions, all commit checks successful.


docs/RFCS/20180501_change_data_capture.md, line 107 at r3 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

This introduces two new (non-reserved, I assume) keywords. I'm fine with the new CHANGEFEED keyword. Is there an existing keyword we could adopt here instead of EMIT? (or maybe even omit it completely: CREATE CHANGEFEED FOR TABLE t TO 'kafka:...'. This would be analogous to the use of TO url in BACKUP, but using TO for that might be inviting too much yacc complexity since it also appears in other clauses)

OUTPUT TO might be a possibility - OUTPUT is a reserved word in sql-92, but it was un-reserved in newer versions of the spec and it's not reserved in postgres. The risk of name collisions here is probably too high. I'm OK with EMIT if no other alternatives present themselves.

This reminds me that SQL has a keyword that's already dedicated to this pattern: INTO


Comments from Reviewable

tbg added a commit to tbg/cockroach that referenced this pull request Jun 4, 2018
Follower reads are consistent reads at historical timestamps from follower
replicas. They make the non-leader replicas in a range suitable sources for
historical reads.

The key enabling technology is the propagation of a **closed timestamp
heartbeats** from the range leaseholder to followers. The closed timestamp
heartbeat (CT heartbeat) is more than just a timestamp. It is a set of
conditions, that if met, guarantee that a follower replica has all state
necessary to satisfy reads at or before the CT.

Consistent historical reads are useful for analytics queries and in particular
allow such queries to be carried out more efficiently and, with appropriate
configuration, away from foreground traffic. But historical reads are also key
to a proposal for [reference-like
tables](cockroachdb#26301) aimed at cutting
down on foreign key check latencies particularly in geo-distributed clusters;
they help recover a reasonably recent consistent snapshot of a cluster after a
loss of quorum; and they are one of the ingredients for [Change Data
Capture](cockroachdb#25229).

Release note: None
@tbg tbg mentioned this pull request Jun 4, 2018
@danhhz danhhz requested a review from a team June 6, 2018 20:10
@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented Jun 6, 2018

I think I've responded to all the comments, but Reviewable is acting up again, so please ping me if I missed one. @nvanbenschoten and @tschottdorf, mind taking another pass? I was planning on waiting for your LGTMs too before merging

The user facing timestamp is a "resolved" timestamp under our new nomenclature, so I changed it here to avoid confusing our future selves. I also removed the use of AS OF SYSTEM TIME and made its functionality into a WITH option since it made the ALTER really awkward.


Review status: 0 of 1 files reviewed at latest revision, 35 unresolved discussions, some commit checks pending.


docs/RFCS/20180501_change_data_capture.md, line 40 at r3 (raw file):

Previously, knz (kena) wrote…

There's a practical consideration here. Say a user enables changefeeds over a very, very large table. So there are, say, millions of rows at that timestamp, and the CDC sink is not particularly fast.

This initial scan may take a long time, perhaps more than a day.

In the meantime, the database is perhaps active with lots of new data but also data deletions.

How do you propose to track these changes while the backscan at the initial timestamp is running? In particular ensure that none of this intermediate events are lost when the GC window for them expires?

There's not much we can do besides buffer them, which brings it's own problems. I've added a note about this in the Failures section


docs/RFCS/20180501_change_data_capture.md, line 107 at r3 (raw file):

Previously, knz (kena) wrote…

This reminds me that SQL has a keyword that's already dedicated to this pattern: INTO

INTO sgtm


docs/RFCS/20180501_change_data_capture.md, line 161 at r3 (raw file):

Previously, knz (kena) wrote…

"The"

Done.


docs/RFCS/20180501_change_data_capture.md, line 192 at r3 (raw file):

Previously, bdarnell (Ben Darnell) wrote…

Remove this TODO unless you're planning to do it.

Done.


docs/RFCS/20180501_change_data_capture.md, line 291 at r3 (raw file):

Previously, knz (kena) wrote…

"failures should be relatively infrequent" - I'd like to challenge this: spencer has been harping on the "node failure should be considered a common occurrence" story line sufficiently long that it ostensibly clashes with the assumption here.

I think the conclusion here is still okay, but I reworded it a bit.


docs/RFCS/20180501_change_data_capture.md, line 559 at r3 (raw file):

Previously, knz (kena) wrote…

I'm hestitant to say "this RFC is good to go" when there are unresolved questions still :)
I would be more comfortable if you would outline your opinion about those things, and at least bin them in different categories upfront: "will be figured out during initial implementation", "will be investigated for v2", "can remain unsupported, because workaround/alternative exists", etc.

All of these fall under "not initially necessary and in the interest of cutting scope, we aren't thinking about them until at least 2.1"


Comments from Reviewable

@tbg
Copy link
Copy Markdown
Member

tbg commented Jun 7, 2018

:lgtm_strong:


Reviewed 1 of 1 files at r4.
Review status: all files reviewed at latest revision, 26 unresolved discussions, all commit checks successful.


docs/RFCS/20180501_change_data_capture.md, line 248 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Agreed. It's way too easy for this to surprise the user in a bad way. I do think it's reasonable for this to be an option in the user's production runbook if they won't be able to bring the sink back in time (and we should document that)

Share Nathan and Dan's opionions, we could stall GC but definitely let's not.


docs/RFCS/20180501_change_data_capture.md, line 483 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

timestamp plus close notifications. check out the guide-level explanation and let me know if it doesn't make sense

You need the transaction IDs in all committed values or you're already toast during the catch-up scan (which may also be required during regular operation of a change feed).


docs/RFCS/20180501_change_data_capture.md, line 43 at r4 (raw file):

watched rows are emitted. The `WITH cursor=<timestamp>` syntax can be used to
start a new changefeed that skips the initial scan and only emits all changes
after the user-given timestamp.

Perhaps clarify to "at or after".


docs/RFCS/20180501_change_data_capture.md, line 108 at r4 (raw file):

CREATE CHANGEFEED <name> FOR TABLE tweets INTO 'kafka://host:port' WITH <...>
CREATE CHANGEFEED <name> FOR TABLE tweets VALUES FROM (1) TO (2) INTO <...>
CREATE CHANGEFEED <name> FOR TABLE tweets PARTITION north_america INTO <...>

Probably unrelated question, but north_america doesn't need to be quoted or anything?


docs/RFCS/20180501_change_data_capture.md, line 126 at r4 (raw file):

    - TODO options
- `WITH <...>`
  - `WITH envelope=<...>` Kafka records have a key and a value. The key is

This shouldn't be Kafka specific (since otherwise it belongs in the query string). Probably just s/Kafka//?


docs/RFCS/20180501_change_data_capture.md, line 157 at r4 (raw file):

and unpause changefeeds.

`DROP CHANGEFEED <name>` can be used to stop a changefeed.

s/stop/remove/, stop sounds undoable.


docs/RFCS/20180501_change_data_capture.md, line 179 at r4 (raw file):

## Full-text index example: Elasticsearch

MovR is a ride sharing company. Their users are quite particular about which

Link to the repo (unless this is a different toy example called MovR).


docs/RFCS/20180501_change_data_capture.md, line 185 at r4 (raw file):

current availability in [ElasticSearch].

The authoratative store for vehicles is a table in CockroachDB:

authoritative


docs/RFCS/20180501_change_data_capture.md, line 237 at r4 (raw file):

and the Confluent Schema Registry are used to deal with schema changes.

MovR is popular, so the rides don't fit in one Kafka partition are sharded

"and are sharded"


docs/RFCS/20180501_change_data_capture.md, line 343 at r4 (raw file):

Each flow is made of a set of `ChangeAggregator` processors. Progress
information in the form of `(span, timestamp)` pairs is passed back to the job
coordinator via DistSQL metadata. (TODO: Is the metadata the right place to do

Not to let that hold back the merge, but that TODO should be figured out.


docs/RFCS/20180501_change_data_capture.md, line 412 at r4 (raw file):

geography-aware scheduling; this would have to be built.

A new `ChangeFeed` first registers itself to emit any relevant raft commands as

RangeFeed here and elsewhere in this section?


docs/RFCS/20180501_change_data_capture.md, line 420 at r4 (raw file):

raft hook was registered. These partially overlap the `ChangeFeed` output and
must be deduplicated with it. `ExportRequest` already uses the time-bounded
iterator to minimize the data read, but support for `READ_UNCOMMITTED` is added

The range feed will simply stream intents upstream (which means these intents may or may not commit or move or whatever), and the distributed architecture has to deduplicate/track/resolve them, correct? I feel that that should be made really clear at the beginning of the section to set the stage (even though an individual section is dedicated to it below). Apologies if I missed it somewhere up there.


docs/RFCS/20180501_change_data_capture.md, line 444 at r4 (raw file):

[follower reads]) which are a guarantee that no new intents or transactions will
be written below a timestamp. Notably, this does not imply that all the intents
below it have been resolved. An in-memory structure is bootstraped with all open

bootstrapped


Comments from Reviewable

@nvb
Copy link
Copy Markdown
Contributor

nvb commented Jun 11, 2018

:lgtm: merge it!


Review status: :shipit: complete! 1 of 0 LGTMs obtained (and 2 stale)


docs/RFCS/20180501_change_data_capture.md, line 412 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

RangeFeed here and elsewhere in this section?

+1, I'm making this change in https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20170613_change_feeds_storage_primitive.md as well, so they won't get out of sync.


docs/RFCS/20180501_change_data_capture.md, line 420 at r4 (raw file):

The range feed will simply stream intents upstream (which means these intents may or may not commit or move or whatever), and the distributed architecture has to deduplicate/track/resolve them, correct?

Yes, that's correct.


Comments from Reviewable

Change Data Capture (CDC) provides efficient, distributed, row-level
change subscriptions.

CockroachDB is an excellent system of record, but no technology exists
in a vacuum. Users would like to keep their data mirrored in full-text
indexes, analytics engines, and big data pipelines, among others. A pull
model can currently be used to do this, but it’s inefficient, overly
manual, and doesn’t scale; the push model described in this RFC
addresses these limitations.

Release note: None
@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented Jun 11, 2018

Thanks for the reviews, everyone! Merging on green 🙌


Review status: :shipit: complete! 1 of 0 LGTMs obtained (and 2 stale)


docs/RFCS/20180501_change_data_capture.md, line 483 at r2 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

You need the transaction IDs in all committed values or you're already toast during the catch-up scan (which may also be required during regular operation of a change feed).

This is talking about reconstructing them from the user's perspective, which is just grouping by timestamp (which is not perfectly accurate if two transactions have exactly the same hlc timestamp)


docs/RFCS/20180501_change_data_capture.md, line 43 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Perhaps clarify to "at or after".

Done.


docs/RFCS/20180501_change_data_capture.md, line 108 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Probably unrelated question, but north_america doesn't need to be quoted or anything?

tbh I'm never entirely sure until I make the grammar change and try it, but I don't think so. they don't need to be quoted when making the partition


docs/RFCS/20180501_change_data_capture.md, line 126 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

This shouldn't be Kafka specific (since otherwise it belongs in the query string). Probably just s/Kafka//?

Done.


docs/RFCS/20180501_change_data_capture.md, line 157 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

s/stop/remove/, stop sounds undoable.

Done.


docs/RFCS/20180501_change_data_capture.md, line 179 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Link to the repo (unless this is a different toy example called MovR).

looks like the repo isn't public yet. in the meantime, added "fictional"


docs/RFCS/20180501_change_data_capture.md, line 185 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

authoritative

Done.


docs/RFCS/20180501_change_data_capture.md, line 237 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

"and are sharded"

Done.


docs/RFCS/20180501_change_data_capture.md, line 343 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

Not to let that hold back the merge, but that TODO should be figured out.

Oops. I did check on this and just forgot to remove the TODO


docs/RFCS/20180501_change_data_capture.md, line 412 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

+1, I'm making this change in https://github.com/cockroachdb/cockroach/blob/master/docs/RFCS/20170613_change_feeds_storage_primitive.md as well, so they won't get out of sync.

Good call. Done


docs/RFCS/20180501_change_data_capture.md, line 420 at r4 (raw file):

Previously, nvanbenschoten (Nathan VanBenschoten) wrote…

The range feed will simply stream intents upstream (which means these intents may or may not commit or move or whatever), and the distributed architecture has to deduplicate/track/resolve them, correct?

Yes, that's correct.

Done.


docs/RFCS/20180501_change_data_capture.md, line 444 at r4 (raw file):

Previously, tschottdorf (Tobias Schottdorf) wrote…

bootstrapped

Done.


Comments from Reviewable

@danhhz
Copy link
Copy Markdown
Contributor Author

danhhz commented Jun 11, 2018

bors r=bdarnell,nvanbenschoten,tschottdorf

craig bot pushed a commit that referenced this pull request Jun 11, 2018
25229: rfc: Change Data Capture r=bdarnell,nvanbenschoten,tschottdorf a=danhhz

Change Data Capture (CDC) provides efficient, distributed, row-level
change subscriptions.

CockroachDB is an excellent system of record, but no technology exists
in a vacuum. Users would like to keep their data mirrored in full-text
indexes, analytics engines, and big data pipelines, among others. A pull
model can currently be used to do this, but it’s inefficient, overly
manual, and doesn’t scale; the push model described in this RFC
addresses these limitations.

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
@craig
Copy link
Copy Markdown
Contributor

craig bot commented Jun 11, 2018

Build succeeded

@craig craig bot merged commit 35a1def into cockroachdb:master Jun 11, 2018
@danhhz danhhz deleted the cdc_rfc branch June 11, 2018 21:44
@tbg
Copy link
Copy Markdown
Member

tbg commented Jun 12, 2018

Review status: :shipit: complete! 1 of 0 LGTMs obtained (and 2 stale)


docs/RFCS/20180501_change_data_capture.md, line 483 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

This is talking about reconstructing them from the user's perspective, which is just grouping by timestamp (which is not perfectly accurate if two transactions have exactly the same hlc timestamp)

Ack. But you're reconstructing sets of transactional writes, if that feature ever becomes relevant I'd make sure to call out the distinction because users will likely need to worry about the difference in some use cases.


Comments from Reviewable

@glerchundi
Copy link
Copy Markdown

glerchundi commented Jun 20, 2018

@danhhz overall looks good although as we don't have Kafka in our stack it's useless for us until another sink(s) is/are implemented.

I have a question regarding to multi-provider deployments. We started with a 1 region k8s cluster (US - GCP GKE) with CockroachDB on it but we plan to extend this to another region/provider (EU - AWS EKS). Is it possible to capture changes of one partition and propagate them to a service which is located as closest as possible? I mean, where is the change data capturer located, is this process executed close to the region so that traffic does not cross between regions?

In this case it would be EU - AWS with Amazon SQS as a sink and US - GCP GKE with Google Pub Sub as a sink (our use case does not require ordering just distinguish between old and new values).

This would be enough to achieve what I explained?

CREATE CHANGEFEED tweets-us FOR TABLE tweets PARTITION us INTO 'sqs://...'
CREATE CHANGEFEED tweets-eu FOR TABLE tweets PARTITION eu INTO 'googlepubsub://...'

Thanks for making this happen!

/cc @kannanlakshmi

tbg added a commit to tbg/cockroach that referenced this pull request Jun 28, 2018
Follower reads are consistent reads at historical timestamps from follower
replicas. They make the non-leader replicas in a range suitable sources for
historical reads.

The key enabling technology is the propagation of a **closed timestamp
heartbeats** from the range leaseholder to followers. The closed timestamp
heartbeat (CT heartbeat) is more than just a timestamp. It is a set of
conditions, that if met, guarantee that a follower replica has all state
necessary to satisfy reads at or before the CT.

Consistent historical reads are useful for analytics queries and in particular
allow such queries to be carried out more efficiently and, with appropriate
configuration, away from foreground traffic. But historical reads are also key
to a proposal for [reference-like
tables](cockroachdb#26301) aimed at cutting
down on foreign key check latencies particularly in geo-distributed clusters;
they help recover a reasonably recent consistent snapshot of a cluster after a
loss of quorum; and they are one of the ingredients for [Change Data
Capture](cockroachdb#25229).

Release note: None
craig bot pushed a commit that referenced this pull request Jul 18, 2018
26362: RFC: follower reads r=bdarnell,nvanbenschoten a=tschottdorf

NB: this is extracted from #21056; please don't add new commentary on the
tech note there.

----

Follower reads are consistent reads at historical timestamps from follower
replicas. They make the non-leader replicas in a range suitable sources for
historical reads.

The key enabling technology is the propagation of **closed timestamp
heartbeats** from the range leaseholder to followers. The closed timestamp
heartbeat (CT heartbeat) is more than just a timestamp. It is a set of
conditions, that if met, guarantee that a follower replica has all state
necessary to satisfy reads at or before the CT.

Consistent historical reads are useful for analytics queries and in particular
allow such queries to be carried out more efficiently and, with appropriate
configuration, away from foreground traffic. But historical reads are also key
to a proposal for [reference-like tables](#26301) aimed at cutting
down on foreign key check latencies particularly in geo-distributed clusters;
they help recover a reasonably recent consistent snapshot of a cluster after a
loss of quorum; and they are one of the ingredients for [Change Data
Capture](#25229).

Release note: None

27699: storage: fix stopper race in compactor r=petermattis a=tschottdorf

Starting workers without a surrounding task is unfortunately often not
the right thing to do when the worker accesses other state that might
become invalidated once the stopper begins to stop. In this particular
case, the compactor might end up accessing the engine even though it
had already been closed.

I wasn't able to repro this failure in the first place, but pretty sure
this:
Fixes #27232.

Release note: None

27704: issues: fix email fallback r=petermattis a=tschottdorf

This was not my email address.

Release note: None

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants