changefeedccl: fix violation of CDC's ordering guarantees#40696
changefeedccl: fix violation of CDC's ordering guarantees#40696craig[bot] merged 1 commit intocockroachdb:masterfrom
Conversation
ajwerner
left a comment
There was a problem hiding this comment.
This approach works but I think there's a simpler approach that will work. Instead of making the sink aware of the initial high water mark, let's instead filter any rows which might precede that high water mark. If all rows emitted from the processor come after the initial high water mark then we'll never create a file which precedes that high water mark using the existing naming rules. We know we don't need to emit those rows because we've already resolved a timestamp above them.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @nvanbenschoten)
d0ecbf3 to
72c869b
Compare
Done |
72c869b to
dd459ae
Compare
ajwerner
left a comment
There was a problem hiding this comment.
I wonder if we should be doing the filtering at a lower level like in the poller. Let's go forward with this for now and continue to investigate why the poller is emitting rows beneath the closed timestamp in the first place.
Reviewed 4 of 4 files at r1, 1 of 1 files at r2.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15 and @nvanbenschoten)
pkg/ccl/changefeedccl/changefeed.go, line 167 at r1 (raw file):
var scratch bufalloc.ByteAllocator emitRowFn := func(ctx context.Context, row encodeRow) error { // If timestamp of the row is older than the latest resolved timestamp
It's fun that this works in the equals case because the ascii . is less than the ascii _. The wording in the docs makes me think this needs to be the condition but I would love it if we could make it !initialHighWater.Less(row.updated). I haven't convinced myself that that's safe so this is fine. Maybe add a TODO to consider filtering rows equal to the initialHighWater
pkg/ccl/changefeedccl/changefeed_processors.go, line 124 at r1 (raw file):
ctx = ca.StartInternal(ctx, changeAggregatorProcName) initialHighWater := hlc.Timestamp{WallTime: -1}
does this code movement still need to happen?
dd459ae to
eb844ae
Compare
aayushshah15
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @nvanbenschoten)
pkg/ccl/changefeedccl/changefeed.go, line 167 at r1 (raw file):
Previously, ajwerner wrote…
It's fun that this works in the equals case because the ascii
.is less than the ascii_. The wording in the docs makes me think this needs to be the condition but I would love it if we could make it!initialHighWater.Less(row.updated). I haven't convinced myself that that's safe so this is fine. Maybe add a TODO to consider filtering rows equal to theinitialHighWater
Done.
pkg/ccl/changefeedccl/changefeed_processors.go, line 124 at r1 (raw file):
Previously, ajwerner wrote…
does this code movement still need to happen?
Fixed.
|
TFTR! |
|
Offline Nathan raised serious concerns about this approach. I think we originally understood the problem better than this current understanding. The problem indeed is about ordering of files and has nothing seemingly to do with resolved timestamps. Are we certain that this fixes the nemesis? If so isn't that surprising given that there are no restarts in the above case? I'm convinced that sequence numbers out front and lexically monotonic files from a given change aggregator would be safe but I'm confused about how this change fixes ordering of files emitted from a single change aggregator. Our understanding may be flawed, especially given the test not flaking but I'd really like to understand before we merge this. |
|
It makes sense that the first version of your change fixes the above nemesis failure because it makes the files lexically monotonically for a given instance of the change aggregator. Unfortunately it won’t be robust to restarts. Did you run this one with the nemesis? The funny thing here is that we understood the problem reasonably well and then convinced ourselves it was something different. |
|
I did run the nemeses against all versions of this change, so my first reaction was maybe this issue isn't as reproducible as we thought. I switched to |
|
I find this reproduces almost instantly with |
eb844ae to
3ac043c
Compare
As discussed offline, this was due to me misunderstanding how the testing library works. I can now reproduce the issue on master. |
|
I updated the PR to implement the approach we briefly discussed. The updated PR description gives an overview of the idea. The new approach keeps track of when Additionally, it should be robust to job restarts (even in the absence of a synchronized clock). This is because the system guarantees that |
3ac043c to
58b19cb
Compare
|
@ajwerner pointed out that this approach can't work due to unsynchronized clocks between the One potential way to solve this would be as follows: Have each |
ajwerner
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/changefeed_processors.go, line 147 at r13 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
as I mentioned in our meeting, I don't like this. It's so much more obvious to leave the span frontier at zero and then handle a zero frontier in the cloud storage sink
One idea that came up is that if we're going to deal with that logic then I'm supportive of hiding the fact that this minTS is coming from a spanFrontier at all behind an abstraction that can be a conduit for comments. Something like:
type timestampLowerBoundOracle interface {
inclusiveLowerBoundTS() hlc.Timestamp
}
type changeAggregatorLowerBoundOracle struct {
sf *spanFrontier
initialInclusiveLowerBound hlc.Timestamp
}
func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp {
if frontier := o.sf.Frontier(); !frontier.IsEmpty() {
return frontier.Next()
}
return o.initialInclusiveLowerBound
}
And then have getSink take a timestampLowerBoundOracle
pkg/ccl/changefeedccl/poller.go, line 110 at r13 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
I don't get it. Maybe the comment needs to be expanded? Or maybe this Next would be more clear if it happens in a different place?
If I understood Aayush's explanation, the issue he uncovered is that when we perform a catch-up scan (not an initial scan) we could emit rows which were exactly equal to the previous resolved timestamp. I guess now that I've typed this I don't understand why that's problematic as it should have been okay to just drop those rows. Why did we need to make this change again?
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 262 at r14 (raw file):
// we're back to the case where k = 2 with jobs P and Q. Thus, by induction we // have the required proof.
If we're going to make this in the style of a godoc comment then add // here to connect it to the cloudStorageSink
pkg/jobs/jobspb/jobs.proto, line 209 at r13 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
comment that this is never persisted to the jobs table. in fact, i'm almost leaning toward saying we should do the session_id stuff inside the
makeCloudStorageSinkconstructor and keep it super tightly scoped. the sinks in a single changefeed wouldn't have the same session_id, but they didn't before and I don't really think we carethen when we have a real session id from the jobs framework in 20.1, we can use that
Seems reasonable to me.
|
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file): Previously, danhhz (Daniel Harrison) wrote…
I added a better comment. I'll paste it here for discussion: |
|
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file): Previously, aayushshah15 (Aayush Shah) wrote…
Additionally, |
As discussed offline: the problem is with emitting row updates after an |
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/changefeed_processors.go, line 147 at r13 (raw file):
Previously, ajwerner wrote…
One idea that came up is that if we're going to deal with that logic then I'm supportive of hiding the fact that this minTS is coming from a spanFrontier at all behind an abstraction that can be a conduit for comments. Something like:
type timestampLowerBoundOracle interface { inclusiveLowerBoundTS() hlc.Timestamp } type changeAggregatorLowerBoundOracle struct { sf *spanFrontier initialInclusiveLowerBound hlc.Timestamp } func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp { if frontier := o.sf.Frontier(); !frontier.IsEmpty() { return frontier.Next() } return o.initialInclusiveLowerBound }And then have
getSinktake atimestampLowerBoundOracle
that wfm
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):
Previously, aayushshah15 (Aayush Shah) wrote…
Additionally,
Next()ing when adding the scan boundary might make certain other things a little harder to reason about since we compare these boundaries with other timestamps in the main poller goroutine (see lines 258 to 303 inpoller.go). So the reason I'm doing only this when callingexportScansParallelis because it seemed like a much less intrusive way of doing it.exportScansParallelonly uses this timestamp to attach it to the row updates it streams out, and nothing else.
I still don't understand. This explanation seems too in the weeds. Can you back up and give it to me from 10,000 ft? I ask because this seems like a hack and I'd like to understand what a more principled fix would look like before dismissing it as too invasive
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
I still don't understand. This explanation seems too in the weeds. Can you back up and give it to me from 10,000 ft? I ask because this seems like a hack and I'd like to understand what a more principled fix would look like before dismissing it as too invasive
Specifically, one thing I want to know is whether the other sinks are incorrect because of this. If they're fine, why can't we fix it inside the cloud storage sink?
|
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file): Previously, danhhz (Daniel Harrison) wrote…
Yes, the other sinks would be incorrect because of this. To elaborate: The poller uses The issue with this approach is that this |
danhhz
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):
Previously, aayushshah15 (Aayush Shah) wrote…
Yes, the other sinks would be incorrect because of this.
To elaborate:
The poller uses
exportSpansParallelto perform its initial scan of the table (when the changefeed starts) and to perform a backfill (for example: when a table thats being watched by the changefeed gets altered to contain a new column with a default value). The poller detects a schema change by keeping track of the corresponding table descriptor's lastModificationTime. When this changes, it adds the newModificationTimetoscanBoundariesto trigger a full scan of the underlying table.The issue with this approach is that this
ModificationTimetimestamp is often (or always in my testing) the last resolved timestamp that was emitted by the poller. So what ends up happening is that thechangeAggregatorforwards its local frontier using this resolved timestamp and then the changeFrontier sees all these backfill-related row updates at the same timestamp (which is the incorrect part).
We discussed this offline. This is a new bug in all changefeeds and will have to be backported (great find!), so I encouraged Aayush to pull it into a separate PR to make that easy and to get this cloud sink fix, which is already a huge change, merged earlier.
Also as a result of our discussion, I suspect that the fix is not advancing the scan timestamp, but rather investigating and eliminating the unexpected resolved timestamp. Aayush will investigate exactly what's happening after the cloud sink stuff is done.
a074720 to
59db726
Compare
aayushshah15
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
We discussed this offline. This is a new bug in all changefeeds and will have to be backported (great find!), so I encouraged Aayush to pull it into a separate PR to make that easy and to get this cloud sink fix, which is already a huge change, merged earlier.
Also as a result of our discussion, I suspect that the fix is not advancing the scan timestamp, but rather investigating and eliminating the unexpected resolved timestamp. Aayush will investigate exactly what's happening after the cloud sink stuff is done.
Created issue #41415 to track this.
pkg/ccl/changefeedccl/cdctest/nemeses.go, line 50 at r13 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
nit: we have AS OF SYSTEM TIME support, we'd just have to use it in
sinklessFeed(which hasn't been done because sinklessFeed would itself have to track the frontier, which is annoying)
Ah good to know. I'll update the referenced issue.
b1766a0 to
2293f4b
Compare
danhhz
left a comment
There was a problem hiding this comment.
I'll take a last pass after you get the tests deflaked, but this seems just about ready to go!
Reviewable status:
complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/changefeed_processors.go, line 169 at r16 (raw file):
// monotonic fashion. sf := makeSpanFrontier(spans...) timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf, initialInclusiveLowerBound: ca.spec.Feed.StatementTime}
super nit: move this line below the for/range that does the watch forwarding
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 55 at r16 (raw file):
type cloudStorageSinkFile struct { // `inclusiveLowerBoundTs` is the local spanFrontier's Frontier().Next() timestamp as of
this comment is now slightly out of date
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 280 at r16 (raw file):
timestampOracle timestampLowerBoundOracle jobSessionID string inclusiveLowerBoundTs hlc.Timestamp
nit: maybe something like dataFileTs?
separate out the fields that are only used in EmitRow/Flush into a block with a comment at the top
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 307 at r16 (raw file):
partitionFormat: defaultPartitionFormat, timestampOracle: timestampOracle, jobSessionID: generateChangefeedSessionID(),
put a TODO here to use the job frameworks session id once it has one
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 364 at r16 (raw file):
// careful to bound the size of the memory held by the pool. inclusiveLowerBoundTs := s.inclusiveLowerBoundTs file = &cloudStorageSinkFile{inclusiveLowerBoundTs: inclusiveLowerBoundTs}
this no longer needs to be kept on the file structs
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 422 at r16 (raw file):
delete(s.files, key) } s.inclusiveLowerBoundTs = s.timestampOracle.inclusiveLowerBoundTS()
short comment here about what's going on and a pointer back to the big comment. in fact, it might be clearer to store the formatted string version (and partition) on cloudStorageSink
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 442 at r16 (raw file):
s.fileID++ // Pad file ID to maintain lexical ordering among files from the same sink. // Pad SchemaID to maintain ordering across schema changes.
we're no longer padding schemaID
3a68ec7 to
640b25d
Compare
danhhz
left a comment
There was a problem hiding this comment.
great work! let's merge this and backport it to 19.2 \o/
Reviewable status:
complete! 2 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 59 at r17 (raw file):
// chosen to preserve CDC's external ordering guarantees. See comment on // cloudStorageSink below for more details. // Note that this timestamp is an inclusive lower bound on the timestamps
this field is no longer present
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 284 at r17 (raw file):
files map[cloudStorageSinkKey]*cloudStorageSinkFile timestampOracle timestampLowerBoundOracle // TODO(aayush): Use the jobs framework's session ID once that's available.
nit: this TODO is probably better placed at the site of the generateChangefeedSessionID call
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 454 at r17 (raw file):
s.fileID++ // Pad file ID to maintain lexical ordering among files from the same sink. // Pad SchemaID to maintain ordering across schema changes.
i think reviewable ate this comment, but schemaid is no longer padded
aayushshah15
left a comment
There was a problem hiding this comment.
Reviewable status:
complete! 2 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 59 at r17 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
this field is no longer present
oops, my apologies, I thought I removed the whole blob.
pkg/ccl/changefeedccl/sink_cloudstorage.go, line 454 at r17 (raw file):
Previously, danhhz (Daniel Harrison) wrote…
i think reviewable ate this comment, but schemaid is no longer padded
huh thought I did this too, not sure what happened, it's possible some of my changes were unstaged when I switched to master to test out the roachprod-stressrace thing.
cloudStorageSink In cloudStorageSink, we currently name output files based on the earliest timestamps inside those files. This can sometimes muddle the ordering of output files, leading to a violation of CDC's ordering guarantees. This change fixes this violation by instead using the times of the least resolved timestamp as of the time of the last `Flush()` call on the sink object. If `Flush()` hasn't been called yet, we name files using the statement time. Fixes: cockroachdb#38368 Release justification: Fixes known limitation of changefeeds when using cloud storage sinks. Release note: None
640b25d to
6c84ae8
Compare
40696: changefeedccl: fix violation of CDC's ordering guarantees r=aayushshah15 a=aayushshah15 In cloudStorageSink, we currently name output files based on the earliest timestamps inside those files. This can sometimes muddle the ordering of output files, leading to a violation of CDC's ordering properties This change fixes this violation by instead using the times of the last `EmitRow` and `EmitResolvedTimestamp` calls to order files. This ensures that timestamps within the output files are consumed in the same order that they're emitted in. Fixes: #38368 Release justification: Fixes limitation of changefeeds when using cloud storage sinks. Release note: None 41414: storage: treat non-voter replicas as suspect for replica GC r=ajwerner a=ajwerner It is not clear that this is actually a problem but it seems like a good thing to do. We can imagine cases where a replica is removed but never receives the message which removes it from the range. In this case we could wait up to 10 days before removing the range. The VOTER_INCOMING case seems like even less of a problem but there's not an obvious reason to be conservative here. Release note: None 41419: cmd/roachtest: remove some duplication in ORM tests r=yuzefovich a=yuzefovich We're running several ORM test suites, and each test contains the copy-pasted logic for parsing the test output and summarizing it. Now that logic is extracted into a couple of functions which removes the duplication. Release note: None 41463: backup,import: remove sticky-bit conditional r=dt a=dt Sticky-bit is always enabled in 20.1. Release note: none. 41472: importccl: add nullif option to DELIMITED r=spaskob a=spaskob This is indeed when an input has a field that needs to be treated as NULL. Example: ``` import table t022 ( a int, b string ) DELIMITED DATA ('nodelocal:///foo.csv') with nullif = ''; ``` This will force all empty string fields to be inserted as NULL in the table. Release note (sql): add nullif option to IMPORT ... DELIMITED. 41474: sql: use errors.Is instead of direct error compares r=pbardea a=knz As suggested in the error handling RFC. Release justification: Fix to earlier fix for release. Release note: None Co-authored-by: Aayush Shah <aayush.shah15@gmail.com> Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com> Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com> Co-authored-by: David Taylor <tinystatemachine@gmail.com> Co-authored-by: Spas Bojanov <spas@cockroachlabs.com> Co-authored-by: Raphael 'kena' Poss <knz@thaumogen.net>
Build succeeded |
|
@aayushshah15 @danhhz we should backport this to |
|
Yes, absolutely |
In cloudStorageSink, we currently name output files based on the earliest
timestamps inside those files. This can sometimes muddle the ordering
of output files, leading to a violation of CDC's ordering properties
This change fixes this violation by instead using the times of the
last
EmitRowandEmitResolvedTimestampcalls to order files. This ensuresthat timestamps within the output files are consumed in the same order
that they're emitted in.
Fixes: #38368
Release justification: Fixes limitation of changefeeds when using cloud storage sinks.
Release note: None