@@ -224,15 +224,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
224224 ctx , ca .cancel = context .WithCancel (ctx )
225225 ca .Ctx = ctx
226226
227- initialHighWater , needsInitialScan := getKVFeedInitialParameters (ca .spec )
228-
229- frontierHighWater := initialHighWater
230- if needsInitialScan {
231- // The frontier highwater marks the latest timestamp we don't need to emit
232- // spans for, and therefore should be 0 if an initial scan is needed
233- frontierHighWater = hlc.Timestamp {}
234- }
235- spans , err := ca .setupSpansAndFrontier (frontierHighWater )
227+ spans , err := ca .setupSpansAndFrontier ()
236228
237229 endTime := ca .spec .Feed .EndTime
238230
@@ -291,7 +283,18 @@ func (ca *changeAggregator) Start(ctx context.Context) {
291283
292284 ca .sink = & errorWrapperSink {wrapped : ca .sink }
293285
294- ca .eventProducer , err = ca .startKVFeed (ctx , spans , initialHighWater , needsInitialScan , endTime , ca .sliMetrics )
286+ // If the initial scan was disabled the highwater would've already been forwarded
287+ needsInitialScan := ca .frontier .Frontier ().IsEmpty ()
288+
289+ // The "HighWater" of the KVFeed is the timestamp it will begin streaming
290+ // change events from. When there's an inital scan, we want the scan to cover
291+ // data up to the StatementTime and change events to begin from that point.
292+ kvFeedHighWater := ca .frontier .Frontier ()
293+ if needsInitialScan {
294+ kvFeedHighWater = ca .spec .Feed .StatementTime
295+ }
296+
297+ ca .eventProducer , err = ca .startKVFeed (ctx , spans , kvFeedHighWater , needsInitialScan , endTime , ca .sliMetrics )
295298 if err != nil {
296299 // Early abort in the case that there is an error creating the sink.
297300 ca .MoveToDraining (err )
@@ -303,7 +306,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
303306 ca .eventConsumer = newNativeKVConsumer (ca .sink )
304307 } else {
305308 ca .eventConsumer = newKVEventToRowConsumer (
306- ctx , ca .flowCtx .Cfg , ca .frontier .SpanFrontier (), initialHighWater ,
309+ ctx , ca .flowCtx .Cfg , ca .frontier .SpanFrontier (), kvFeedHighWater ,
307310 ca .sink , ca .encoder , ca .spec .Feed , ca .knobs , ca .topicNamer )
308311 }
309312}
@@ -428,16 +431,18 @@ func getKVFeedInitialParameters(
428431}
429432
430433// setupSpans is called on start to extract the spans for this changefeed as a
431- // slice and creates a span frontier with the initial resolved timestampsc . This
434+ // slice and creates a span frontier with the initial resolved timestamps . This
432435// SpanFrontier only tracks the spans being watched on this node. There is a
433436// different SpanFrontier elsewhere for the entire changefeed. This object is
434437// used to filter out some previously emitted rows, and by the cloudStorageSink
435438// to name its output files in lexicographically monotonic fashion.
436- func (ca * changeAggregator ) setupSpansAndFrontier (
437- initialHighWater hlc.Timestamp ,
438- ) (spans []roachpb.Span , err error ) {
439+ func (ca * changeAggregator ) setupSpansAndFrontier () (spans []roachpb.Span , err error ) {
440+ var initialHighWater hlc.Timestamp
439441 spans = make ([]roachpb.Span , 0 , len (ca .spec .Watches ))
440442 for _ , watch := range ca .spec .Watches {
443+ if initialHighWater .IsEmpty () || watch .InitialResolved .Less (initialHighWater ) {
444+ initialHighWater = watch .InitialResolved
445+ }
441446 spans = append (spans , watch .Span )
442447 }
443448
@@ -446,17 +451,20 @@ func (ca *changeAggregator) setupSpansAndFrontier(
446451 return nil , err
447452 }
448453
454+ checkpointedSpanTs := ca .spec .Checkpoint .Timestamp
455+
456+ // Checkpoint records from 21.2 were used only for backfills and did not store
457+ // the timestamp, since in a backfill it must either be the StatementTime for
458+ // an initial backfill, or right after the high-water for schema backfills.
459+ if checkpointedSpanTs .IsEmpty () {
460+ if initialHighWater .IsEmpty () {
461+ checkpointedSpanTs = ca .spec .Feed .StatementTime
462+ } else {
463+ checkpointedSpanTs = initialHighWater .Next ()
464+ }
465+ }
449466 // Checkpointed spans are spans that were above the highwater mark, and we
450467 // must preserve that information in the frontier for future checkpointing.
451- // If we don't have a highwater yet (during initial scan) they must at least
452- // be from StatementTime, and given an initial highwater they must all by
453- // definition have been at or after initialHighWater.Next()
454- var checkpointedSpanTs hlc.Timestamp
455- if initialHighWater .IsEmpty () {
456- checkpointedSpanTs = ca .spec .Feed .StatementTime
457- } else {
458- checkpointedSpanTs = initialHighWater .Next ()
459- }
460468 for _ , checkpointedSpan := range ca .spec .Checkpoint .Spans {
461469 if _ , err := ca .frontier .Forward (checkpointedSpan , checkpointedSpanTs ); err != nil {
462470 return nil , err
@@ -578,12 +586,13 @@ func (ca *changeAggregator) noteResolvedSpan(resolved *jobspb.ResolvedSpan) erro
578586 checkpointFrontier := advanced &&
579587 (forceFlush || timeutil .Since (ca .lastFlush ) > ca .flushFrequency )
580588
581- // If backfilling we must also consider the Backfill Checkpointing frequency
582- checkpointBackfill := ca .spec .JobID != 0 && /* enterprise changefeed */
583- resolved .Timestamp .Equal (ca .frontier .BackfillTS ()) &&
584- canCheckpointBackfill (& ca .flowCtx .Cfg .Settings .SV , ca .lastFlush )
589+ // At a lower frequency we checkpoint specific spans in the job progress
590+ // either in backfills or if the highwater mark is excessively lagging behind
591+ checkpointSpans := ca .spec .JobID != 0 && /* enterprise changefeed */
592+ (resolved .Timestamp .Equal (ca .frontier .BackfillTS ()) || ca .frontier .hasLaggingSpans (& ca .flowCtx .Cfg .Settings .SV )) &&
593+ canCheckpointSpans (& ca .flowCtx .Cfg .Settings .SV , ca .lastFlush )
585594
586- if checkpointFrontier || checkpointBackfill {
595+ if checkpointFrontier || checkpointSpans {
587596 defer func () {
588597 ca .lastFlush = timeutil .Now ()
589598 }()
@@ -1178,16 +1187,16 @@ func newJobState(
11781187 }
11791188}
11801189
1181- func canCheckpointBackfill (sv * settings.Values , lastCheckpoint time.Time ) bool {
1190+ func canCheckpointSpans (sv * settings.Values , lastCheckpoint time.Time ) bool {
11821191 freq := changefeedbase .FrontierCheckpointFrequency .Get (sv )
11831192 if freq == 0 {
11841193 return false
11851194 }
11861195 return timeutil .Since (lastCheckpoint ) > freq
11871196}
11881197
1189- func (j * jobState ) canCheckpointBackfill () bool {
1190- return canCheckpointBackfill (& j .settings .SV , j .lastProgressUpdate )
1198+ func (j * jobState ) canCheckpointSpans () bool {
1199+ return canCheckpointSpans (& j .settings .SV , j .lastProgressUpdate )
11911200}
11921201
11931202// canCheckpointHighWatermark returns true if we should update job high water mark (i.e. progress).
@@ -1610,21 +1619,26 @@ func (cf *changeFrontier) maybeCheckpointJob(
16101619 // as we receive spans from the scan request at the Backfill Timestamp
16111620 inBackfill := ! frontierChanged && resolvedSpan .Timestamp .Equal (cf .frontier .BackfillTS ())
16121621
1613- // During a backfill we store a checkpoint of completed scans at a throttled rate in the job record
1614- updateCheckpoint := inBackfill && cf .js .canCheckpointBackfill ()
1615-
1616- var checkpoint jobspb.ChangefeedProgress_Checkpoint
1617- if updateCheckpoint {
1618- maxBytes := changefeedbase .FrontierCheckpointMaxBytes .Get (& cf .flowCtx .Cfg .Settings .SV )
1619- checkpoint .Spans = cf .frontier .getCheckpointSpans (maxBytes )
1620- }
1621-
16221622 // If we're not in a backfill, highwater progress and an empty checkpoint will
16231623 // be saved. This is throttled however we always persist progress to a schema
16241624 // boundary.
16251625 updateHighWater :=
16261626 ! inBackfill && (cf .frontier .schemaChangeBoundaryReached () || cf .js .canCheckpointHighWatermark (frontierChanged ))
16271627
1628+ // During backfills or when some problematic spans stop advancing, the
1629+ // highwater mark remains fixed while other spans may significantly outpace
1630+ // it, therefore to avoid losing that progress on changefeed resumption we
1631+ // also store as many of those leading spans as we can in the job progress
1632+ updateCheckpoint :=
1633+ (inBackfill || cf .frontier .hasLaggingSpans (& cf .js .settings .SV )) && cf .js .canCheckpointSpans ()
1634+
1635+ // If the highwater has moved an empty checkpoint will be saved
1636+ var checkpoint jobspb.ChangefeedProgress_Checkpoint
1637+ if updateCheckpoint {
1638+ maxBytes := changefeedbase .FrontierCheckpointMaxBytes .Get (& cf .flowCtx .Cfg .Settings .SV )
1639+ checkpoint .Spans , checkpoint .Timestamp = cf .frontier .getCheckpointSpans (maxBytes )
1640+ }
1641+
16281642 if updateCheckpoint || updateHighWater {
16291643 checkpointStart := timeutil .Now ()
16301644 if err := cf .checkpointJobProgress (cf .frontier .Frontier (), checkpoint ); err != nil {
@@ -1960,21 +1974,40 @@ func (f *schemaChangeFrontier) SpanFrontier() *span.Frontier {
19601974}
19611975
19621976// getCheckpointSpans returns as many spans that should be checkpointed (are
1963- // above the highwater mark) as can fit in maxBytes.
1964- func (f * schemaChangeFrontier ) getCheckpointSpans (maxBytes int64 ) (checkpoint []roachpb.Span ) {
1965- var used int64
1977+ // above the highwater mark) as can fit in maxBytes, along with the earliest
1978+ // timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
1979+ // spans above the high-water mark.
1980+ func (f * schemaChangeFrontier ) getCheckpointSpans (
1981+ maxBytes int64 ,
1982+ ) (spans []roachpb.Span , timestamp hlc.Timestamp ) {
19661983 frontier := f .frontierTimestamp ()
1984+
1985+ // Collect leading spans into a SpanGroup to merge adjacent spans and store
1986+ // the lowest timestamp found
1987+ var checkpointSpanGroup roachpb.SpanGroup
1988+ checkpointFrontier := hlc.Timestamp {}
19671989 f .Entries (func (s roachpb.Span , ts hlc.Timestamp ) span.OpResult {
19681990 if frontier .Less (ts ) {
1969- used += int64 ( len ( s . Key )) + int64 ( len ( s . EndKey ) )
1970- if used > maxBytes {
1971- return span . StopMatch
1991+ checkpointSpanGroup . Add ( s )
1992+ if checkpointFrontier . IsEmpty () || ts . Less ( checkpointFrontier ) {
1993+ checkpointFrontier = ts
19721994 }
1973- checkpoint = append (checkpoint , s )
19741995 }
19751996 return span .ContinueMatch
19761997 })
1977- return checkpoint
1998+
1999+ // Ensure we only return up to maxBytes spans
2000+ var checkpointSpans []roachpb.Span
2001+ var used int64
2002+ for _ , span := range checkpointSpanGroup .Slice () {
2003+ used += int64 (len (span .Key )) + int64 (len (span .EndKey ))
2004+ if used > maxBytes {
2005+ break
2006+ }
2007+ checkpointSpans = append (checkpointSpans , span )
2008+ }
2009+
2010+ return checkpointSpans , checkpointFrontier
19782011}
19792012
19802013// BackfillTS returns the timestamp of the incoming spans for an ongoing
@@ -2010,3 +2043,13 @@ func (f *schemaChangeFrontier) schemaChangeBoundaryReached() (r bool) {
20102043 f .latestTs .Equal (f .boundaryTime ) &&
20112044 f .boundaryType != jobspb .ResolvedSpan_NONE
20122045}
2046+
2047+ // hasLaggingSpans returns true when the time between the earliest and latest
2048+ // resolved spans has exceeded the configured HighwaterLagCheckpointThreshold
2049+ func (f * schemaChangeFrontier ) hasLaggingSpans (sv * settings.Values ) bool {
2050+ lagThreshold := changefeedbase .FrontierHighwaterLagCheckpointThreshold .Get (sv )
2051+ if lagThreshold == 0 {
2052+ return false
2053+ }
2054+ return f .latestTs .GoTime ().Sub (f .Frontier ().GoTime ()) > lagThreshold
2055+ }
0 commit comments