changefeedccl: Freeze table name to the (optionally fully qualified) statement time name#59258
Conversation
94a608f to
cfa8fdf
Compare
(optionally fully qualified) statement time name Previously, Kafka topics and top-level keys were always derived from the table name in changefeeds. If the table name changed, the feed eventually failed, and if the table name was non-unique across databases, collisions were unavoidable. This PR adds a WITH full_table_name option to changefeeds, and honors it by serializing movr.public.drivers as the statement time name and relying on that. There are probably more things that need to change downstream. Release note (sql change): Added "WITH full_table_name" option to create a changefeed on "movr.public.drivers" instead of "drivers".
miretskiy
left a comment
There was a problem hiding this comment.
Reviewed 11 of 12 files at r1, 1 of 1 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @HonoreDB)
pkg/ccl/changefeedccl/avro.go, line 392 at r1 (raw file):
// record schema. The fields are kept in the same order as columns in the index. func indexToAvroSchema( tableDesc catalog.TableDescriptor, indexDesc *descpb.IndexDescriptor, name string,
Could you either rename name to something more descriptive (e.g. statementTimeTableName), or at least
add a comment to that effect.
pkg/ccl/changefeedccl/changefeed_stmt.go, line 203 at r2 (raw file):
if table, isTable := desc.(catalog.TableDescriptor); isTable { _, qualified := opts[changefeedbase.OptFullTableName] name, err := getMaybeQualifiedTableName(ctx, table, *p.ExecCfg(), p.Txn(), qualified)
nit: would something like getChangefeedTargetName be a better name?
pkg/ccl/changefeedccl/changefeed_stmt.go, line 687 at r2 (raw file):
// or view represented by the provided descriptor. It is a sort of // reverse of the Resolve() functions.
nit: I would drop "it's sort of ..." part
pkg/ccl/changefeedccl/changefeed_stmt.go, line 717 at r2 (raw file):
) (string, error) { if qualified { tbl, err := getQualifiedTableName(ctx, execCfg, txn, desc)
Perhaps change getQualifiedTableName signature to return "string, error" so that you can make this if block
return getQualifiedTableName(....)
pkg/ccl/changefeedccl/changefeed_test.go, line 214 at r1 (raw file):
} //TODO(zinger): No reason in principle this shouldn't be honored here //t.Run(`sinkless`, sinklessTest(testFn))
why doesn't this work?
nit: Also, if possible phrase todos as action items.
pkg/ccl/changefeedccl/encoder.go, line 277 at r1 (raw file):
type confluentAvroEncoder struct { registryURL string updatedField, beforeField, keyOnly, fullTableName bool
nit: maybe rename to useFullTableName?
pkg/ccl/changefeedccl/encoder.go, line 329 at r1 (raw file):
changefeedbase.OptKeyInValue, changefeedbase.OptFormat, changefeedbase.OptFormatAvro) } _, e.fullTableName = opts[changefeedbase.OptFullTableName]
I'm having hard time finding the uses of this boolean field. Shouldn't we pass a different name
when calling indexToAvroSchema if this option is true (in the EncodeKey)?
pkg/ccl/changefeedccl/encoder_test.go, line 444 at r2 (raw file):
} //t.Run(`sinkless`, sinklessTest(testFn))
drop comment?
pkg/ccl/changefeedccl/encoder_test.go, line 445 at r2 (raw file):
//t.Run(`sinkless`, sinklessTest(testFn)) t.Run(`enterprise`, enterpriseTest(testFn))
nice test.
pkg/ccl/changefeedccl/sink.go, line 334 at r2 (raw file):
} func (s *kafkaSink) setTargets(targets jobspb.ChangefeedTargets) {
nit: perhaps setTargetNames?
On the other hand: the sql sink initializes similar things directly inside makeSqlSink.
I wonder if it's better to do the same thing here. If anything, this may seem like it's
okay to call setTargets multiple times, which probably isn't true.
|
pkg/ccl/changefeedccl/changefeed_stmt.go, line 717 at r2 (raw file): Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Done. |
|
pkg/ccl/changefeedccl/changefeed_test.go, line 214 at r1 (raw file): Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Done. |
|
pkg/ccl/changefeedccl/encoder.go, line 329 at r1 (raw file): Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Thanks for this catch, I'd forgotten this part wasn't done (and doesn't get tested because we don't have real Kafka testing). Need to make sure locally that this actually works now. |
|
pkg/ccl/changefeedccl/encoder_test.go, line 444 at r2 (raw file): Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Done. |
|
pkg/ccl/changefeedccl/sink.go, line 334 at r2 (raw file): Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Yeah, this refactor is awkward and more for testing. This is doing a little more than setting names, the descriptor IDs in the keys are also used. |
HonoreDB
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @adityamaru, @HonoreDB, and @miretskiy)
pkg/ccl/changefeedccl/avro.go, line 392 at r1 (raw file):
Previously, miretskiy (Yevgeniy Miretskiy) wrote…
Could you either rename
nameto something more descriptive (e.g.statementTimeTableName), or at least
add a comment to that effect.
Done.
miretskiy
left a comment
There was a problem hiding this comment.
Reviewed 5 of 5 files at r3.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @adityamaru and @HonoreDB)
Release note: None
|
bors r=[miretskiy] |
|
Build succeeded: |
Previously, Kafka topics and top-level keys were always derived from the
table name in changefeeds. If the table name changed, the feed
eventually failed, and if the table name was non-unique across
databases, collisions were unavoidable. This PR adds a WITH
full_table_name option to changefeeds, and honors it by serializing
movr.public.drivers as the statement time name and relying on that.
There are probably more things that need to change downstream.
Release note (sql change): Added "WITH full_table_name" option to create
a changefeed on "movr.public.drivers" instead of
"drivers".