Skip to content

changefeedccl: fix violation of CDC's ordering guarantees#40696

Merged
craig[bot] merged 1 commit intocockroachdb:masterfrom
aayushshah15:job_lease_sequence
Oct 9, 2019
Merged

changefeedccl: fix violation of CDC's ordering guarantees#40696
craig[bot] merged 1 commit intocockroachdb:masterfrom
aayushshah15:job_lease_sequence

Conversation

@aayushshah15
Copy link
Copy Markdown
Contributor

@aayushshah15 aayushshah15 commented Sep 11, 2019

In cloudStorageSink, we currently name output files based on the earliest
timestamps inside those files. This can sometimes muddle the ordering
of output files, leading to a violation of CDC's ordering properties

This change fixes this violation by instead using the times of the
last EmitRow and EmitResolvedTimestamp calls to order files. This ensures
that timestamps within the output files are consumed in the same order
that they're emitted in.

Fixes: #38368

Release justification: Fixes limitation of changefeeds when using cloud storage sinks.

Release note: None

@CLAassistant
Copy link
Copy Markdown

CLAassistant commented Sep 11, 2019

CLA assistant check
All committers have signed the CLA.

@cockroach-teamcity
Copy link
Copy Markdown
Member

This change is Reviewable

Copy link
Copy Markdown
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach works but I think there's a simpler approach that will work. Instead of making the sink aware of the initial high water mark, let's instead filter any rows which might precede that high water mark. If all rows emitted from the processor come after the initial high water mark then we'll never create a file which precedes that high water mark using the existing naming rules. We know we don't need to emit those rows because we've already resolved a timestamp above them.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @nvanbenschoten)

@aayushshah15
Copy link
Copy Markdown
Contributor Author

This approach works but I think there's a simpler approach that will work. Instead of making the sink aware of the initial high water mark, let's instead filter any rows which might precede that high water mark. If all rows emitted from the processor come after the initial high water mark then we'll never create a file which precedes that high water mark using the existing naming rules. We know we don't need to emit those rows because we've already resolved a timestamp above them.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @ajwerner and @nvanbenschoten)

Done

Copy link
Copy Markdown
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

I wonder if we should be doing the filtering at a lower level like in the poller. Let's go forward with this for now and continue to investigate why the poller is emitting rows beneath the closed timestamp in the first place.

Reviewed 4 of 4 files at r1, 1 of 1 files at r2.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15 and @nvanbenschoten)


pkg/ccl/changefeedccl/changefeed.go, line 167 at r1 (raw file):

	var scratch bufalloc.ByteAllocator
	emitRowFn := func(ctx context.Context, row encodeRow) error {
		// If timestamp of the row is older than the latest resolved timestamp

It's fun that this works in the equals case because the ascii . is less than the ascii _. The wording in the docs makes me think this needs to be the condition but I would love it if we could make it !initialHighWater.Less(row.updated). I haven't convinced myself that that's safe so this is fine. Maybe add a TODO to consider filtering rows equal to the initialHighWater


pkg/ccl/changefeedccl/changefeed_processors.go, line 124 at r1 (raw file):

	ctx = ca.StartInternal(ctx, changeAggregatorProcName)

	initialHighWater := hlc.Timestamp{WallTime: -1}

does this code movement still need to happen?

Copy link
Copy Markdown
Contributor Author

@aayushshah15 aayushshah15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on @ajwerner and @nvanbenschoten)


pkg/ccl/changefeedccl/changefeed.go, line 167 at r1 (raw file):

Previously, ajwerner wrote…

It's fun that this works in the equals case because the ascii . is less than the ascii _. The wording in the docs makes me think this needs to be the condition but I would love it if we could make it !initialHighWater.Less(row.updated). I haven't convinced myself that that's safe so this is fine. Maybe add a TODO to consider filtering rows equal to the initialHighWater

Done.


pkg/ccl/changefeedccl/changefeed_processors.go, line 124 at r1 (raw file):

Previously, ajwerner wrote…

does this code movement still need to happen?

Fixed.

@aayushshah15
Copy link
Copy Markdown
Contributor Author

TFTR!

@ajwerner
Copy link
Copy Markdown
Contributor

Offline Nathan raised serious concerns about this approach. I think we originally understood the problem better than this current understanding. The problem indeed is about ordering of files and has nothing seemingly to do with resolved timestamps.

