@@ -19,6 +19,7 @@ import (
1919 "fmt"
2020 "math"
2121 "math/rand"
22+ "sync/atomic"
2223 "time"
2324
2425 "github.com/pkg/errors"
@@ -106,10 +107,6 @@ func newGCQueue(store *Store, gossip *gossip.Gossip) *gcQueue {
106107 return gcq
107108}
108109
109- // A gcFunc sends the chunks of GCRequests in the order in which they
110- // appear. It may but does not have to return on the first error.
111- type gcFunc func (context.Context , [][]roachpb.GCRequest_GCKey , * GCInfo ) error
112-
113110// A cleanupIntentsFunc synchronously resolves the supplied intents
114111// (which may be PENDING, in which case they are first pushed) while
115112// taking care of proper batching.
@@ -513,32 +510,65 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg config.Sy
513510 return gcq .processImpl (ctx , repl , sysCfg , now )
514511}
515512
516- // chunkGCRequest chunks the supplied gcKeys (which are consumed by this method) into
517- // multiple batches which must be executed in order by the caller.
518- func chunkGCRequest (
519- desc * roachpb.RangeDescriptor , info * GCInfo , gcKeys [][]roachpb.GCRequest_GCKey ,
520- ) []roachpb.GCRequest {
513+ // NoopGCer implements GCer by doing nothing.
514+ type NoopGCer struct {}
515+
516+ var _ GCer = NoopGCer {}
517+
518+ // SetGCThreshold implements storage.GCer.
519+ func (NoopGCer ) SetGCThreshold (context.Context , GCThreshold ) error { return nil }
520+
521+ // GC implements storage.GCer.
522+ func (NoopGCer ) GC (context.Context , []roachpb.GCRequest_GCKey ) error { return nil }
523+
524+ type replicaGCer struct {
525+ repl * Replica
526+ count int32 // update atomically
527+ }
528+
529+ var _ GCer = & replicaGCer {}
530+
531+ func (r * replicaGCer ) template () roachpb.GCRequest {
532+ desc := r .repl .Desc ()
521533 var template roachpb.GCRequest
522534 template .Key = desc .StartKey .AsRawKey ()
523535 template .EndKey = desc .EndKey .AsRawKey ()
524- ret := make ([]roachpb.GCRequest , 0 , len (gcKeys )+ 1 )
525536
526- gc1 := template
527- gc1 . Threshold = info . Threshold
528- gc1 . TxnSpanGCThreshold = info . TxnSpanGCThreshold
529- // The first request is intentionally kept very small since it
530- // updates the Range-wide GCThresholds, and thus must block all
531- // reads and writes while it is being applied.
537+ return template
538+ }
539+
540+ func ( r * replicaGCer ) send ( ctx context. Context , req roachpb. GCRequest ) error {
541+ n := atomic . AddInt32 ( & r . count , 1 )
542+ log . Eventf ( ctx , "sending batch %d (%d keys)" , n , len ( req . Keys ))
532543
533- ret = append ( ret , gc1 )
544+ var ba roachpb. BatchRequest
534545
535- for _ , keys := range gcKeys {
536- gc := template
537- gc .Keys = keys
538- ret = append (ret , gc )
546+ // Technically not needed since we're talking directly to the Replica.
547+ ba .RangeID = r .repl .Desc ().RangeID
548+ ba .Timestamp = r .repl .Clock ().Now ()
549+ ba .Add (& req )
550+
551+ if _ , pErr := r .repl .Send (ctx , ba ); pErr != nil {
552+ log .VErrEvent (ctx , 2 , pErr .String ())
553+ return pErr .GoError ()
539554 }
555+ return nil
556+ }
540557
541- return ret
558+ func (r * replicaGCer ) SetGCThreshold (ctx context.Context , thresh GCThreshold ) error {
559+ req := r .template ()
560+ req .Threshold = thresh .Key
561+ req .TxnSpanGCThreshold = thresh .Txn
562+ return r .send (ctx , req )
563+ }
564+
565+ func (r * replicaGCer ) GC (ctx context.Context , keys []roachpb.GCRequest_GCKey ) error {
566+ if len (keys ) == 0 {
567+ return nil
568+ }
569+ req := r .template ()
570+ req .Keys = keys
571+ return r .send (ctx , req )
542572}
543573
544574func (gcq * gcQueue ) processImpl (
@@ -554,32 +584,7 @@ func (gcq *gcQueue) processImpl(
554584 return errors .Errorf ("could not find zone config for range %s: %s" , repl , err )
555585 }
556586
557- info , err := RunGC (ctx , desc , snap , now , zone .GC ,
558- func (ctx context.Context , gcKeys [][]roachpb.GCRequest_GCKey , info * GCInfo ) error {
559- // Chunk the keys into multiple GC requests to interleave more
560- // gracefully with other Raft traffic.
561- batches := chunkGCRequest (desc , info , gcKeys )
562-
563- for i , gcArgs := range batches {
564- var ba roachpb.BatchRequest
565-
566- // Technically not needed since we're talking directly to the Range.
567- ba .RangeID = desc .RangeID
568- ba .Timestamp = now
569-
570- // TODO(tschottdorf): This is one of these instances in which we want
571- // to be more careful that the request ends up on the correct Replica,
572- // and we might have to worry about mixing range-local and global keys
573- // in a batch which might end up spanning Ranges by the time it executes.
574- ba .Add (& gcArgs )
575- log .Eventf (ctx , "sending batch %d of %d" , i + 1 , len (batches ))
576- if _ , pErr := repl .Send (ctx , ba ); pErr != nil {
577- log .VErrEvent (ctx , 2 , pErr .String ())
578- return pErr .GoError ()
579- }
580- }
581- return nil
582- },
587+ info , err := RunGC (ctx , desc , snap , now , zone .GC , & replicaGCer {repl : repl },
583588 func (ctx context.Context , intents []roachpb.Intent ) error {
584589 intentCount , err := repl .store .intentResolver .cleanupIntents (ctx , intents , now , roachpb .PUSH_ABORT )
585590 if err == nil {
@@ -670,6 +675,18 @@ type lockableGCInfo struct {
670675 GCInfo
671676}
672677
678+ // GCThreshold holds the key and txn span GC thresholds, respectively.
679+ type GCThreshold struct {
680+ Key hlc.Timestamp
681+ Txn hlc.Timestamp
682+ }
683+
684+ // A GCer is an abstraction used by the GC queue to carry out chunked deletions.
685+ type GCer interface {
686+ SetGCThreshold (context.Context , GCThreshold ) error
687+ GC (context.Context , []roachpb.GCRequest_GCKey ) error
688+ }
689+
673690// RunGC runs garbage collection for the specified descriptor on the
674691// provided Engine (which is not mutated). It uses the provided gcFn
675692// to run garbage collection once on all implicated spans,
@@ -682,7 +699,7 @@ func RunGC(
682699 snap engine.Reader ,
683700 now hlc.Timestamp ,
684701 policy config.GCPolicy ,
685- gcFn gcFunc ,
702+ gcer GCer ,
686703 cleanupIntentsFn cleanupIntentsFunc ,
687704 cleanupTxnIntentsAsyncFn cleanupTxnIntentsAsyncFunc ,
688705) (GCInfo , error ) {
@@ -702,7 +719,13 @@ func RunGC(
702719 infoMu .Threshold = gc .Threshold
703720 infoMu .TxnSpanGCThreshold = txnExp
704721
705- var gcKeys [][]roachpb.GCRequest_GCKey
722+ if err := gcer .SetGCThreshold (ctx , GCThreshold {
723+ Key : gc .Threshold ,
724+ Txn : txnExp ,
725+ }); err != nil {
726+ return GCInfo {}, errors .Wrap (err , "failed to set GC thresholds" )
727+ }
728+
706729 var batchGCKeys []roachpb.GCRequest_GCKey
707730 var batchGCKeysBytes int64
708731 var expBaseKey roachpb.Key
@@ -777,8 +800,17 @@ func RunGC(
777800 // chunk and start a new one.
778801 if batchGCKeysBytes >= gcKeyVersionChunkBytes {
779802 batchGCKeys = append (batchGCKeys , roachpb.GCRequest_GCKey {Key : expBaseKey , Timestamp : keys [i ].Timestamp })
780- gcKeys = append (gcKeys , batchGCKeys )
781- batchGCKeys = []roachpb.GCRequest_GCKey {}
803+ if err := gcer .GC (ctx , batchGCKeys ); err != nil {
804+ // Even though we are batching the GC process, it's
805+ // safe to continue because we bumped the GC
806+ // thresholds. We may leave some inconsistent history
807+ // behind, but nobody can read it.
808+ log .Warning (ctx , err )
809+ return
810+ }
811+ // Allow releasing the memory backing batchGCKeys.
812+ iter .ResetAllocator ()
813+ batchGCKeys = nil
782814 batchGCKeysBytes = 0
783815 }
784816 }
@@ -823,42 +855,34 @@ func RunGC(
823855 // Handle last collected set of keys/vals.
824856 processKeysAndValues ()
825857 if len (batchGCKeys ) > 0 {
826- gcKeys = append (gcKeys , batchGCKeys )
858+ if err := gcer .GC (ctx , batchGCKeys ); err != nil {
859+ return GCInfo {}, err
860+ }
827861 }
828862
863+ // From now on, all newly added keys are range-local.
864+
829865 // Process local range key entries (txn records, queue last processed times).
830866 localRangeKeys , err := processLocalKeyRange (ctx , snap , desc , txnExp , & infoMu , cleanupTxnIntentsAsyncFn )
831867 if err != nil {
832868 return GCInfo {}, err
833869 }
834870
835- // From now on, all newly added keys are range-local.
836- // TODO(tschottdorf): Might need to use two requests at some point since we
837- // hard-coded the full non-local key range in the header, but that does
838- // not take into account the range-local keys. It will be OK as long as
839- // we send directly to the Replica, though.
840- if len (localRangeKeys ) > 0 {
841- gcKeys = append (gcKeys , localRangeKeys )
871+ if err := gcer .GC (ctx , localRangeKeys ); err != nil {
872+ return GCInfo {}, err
842873 }
843874
844875 // Clean up the AbortSpan.
845876 log .Event (ctx , "processing AbortSpan" )
846877 abortSpanKeys := processAbortSpan (ctx , snap , desc .RangeID , txnExp , & infoMu )
847- if len ( abortSpanKeys ) > 0 {
848- gcKeys = append ( gcKeys , abortSpanKeys )
878+ if err := gcer . GC ( ctx , abortSpanKeys ); err != nil {
879+ return GCInfo {}, err
849880 }
850881
851882 infoMu .Lock ()
852- log .Eventf (ctx , "assembled GC keys, now proceeding to GC ; stats so far %+v" , infoMu .GCInfo )
883+ log .Eventf (ctx , "GC'ed keys; stats %+v" , infoMu .GCInfo )
853884 infoMu .Unlock ()
854885
855- // Process the keys before beginning to push transactions and
856- // resolve intents so that we don't lose all of the work we've done
857- // thus far gathering GC'able keys.
858- if err := gcFn (ctx , gcKeys , & infoMu .GCInfo ); err != nil {
859- return GCInfo {}, err
860- }
861-
862886 // Push transactions (if pending) and resolve intents.
863887 var intents []roachpb.Intent
864888 for txnID , txn := range txnMap {
0 commit comments