Skip to content

Commit 83128c9

Browse files
committed
storage: Don't transfer leases to behind replicas in StoreRebalancer
Transferring a lease to a replica that's behind can cause all requests to the range to stall, as the old leaseholder thinks it's no longer the leaseholder but the new leaseholder doesn't know it's the leaseholder yet. This avoids creating such scenarios in the StoreRebalancer. Release note (bug fix): Avoids an edge case in load-based rebalancing where we could transfer the lease for a range to a replica that isn't keeping up with the other replicas, causing brief periods where no replicas think they're leaseholder for the range and thus no requests can be processed for the range.
1 parent e5644aa commit 83128c9

3 files changed

Lines changed: 174 additions & 14 deletions

File tree

pkg/storage/allocator.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1158,23 +1158,32 @@ func filterBehindReplicas(
11581158
// Raft leader. This is rare enough not to matter.
11591159
return nil
11601160
}
1161+
candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas))
1162+
for _, r := range replicas {
1163+
if !replicaIsBehind(raftStatus, r.ReplicaID) || r.ReplicaID == brandNewReplicaID {
1164+
candidates = append(candidates, r)
1165+
}
1166+
}
1167+
return candidates
1168+
}
1169+
1170+
func replicaIsBehind(raftStatus *raft.Status, replicaID roachpb.ReplicaID) bool {
1171+
if raftStatus == nil || len(raftStatus.Progress) == 0 {
1172+
return true
1173+
}
11611174
// NB: We use raftStatus.Commit instead of getQuorumIndex() because the
11621175
// latter can return a value that is less than the commit index. This is
11631176
// useful for Raft log truncation which sometimes wishes to keep those
11641177
// earlier indexes, but not appropriate for determining which nodes are
11651178
// behind the actual commit index of the range.
1166-
candidates := make([]roachpb.ReplicaDescriptor, 0, len(replicas))
1167-
for _, r := range replicas {
1168-
if progress, ok := raftStatus.Progress[uint64(r.ReplicaID)]; ok {
1169-
if uint64(r.ReplicaID) == raftStatus.Lead ||
1170-
r.ReplicaID == brandNewReplicaID ||
1171-
(progress.State == raft.ProgressStateReplicate &&
1172-
progress.Match >= raftStatus.Commit) {
1173-
candidates = append(candidates, r)
1174-
}
1179+
if progress, ok := raftStatus.Progress[uint64(replicaID)]; ok {
1180+
if uint64(replicaID) == raftStatus.Lead ||
1181+
(progress.State == raft.ProgressStateReplicate &&
1182+
progress.Match >= raftStatus.Commit) {
1183+
return false
11751184
}
11761185
}
1177-
return candidates
1186+
return true
11781187
}
11791188

11801189
func simulateFilterUnremovableReplicas(

pkg/storage/store_rebalancer.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/util/log"
2828
"github.com/cockroachdb/cockroach/pkg/util/metric"
2929
"github.com/cockroachdb/cockroach/pkg/util/stop"
30+
"go.etcd.io/etcd/raft"
3031
)
3132

3233
const (
@@ -123,10 +124,11 @@ const (
123124
// will best accomplish the store-level goals.
124125
type StoreRebalancer struct {
125126
log.AmbientContext
126-
metrics StoreRebalancerMetrics
127-
st *cluster.Settings
128-
rq *replicateQueue
129-
replRankings *replicaRankings
127+
metrics StoreRebalancerMetrics
128+
st *cluster.Settings
129+
rq *replicateQueue
130+
replRankings *replicaRankings
131+
getRaftStatusFn func(replica *Replica) *raft.Status
130132
}
131133

132134
// NewStoreRebalancer creates a StoreRebalancer to work in tandem with the
@@ -143,6 +145,9 @@ func NewStoreRebalancer(
143145
st: st,
144146
rq: rq,
145147
replRankings: replRankings,
148+
getRaftStatusFn: func(replica *Replica) *raft.Status {
149+
return replica.RaftStatus()
150+
},
146151
}
147152
sr.AddLogTag("store-rebalancer", nil)
148153
sr.rq.store.metrics.registry.AddMetricStruct(&sr.metrics)
@@ -395,6 +400,8 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
395400
return iQPS < jQPS
396401
})
397402

403+
var raftStatus *raft.Status
404+
398405
for _, candidate := range replicas {
399406
if candidate.StoreID == localDesc.StoreID {
400407
continue
@@ -405,6 +412,15 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
405412
continue
406413
}
407414

415+
if raftStatus == nil {
416+
raftStatus = sr.getRaftStatusFn(replWithStats.repl)
417+
}
418+
if replicaIsBehind(raftStatus, candidate.ReplicaID) {
419+
log.VEventf(ctx, 3, "%v is behind or this store isn't the raft leader; raftStatus: %v",
420+
candidate, raftStatus)
421+
continue
422+
}
423+
408424
preferred := sr.rq.allocator.preferredLeaseholders(zone, desc.Replicas)
409425
if len(preferred) > 0 && !storeHasReplica(candidate.StoreID, preferred) {
410426
log.VEventf(ctx, 3, "s%d not a preferred leaseholder; preferred: %v", candidate.StoreID, preferred)
@@ -566,7 +582,25 @@ func (sr *StoreRebalancer) chooseReplicaToRebalance(
566582
// RelocateRange transfers the lease to the first provided target.
567583
newLeaseIdx := 0
568584
newLeaseQPS := math.MaxFloat64
585+
var raftStatus *raft.Status
569586
for i := 0; i < len(targets); i++ {
587+
// Ensure we don't transfer the lease to an existing replica that is behind
588+
// in processing its raft log.
589+
var replicaID roachpb.ReplicaID
590+
for _, replica := range desc.Replicas {
591+
if replica.StoreID == targets[i].StoreID {
592+
replicaID = replica.ReplicaID
593+
}
594+
}
595+
if replicaID != 0 {
596+
if raftStatus == nil {
597+
raftStatus = sr.getRaftStatusFn(replWithStats.repl)
598+
}
599+
if replicaIsBehind(raftStatus, replicaID) {
600+
continue
601+
}
602+
}
603+
570604
storeDesc, ok := storeMap[targets[i].StoreID]
571605
if ok && storeDesc.Capacity.QueriesPerSecond < newLeaseQPS {
572606
newLeaseIdx = i

pkg/storage/store_rebalancer_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2828
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
2929
"github.com/cockroachdb/cockroach/pkg/util/stop"
30+
"go.etcd.io/etcd/raft"
3031
)
3132

3233
var (
@@ -133,6 +134,24 @@ func TestChooseLeaseToTransfer(t *testing.T) {
133134

134135
sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr)
135136

137+
// Rather than trying to populate every Replica with a real raft group in
138+
// order to pass replicaIsBehind checks, fake out the function for getting
139+
// raft status with one that always returns all replicas as up to date.
140+
sr.getRaftStatusFn = func(r *Replica) *raft.Status {
141+
status := &raft.Status{
142+
Progress: make(map[uint64]raft.Progress),
143+
}
144+
status.Lead = uint64(r.ReplicaID())
145+
status.Commit = 1
146+
for _, replica := range r.Desc().Replicas {
147+
status.Progress[uint64(replica.ReplicaID)] = raft.Progress{
148+
Match: 1,
149+
State: raft.ProgressStateReplicate,
150+
}
151+
}
152+
return status
153+
}
154+
136155
testCases := []struct {
137156
storeIDs []roachpb.StoreID
138157
qps float64
@@ -198,6 +217,24 @@ func TestChooseReplicaToRebalance(t *testing.T) {
198217

199218
sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr)
200219

220+
// Rather than trying to populate every Replica with a real raft group in
221+
// order to pass replicaIsBehind checks, fake out the function for getting
222+
// raft status with one that always returns all replicas as up to date.
223+
sr.getRaftStatusFn = func(r *Replica) *raft.Status {
224+
status := &raft.Status{
225+
Progress: make(map[uint64]raft.Progress),
226+
}
227+
status.Lead = uint64(r.ReplicaID())
228+
status.Commit = 1
229+
for _, replica := range r.Desc().Replicas {
230+
status.Progress[uint64(replica.ReplicaID)] = raft.Progress{
231+
Match: 1,
232+
State: raft.ProgressStateReplicate,
233+
}
234+
}
235+
return status
236+
}
237+
201238
testCases := []struct {
202239
storeIDs []roachpb.StoreID
203240
qps float64
@@ -262,3 +299,83 @@ func TestChooseReplicaToRebalance(t *testing.T) {
262299
})
263300
}
264301
}
302+
303+
func TestNoLeaseTransferToBehindReplicas(t *testing.T) {
304+
defer leaktest.AfterTest(t)()
305+
306+
// Lots of setup boilerplate.
307+
308+
ctx := context.Background()
309+
stopper := stop.NewStopper()
310+
defer stopper.Stop(ctx)
311+
312+
stopper, g, _, a, _ := createTestAllocator( /* deterministic */ false)
313+
defer stopper.Stop(context.Background())
314+
gossiputil.NewStoreGossiper(g).GossipStores(noLocalityStores, t)
315+
storeList, _, _ := a.storePool.getStoreList(firstRange, storeFilterThrottled)
316+
storeMap := storeListToMap(storeList)
317+
318+
const minQPS = 800
319+
const maxQPS = 1200
320+
321+
localDesc := *noLocalityStores[0]
322+
cfg := TestStoreConfig(nil)
323+
s := createTestStoreWithoutStart(t, stopper, &cfg)
324+
s.Ident = &roachpb.StoreIdent{StoreID: localDesc.StoreID}
325+
rq := newReplicateQueue(s, g, a)
326+
rr := newReplicaRankings()
327+
328+
sr := NewStoreRebalancer(cfg.AmbientCtx, cfg.Settings, rq, rr)
329+
330+
// Load in a range with replicas on an overfull node, a slightly underfull
331+
// node, and a very underfull node.
332+
loadRanges(rr, s, []testRange{{storeIDs: []roachpb.StoreID{1, 4, 5}, qps: 100}})
333+
hottestRanges := rr.topQPS()
334+
repl := hottestRanges[0].repl
335+
336+
// Set up a fake RaftStatus that indicates s5 is behind (but all other stores
337+
// are caught up). We thus shouldn't transfer a lease to s5.
338+
sr.getRaftStatusFn = func(r *Replica) *raft.Status {
339+
status := &raft.Status{
340+
Progress: make(map[uint64]raft.Progress),
341+
}
342+
status.Lead = uint64(r.ReplicaID())
343+
status.Commit = 1
344+
for _, replica := range r.Desc().Replicas {
345+
match := uint64(1)
346+
if replica.StoreID == roachpb.StoreID(5) {
347+
match = 0
348+
}
349+
status.Progress[uint64(replica.ReplicaID)] = raft.Progress{
350+
Match: match,
351+
State: raft.ProgressStateReplicate,
352+
}
353+
}
354+
return status
355+
}
356+
357+
_, target, _ := sr.chooseLeaseToTransfer(
358+
ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS)
359+
expectTarget := roachpb.StoreID(4)
360+
if target.StoreID != expectTarget {
361+
t.Errorf("got target store s%d for range with RaftStatus %v; want s%d",
362+
target.StoreID, sr.getRaftStatusFn(repl), expectTarget)
363+
}
364+
365+
// Then do the same, but for replica rebalancing. Make s5 an existing replica
366+
// that's behind, and see how a new replica is preferred as the leaseholder
367+
// over it.
368+
loadRanges(rr, s, []testRange{{storeIDs: []roachpb.StoreID{1, 3, 5}, qps: 100}})
369+
hottestRanges = rr.topQPS()
370+
repl = hottestRanges[0].repl
371+
372+
_, targets := sr.chooseReplicaToRebalance(
373+
ctx, &hottestRanges, &localDesc, storeList, storeMap, minQPS, maxQPS)
374+
expectTargets := []roachpb.ReplicationTarget{
375+
{NodeID: 4, StoreID: 4}, {NodeID: 5, StoreID: 5}, {NodeID: 3, StoreID: 3},
376+
}
377+
if !reflect.DeepEqual(targets, expectTargets) {
378+
t.Errorf("got targets %v for range with RaftStatus %v; want %v",
379+
targets, sr.getRaftStatusFn(repl), expectTargets)
380+
}
381+
}

0 commit comments

Comments
 (0)