Are we certain that this fixes the nemesis? If so isn't that surprising given that there are no restarts in the above case? I'm convinced that sequence numbers out front and lexically monotonic files from a given change aggregator would be safe but I'm confused about how this change fixes ordering of files emitted from a single change aggregator. Our understanding may be flawed, especially given the test not flaking but I'd really like to understand before we merge this.

@ajwerner
Copy link
Copy Markdown
Contributor

It makes sense that the first version of your change fixes the above nemesis failure because it makes the files lexically monotonically for a given instance of the change aggregator. Unfortunately it won’t be robust to restarts. Did you run this one with the nemesis?

The funny thing here is that we understood the problem reasonably well and then convinced ourselves it was something different.

@aayushshah15
Copy link
Copy Markdown
Contributor Author

I did run the nemeses against all versions of this change, so my first reaction was maybe this issue isn't as reproducible as we thought. I switched to master and ran make roachprod-stressrace PKG=./pkg/ccl/changefeedccl TESTS=TestChangefeedNemeses/cloudstorage CLUSTER=$USER-stress for about an hour (500K runs) on a 20 node cluster and couldn't reproduce it.

@ajwerner
Copy link
Copy Markdown
Contributor

ajwerner commented Sep 13, 2019

I find this reproduces almost instantly with roachprod-stress assuming you unskip the test:

--- a/pkg/ccl/changefeedccl/nemeses_test.go
+++ b/pkg/ccl/changefeedccl/nemeses_test.go
@@ -34,8 +34,5 @@ func TestChangefeedNemeses(t *testing.T) {
        }
        t.Run(`sinkless`, sinklessTest(testFn))
        t.Run(`enterprise`, enterpriseTest(testFn))
-       t.Run(`cloudstorage`, func(t *testing.T) {
-               t.Skip("https://github.com/cockroachdb/cockroach/issues/38368")
-               cloudStorageTest(testFn)
-       })
+       t.Run(`cloudstorage`, cloudStorageTest(testFn))
 }

@aayushshah15
Copy link
Copy Markdown
Contributor Author

I find this reproduces almost instantly with roachprod-stress assuming you unskip the test:

--- a/pkg/ccl/changefeedccl/nemeses_test.go
+++ b/pkg/ccl/changefeedccl/nemeses_test.go
@@ -34,8 +34,5 @@ func TestChangefeedNemeses(t *testing.T) {
        }
        t.Run(`sinkless`, sinklessTest(testFn))
        t.Run(`enterprise`, enterpriseTest(testFn))
-       t.Run(`cloudstorage`, func(t *testing.T) {
-               t.Skip("https://github.com/cockroachdb/cockroach/issues/38368")
-               cloudStorageTest(testFn)
-       })
+       t.Run(`cloudstorage`, cloudStorageTest(testFn))
 }

As discussed offline, this was due to me misunderstanding how the testing library works. I can now reproduce the issue on master.

@aayushshah15
Copy link
Copy Markdown
Contributor Author

I updated the PR to implement the approach we briefly discussed. The updated PR description gives an overview of the idea.

The new approach keeps track of when EmitRow was called for each file and uses the timestamp of the last call to order files. This intuitively solves the date boundary issue we faced with previous approaches.

Additionally, it should be robust to job restarts (even in the absence of a synchronized clock). This is because the system guarantees that updated timestamps seen between two resolved timestamps will be resent in case of a failure. Thus, an inversion of our intended ordering here should not violate our user-facing guarantees.

@aayushshah15
Copy link
Copy Markdown
Contributor Author

@ajwerner pointed out that this approach can't work due to unsynchronized clocks between the changeFrontier and changeAggregators. Because of this, %d.RESOLVED files emitted by the changeFrontier can lexicographically precede files emitted by the changeAggregators, which would violate our ordering guarantees. There is also an issue with high contention when using the Clock from flowCtx due to calling Now() for every row.

One potential way to solve this would be as follows: Have each changeAggregator maintain a separate local Clock. When the changeAggregators send their resolved timestamps up to the changeFrontier, they also send their local clock timestamp as of the emitEntries call. The changeFrontier uses this local clock timestamp that it receives from the changeAggregator to name its %d.RESOLVED files. This buys us monotonicity across all (updated, resolved) files emitted by the same changeAggregator. This is admittedly a confusing solution.

