cdc: Support Schema Changes with Backfill#30902
cdc: Support Schema Changes with Backfill#30902craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
0aa5031 to
cfd45d0
Compare
danhhz
left a comment
There was a problem hiding this comment.
Looking great so far. I'm pleasantly surprised how clean a lot of this is.
I'm a bit confused about why poller is handling this complexity; I thought we'd discussed offline that buffer would be doing it. This mostly matters once we do #28669, which is going to open up the possibility of shutting down RangeFeeds when the sink is slow. This seems similar to me. A backfill scan would also cause the buffer to back up and whatever mechanism is monitoring it would be able to shut down the RangeFeed.
On the other hand, it does seem like the timestamp boundaries are simpler this way since you can adjust the nextHighWater (though that advantage goes away when we switch to RangeFeed).
Update: After thinking about this overnight, I'm more convinced that poller is the right place to put this right now. Once the buffer actually buffers (which means we don't need to do this adjust the high-water dance, can avoid throwing away data, and don't need to restart rangefeeds for schemachanges) we should revisit.
Reviewable status:
complete! 0 of 0 LGTMs obtained
pkg/ccl/changefeedccl/buffer.go, line 25 at r1 (raw file):
// If the value's timestamp is greater than the minimum, the schema at the // value timestamp should be used instead. minSchemaTimestamp hlc.Timestamp
why is this a greater/lesser relationship? can we have it be an override when not the zero value or am i missing something?
pkg/ccl/changefeedccl/changefeed_test.go, line 433 at r1 (raw file):
// Test schema changes that require a backfill when the backfill option is // allowed. func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
love the tests! 👏
pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):
}) }) }
can we add a test that checks scanBoundaries queueing up many things (maybe by running many schema changes)
pkg/ccl/changefeedccl/poller.go, line 62 at r1 (raw file):
// the initial scan of the table when starting a new Changefeed, and when // a backfilling schema change is marked as completed. scanBoundaries []hlc.Timestamp
does this have to stay sorted? seems like the code below is assuming that
pkg/ccl/changefeedccl/poller.go, line 578 at r1 (raw file):
} if lastVersion.HasColumnBackfillMutation() && !desc.HasColumnBackfillMutation() { p.mu.scanBoundaries = append(p.mu.scanBoundaries, desc.GetModificationTime())
can you add a check that high-water is less than the boundary you're about to queue up? i think this means moving it inside your mu struct
pkg/ccl/changefeedccl/table_history_test.go, line 49 at r1 (raw file):
// advance require.NoError(t, m.IngestDescriptors(context.TODO(), ts(0), ts(1), nil))
nit: these are all noisy. make a ctx variable above. also context.Background() is appropriate in tests
|
Thanks for the prompt feedback! I've been a little sidetracked today looking at an unrelated flaky test issue; I should get this turned around by the end of today. |
cfd45d0 to
cda34ee
Compare
mrtracy
left a comment
There was a problem hiding this comment.
In terms of putting this in the poller: I think we're in agreement. In the poll/export world, I think this is obviously the best solution, as the tableHistory is polled on the same interval as the data itself, and we have an explicit maximum timestamp available when calling Export. I don't think it could get much cleaner.
In terms of rangefeeds, I think its going to be trickier than it appears at first glance to buffer their output and multiplexing it with a scan, but it is possible. I'm certain it can be done, but the new component stack at that point might not resemble the current buffer/poller combination, it might be a more considerable refactoring.
Reviewable status:
complete! 0 of 0 LGTMs obtained
pkg/ccl/changefeedccl/buffer.go, line 25 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
why is this a greater/lesser relationship? can we have it be an override when not the zero value or am i missing something?
I was thinking this might come up if we had a situation where a scan was returning keys that were created after the scan boundary, but that never happens anymore; i've changed it to a simple override, updated the field name and comment appropriately.
pkg/ccl/changefeedccl/changefeed_test.go, line 433 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
love the tests! 👏
Thanks! But really you wrote these a while ago, i just reorganized them a bit.
pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
can we add a test that checks scanBoundaries queueing up many things (maybe by running many schema changes)
That's a good idea, but i'm going to have to construct a test that's a bit different than the others (I think i'll have to use a hook). I'm going ahead and publishing the rest of the changes you requested, I'll get this one out soon.
pkg/ccl/changefeedccl/poller.go, line 62 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
does this have to stay sorted? seems like the code below is assuming that
Yes, it gets sorted in the only place it is updated, but I will add that information to the comment.
pkg/ccl/changefeedccl/poller.go, line 578 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
can you add a check that high-water is less than the boundary you're about to queue up? i think this means moving it inside your
mu struct
Done.
pkg/ccl/changefeedccl/table_history_test.go, line 49 at r1 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: these are all noisy. make a ctx variable above. also context.Background() is appropriate in tests
Done.
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/buffer.go, line 23 at r2 (raw file):
resolved *jobspb.ResolvedSpan // Timestamp of the schema that should be used to read this KV. // If unset (default-valued), the value's timestamp will be used instead.
nit: I feel like I usually see "zero-valued" for this in our codebase
pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):
r.row.deleted = rf.RowIsDeleted() r.row.timestamp = kv.Value.Timestamp
i think we also have to use schemaTimestamp here if it's set. otherwise, you'll get two copies of a row with the same timestamp but with different data, which is super confusing. especially in the presence of job restarts, how would the user know which one to keep? intuitively, it makes some sense to me that the backfill would be emitted with the descriptor timestamp. (oh, but we'll have to make sure we use the kv timestamps for the initial scan. that will have to be special cased)
also, this definitely deserves a test. maybe something based on TestChangefeedTimestamps
pkg/ccl/changefeedccl/changefeed_test.go, line 433 at r1 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
Thanks! But really you wrote these a while ago, i just reorganized them a bit.
I liked the way you reorganized them!
pkg/ccl/changefeedccl/changefeed_test.go, line 469 at r2 (raw file):
}) sqlDB.Exec(t, `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED`) sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (3)`)
should we look for this in the output or maybe remove it?
pkg/ccl/changefeedccl/poller.go, line 146 at r2 (raw file):
// Determine if we are at a scanBoundary, and trigger a full scan if needed. isFullScan := false
is isFullScan redundant with lastHighwater == nextHighwater?
pkg/ccl/changefeedccl/poller.go, line 165 at r2 (raw file):
p.mu.Unlock() if isFullScan {
is this backward?
pkg/ccl/changefeedccl/poller.go, line 227 at r2 (raw file):
if performScan { if err := p.exportSpansParallel( ctx, spans, lastHighwater, lastHighwater, true, /* fullScan */
it's a little strange that this pops p.mu.scanBoundaries and then uses lastHighwater. i understand that they should match, but maybe there's some way we can make it more explicit
pkg/ccl/changefeedccl/poller.go, line 272 at r2 (raw file):
switch t := e.GetValue().(type) { case *roachpb.RangeFeedValue: if err := p.tableHist.WaitForTS(ctx, t.Value.Timestamp); err != nil {
oof, this is not new in your PR, but this is making me notice that we backpressure raft on this. no way that cluster setting is even close to usable until we make buffer actually buffer and move all this stuff later in the changefeed flow
can you stick a TODO in here about it and reference #28669. also mention that we currently have to tear down and then set up all rangefeeds every time there is a backfill
pkg/ccl/changefeedccl/poller.go, line 328 at r2 (raw file):
} func (p *poller) getSpansToProcess(ctx context.Context) ([]roachpb.Span, error) {
nit: can we make this a function that takes spans and db instead of a method on poller?
pkg/ccl/changefeedccl/poller.go, line 419 at r2 (raw file):
// When outputting a full scan, we want to use the schema at the scan // timestamp, not the schema at the value timestamp. var minSchemaTimetamp hlc.Timestamp
s/minSchemaTimetamp/schemaTimestamp/ (remove "min" and correct typo in "timestamp")
pkg/ccl/changefeedccl/poller.go, line 580 at r2 (raw file):
} func (p *poller) validateTables(ctx context.Context, desc *sqlbase.TableDescriptor) error {
nit: odd that the name is plural but it takes one table
pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):
// To avoid race conditions with the lease manager, at this point we force // the manager to acquire the freshest descriptor of this table from the // store.
And the lease manager always returns the newest descriptor it has that matches the requested timestamp? Huh. TIL
cda34ee to
80a1715
Compare
mrtracy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/buffer.go, line 23 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: I feel like I usually see "zero-valued" for this in our codebase
Done.
pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
i think we also have to use schemaTimestamp here if it's set. otherwise, you'll get two copies of a row with the same timestamp but with different data, which is super confusing. especially in the presence of job restarts, how would the user know which one to keep? intuitively, it makes some sense to me that the backfill would be emitted with the descriptor timestamp. (oh, but we'll have to make sure we use the kv timestamps for the initial scan. that will have to be special cased)
also, this definitely deserves a test. maybe something based on TestChangefeedTimestamps
Why do we need to use the KV timestamps for the initial scan? It seems to me that the original timestamps don't matter in that case either; in fact, we want to use the current timestamp, as we need to pick up any backfilled schema changes that have occurred since the last modification of a row (in fact, I think this means our initial scans are currently wrong).
I've added a TODO here because this breaks a lot of the assumptions of our tests in subtle ways, it needs its own dedicated PR. Is that acceptable, or does it need to be swapped in the scope of this change?
pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
That's a good idea, but i'm going to have to construct a test that's a bit different than the others (I think i'll have to use a hook). I'm going ahead and publishing the rest of the changes you requested, I'll get this one out soon.
I've added this test, it seems pretty good.
pkg/ccl/changefeedccl/changefeed_test.go, line 469 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
should we look for this in the output or maybe remove it?
Just removed it.
pkg/ccl/changefeedccl/poller.go, line 146 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
is
isFullScanredundant withlastHighwater == nextHighwater?
Technically no, its based on detecting a scan boundary, after which nextHighwater is set to lastHightwater. However, it would be impossible to get into that situation otherwise.
This value gets passed to other functions that have different behavior for scans, I feel that its easier to read when its an explicit boolean like this.
pkg/ccl/changefeedccl/poller.go, line 165 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
is this backward?
Yes it is! Fixed, and good catch.
pkg/ccl/changefeedccl/poller.go, line 227 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
it's a little strange that this pops p.mu.scanBoundaries and then uses lastHighwater. i understand that they should match, but maybe there's some way we can make it more explicit
Instead of using the "perfomScan" boolean, I now use a "scanTime" hlc.Timestamp which gets set to the scan boundary if it exists.
pkg/ccl/changefeedccl/poller.go, line 272 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
oof, this is not new in your PR, but this is making me notice that we backpressure raft on this. no way that cluster setting is even close to usable until we make buffer actually buffer and move all this stuff later in the changefeed flow
can you stick a TODO in here about it and reference #28669. also mention that we currently have to tear down and then set up all rangefeeds every time there is a backfill
TODOs added.
pkg/ccl/changefeedccl/poller.go, line 328 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: can we make this a function that takes spans and db instead of a method on poller?
Done.
pkg/ccl/changefeedccl/poller.go, line 419 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
s/minSchemaTimetamp/schemaTimestamp/ (remove "min" and correct typo in "timestamp")
Done.
80a1715 to
b6ea83c
Compare
mrtracy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/poller.go, line 580 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: odd that the name is plural but it takes one table
Done.
pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
And the lease manager always returns the newest descriptor it has that matches the requested timestamp? Huh. TIL
Yes indeed.
b6ea83c to
c72336d
Compare
danhhz
left a comment
There was a problem hiding this comment.
sorry to hold up the pr, but i think its worth hashing out the behavior of timestamps in scans first
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
Why do we need to use the KV timestamps for the initial scan? It seems to me that the original timestamps don't matter in that case either; in fact, we want to use the current timestamp, as we need to pick up any backfilled schema changes that have occurred since the last modification of a row (in fact, I think this means our initial scans are currently wrong).
I've added a TODO here because this breaks a lot of the assumptions of our tests in subtle ways, it needs its own dedicated PR. Is that acceptable, or does it need to be swapped in the scope of this change?
Let's slack/hangout and figure this out today. I have reservations about landing this PR as-is in case the followup ends up not making 2.1.0 for some reason
pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
I've added this test, it seems pretty good.
yeah, this turned out great. nice use of the hook and waitgroup
pkg/ccl/changefeedccl/poller.go, line 146 at r2 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
Technically no, its based on detecting a scan boundary, after which nextHighwater is set to lastHightwater. However, it would be impossible to get into that situation otherwise.
This value gets passed to other functions that have different behavior for scans, I feel that its easier to read when its an explicit boolean like this.
👍
pkg/ccl/changefeedccl/poller.go, line 227 at r2 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
Instead of using the "perfomScan" boolean, I now use a "scanTime" hlc.Timestamp which gets set to the scan boundary if it exists.
much better!
pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):
Previously, mrtracy (Matt Tracy) wrote…
Yes indeed.
let's stick that in the comment here, too
c72336d to
38ff199
Compare
mrtracy
left a comment
There was a problem hiding this comment.
@danhhz RFAL, I have made the emitted timestamp change and adjusted tests as per our offline discussion.
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
Let's slack/hangout and figure this out today. I have reservations about landing this PR as-is in case the followup ends up not making 2.1.0 for some reason
Okay, as discussed in our slack, we have made this change because we expect its behavior is correct (and that the previous behavior of initial scans might have been incorrect).
I have modified two tests accordingly: TestValidators has had the "fingerprintValidator" removed until it can be repaired, that is tracked by issue #31110. TestChangefeedTimestamps has been modified to account for the new initial scan timestamps.
pkg/ccl/changefeedccl/helpers_test.go, line 532 at r3 (raw file):
Quoted 16 lines of code…
t testing.TB, sqlDB *sqlutils.SQLRunner, stmt string, arguments ...interface{}, ) { sqlDB.Exec(t, stmt, arguments...) row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1") var jobID string row.Scan(&jobID) testutils.SucceedsSoon(t, func() error { row := sqlDB.QueryRow(t, "SELECT status FROM [SHOW JOBS] WHERE job_id = $1", jobID) var status string row.Scan(&status) if status != "succeeded" { return fmt.Errorf("Job %s had status %s, wanted 'succeeded'", jobID, status) } return nil })
pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
let's stick that in the comment here, too
Done.
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/validations_test.go, line 73 at r4 (raw file):
var numResolved, rowsSinceResolved int // TODO(mrtracy): Re-enable Fingerprint validator.
normally, i'd ask that this have the pr that disabled it and some context so you don't have to git blame to dig up the context. but we're planning on fixing this RealSoonNow (tm), so don't eat a teamcity cycle for it
38ff199 to
333d404
Compare
mrtracy
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained
pkg/ccl/changefeedccl/validations_test.go, line 73 at r4 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
normally, i'd ask that this have the pr that disabled it and some context so you don't have to git blame to dig up the context. but we're planning on fixing this RealSoonNow (tm), so don't eat a teamcity cycle for it
I ate the roundtrip anyway, done.
|
bors r=danhhz |
Changefeeds can now correctly continue when watched tables are altered in ways that require a backfill operation. To represent a backfill through the changefeed, a full scan of the watched tables is performed at the timestamp when the table descriptor for the finalized schema change is detected (when the modified columns become public). Note that, due to the backfill process writing rows in the background, the changefeed will see numerous "no-op" writes, where each backfilled row is written with no apparent changes. However, this does not violate any correctness guarantees, it is just inefficient. Additionally, note that this commit changes the behavior of initial scans; previously, an initial scan would return the latest value for each key at the timestamp of the key. However, in order to capture schema changes that occurred in between the CHANGEFEED creation and the last write of each key, we now treat the initial scan as a backfill and output each key at the statement time. We have temporarily modified one test to allow this to be committed, re-enabling it is tracked by cockroachdb#31110. Release note: Changefeeds now continue running when watched tables are ALTERed in ways that require a backfill.
333d404 to
57c5450
Compare
Canceled |
|
bors r=danhhz |
30902: cdc: Support Schema Changes with Backfill r=danhhz a=mrtracy Changefeeds can now correctly continue when watched tables are altered in ways that require a backfill operation. To represent a backfill through the changefeed, a full scan of the watched tables is performed at the timestamp when the table descriptor for the finalized schema change is detected (when the modified columns become public). Note that, due to the backfill process writing rows in the background, the changefeed will see numerous "no-op" writes, where each backfilled row is written with no apparent changes. However, this does not violate any correctness guarantees, it is just inefficient. Release note: Changefeeds now continue running when watched tables are ALTERed in ways that require a backfill. Co-authored-by: Matt Tracy <matt@cockroachlabs.com>
Build succeeded |
Changefeeds can now correctly continue when watched tables are altered
in ways that require a backfill operation.
To represent a backfill through the changefeed, a full scan of
the watched tables is performed at the timestamp when the table
descriptor for the finalized schema change is detected (when the
modified columns become public).
Note that, due to the backfill process writing rows in the background,
the changefeed will see numerous "no-op" writes, where each backfilled
row is written with no apparent changes. However, this does not
violate any correctness guarantees, it is just inefficient.
Release note: Changefeeds now continue running when watched tables
are ALTERed in ways that require a backfill.