Skip to content

Commit 6444df5

Browse files
committed
kvserver: deflake TestReplicaTombstone
Like many other tests, this test could flake because we'd sometimes catch a "cannot remove learner while snapshot is in flight" error. I think the root cause is that sometimes there are errant Raft snapshots in the system[^1] and these get mistaken for LEARNERs that are still being caught up by the replicate queue. I tried to address this general class of issues by making the check for in-flight learner snapshots not care about *raft* snapshots. I was able to stress TestReplicaTombstone for 30+ minutes without a failure using that approach, whereas previously it usually failed within a few minutes. ``` ./dev test --stress pkg/kv/kvserver/ --filter TestReplicaTombstone 2>&1 | tee stress.log [...] 2461 runs so far, 0 failures, over 35m45s ``` [^1]: #87553 Fixes #98883. Epic: none Release note: None
1 parent 4dc10b5 commit 6444df5

6 files changed

Lines changed: 46 additions & 36 deletions

File tree

pkg/kv/kvserver/raft_log_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func newTruncateDecision(ctx context.Context, r *Replica) (truncateDecision, err
268268
raftStatus := r.raftStatusRLocked()
269269

270270
const anyRecipientStore roachpb.StoreID = 0
271-
pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore)
271+
_, pendingSnapshotIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */)
272272
lastIndex := r.mu.lastIndexNotDurable
273273
// NB: raftLogSize above adjusts for pending truncations that have already
274274
// been successfully replicated via raft, but logSizeTrusted does not see if

pkg/kv/kvserver/raft_log_queue_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -648,7 +648,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
648648

649649
r.mu.state.RaftAppliedIndex = index1
650650
// Add first constraint.
651-
_, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
651+
_, cleanup1 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID)
652652
exp1 := map[uuid.UUID]snapTruncationInfo{id1: {index: index1}}
653653

654654
// Make sure it registered.
@@ -658,15 +658,15 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
658658
// Add another constraint with the same id. Extremely unlikely in practice
659659
// but we want to make sure it doesn't blow anything up. Collisions are
660660
// handled by ignoring the colliding update.
661-
_, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, storeID)
661+
_, cleanup2 := r.addSnapshotLogTruncationConstraint(ctx, id1, false /* initial */, storeID)
662662
assert.Equal(t, r.mu.snapshotLogTruncationConstraints, exp1)
663663

664664
// Helper that grabs the min constraint index (which can trigger GC as a
665665
// byproduct) and asserts.
666666
assertMin := func(exp uint64, now time.Time) {
667667
t.Helper()
668668
const anyRecipientStore roachpb.StoreID = 0
669-
if maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore); maxIndex != exp {
669+
if _, maxIndex := r.getSnapshotLogTruncationConstraintsRLocked(anyRecipientStore, false /* initialOnly */); maxIndex != exp {
670670
t.Fatalf("unexpected max index %d, wanted %d", maxIndex, exp)
671671
}
672672
}
@@ -678,7 +678,7 @@ func TestSnapshotLogTruncationConstraints(t *testing.T) {
678678
r.mu.state.RaftAppliedIndex = index2
679679
// Add another, higher, index. We're not going to notice it's around
680680
// until the lower one disappears.
681-
_, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, storeID)
681+
_, cleanup3 := r.addSnapshotLogTruncationConstraint(ctx, id2, false /* initial */, storeID)
682682

683683
now := timeutil.Now()
684684
// The colliding snapshot comes back. Or the original, we can't tell.