Copy link
Copy Markdown
Contributor

@ajwerner ajwerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/changefeed_processors.go, line 147 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

as I mentioned in our meeting, I don't like this. It's so much more obvious to leave the span frontier at zero and then handle a zero frontier in the cloud storage sink

One idea that came up is that if we're going to deal with that logic then I'm supportive of hiding the fact that this minTS is coming from a spanFrontier at all behind an abstraction that can be a conduit for comments. Something like:

type timestampLowerBoundOracle interface {
    inclusiveLowerBoundTS() hlc.Timestamp
}

type changeAggregatorLowerBoundOracle struct {
   sf *spanFrontier
   initialInclusiveLowerBound hlc.Timestamp
}

func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp {
    if frontier := o.sf.Frontier(); !frontier.IsEmpty() {
        return frontier.Next()
    } 
    return o.initialInclusiveLowerBound
}

And then have getSink take a timestampLowerBoundOracle


pkg/ccl/changefeedccl/poller.go, line 110 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I don't get it. Maybe the comment needs to be expanded? Or maybe this Next would be more clear if it happens in a different place?

If I understood Aayush's explanation, the issue he uncovered is that when we perform a catch-up scan (not an initial scan) we could emit rows which were exactly equal to the previous resolved timestamp. I guess now that I've typed this I don't understand why that's problematic as it should have been okay to just drop those rows. Why did we need to make this change again?


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 262 at r14 (raw file):

// we're back to the case where k = 2 with jobs P and Q. Thus, by induction we
// have the required proof.

If we're going to make this in the style of a godoc comment then add // here to connect it to the cloudStorageSink


pkg/jobs/jobspb/jobs.proto, line 209 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

comment that this is never persisted to the jobs table. in fact, i'm almost leaning toward saying we should do the session_id stuff inside the makeCloudStorageSink constructor and keep it super tightly scoped. the sinks in a single changefeed wouldn't have the same session_id, but they didn't before and I don't really think we care

then when we have a real session id from the jobs framework in 20.1, we can use that

Seems reasonable to me.

@aayushshah15
Copy link
Copy Markdown
Contributor Author


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Ditto I don't get this either, maybe the comment needs to be expanded? Also, it seems like doing this when the scanBoundary is added might be a better place for this

I added a better comment. I'll paste it here for discussion:

// This condition means this a backfill scan. We want to set scanTime
// to the successor of the scan boundary because this boundary is set
// to the last modification timestamp of the altered table's table
// descriptor. This timestamp can be a resolved timestamp which could
// have already been forwarded to the `changeAggregator` (and consquently
// to the `changeFrontier`), and emitted as a globally resolved timestamp.
// Thus, emitting row updates with the scan boundary as is could cause
// a consumption bug (since the client is free to ignore all the timestamps
// less than or equal to a globally resolved timestamp).

@aayushshah15
Copy link
Copy Markdown
Contributor Author


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, aayushshah15 (Aayush Shah) wrote…

I added a better comment. I'll paste it here for discussion:

// This condition means this a backfill scan. We want to set scanTime
// to the successor of the scan boundary because this boundary is set
// to the last modification timestamp of the altered table's table
// descriptor. This timestamp can be a resolved timestamp which could
// have already been forwarded to the `changeAggregator` (and consquently
// to the `changeFrontier`), and emitted as a globally resolved timestamp.
// Thus, emitting row updates with the scan boundary as is could cause
// a consumption bug (since the client is free to ignore all the timestamps
// less than or equal to a globally resolved timestamp).

Additionally, Next()ing when adding the scan boundary might make certain other things a little harder to reason about since we compare these boundaries with other timestamps in the main poller goroutine (see lines 258 to 303 in poller.go). So the reason I'm doing only this when calling exportScansParallel is because it seemed like a much less intrusive way of doing it. exportScansParallel only uses this timestamp to attach it to the row updates it streams out, and nothing else.

@aayushshah15
Copy link
Copy Markdown
Contributor Author

aayushshah15 commented Oct 7, 2019

If I understood Aayush's explanation, the issue he uncovered is that when we perform a catch-up scan (not an initial scan) we could emit rows which were exactly equal to the previous resolved timestamp. I guess now that I've typed this I don't understand why that's problematic as it should have been okay to just drop those rows. Why did we need to make this change again?

