Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,15 +258,30 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
// by the previous lease holder.
r.mu.Lock()
r.mu.state.Lease = &newLease
expirationBasedLease := r.requiresExpiringLeaseRLocked()
r.mu.Unlock()

// Gossip the first range whenever its lease is acquired. We check to
// make sure the lease is active so that a trailing replica won't process
// an old lease request and attempt to gossip the first range.
// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
// lease request and attempt to gossip the first range.
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.IsLeaseValid(newLease, r.store.Clock().Now()) {
r.gossipFirstRange(ctx)
}

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.IsLeaseValid(newLease, r.store.Clock().Now()) {
// It's not worth blocking on a full channel here since the worst that can
// happen is the lease times out and has to be reacquired when needed, but
// log an error since it's pretty unexpected.
select {
case r.store.expirationBasedLeaseChan <- r:
default:
log.Warningf(ctx, "unable to kick off proactive lease renewal; channel full")
}
}

if leaseChangingHands && !iAmTheLeaseHolder {
// Also clear and disable the push transaction queue. Any waiters
// must be redirected to the new lease holder.
Expand Down
74 changes: 74 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,10 @@ type Store struct {
// data destruction.
snapshotApplySem chan struct{}

// Channel of newly-acquired expiration-based leases that we want to
// proactively renew.
expirationBasedLeaseChan chan *Replica

// draining holds a bool which indicates whether this store is draining. See
// SetDraining() for a more detailed explanation of behavior changes.
//
Expand Down Expand Up @@ -880,6 +884,11 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript

s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit)

// The channel size here is arbitrary. We don't want it to fill up, but it
// isn't a disaster if it does and it shouldn't unless a huge number of meta2
// range leases are acquired at once.
s.expirationBasedLeaseChan = make(chan *Replica, 64)

s.limiters.BulkIOWriteRate = rate.NewLimiter(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)), bulkIOWriteBurst)
bulkIOWriteLimit.SetOnChange(&cfg.Settings.SV, func() {
s.limiters.BulkIOWriteRate.SetLimit(rate.Limit(bulkIOWriteLimit.Get(&cfg.Settings.SV)))
Expand Down Expand Up @@ -1433,6 +1442,8 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
log.Event(ctx, "computed initial metrics")
}

s.startLeaseRenewer(ctx)

// Start the storage engine compactor.
if envutil.EnvOrDefaultBool("COCKROACH_ENABLE_COMPACTOR", true) {
s.compactor.Start(s.AnnotateCtx(context.Background()), s.Tracer(), s.stopper)
Expand Down Expand Up @@ -1542,6 +1553,69 @@ func (s *Store) startGossip() {
}
}

// startLeaseRenewer runs an infinite loop in a goroutine which regularly
// checks whether the store has any expiration-based leases that should be
// proactively renewed and attempts to continue renewing them.
//
// This reduces user-visible latency when range lookups are needed to serve a
// request and reduces ping-ponging of r1's lease to different replicas as
// maybeGossipFirstRange is called on each (e.g. #24753).
func (s *Store) startLeaseRenewer(ctx context.Context) {
// Start a goroutine that watches and proactively renews certain
// expiration-based leases.
s.stopper.RunWorker(ctx, func(context.Context) {
repls := make(map[*Replica]struct{})
timer := timeutil.NewTimer()
defer timer.Stop()

// Determine how frequently to attempt to ensure that we have each lease.
// The divisor used here is somewhat arbitrary, but needs to be large
// enough to ensure we'll attempt to renew the lease reasonably early
// within the RangeLeaseRenewalDuration time window. This means we'll wake
// up more often that strictly necessary, but it's more maintainable than
// attempting to accurately determine exactly when each iteration of a
// lease expires and when we should attempt to renew it as a result.
renewalDuration := s.cfg.RangeLeaseActiveDuration() / 5
for {
for repl := range repls {
annotatedCtx := repl.AnnotateCtx(ctx)
_, pErr := repl.redirectOnOrAcquireLease(annotatedCtx)
if pErr != nil {
if _, ok := pErr.GetDetail().(*roachpb.NotLeaseHolderError); !ok {
log.Warningf(annotatedCtx, "failed to proactively renew lease: %s", pErr)
}
delete(repls, repl)
continue
}
}

if len(repls) > 0 {
timer.Reset(renewalDuration)
}
select {
case repl := <-s.expirationBasedLeaseChan:
repls[repl] = struct{}{}
// If we got one entry off the channel, there may be more (e.g. a node
// holding a bunch of meta2 ranges just failed), so keep pulling until
// we can't get any more.
continuepulling:
for {
select {
case repl := <-s.expirationBasedLeaseChan:
repls[repl] = struct{}{}
default:
break continuepulling
}
}
case <-timer.C:
timer.Read = true
case <-s.stopper.ShouldStop():
return
}
}
})
}

// systemGossipUpdate is a callback for gossip updates to
// the system config which affect range split boundaries.
func (s *Store) systemGossipUpdate(cfg config.SystemConfig) {
Expand Down