pkg/kv/kvserver/raft_snapshot_queue.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,10 @@ func (rq *raftSnapshotQueue) processRaftSnapshot(
116116
if fn := repl.store.cfg.TestingKnobs.RaftSnapshotQueueSkipReplica; fn != nil && fn() {
117117
return false, nil
118118
}
119-
if repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID) {
120-
// There is a snapshot being transferred. It's probably an INITIAL snap,
121-
// so bail for now and try again later.
119+
// NB: we could pass `false` for initialOnly as well, but we are the "other"
120+
// possible sender.
121+
if _, ok := repl.hasOutstandingSnapshotInFlightToStore(repDesc.StoreID, true /* initialOnly */); ok {
122+
// There is an INITIAL snapshot being transferred, so bail for now and try again later.
122123
err := errors.Errorf(
123124
"skipping snapshot; replica is likely a %s in the process of being added: %s",
124125
typ,

pkg/kv/kvserver/replica.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2314,7 +2314,7 @@ func (r *Replica) GetLoadStatsForTesting() *load.ReplicaLoad {
23142314
// HasOutstandingLearnerSnapshotInFlightForTesting is for use only by tests to
23152315
// gather whether there are in-flight snapshots to learner replcas.
23162316
func (r *Replica) HasOutstandingLearnerSnapshotInFlightForTesting() bool {
2317-
return r.hasOutstandingLearnerSnapshotInFlight()
2317+
return r.errOnOutstandingLearnerSnapshotInflight() != nil
23182318
}
23192319

23202320
// ReadProtectedTimestampsForTesting is for use only by tests to read and update

pkg/kv/kvserver/replica_command.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1334,9 +1334,9 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
13341334
// periods of time on a single range without making progress, which can stall
13351335
// other operations that they are expected to perform (see
13361336
// https://github.com/cockroachdb/cockroach/issues/79249 for example).
1337-
if r.hasOutstandingLearnerSnapshotInFlight() {
1337+
if err := r.errOnOutstandingLearnerSnapshotInflight(); err != nil {
13381338
return nil /* desc */, 0, /* learnersRemoved */
1339-
errCannotRemoveLearnerWhileSnapshotInFlight
1339+
errors.WithSecondaryError(errCannotRemoveLearnerWhileSnapshotInFlight, err)
13401340
}
13411341

13421342
if fn := r.store.TestingKnobs().BeforeRemovingDemotedLearner; fn != nil {
@@ -1839,7 +1839,7 @@ func (r *Replica) lockLearnerSnapshot(
18391839
var cleanups []func()
18401840
for _, addition := range additions {
18411841
lockUUID := uuid.MakeV4()
1842-
_, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, addition.StoreID)
1842+
_, cleanup := r.addSnapshotLogTruncationConstraint(ctx, lockUUID, true /* initial */, addition.StoreID)
18431843
cleanups = append(cleanups, cleanup)
18441844
}
18451845
return func() {
@@ -2793,7 +2793,7 @@ func (r *Replica) sendSnapshotUsingDelegate(
27932793
senderQueuePriority = 0
27942794
}
27952795
snapUUID := uuid.MakeV4()
2796-
appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, recipient.StoreID)
2796+
appliedIndex, cleanup := r.addSnapshotLogTruncationConstraint(ctx, snapUUID, snapType == kvserverpb.SnapshotRequest_INITIAL, recipient.StoreID)
27972797
// The cleanup function needs to be called regardless of success or failure of
27982798
// sending to release the log truncation constraint.
27992799
defer cleanup()

pkg/kv/kvserver/replica_raft.go

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1756,6 +1756,7 @@ func (r *Replica) reportSnapshotStatus(ctx context.Context, to roachpb.ReplicaID
17561756
type snapTruncationInfo struct {
17571757
index uint64
17581758
recipientStore roachpb.StoreID
1759+
initial bool
17591760
}
17601761

17611762
// addSnapshotLogTruncation creates a log truncation record which will prevent
@@ -1771,8 +1772,10 @@ type snapTruncationInfo struct {
17711772
// a possibly stale value here is harmless since the values increases
17721773
// monotonically. The actual snapshot index, may preserve more from a log
17731774
// truncation perspective.
1775+
// If initial is true, the snapshot is marked as being sent by the replicate
1776+
// queue to a new replica; some callers only care about these snapshots.
17741777
func (r *Replica) addSnapshotLogTruncationConstraint(
1775-
ctx context.Context, snapUUID uuid.UUID, recipientStore roachpb.StoreID,
1778+
ctx context.Context, snapUUID uuid.UUID, initial bool, recipientStore roachpb.StoreID,
17761779
) (uint64, func()) {
17771780
r.mu.Lock()
17781781
defer r.mu.Unlock()
@@ -1794,6 +1797,7 @@ func (r *Replica) addSnapshotLogTruncationConstraint(
17941797
r.mu.snapshotLogTruncationConstraints[snapUUID] = snapTruncationInfo{
17951798
index: appliedIndex,
17961799
recipientStore: recipientStore,
1800+
initial: initial,
17971801
}
17981802

17991803
return appliedIndex, func() {
@@ -1814,48 +1818,53 @@ func (r *Replica) addSnapshotLogTruncationConstraint(
18141818
}
18151819
}
18161820

1817-
// getSnapshotLogTruncationConstraints returns the minimum index of any
1821+
// getSnapshotLogTruncationConstraintsRLocked returns the minimum index of any
18181822
// currently outstanding snapshot being sent from this replica to the specified
18191823
// recipient or 0 if there isn't one. Passing 0 for recipientStore means any
1820-
// recipient.
1821-
func (r *Replica) getSnapshotLogTruncationConstraints(
1822-
recipientStore roachpb.StoreID,
1823-
) (minSnapIndex uint64) {
1824-
r.mu.RLock()
1825-
defer r.mu.RUnlock()
1826-
return r.getSnapshotLogTruncationConstraintsRLocked(recipientStore)
1827-
}
1828-
1824+
// recipient. If initialOnly is set, only snapshots sent by the replicate queue
1825+
// to new replicas are considered.
18291826
func (r *Replica) getSnapshotLogTruncationConstraintsRLocked(
1830-
recipientStore roachpb.StoreID,
1831-
) (minSnapIndex uint64) {
1827+
recipientStore roachpb.StoreID, initialOnly bool,
1828+
) (_ []snapTruncationInfo, minSnapIndex uint64) {
1829+
var sl []snapTruncationInfo
18321830
for _, item := range r.mu.snapshotLogTruncationConstraints {
1831+
if initialOnly && !item.initial {
1832+
continue
1833+
}
18331834
if recipientStore != 0 && item.recipientStore != recipientStore {
18341835
continue
18351836
}
18361837
if minSnapIndex == 0 || minSnapIndex > item.index {
18371838
minSnapIndex = item.index
18381839
}
1840+
sl = append(sl, item)
18391841
}
1840-
return minSnapIndex
1842+
return sl, minSnapIndex
18411843
}
18421844

1843-
// hasOutstandingLearnerSnapshotInFlight returns true if there is a snapshot in
1844-
// progress from this replica to a learner replica for this range.
1845-
func (r *Replica) hasOutstandingLearnerSnapshotInFlight() bool {
1845+
// errOnOutstandingLearnerSnapshotInflight returns an error if there is a
1846+
// snapshot in progress from this replica to a learner replica for this range.
1847+
func (r *Replica) errOnOutstandingLearnerSnapshotInflight() error {
18461848
learners := r.Desc().Replicas().LearnerDescriptors()
18471849
for _, repl := range learners {
1848-
if r.hasOutstandingSnapshotInFlightToStore(repl.StoreID) {
1849-
return true
1850+
sl, _ := r.hasOutstandingSnapshotInFlightToStore(repl.StoreID, true /* initialOnly */)
1851+
if len(sl) > 0 {
1852+
return errors.Errorf("INITIAL snapshots in flight to s%d: %v", repl.StoreID, sl)
18501853
}
18511854
}
1852-
return false
1855+
return nil
18531856
}
18541857

18551858
// hasOutstandingSnapshotInFlightToStore returns true if there is a snapshot in
1856-
// flight from this replica to the store with the given ID.
1857-
func (r *Replica) hasOutstandingSnapshotInFlightToStore(storeID roachpb.StoreID) bool {
1858-
return r.getSnapshotLogTruncationConstraints(storeID) > 0
1859+
// flight from this replica to the store with the given ID. If initialOnly is
1860+
// true, only snapshots sent by the replicate queue to new replicas are considered.
1861+
func (r *Replica) hasOutstandingSnapshotInFlightToStore(
1862+
storeID roachpb.StoreID, initialOnly bool,
1863+
) ([]snapTruncationInfo, bool) {
1864+
r.mu.RLock()
1865+
defer r.mu.RUnlock()
1866+
sl, idx := r.getSnapshotLogTruncationConstraintsRLocked(storeID, initialOnly)
1867+
return sl, idx > 0
18591868
}
18601869

18611870
// HasRaftLeader returns true if the raft group has a raft leader currently.

0 commit comments

Comments
 (0)