As discussed offline: the problem is with emitting row updates after an ALTER TABLE ... WITH DEFAULT statement where an additional column is added with a default value. Please let me know if the updated comment I posted above still seems unclear.

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/changefeed_processors.go, line 147 at r13 (raw file):

Previously, ajwerner wrote…

One idea that came up is that if we're going to deal with that logic then I'm supportive of hiding the fact that this minTS is coming from a spanFrontier at all behind an abstraction that can be a conduit for comments. Something like:

type timestampLowerBoundOracle interface {
    inclusiveLowerBoundTS() hlc.Timestamp
}

type changeAggregatorLowerBoundOracle struct {
   sf *spanFrontier
   initialInclusiveLowerBound hlc.Timestamp
}

func (o *changeAggregatorLowerBoundOracle) inclusiveLowerBoundTS() hlc.Timestamp {
    if frontier := o.sf.Frontier(); !frontier.IsEmpty() {
        return frontier.Next()
    } 
    return o.initialInclusiveLowerBound
}

And then have getSink take a timestampLowerBoundOracle

that wfm


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, aayushshah15 (Aayush Shah) wrote…

Additionally, Next()ing when adding the scan boundary might make certain other things a little harder to reason about since we compare these boundaries with other timestamps in the main poller goroutine (see lines 258 to 303 in poller.go). So the reason I'm doing only this when calling exportScansParallel is because it seemed like a much less intrusive way of doing it. exportScansParallel only uses this timestamp to attach it to the row updates it streams out, and nothing else.

I still don't understand. This explanation seems too in the weeds. Can you back up and give it to me from 10,000 ft? I ask because this seems like a hack and I'd like to understand what a more principled fix would look like before dismissing it as too invasive

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

I still don't understand. This explanation seems too in the weeds. Can you back up and give it to me from 10,000 ft? I ask because this seems like a hack and I'd like to understand what a more principled fix would look like before dismissing it as too invasive

Specifically, one thing I want to know is whether the other sinks are incorrect because of this. If they're fine, why can't we fix it inside the cloud storage sink?

@aayushshah15
Copy link
Copy Markdown
Contributor Author


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

Specifically, one thing I want to know is whether the other sinks are incorrect because of this. If they're fine, why can't we fix it inside the cloud storage sink?

Yes, the other sinks would be incorrect because of this.

To elaborate:

The poller uses exportSpansParallel to perform its initial scan of the table (when the changefeed starts) and to perform a backfill (for example: when a table thats being watched by the changefeed gets altered to contain a new column with a default value). The poller detects a schema change by keeping track of the corresponding table descriptor's last ModificationTime. When this changes, it adds the new ModificationTime to scanBoundaries to trigger a full scan of the underlying table.

The issue with this approach is that this ModificationTime timestamp is often (or always in my testing) the last resolved timestamp that was emitted by the poller. So what ends up happening is that the changeAggregator forwards its local frontier using this resolved timestamp and then the changeFrontier sees all these backfill-related row updates at the same timestamp (which is the incorrect part).

Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, aayushshah15 (Aayush Shah) wrote…

Yes, the other sinks would be incorrect because of this.

To elaborate:

The poller uses exportSpansParallel to perform its initial scan of the table (when the changefeed starts) and to perform a backfill (for example: when a table thats being watched by the changefeed gets altered to contain a new column with a default value). The poller detects a schema change by keeping track of the corresponding table descriptor's last ModificationTime. When this changes, it adds the new ModificationTime to scanBoundaries to trigger a full scan of the underlying table.

The issue with this approach is that this ModificationTime timestamp is often (or always in my testing) the last resolved timestamp that was emitted by the poller. So what ends up happening is that the changeAggregator forwards its local frontier using this resolved timestamp and then the changeFrontier sees all these backfill-related row updates at the same timestamp (which is the incorrect part).

We discussed this offline. This is a new bug in all changefeeds and will have to be backported (great find!), so I encouraged Aayush to pull it into a separate PR to make that easy and to get this cloud sink fix, which is already a huge change, merged earlier.

Also as a result of our discussion, I suspect that the fix is not advancing the scan timestamp, but rather investigating and eliminating the unexpected resolved timestamp. Aayush will investigate exactly what's happening after the cloud sink stuff is done.

