Skip to content

cdc: Support Schema Changes with Backfill#30902

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
mrtracy:mtracy/cdc_backfill_support
Oct 9, 2018
Merged

cdc: Support Schema Changes with Backfill#30902
craig[bot] merged 1 commit intocockroachdb:masterfrom
mrtracy:mtracy/cdc_backfill_support

Conversation

@mrtracy
Copy link
Copy Markdown
Contributor

@mrtracy mrtracy commented Oct 2, 2018

Changefeeds can now correctly continue when watched tables are altered
in ways that require a backfill operation.

To represent a backfill through the changefeed, a full scan of
the watched tables is performed at the timestamp when the table
descriptor for the finalized schema change is detected (when the
modified columns become public).

Note that, due to the backfill process writing rows in the background,
the changefeed will see numerous "no-op" writes, where each backfilled
row is written with no apparent changes. However, this does not
violate any correctness guarantees, it is just inefficient.

Release note: Changefeeds now continue running when watched tables
are ALTERed in ways that require a backfill.

@mrtracy mrtracy requested review from a team and danhhz October 2, 2018 21:45
@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from 0aa5031 to cfd45d0 Compare October 2, 2018 21:58
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great so far. I'm pleasantly surprised how clean a lot of this is.

I'm a bit confused about why poller is handling this complexity; I thought we'd discussed offline that buffer would be doing it. This mostly matters once we do #28669, which is going to open up the possibility of shutting down RangeFeeds when the sink is slow. This seems similar to me. A backfill scan would also cause the buffer to back up and whatever mechanism is monitoring it would be able to shut down the RangeFeed.

On the other hand, it does seem like the timestamp boundaries are simpler this way since you can adjust the nextHighWater (though that advantage goes away when we switch to RangeFeed).

Update: After thinking about this overnight, I'm more convinced that poller is the right place to put this right now. Once the buffer actually buffers (which means we don't need to do this adjust the high-water dance, can avoid throwing away data, and don't need to restart rangefeeds for schemachanges) we should revisit.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained


pkg/ccl/changefeedccl/buffer.go, line 25 at r1 (raw file):

	// If the value's timestamp is greater than the minimum, the schema at the
	// value timestamp should be used instead.
	minSchemaTimestamp hlc.Timestamp

why is this a greater/lesser relationship? can we have it be an override when not the zero value or am i missing something?


pkg/ccl/changefeedccl/changefeed_test.go, line 433 at r1 (raw file):

// Test schema changes that require a backfill when the backfill option is
// allowed.
func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {

love the tests! 👏


pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):

			})
		})
	}

can we add a test that checks scanBoundaries queueing up many things (maybe by running many schema changes)


pkg/ccl/changefeedccl/poller.go, line 62 at r1 (raw file):

		// the initial scan of the table when starting a new Changefeed, and when
		// a backfilling schema change is marked as completed.
		scanBoundaries []hlc.Timestamp

does this have to stay sorted? seems like the code below is assuming that


