Skip to content

Commit 87e85eb

Browse files
committed
storage: eagerly carry out chunked GC
The GC queue was already splitting up the set of keys to be deleted into small parts to avoid overloading the Raft machinery. From there, it was only a small step to let it dispatch the corresponding GCRequests eagerly. The upshot is that now we don't have to hold all of the keys to be deleted in memory any more. With appropriate testing, I think this is a change suitable for inclusion in 2.0.1. "Appropriate testing" includes a roachtest which creates a large range (larger than available RAM) and fills it with GC'able data. (A variant of the `DROP` test that increases the range size suitably should do it). Such a test is filed as #24214. Release note (performance improvement): lowered the amount of memory used during garbage collection of old versions.
1 parent 077f7e1 commit 87e85eb

3 files changed

Lines changed: 96 additions & 74 deletions

File tree

pkg/cli/debug.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
610610
snap,
611611
hlc.Timestamp{WallTime: timeutil.Now().UnixNano()},
612612
config.GCPolicy{TTLSeconds: 24 * 60 * 60 /* 1 day */},
613-
func(_ context.Context, _ [][]roachpb.GCRequest_GCKey, _ *storage.GCInfo) error { return nil },
613+
storage.NoopGCer{},
614614
func(_ context.Context, _ []roachpb.Intent) error { return nil },
615615
func(_ context.Context, _ *roachpb.Transaction, _ []roachpb.Intent) error { return nil },
616616
)

pkg/storage/gc_queue.go

Lines changed: 94 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"fmt"
2020
"math"
2121
"math/rand"
22+
"sync/atomic"
2223
"time"
2324

2425
"github.com/pkg/errors"
@@ -106,10 +107,6 @@ func newGCQueue(store *Store, gossip *gossip.Gossip) *gcQueue {
106107
return gcq
107108
}
108109

109-
// A gcFunc sends the chunks of GCRequests in the order in which they
110-
// appear. It may but does not have to return on the first error.
111-
type gcFunc func(context.Context, [][]roachpb.GCRequest_GCKey, *GCInfo) error
112-
113110
// A cleanupIntentsFunc synchronously resolves the supplied intents
114111
// (which may be PENDING, in which case they are first pushed) while
115112
// taking care of proper batching.
@@ -513,32 +510,65 @@ func (gcq *gcQueue) process(ctx context.Context, repl *Replica, sysCfg config.Sy
513510
return gcq.processImpl(ctx, repl, sysCfg, now)
514511
}
515512