Copy link
Copy Markdown
Contributor Author

@aayushshah15 aayushshah15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/poller.go, line 162 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

We discussed this offline. This is a new bug in all changefeeds and will have to be backported (great find!), so I encouraged Aayush to pull it into a separate PR to make that easy and to get this cloud sink fix, which is already a huge change, merged earlier.

Also as a result of our discussion, I suspect that the fix is not advancing the scan timestamp, but rather investigating and eliminating the unexpected resolved timestamp. Aayush will investigate exactly what's happening after the cloud sink stuff is done.

Created issue #41415 to track this.


pkg/ccl/changefeedccl/cdctest/nemeses.go, line 50 at r13 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

nit: we have AS OF SYSTEM TIME support, we'd just have to use it in sinklessFeed (which hasn't been done because sinklessFeed would itself have to track the frontier, which is annoying)

Ah good to know. I'll update the referenced issue.

@aayushshah15 aayushshah15 force-pushed the job_lease_sequence branch 4 times, most recently from b1766a0 to 2293f4b Compare October 8, 2019 22:01
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll take a last pass after you get the tests deflaked, but this seems just about ready to go!

Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/changefeed_processors.go, line 169 at r16 (raw file):

	// monotonic fashion.
	sf := makeSpanFrontier(spans...)
	timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf, initialInclusiveLowerBound: ca.spec.Feed.StatementTime}

super nit: move this line below the for/range that does the watch forwarding


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 55 at r16 (raw file):

type cloudStorageSinkFile struct {
	// `inclusiveLowerBoundTs` is the local spanFrontier's Frontier().Next() timestamp as of

this comment is now slightly out of date


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 280 at r16 (raw file):

	timestampOracle       timestampLowerBoundOracle
	jobSessionID          string
	inclusiveLowerBoundTs hlc.Timestamp

nit: maybe something like dataFileTs?

separate out the fields that are only used in EmitRow/Flush into a block with a comment at the top


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 307 at r16 (raw file):

		partitionFormat:   defaultPartitionFormat,
		timestampOracle:   timestampOracle,
		jobSessionID:      generateChangefeedSessionID(),

put a TODO here to use the job frameworks session id once it has one


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 364 at r16 (raw file):

		// careful to bound the size of the memory held by the pool.
		inclusiveLowerBoundTs := s.inclusiveLowerBoundTs
		file = &cloudStorageSinkFile{inclusiveLowerBoundTs: inclusiveLowerBoundTs}

this no longer needs to be kept on the file structs


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 422 at r16 (raw file):

		delete(s.files, key)
	}
	s.inclusiveLowerBoundTs = s.timestampOracle.inclusiveLowerBoundTS()

short comment here about what's going on and a pointer back to the big comment. in fact, it might be clearer to store the formatted string version (and partition) on cloudStorageSink


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 442 at r16 (raw file):

	s.fileID++
	// Pad file ID to maintain lexical ordering among files from the same sink.
	// Pad SchemaID to maintain ordering across schema changes.

we're no longer padding schemaID

@aayushshah15 aayushshah15 force-pushed the job_lease_sequence branch 3 times, most recently from 3a68ec7 to 640b25d Compare October 9, 2019 17:14
Copy link
Copy Markdown
Contributor

@danhhz danhhz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm_strong: great work! let's merge this and backport it to 19.2 \o/

Reviewable status: :shipit: complete! 2 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 59 at r17 (raw file):

	// chosen to preserve CDC's external ordering guarantees. See comment on
	// cloudStorageSink below for more details.
	// Note that this timestamp is an inclusive lower bound on the timestamps

this field is no longer present


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 284 at r17 (raw file):

	files           map[cloudStorageSinkKey]*cloudStorageSinkFile
	timestampOracle timestampLowerBoundOracle
	// TODO(aayush): Use the jobs framework's session ID once that's available.

nit: this TODO is probably better placed at the site of the generateChangefeedSessionID call


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 454 at r17 (raw file):

	s.fileID++
	// Pad file ID to maintain lexical ordering among files from the same sink.
	// Pad SchemaID to maintain ordering across schema changes.

i think reviewable ate this comment, but schemaid is no longer padded

Copy link
Copy Markdown
Contributor Author