pkg/ccl/changefeedccl/poller.go, line 578 at r1 (raw file):

		}
		if lastVersion.HasColumnBackfillMutation() && !desc.HasColumnBackfillMutation() {
			p.mu.scanBoundaries = append(p.mu.scanBoundaries, desc.GetModificationTime())

can you add a check that high-water is less than the boundary you're about to queue up? i think this means moving it inside your mu struct


pkg/ccl/changefeedccl/table_history_test.go, line 49 at r1 (raw file):

	// advance
	require.NoError(t, m.IngestDescriptors(context.TODO(), ts(0), ts(1), nil))

nit: these are all noisy. make a ctx variable above. also context.Background() is appropriate in tests

@mrtracy
Copy link
Copy Markdown
Contributor Author

mrtracy commented Oct 3, 2018

Thanks for the prompt feedback! I've been a little sidetracked today looking at an unrelated flaky test issue; I should get this turned around by the end of today.

@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from cfd45d0 to cda34ee Compare October 3, 2018 21:56
Copy link
Copy Markdown
Contributor Author

@mrtracy mrtracy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of putting this in the poller: I think we're in agreement. In the poll/export world, I think this is obviously the best solution, as the tableHistory is polled on the same interval as the data itself, and we have an explicit maximum timestamp available when calling Export. I don't think it could get much cleaner.

In terms of rangefeeds, I think its going to be trickier than it appears at first glance to buffer their output and multiplexing it with a scan, but it is possible. I'm certain it can be done, but the new component stack at that point might not resemble the current buffer/poller combination, it might be a more considerable refactoring.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained


pkg/ccl/changefeedccl/buffer.go, line 25 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

why is this a greater/lesser relationship? can we have it be an override when not the zero value or am i missing something?

I was thinking this might come up if we had a situation where a scan was returning keys that were created after the scan boundary, but that never happens anymore; i've changed it to a simple override, updated the field name and comment appropriately.


pkg/ccl/changefeedccl/changefeed_test.go, line 433 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

love the tests! 👏

Thanks! But really you wrote these a while ago, i just reorganized them a bit.


pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

can we add a test that checks scanBoundaries queueing up many things (maybe by running many schema changes)

That's a good idea, but i'm going to have to construct a test that's a bit different than the others (I think i'll have to use a hook). I'm going ahead and publishing the rest of the changes you requested, I'll get this one out soon.


pkg/ccl/changefeedccl/poller.go, line 62 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

does this have to stay sorted? seems like the code below is assuming that

Yes, it gets sorted in the only place it is updated, but I will add that information to the comment.


pkg/ccl/changefeedccl/poller.go, line 578 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

can you add a check that high-water is less than the boundary you're about to queue up? i think this means moving it inside your mu struct

Done.


pkg/ccl/changefeedccl/table_history_test.go, line 49 at r1 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: these are all noisy. make a ctx variable above. also context.Background() is appropriate in tests

Done.

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong:

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/buffer.go, line 23 at r2 (raw file):

	resolved *jobspb.ResolvedSpan
	// Timestamp of the schema that should be used to read this KV.
	// If unset (default-valued), the value's timestamp will be used instead.

nit: I feel like I usually see "zero-valued" for this in our codebase


pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):

			r.row.deleted = rf.RowIsDeleted()
			r.row.timestamp = kv.Value.Timestamp

i think we also have to use schemaTimestamp here if it's set. otherwise, you'll get two copies of a row with the same timestamp but with different data, which is super confusing. especially in the presence of job restarts, how would the user know which one to keep? intuitively, it makes some sense to me that the backfill would be emitted with the descriptor timestamp. (oh, but we'll have to make sure we use the kv timestamps for the initial scan. that will have to be special cased)

also, this definitely deserves a test. maybe something based on TestChangefeedTimestamps


pkg/ccl/changefeedccl/changefeed_test.go, line 433 at r1 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

Thanks! But really you wrote these a while ago, i just reorganized them a bit.

I liked the way you reorganized them!


pkg/ccl/changefeedccl/changefeed_test.go, line 469 at r2 (raw file):

			})
			sqlDB.Exec(t, `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED`)
			sqlDB.Exec(t, `INSERT INTO add_col_comp (a) VALUES (3)`)

should we look for this in the output or maybe remove it?


pkg/ccl/changefeedccl/poller.go, line 146 at r2 (raw file):

		// Determine if we are at a scanBoundary, and trigger a full scan if needed.
		isFullScan := false

is isFullScan redundant with lastHighwater == nextHighwater?