516-
// chunkGCRequest chunks the supplied gcKeys (which are consumed by this method) into
517-
// multiple batches which must be executed in order by the caller.
518-
func chunkGCRequest(
519-
desc *roachpb.RangeDescriptor, info *GCInfo, gcKeys [][]roachpb.GCRequest_GCKey,
520-
) []roachpb.GCRequest {
513+
// NoopGCer implements GCer by doing nothing.
514+
type NoopGCer struct{}
515+
516+
var _ GCer = NoopGCer{}
517+
518+
// SetGCThreshold implements storage.GCer.
519+
func (NoopGCer) SetGCThreshold(context.Context, GCThreshold) error { return nil }
520+
521+
// GC implements storage.GCer.
522+
func (NoopGCer) GC(context.Context, []roachpb.GCRequest_GCKey) error { return nil }
523+
524+
type replicaGCer struct {
525+
repl *Replica
526+
count int32 // update atomically
527+
}
528+
529+
var _ GCer = &replicaGCer{}
530+
531+
func (r *replicaGCer) template() roachpb.GCRequest {
532+
desc := r.repl.Desc()
521533
var template roachpb.GCRequest
522534
template.Key = desc.StartKey.AsRawKey()
523535
template.EndKey = desc.EndKey.AsRawKey()
524-
ret := make([]roachpb.GCRequest, 0, len(gcKeys)+1)
525536

526-
gc1 := template
527-
gc1.Threshold = info.Threshold
528-
gc1.TxnSpanGCThreshold = info.TxnSpanGCThreshold
529-
// The first request is intentionally kept very small since it
530-
// updates the Range-wide GCThresholds, and thus must block all
531-
// reads and writes while it is being applied.
537+
return template
538+
}
539+
540+
func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error {
541+
n := atomic.AddInt32(&r.count, 1)
542+
log.Eventf(ctx, "sending batch %d (%d keys)", n, len(req.Keys))
532543

533-
ret = append(ret, gc1)
544+
var ba roachpb.BatchRequest
534545

535-
for _, keys := range gcKeys {
536-
gc := template
537-
gc.Keys = keys
538-
ret = append(ret, gc)
546+
// Technically not needed since we're talking directly to the Replica.
547+
ba.RangeID = r.repl.Desc().RangeID
548+
ba.Timestamp = r.repl.Clock().Now()
549+
ba.Add(&req)
550+
551+
if _, pErr := r.repl.Send(ctx, ba); pErr != nil {
552+
log.VErrEvent(ctx, 2, pErr.String())
553+
return pErr.GoError()
539554
}
555+
return nil
556+
}
540557

541-
return ret
558+
func (r *replicaGCer) SetGCThreshold(ctx context.Context, thresh GCThreshold) error {
559+
req := r.template()
560+
req.Threshold = thresh.Key
561+
req.TxnSpanGCThreshold = thresh.Txn
562+
return r.send(ctx, req)
563+
}
564+
565+
func (r *replicaGCer) GC(ctx context.Context, keys []roachpb.GCRequest_GCKey) error {
566+
if len(keys) == 0 {
567+
return nil
568+
}
569+
req := r.template()
570+
req.Keys = keys
571+
return r.send(ctx, req)
542572
}
543573

544574
func (gcq *gcQueue) processImpl(
@@ -554,32 +584,7 @@ func (gcq *gcQueue) processImpl(
554584
return errors.Errorf("could not find zone config for range %s: %s", repl, err)
555585
}
556586

557-
info, err := RunGC(ctx, desc, snap, now, zone.GC,
558-
func(ctx context.Context, gcKeys [][]roachpb.GCRequest_GCKey, info *GCInfo) error {
559-
// Chunk the keys into multiple GC requests to interleave more
560-
// gracefully with other Raft traffic.
561-
batches := chunkGCRequest(desc, info, gcKeys)
562-
563-
for i, gcArgs := range batches {
564-
var ba roachpb.BatchRequest
565-
566-
// Technically not needed since we're talking directly to the Range.
567-
ba.RangeID = desc.RangeID
568-
ba.Timestamp = now
569-
570-
// TODO(tschottdorf): This is one of these instances in which we want
571-
// to be more careful that the request ends up on the correct Replica,
572-
// and we might have to worry about mixing range-local and global keys
573-
// in a batch which might end up spanning Ranges by the time it executes.
574-
ba.Add(&gcArgs)
575-
log.Eventf(ctx, "sending batch %d of %d", i+1, len(batches))
576-
if _, pErr := repl.Send(ctx, ba); pErr != nil {
577-
log.VErrEvent(ctx, 2, pErr.String())
578-
return pErr.GoError()
579-
}
580-
}
581-
return nil
582-
},
587+
info, err := RunGC(ctx, desc, snap, now, zone.GC, &replicaGCer{repl: repl},
583588
func(ctx context.Context, intents []roachpb.Intent) error {
584589
intentCount, err := repl.store.intentResolver.cleanupIntents(ctx, intents, now, roachpb.PUSH_ABORT)
585590
if err == nil {
@@ -670,6 +675,18 @@ type lockableGCInfo struct {
670675
GCInfo
671676
}
672677

678+
// GCThreshold holds the key and txn span GC thresholds, respectively.
679+
type GCThreshold struct {
680+
Key hlc.Timestamp
681+
Txn hlc.Timestamp
682+
}
683+
684+
// A GCer is an abstraction used by the GC queue to carry out chunked deletions.
685+
type GCer interface {
686+
SetGCThreshold(context.Context, GCThreshold) error
687+
GC(context.Context, []roachpb.GCRequest_GCKey) error
688+
}
689+
673690
// RunGC runs garbage collection for the specified descriptor on the
674691
// provided Engine (which is not mutated). It uses the provided gcFn
675692
// to run garbage collection once on all implicated spans,
@@ -682,7 +699,7 @@ func RunGC(
682699
snap engine.Reader,
683700
now hlc.Timestamp,
684701
policy config.GCPolicy,
685-
gcFn gcFunc,
702+
gcer GCer,
686703
cleanupIntentsFn cleanupIntentsFunc,
687704
cleanupTxnIntentsAsyncFn cleanupTxnIntentsAsyncFunc,
688705
) (GCInfo, error) {
@@ -702,7 +719,13 @@ func RunGC(
702719
infoMu.Threshold = gc.Threshold
703720
infoMu.TxnSpanGCThreshold = txnExp
704721

705-
var gcKeys [][]roachpb.GCRequest_GCKey
722+
if err := gcer.SetGCThreshold(ctx, GCThreshold{
723+
Key: gc.Threshold,
724+
Txn: txnExp,
725+
}); err != nil {
726+
return GCInfo{}, errors.Wrap(err, "failed to set GC thresholds")
727+
}
728+
706729
var batchGCKeys []roachpb.GCRequest_GCKey
707730
var batchGCKeysBytes int64
708731
var expBaseKey roachpb.Key
@@ -777,8 +800,17 @@ func RunGC(
777800
// chunk and start a new one.
778801
if batchGCKeysBytes >= gcKeyVersionChunkBytes {
779802
batchGCKeys = append(batchGCKeys, roachpb.GCRequest_GCKey{Key: expBaseKey, Timestamp: keys[i].Timestamp})
780-
gcKeys = append(gcKeys, batchGCKeys)
781-
batchGCKeys = []roachpb.GCRequest_GCKey{}
803+
if err := gcer.GC(ctx, batchGCKeys); err != nil {
804+
// Even though we are batching the GC process, it's
805+
// safe to continue because we bumped the GC
806+
// thresholds. We may leave some inconsistent history
807+
// behind, but nobody can read it.
808+
log.Warning(ctx, err)
809+
return
810+
}
811+
// Allow releasing the memory backing batchGCKeys.
812+
iter.ResetAllocator()
813+
batchGCKeys = nil
782814
batchGCKeysBytes = 0
783815
}
784816
}
@@ -823,42 +855,34 @@ func RunGC(
823855
// Handle last collected set of keys/vals.
824856
processKeysAndValues()
825857
if len(batchGCKeys) > 0 {
826-
gcKeys = append(gcKeys, batchGCKeys)
858+
if err := gcer.GC(ctx, batchGCKeys); err != nil {
859+
return GCInfo{}, err
860+
}
827861
}
828862

863+
// From now on, all newly added keys are range-local.
864+
829865
// Process local range key entries (txn records, queue last processed times).
830866
localRangeKeys, err := processLocalKeyRange(ctx, snap, desc, txnExp, &infoMu, cleanupTxnIntentsAsyncFn)
831867
if err != nil {
832868
return GCInfo{}, err
833869
}
834870

835-
// From now on, all newly added keys are range-local.
836-
// TODO(tschottdorf): Might need to use two requests at some point since we
837-
// hard-coded the full non-local key range in the header, but that does
838-
// not take into account the range-local keys. It will be OK as long as
839-
// we send directly to the Replica, though.
840-
if len(localRangeKeys) > 0 {
841-
gcKeys = append(gcKeys, localRangeKeys)
871+
if err := gcer.GC(ctx, localRangeKeys); err != nil {
872+
return GCInfo{}, err
842873
}
843874

844875
// Clean up the AbortSpan.
845876
log.Event(ctx, "processing AbortSpan")
846877
abortSpanKeys := processAbortSpan(ctx, snap, desc.RangeID, txnExp, &infoMu)
847-
if len(abortSpanKeys) > 0 {
848-
gcKeys = append(gcKeys, abortSpanKeys)
878+
if err := gcer.GC(ctx, abortSpanKeys); err != nil {
879+
return GCInfo{}, err
849880
}
850881

851882
infoMu.Lock()
852-
log.Eventf(ctx, "assembled GC keys, now proceeding to GC; stats so far %+v", infoMu.GCInfo)
883+
log.Eventf(ctx, "GC'ed keys; stats %+v", infoMu.GCInfo)
853884
infoMu.Unlock()
854885

855-
// Process the keys before beginning to push transactions and
856-
// resolve intents so that we don't lose all of the work we've done
857-
// thus far gathering GC'able keys.
858-
if err := gcFn(ctx, gcKeys, &infoMu.GCInfo); err != nil {
859-
return GCInfo{}, err
860-
}
861-
862886
// Push transactions (if pending) and resolve intents.
863887
var intents []roachpb.Intent
864888
for txnID, txn := range txnMap {

pkg/storage/gc_queue_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -506,9 +506,7 @@ func TestGCQueueProcess(t *testing.T) {
506506
ctx := context.Background()
507507
now := tc.Clock().Now()
508508
return RunGC(ctx, desc, snap, now, zone.GC,
509-
func(ctx context.Context, gcKeys [][]roachpb.GCRequest_GCKey, info *GCInfo) error {
510-
return nil
511-
},
509+
NoopGCer{},
512510
func(ctx context.Context, intents []roachpb.Intent) error {
513511
return nil
514512
},

0 commit comments

Comments
 (0)