@aayushshah15 aayushshah15 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 2 of 0 LGTMs obtained (waiting on @aayushshah15, @ajwerner, @danhhz, and @nvanbenschoten)


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 59 at r17 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

this field is no longer present

oops, my apologies, I thought I removed the whole blob.


pkg/ccl/changefeedccl/sink_cloudstorage.go, line 454 at r17 (raw file):

Previously, danhhz (Daniel Harrison) wrote…

i think reviewable ate this comment, but schemaid is no longer padded

huh thought I did this too, not sure what happened, it's possible some of my changes were unstaged when I switched to master to test out the roachprod-stressrace thing.

cloudStorageSink

In cloudStorageSink, we currently name output files based on the earliest
timestamps inside those files. This can sometimes muddle the ordering
of output files, leading to a violation of CDC's ordering guarantees.

This change fixes this violation by instead using the times of the
least resolved timestamp as of the time of the last `Flush()` call on
the sink object. If `Flush()` hasn't been called yet, we name files
using the statement time.

Fixes: cockroachdb#38368

Release justification: Fixes known limitation of changefeeds when using
cloud storage sinks.

Release note: None
@aayushshah15
Copy link
Copy Markdown
Contributor Author

thanks for spending so much time on this with me! @ajwerner and @danhhz

bors r+

craig bot pushed a commit that referenced this pull request Oct 9, 2019
40696: changefeedccl: fix violation of CDC's ordering guarantees r=aayushshah15 a=aayushshah15

In cloudStorageSink, we currently name output files based on the earliest
timestamps inside those files. This can sometimes muddle the ordering
of output files, leading to a violation of CDC's ordering properties

This change fixes this violation by instead using the times of the 
last `EmitRow` and `EmitResolvedTimestamp` calls to order files. This ensures
that timestamps within the output files are consumed in the same order
that they're emitted in.

Fixes: #38368

Release justification: Fixes limitation of changefeeds when using cloud storage sinks.

Release note: None

41414: storage: treat non-voter replicas as suspect for replica GC r=ajwerner a=ajwerner

It is not clear that this is actually a problem but it seems like a good thing
to do. We can imagine cases where a replica is removed but never receives the
message which removes it from the range. In this case we could wait up to
10 days before removing the range. The VOTER_INCOMING case seems like even
less of a problem but there's not an obvious reason to be conservative here.

Release note: None

41419: cmd/roachtest: remove some duplication in ORM tests r=yuzefovich a=yuzefovich

We're running several ORM test suites, and each test contains the
copy-pasted logic for parsing the test output and summarizing it.
Now that logic is extracted into a couple of functions which
removes the duplication.

Release note: None

41463: backup,import: remove sticky-bit conditional r=dt a=dt

Sticky-bit is always enabled in 20.1.

Release note: none.

41472: importccl: add nullif option to DELIMITED r=spaskob a=spaskob

This is indeed when an input has a field that needs to
be treated as NULL. Example:
```
import table t022 (
a int, b string
) DELIMITED DATA ('nodelocal:///foo.csv') with nullif = '';
```
This will force all empty string fields to be inserted as
NULL in the table.

Release note (sql): add nullif option to IMPORT ... DELIMITED.

41474: sql: use errors.Is instead of direct error compares r=pbardea a=knz

As suggested in the error handling RFC.

Release justification: Fix to earlier fix for release.

Release note: None

Co-authored-by: Aayush Shah <aayush.shah15@gmail.com>
Co-authored-by: Andrew Werner <ajwerner@cockroachlabs.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: David Taylor <tinystatemachine@gmail.com>
Co-authored-by: Spas Bojanov <spas@cockroachlabs.com>
Co-authored-by: Raphael 'kena' Poss <knz@thaumogen.net>
@craig
Copy link
Copy Markdown
Contributor

craig bot commented Oct 9, 2019

Build succeeded

@craig craig bot merged commit 6c84ae8 into cockroachdb:master Oct 9, 2019
@nvb
Copy link
Copy Markdown
Contributor

nvb commented Oct 10, 2019

@aayushshah15 @danhhz we should backport this to release-19.2, right?

@danhhz
Copy link
Copy Markdown
Contributor

danhhz commented Oct 10, 2019

Yes, absolutely

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

teamcity: failed test: TestChangefeedNemeses [skipped]

6 participants