pkg/ccl/changefeedccl/poller.go, line 165 at r2 (raw file):

		p.mu.Unlock()

		if isFullScan {

is this backward?


pkg/ccl/changefeedccl/poller.go, line 227 at r2 (raw file):

		if performScan {
			if err := p.exportSpansParallel(
				ctx, spans, lastHighwater, lastHighwater, true, /* fullScan */

it's a little strange that this pops p.mu.scanBoundaries and then uses lastHighwater. i understand that they should match, but maybe there's some way we can make it more explicit


pkg/ccl/changefeedccl/poller.go, line 272 at r2 (raw file):

					switch t := e.GetValue().(type) {
					case *roachpb.RangeFeedValue:
						if err := p.tableHist.WaitForTS(ctx, t.Value.Timestamp); err != nil {

oof, this is not new in your PR, but this is making me notice that we backpressure raft on this. no way that cluster setting is even close to usable until we make buffer actually buffer and move all this stuff later in the changefeed flow

can you stick a TODO in here about it and reference #28669. also mention that we currently have to tear down and then set up all rangefeeds every time there is a backfill


pkg/ccl/changefeedccl/poller.go, line 328 at r2 (raw file):

}

func (p *poller) getSpansToProcess(ctx context.Context) ([]roachpb.Span, error) {

nit: can we make this a function that takes spans and db instead of a method on poller?


pkg/ccl/changefeedccl/poller.go, line 419 at r2 (raw file):

			// When outputting a full scan, we want to use the schema at the scan
			// timestamp, not the schema at the value timestamp.
			var minSchemaTimetamp hlc.Timestamp

s/minSchemaTimetamp/schemaTimestamp/ (remove "min" and correct typo in "timestamp")


pkg/ccl/changefeedccl/poller.go, line 580 at r2 (raw file):

}

func (p *poller) validateTables(ctx context.Context, desc *sqlbase.TableDescriptor) error {

nit: odd that the name is plural but it takes one table


pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):

			// To avoid race conditions with the lease manager, at this point we force
			// the manager to acquire the freshest descriptor of this table from the
			// store.

And the lease manager always returns the newest descriptor it has that matches the requested timestamp? Huh. TIL

@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from cda34ee to 80a1715 Compare October 4, 2018 21:54
Copy link
Copy Markdown
Contributor Author

@mrtracy mrtracy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/buffer.go, line 23 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: I feel like I usually see "zero-valued" for this in our codebase

Done.


pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

i think we also have to use schemaTimestamp here if it's set. otherwise, you'll get two copies of a row with the same timestamp but with different data, which is super confusing. especially in the presence of job restarts, how would the user know which one to keep? intuitively, it makes some sense to me that the backfill would be emitted with the descriptor timestamp. (oh, but we'll have to make sure we use the kv timestamps for the initial scan. that will have to be special cased)

also, this definitely deserves a test. maybe something based on TestChangefeedTimestamps

Why do we need to use the KV timestamps for the initial scan? It seems to me that the original timestamps don't matter in that case either; in fact, we want to use the current timestamp, as we need to pick up any backfilled schema changes that have occurred since the last modification of a row (in fact, I think this means our initial scans are currently wrong).

I've added a TODO here because this breaks a lot of the assumptions of our tests in subtle ways, it needs its own dedicated PR. Is that acceptable, or does it need to be swapped in the scope of this change?


pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

That's a good idea, but i'm going to have to construct a test that's a bit different than the others (I think i'll have to use a hook). I'm going ahead and publishing the rest of the changes you requested, I'll get this one out soon.

I've added this test, it seems pretty good.


pkg/ccl/changefeedccl/changefeed_test.go, line 469 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

should we look for this in the output or maybe remove it?

Just removed it.


pkg/ccl/changefeedccl/poller.go, line 146 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

is isFullScan redundant with lastHighwater == nextHighwater?

Technically no, its based on detecting a scan boundary, after which nextHighwater is set to lastHightwater. However, it would be impossible to get into that situation otherwise.

This value gets passed to other functions that have different behavior for scans, I feel that its easier to read when its an explicit boolean like this.


pkg/ccl/changefeedccl/poller.go, line 165 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

is this backward?

Yes it is! Fixed, and good catch.


pkg/ccl/changefeedccl/poller.go, line 227 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

it's a little strange that this pops p.mu.scanBoundaries and then uses lastHighwater. i understand that they should match, but maybe there's some way we can make it more explicit

Instead of using the "perfomScan" boolean, I now use a "scanTime" hlc.Timestamp which gets set to the scan boundary if it exists.


pkg/ccl/changefeedccl/poller.go, line 272 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

oof, this is not new in your PR, but this is making me notice that we backpressure raft on this. no way that cluster setting is even close to usable until we make buffer actually buffer and move all this stuff later in the changefeed flow

can you stick a TODO in here about it and reference #28669. also mention that we currently have to tear down and then set up all rangefeeds every time there is a backfill

TODOs added.


pkg/ccl/changefeedccl/poller.go, line 328 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: can we make this a function that takes spans and db instead of a method on poller?

Done.


pkg/ccl/changefeedccl/poller.go, line 419 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

s/minSchemaTimetamp/schemaTimestamp/ (remove "min" and correct typo in "timestamp")

Done.

@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from 80a1715 to b6ea83c Compare October 4, 2018 21:57
Copy link
Copy Markdown
Contributor Author

@mrtracy mrtracy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/poller.go, line 580 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: odd that the name is plural but it takes one table

Done.


pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

And the lease manager always returns the newest descriptor it has that matches the requested timestamp? Huh. TIL

Yes indeed.

@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from b6ea83c to c72336d Compare October 6, 2018 07:09
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry to hold up the pr, but i think its worth hashing out the behavior of timestamps in scans first

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

Why do we need to use the KV timestamps for the initial scan? It seems to me that the original timestamps don't matter in that case either; in fact, we want to use the current timestamp, as we need to pick up any backfilled schema changes that have occurred since the last modification of a row (in fact, I think this means our initial scans are currently wrong).

I've added a TODO here because this breaks a lot of the assumptions of our tests in subtle ways, it needs its own dedicated PR. Is that acceptable, or does it need to be swapped in the scope of this change?

Let's slack/hangout and figure this out today. I have reservations about landing this PR as-is in case the followup ends up not making 2.1.0 for some reason


pkg/ccl/changefeedccl/changefeed_test.go, line 499 at r1 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

I've added this test, it seems pretty good.

yeah, this turned out great. nice use of the hook and waitgroup


pkg/ccl/changefeedccl/poller.go, line 146 at r2 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

Technically no, its based on detecting a scan boundary, after which nextHighwater is set to lastHightwater. However, it would be impossible to get into that situation otherwise.

This value gets passed to other functions that have different behavior for scans, I feel that its easier to read when its an explicit boolean like this.

👍


pkg/ccl/changefeedccl/poller.go, line 227 at r2 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

Instead of using the "perfomScan" boolean, I now use a "scanTime" hlc.Timestamp which gets set to the scan boundary if it exists.

much better!


pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):

Previously, mrtracy (Matt Tracy) wrote…

Yes indeed.

let's stick that in the comment here, too

Copy link
Copy Markdown
Contributor Author

@mrtracy mrtracy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danhhz RFAL, I have made the emitted timestamp change and adjusted tests as per our offline discussion.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/changefeed.go, line 115 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Let's slack/hangout and figure this out today. I have reservations about landing this PR as-is in case the followup ends up not making 2.1.0 for some reason

Okay, as discussed in our slack, we have made this change because we expect its behavior is correct (and that the previous behavior of initial scans might have been incorrect).

I have modified two tests accordingly: TestValidators has had the "fingerprintValidator" removed until it can be repaired, that is tracked by issue #31110. TestChangefeedTimestamps has been modified to account for the new initial scan timestamps.


pkg/ccl/changefeedccl/helpers_test.go, line 532 at r3 (raw file):

Quoted 16 lines of code…
	t testing.TB, sqlDB *sqlutils.SQLRunner, stmt string, arguments ...interface{},
) {
	sqlDB.Exec(t, stmt, arguments...)
	row := sqlDB.QueryRow(t, "SELECT job_id FROM [SHOW JOBS] ORDER BY created DESC LIMIT 1")
	var jobID string
	row.Scan(&jobID)
 
	testutils.SucceedsSoon(t, func() error {
		row := sqlDB.QueryRow(t, "SELECT status FROM [SHOW JOBS] WHERE job_id = $1", jobID)
		var status string
		row.Scan(&status)
		if status != "succeeded" {
			return fmt.Errorf("Job %s had status %s, wanted 'succeeded'", jobID, status)
		}
		return nil
	})

pkg/ccl/changefeedccl/poller.go, line 606 at r2 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

let's stick that in the comment here, too

Done.

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/validations_test.go, line 73 at r4 (raw file):

			var numResolved, rowsSinceResolved int

			// TODO(mrtracy): Re-enable Fingerprint validator.

normally, i'd ask that this have the pr that disabled it and some context so you don't have to git blame to dig up the context. but we're planning on fixing this RealSoonNow (tm), so don't eat a teamcity cycle for it

@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from 38ff199 to 333d404 Compare October 9, 2018 18:58
Copy link
Copy Markdown
Contributor Author

@mrtracy mrtracy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained


pkg/ccl/changefeedccl/validations_test.go, line 73 at r4 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

normally, i'd ask that this have the pr that disabled it and some context so you don't have to git blame to dig up the context. but we're planning on fixing this RealSoonNow (tm), so don't eat a teamcity cycle for it

I ate the roundtrip anyway, done.

@mrtracy
Copy link
Copy Markdown
Contributor Author

mrtracy commented Oct 9, 2018

bors r=danhhz

Changefeeds can now correctly continue when watched tables are altered
in ways that require a backfill operation.

To represent a backfill through the changefeed, a full scan of
the watched tables is performed at the timestamp when the table
descriptor for the finalized schema change is detected (when the
modified columns become public).

Note that, due to the backfill process writing rows in the background,
the changefeed will see numerous "no-op" writes, where each backfilled
row is written with no apparent changes. However, this does not
violate any correctness guarantees, it is just inefficient.

Additionally, note that this commit changes the behavior of initial
scans; previously, an initial scan would return the latest value for
each key at the timestamp of the key. However, in order to capture
schema changes that occurred in between the CHANGEFEED creation and the
last write of each key, we now treat the initial scan as a backfill and
output each key at the statement time. We have temporarily modified one
test to allow this to be committed, re-enabling it is tracked by cockroachdb#31110.

Release note: Changefeeds now continue running when watched tables
are ALTERed in ways that require a backfill.
@mrtracy mrtracy force-pushed the mtracy/cdc_backfill_support branch from 333d404 to 57c5450 Compare October 9, 2018 19:56
@craig
Copy link
Copy Markdown
Contributor

craig bot commented Oct 9, 2018

Canceled

@mrtracy
Copy link
Copy Markdown
Contributor Author

mrtracy commented Oct 9, 2018

bors r=danhhz

craig bot pushed a commit that referenced this pull request Oct 9, 2018
30902: cdc: Support Schema Changes with Backfill r=danhhz a=mrtracy

Changefeeds can now correctly continue when watched tables are altered
in ways that require a backfill operation.

To represent a backfill through the changefeed, a full scan of
the watched tables is performed at the timestamp when the table
descriptor for the finalized schema change is detected (when the
modified columns become public).

Note that, due to the backfill process writing rows in the background,
the changefeed will see numerous "no-op" writes, where each backfilled
row is written with no apparent changes. However, this does not
violate any correctness guarantees, it is just inefficient.

Release note: Changefeeds now continue running when watched tables
are ALTERed in ways that require a backfill.

Co-authored-by: Matt Tracy <matt@cockroachlabs.com>
@craig
Copy link
Copy Markdown
Contributor

craig bot commented Oct 9, 2018

Build succeeded

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants