Skip to content

Commit 0eda540

Browse files
samiskinYevgeniy Miretskiy
authored andcommitted
changefeedccl: checkpoint for lagging high-water
A changefeed's main method of persisting progress is through the high-water mark, the timestamp at which every tracked span has met or exceeded. This meant that if some small set of spans were lagging behind the rest for example due to nodes becoming transiently unavailable and the changefeed was to be restarted, it would consider every span to be at that lagging timestamp and begin re-emitting the other spans. This would be a pain point for significantly high QPS changefeeds where restarting even 20 minutes into the past would result in millions of duplicated events being sent. In addition, when changefeed starts with the cursor, the changefeeed performs a catchup scan. Those catchup scans could be expensive if the cursor sufficiently back in the past. Since KV server limits the number of concurrent catchup scans, some spans will complete their catchup scan and beging emitting regular (rangefeed) events, while others would still be waiting to perform catchup scan. Any transient error at this time would result in a restart -- and the checkpoint for the spans that were able to begin rangefeed is important since it allows changefeed to make forward progress. This change extends the current per-span checkpointing used in backfills to also encompass the situation when the high-water mark is sufficiently lagging behind the latest edge of the frontier. Once the high-water mark's delay has exceeded the value of the frontier_highwater_lag_checkpoint_threshold cluster setting, checkpoints will be stored at the same frontier_checkpoint_frequency as backfills, with both a number of spans as well as the minimum timestamp they have advanced to. On changefeed resumption, the frontier will advance these spans to that timestamp. Release note (performance improvement): per-span checkpointing added to cases when the high-water mark lags excessively behind the leading edge of the frontier in order to avoid re-emitting the majority of spans due to a small minority that is experiencing issues progressing. Release Justification: Important fix to enable changefeed to operate on very large tables when performing large catchup scan.
1 parent c57e392 commit 0eda540

6 files changed

Lines changed: 292 additions & 59 deletions

File tree

pkg/ccl/changefeedccl/changefeed_processors.go

Lines changed: 92 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -224,15 +224,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
224224
ctx, ca.cancel = context.WithCancel(ctx)
225225
ca.Ctx = ctx
226226

227-
initialHighWater, needsInitialScan := getKVFeedInitialParameters(ca.spec)
228-
229-
frontierHighWater := initialHighWater
230-
if needsInitialScan {
231-
// The frontier highwater marks the latest timestamp we don't need to emit
232-
// spans for, and therefore should be 0 if an initial scan is needed
233-
frontierHighWater = hlc.Timestamp{}
234-
}
235-
spans, err := ca.setupSpansAndFrontier(frontierHighWater)
227+
spans, err := ca.setupSpansAndFrontier()
236228

237229
endTime := ca.spec.Feed.EndTime
238230

@@ -291,7 +283,18 @@ func (ca *changeAggregator) Start(ctx context.Context) {
291283

292284
ca.sink = &errorWrapperSink{wrapped: ca.sink}
293285

294-
ca.eventProducer, err = ca.startKVFeed(ctx, spans, initialHighWater, needsInitialScan, endTime, ca.sliMetrics)
286+
// If the initial scan was disabled the highwater would've already been forwarded
287+
needsInitialScan := ca.frontier.Frontier().IsEmpty()
288+
289+
// The "HighWater" of the KVFeed is the timestamp it will begin streaming
290+
// change events from. When there's an inital scan, we want the scan to cover
291+
// data up to the StatementTime and change events to begin from that point.
292+
kvFeedHighWater := ca.frontier.Frontier()
293+
if needsInitialScan {
294+
kvFeedHighWater = ca.spec.Feed.StatementTime
295+
}
296+
297+
ca.eventProducer, err = ca.startKVFeed(ctx, spans, kvFeedHighWater, needsInitialScan, endTime, ca.sliMetrics)
295298
if err != nil {
296299
// Early abort in the case that there is an error creating the sink.
297300
ca.MoveToDraining(err)
@@ -303,7 +306,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
303306
ca.eventConsumer = newNativeKVConsumer(ca.sink)
304307
} else {
305308
ca.eventConsumer = newKVEventToRowConsumer(
306-
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), initialHighWater,
309+
ctx, ca.flowCtx.Cfg, ca.frontier.SpanFrontier(), kvFeedHighWater,
307310
ca.sink, ca.encoder, ca.spec.Feed, ca.knobs, ca.topicNamer)
308311
}
309312
}
@@ -428,16 +431,18 @@ func getKVFeedInitialParameters(
428431
}
429432

430433
// setupSpans is called on start to extract the spans for this changefeed as a
431-
// slice and creates a span frontier with the initial resolved timestampsc. This
434+
// slice and creates a span frontier with the initial resolved timestamps. This
432435
// SpanFrontier only tracks the spans being watched on this node. There is a
433436
// different SpanFrontier elsewhere for the entire changefeed. This object is
434437
// used to filter out some previously emitted rows, and by the cloudStorageSink
435438
// to name its output files in lexicographically monotonic fashion.
436-
func (ca *changeAggregator) setupSpansAndFrontier(
437-
initialHighWater hlc.Timestamp,
438-
) (spans []roachpb.Span, err error) {
439+
func (ca *changeAggregator) setupSpansAndFrontier() (spans []roachpb.Span, err error) {
440+
var initialHighWater hlc.Timestamp
439441
spans = make([]roachpb.Span, 0, len(ca.spec.Watches))
440442
for _, watch := range ca.spec.Watches {
443+
if initialHighWater.IsEmpty() || watch.InitialResolved.Less(initialHighWater) {
444+
initialHighWater = watch.InitialResolved
445+
}
441446
spans = append(spans, watch.Span)
442447
}
443448

@@ -446,17 +451,20 @@ func (ca *changeAggregator) setupSpansAndFrontier(
446451
return nil, err
447452
}
448453

454+
checkpointedSpanTs := ca.spec.Checkpoint.Timestamp
455+
456+
// Checkpoint records from 21.2 were used only for backfills and did not store
457+
// the timestamp, since in a backfill it must either be the StatementTime for
458+
// an initial backfill, or right after the high-water for schema backfills.
459+
if checkpointedSpanTs.IsEmpty() {
460+
if initialHighWater.IsEmpty() {
461+
checkpointedSpanTs = ca.spec.Feed.StatementTime
462+
} else {
463+
checkpointedSpanTs = initialHighWater.Next()
464+
}
465+
}
449466
// Checkpointed spans are spans that were above the highwater mark, and we
450467
// must preserve that information in the frontier for future checkpointing.
451-
// If we don't have a highwater yet (during initial scan) they must at least
452-
// be from StatementTime, and given an initial highwater they must all by
453-
// definition have been at or after initialHighWater.Next()
454-
var checkpointedSpanTs hlc.Timestamp
455-
if initialHighWater.IsEmpty() {
456-
checkpointedSpanTs = ca.spec.Feed.StatementTime
457-
} else {
458-
checkpointedSpanTs = initialHighWater.Next()
459-
}
460468
for _, checkpointedSpan := range ca.spec.Checkpoint.Spans {
461469
if _, err := ca.frontier.Forward(checkpointedSpan, checkpointedSpanTs); err != nil {
462470
return nil, err
@@ -578,12 +586,13 @@ func (ca *changeAggregator) noteResolvedSpan(resolved *jobspb.ResolvedSpan) erro
578586
checkpointFrontier := advanced &&
579587
(forceFlush || timeutil.Since(ca.lastFlush) > ca.flushFrequency)
580588

581-
// If backfilling we must also consider the Backfill Checkpointing frequency
582-
checkpointBackfill := ca.spec.JobID != 0 && /* enterprise changefeed */
583-
resolved.Timestamp.Equal(ca.frontier.BackfillTS()) &&
584-
canCheckpointBackfill(&ca.flowCtx.Cfg.Settings.SV, ca.lastFlush)
589+
// At a lower frequency we checkpoint specific spans in the job progress
590+
// either in backfills or if the highwater mark is excessively lagging behind
591+
checkpointSpans := ca.spec.JobID != 0 && /* enterprise changefeed */
592+
(resolved.Timestamp.Equal(ca.frontier.BackfillTS()) || ca.frontier.hasLaggingSpans(&ca.flowCtx.Cfg.Settings.SV)) &&
593+
canCheckpointSpans(&ca.flowCtx.Cfg.Settings.SV, ca.lastFlush)
585594

586-
if checkpointFrontier || checkpointBackfill {
595+
if checkpointFrontier || checkpointSpans {
587596
defer func() {
588597
ca.lastFlush = timeutil.Now()
589598
}()
@@ -1178,16 +1187,16 @@ func newJobState(
11781187
}
11791188
}
11801189

1181-
func canCheckpointBackfill(sv *settings.Values, lastCheckpoint time.Time) bool {
1190+
func canCheckpointSpans(sv *settings.Values, lastCheckpoint time.Time) bool {
11821191
freq := changefeedbase.FrontierCheckpointFrequency.Get(sv)
11831192
if freq == 0 {
11841193
return false
11851194
}
11861195
return timeutil.Since(lastCheckpoint) > freq
11871196
}
11881197

1189-
func (j *jobState) canCheckpointBackfill() bool {
1190-
return canCheckpointBackfill(&j.settings.SV, j.lastProgressUpdate)
1198+
func (j *jobState) canCheckpointSpans() bool {
1199+
return canCheckpointSpans(&j.settings.SV, j.lastProgressUpdate)
11911200
}
11921201

11931202
// canCheckpointHighWatermark returns true if we should update job high water mark (i.e. progress).
@@ -1610,21 +1619,26 @@ func (cf *changeFrontier) maybeCheckpointJob(
16101619
// as we receive spans from the scan request at the Backfill Timestamp
16111620
inBackfill := !frontierChanged && resolvedSpan.Timestamp.Equal(cf.frontier.BackfillTS())
16121621

1613-
// During a backfill we store a checkpoint of completed scans at a throttled rate in the job record
1614-
updateCheckpoint := inBackfill && cf.js.canCheckpointBackfill()
1615-
1616-
var checkpoint jobspb.ChangefeedProgress_Checkpoint
1617-
if updateCheckpoint {
1618-
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.flowCtx.Cfg.Settings.SV)
1619-
checkpoint.Spans = cf.frontier.getCheckpointSpans(maxBytes)
1620-
}
1621-
16221622
// If we're not in a backfill, highwater progress and an empty checkpoint will
16231623
// be saved. This is throttled however we always persist progress to a schema
16241624
// boundary.
16251625
updateHighWater :=
16261626
!inBackfill && (cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged))
16271627

1628+
// During backfills or when some problematic spans stop advancing, the
1629+
// highwater mark remains fixed while other spans may significantly outpace
1630+
// it, therefore to avoid losing that progress on changefeed resumption we
1631+
// also store as many of those leading spans as we can in the job progress
1632+
updateCheckpoint :=
1633+
(inBackfill || cf.frontier.hasLaggingSpans(&cf.js.settings.SV)) && cf.js.canCheckpointSpans()
1634+
1635+
// If the highwater has moved an empty checkpoint will be saved
1636+
var checkpoint jobspb.ChangefeedProgress_Checkpoint
1637+
if updateCheckpoint {
1638+
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.flowCtx.Cfg.Settings.SV)
1639+
checkpoint.Spans, checkpoint.Timestamp = cf.frontier.getCheckpointSpans(maxBytes)
1640+
}
1641+
16281642
if updateCheckpoint || updateHighWater {
16291643
checkpointStart := timeutil.Now()
16301644
if err := cf.checkpointJobProgress(cf.frontier.Frontier(), checkpoint); err != nil {
@@ -1960,21 +1974,40 @@ func (f *schemaChangeFrontier) SpanFrontier() *span.Frontier {
19601974
}
19611975

19621976
// getCheckpointSpans returns as many spans that should be checkpointed (are
1963-
// above the highwater mark) as can fit in maxBytes.
1964-
func (f *schemaChangeFrontier) getCheckpointSpans(maxBytes int64) (checkpoint []roachpb.Span) {
1965-
var used int64
1977+
// above the highwater mark) as can fit in maxBytes, along with the earliest
1978+
// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
1979+
// spans above the high-water mark.
1980+
func (f *schemaChangeFrontier) getCheckpointSpans(
1981+
maxBytes int64,
1982+
) (spans []roachpb.Span, timestamp hlc.Timestamp) {
19661983
frontier := f.frontierTimestamp()
1984+
1985+
// Collect leading spans into a SpanGroup to merge adjacent spans and store
1986+
// the lowest timestamp found
1987+
var checkpointSpanGroup roachpb.SpanGroup
1988+
checkpointFrontier := hlc.Timestamp{}
19671989
f.Entries(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
19681990
if frontier.Less(ts) {
1969-
used += int64(len(s.Key)) + int64(len(s.EndKey))
1970-
if used > maxBytes {
1971-
return span.StopMatch
1991+
checkpointSpanGroup.Add(s)
1992+
if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
1993+
checkpointFrontier = ts
19721994
}
1973-
checkpoint = append(checkpoint, s)
19741995
}
19751996
return span.ContinueMatch
19761997
})
1977-
return checkpoint
1998+
1999+
// Ensure we only return up to maxBytes spans
2000+
var checkpointSpans []roachpb.Span
2001+
var used int64
2002+
for _, span := range checkpointSpanGroup.Slice() {
2003+
used += int64(len(span.Key)) + int64(len(span.EndKey))
2004+
if used > maxBytes {
2005+
break
2006+
}
2007+
checkpointSpans = append(checkpointSpans, span)
2008+
}
2009+
2010+
return checkpointSpans, checkpointFrontier
19782011
}
19792012

19802013
// BackfillTS returns the timestamp of the incoming spans for an ongoing
@@ -2010,3 +2043,13 @@ func (f *schemaChangeFrontier) schemaChangeBoundaryReached() (r bool) {
20102043
f.latestTs.Equal(f.boundaryTime) &&
20112044
f.boundaryType != jobspb.ResolvedSpan_NONE
20122045
}
2046+
2047+
// hasLaggingSpans returns true when the time between the earliest and latest
2048+
// resolved spans has exceeded the configured HighwaterLagCheckpointThreshold
2049+
func (f *schemaChangeFrontier) hasLaggingSpans(sv *settings.Values) bool {
2050+
lagThreshold := changefeedbase.FrontierHighwaterLagCheckpointThreshold.Get(sv)
2051+
if lagThreshold == 0 {
2052+
return false
2053+
}
2054+
return f.latestTs.GoTime().Sub(f.Frontier().GoTime()) > lagThreshold
2055+
}

0 commit comments

Comments
 (0)