Skip to content

Commit 5178559

Browse files
committed
kvserverpb: move quorum safeguard into execChangeReplicasTxn
This used to live in the replicate queue, but there are other entry points to replication changes, notably the store rebalancer which caused cockroachdb#54444. Move the check in the guts of replication changes where it is guaranteed to be invoked. Fixes cockroachdb#50729 Touches cockroachdb#54444 (release-20.2) Release note (bug fix): in rare situations, an automated replication change could result in a loss of quorum. This would require down nodes and a simultaneous change in the replication factor. Note that a change in the replication factor can occur automatically if the cluster is comprised of less than five available nodes. Experimentally the likeli- hood of encountering this issue, even under contrived conditions, was small.
1 parent 677f6f8 commit 5178559

15 files changed

Lines changed: 174 additions & 124 deletions

pkg/kv/kvnemesis/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ go_library(
1919
"//pkg/keys",
2020
"//pkg/kv",
2121
"//pkg/kv/kvclient/kvcoord",
22+
"//pkg/kv/kvserver",
2223
"//pkg/roachpb",
2324
"//pkg/storage",
2425
"//pkg/util/bufalloc",

pkg/kv/kvnemesis/validator.go

Lines changed: 9 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"sort"
1818
"strings"
1919

20+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
2021
"github.com/cockroachdb/cockroach/pkg/roachpb"
2122
"github.com/cockroachdb/cockroach/pkg/storage"
2223
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -328,39 +329,14 @@ func (v *validator) processOp(txnID *string, op Operation) {
328329
v.failIfError(op, t.Result)
329330
}
330331
case *ChangeReplicasOperation:
331-
if resultIsError(t.Result, `unable to add replica .* which is already present in`) {
332-
// Generator created this operations based on data about a range's
333-
// replicas that is now stale (because it raced with some other operation
334-
// created by that Generator): a replica is being added and in the
335-
// meantime, some other operation added the same replica.
336-
} else if resultIsError(t.Result, `unable to add replica .* which is already present as a learner`) {
337-
// Generator created this operations based on data about a range's
338-
// replicas that is now stale (because it raced with some other operation
339-
// created by that Generator): a replica is being added and in the
340-
// meantime, some other operation started (but did not finish) adding the
341-
// same replica.
342-
} else if resultIsError(t.Result, `descriptor changed`) {
343-
// Race between two operations being executed concurrently. Applier grabs
344-
// a range descriptor and then calls AdminChangeReplicas with it, but the
345-
// descriptor is changed by some other operation in between.
346-
} else if resultIsError(t.Result, `received invalid ChangeReplicasTrigger .* to remove self \(leaseholder\)`) {
347-
// Removing the leaseholder is invalid for technical reasons, but
348-
// Generator intentiontally does not try to avoid this so that this edge
349-
// case is exercised.
350-
} else if resultIsError(t.Result, `removing .* which is not in`) {
351-
// Generator created this operations based on data about a range's
352-
// replicas that is now stale (because it raced with some other operation
353-
// created by that Generator): a replica is being removed and in the
354-
// meantime, some other operation removed the same replica.
355-
} else if resultIsError(t.Result, `remote failed to apply snapshot for reason failed to apply snapshot: raft group deleted`) {
356-
// Probably should be transparently retried.
357-
} else if resultIsError(t.Result, `remote failed to apply snapshot for reason failed to apply snapshot: .* cannot add placeholder, have an existing placeholder`) {
358-
// Probably should be transparently retried.
359-
} else if resultIsError(t.Result, `cannot apply snapshot: snapshot intersects existing range`) {
360-
// Probably should be transparently retried.
361-
} else if resultIsError(t.Result, `snapshot of type LEARNER was sent to .* which did not contain it as a replica`) {
362-
// Probably should be transparently retried.
363-
} else {
332+
var ignore bool
333+
if t.Result.Type == ResultType_Error {
334+
ctx := context.Background()
335+
err := errors.DecodeError(ctx, *t.Result.Err)
336+
ignore = kvserver.IsRetriableReplicationChangeError(err) ||
337+
kvserver.IsIllegalReplicationChangeError(err)
338+
}
339+
if !ignore {
364340
v.failIfError(op, t.Result)
365341
}
366342
case *BatchOperation:

pkg/kv/kvserver/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ go_library(
1414
"gc_queue.go",
1515
"lease_history.go",
1616
"log.go",
17+
"markers.go",
1718
"merge_queue.go",
1819
"metrics.go",
1920
"queue.go",

pkg/kv/kvserver/client_merge_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,7 +1983,7 @@ func TestStoreRangeMergeAddReplicaRace(t *testing.T) {
19831983

19841984
const acceptableMergeErr = `unexpected value: raw_bytes|ranges not collocated` +
19851985
`|cannot merge range with non-voter replicas`
1986-
if mergeErr == nil && testutils.IsError(addErr, `descriptor changed: \[expected\]`) {
1986+
if mergeErr == nil && kvserver.IsRetriableReplicationChangeError(addErr) {
19871987
// Merge won the race, no add happened.
19881988
require.Len(t, afterDesc.Replicas().Voters(), 1)
19891989
require.Equal(t, origDesc.EndKey, afterDesc.EndKey)
@@ -2030,7 +2030,7 @@ func TestStoreRangeMergeResplitAddReplicaRace(t *testing.T) {
20302030

20312031
_, err := tc.Server(0).DB().AdminChangeReplicas(
20322032
ctx, scratchStartKey, origDesc, roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, tc.Target(1)))
2033-
if !testutils.IsError(err, `descriptor changed`) {
2033+
if !kvserver.IsRetriableReplicationChangeError(err) {
20342034
t.Fatalf(`expected "descriptor changed" error got: %+v`, err)
20352035
}
20362036
}

pkg/kv/kvserver/client_raft_test.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1998,10 +1998,11 @@ func runReplicateRestartAfterTruncation(t *testing.T, removeBeforeTruncateAndReA
19981998

19991999
func testReplicaAddRemove(t *testing.T, addFirst bool) {
20002000
sc := kvserver.TestStoreConfig(nil)
2001-
// We're gonna want to validate the state of the store before and after the
2002-
// replica GC queue does its work, so we disable the replica gc queue here
2003-
// and run it manually when we're ready.
2001+
// We're gonna want to validate the state of the store before and
2002+
// after the replica GC queue does its work, so we disable the
2003+
// replica gc queue here and run it manually when we're ready.
20042004
sc.TestingKnobs.DisableReplicaGCQueue = true
2005+
sc.TestingKnobs.DisableReplicateQueue = true
20052006
sc.TestingKnobs.DisableEagerReplicaRemoval = true
20062007
sc.Clock = nil // manual clock
20072008
mtc := &multiTestContext{
@@ -2852,7 +2853,7 @@ func TestRemovePlaceholderRace(t *testing.T) {
28522853
StoreID: mtc.stores[1].Ident.StoreID,
28532854
})
28542855
if _, err := repl.ChangeReplicas(ctx, repl.Desc(), kvserver.SnapshotRequest_REBALANCE, kvserverpb.ReasonUnknown, "", chgs); err != nil {
2855-
if kvserver.IsSnapshotError(err) {
2856+
if kvserver.IsRetriableReplicationChangeError(err) {
28562857
continue
28572858
} else {
28582859
t.Fatal(err)
@@ -4786,6 +4787,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
47864787
// Newly-started stores (including the "rogue" one) should not GC
47874788
// their replicas. We'll turn this back on when needed.
47884789
sc.TestingKnobs.DisableReplicaGCQueue = true
4790+
sc.TestingKnobs.DisableReplicateQueue = true
47894791
sc.RaftDelaySplitToSuppressSnapshotTicks = 0
47904792
// Make the tick interval short so we don't need to wait too long for the
47914793
// partitioned leader to time out. Also make the
@@ -4959,8 +4961,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
49594961
// and will be rolled back. Nevertheless it will have learned that it
49604962
// has been removed at the old replica ID.
49614963
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
4962-
require.True(t,
4963-
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
4964+
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
49644965

49654966
// Without a partitioned RHS we'll end up always writing a tombstone here because
49664967
// the RHS will be created at the initial replica ID because it will get
@@ -5008,8 +5009,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
50085009
// and will be rolled back. Nevertheless it will have learned that it
50095010
// has been removed at the old replica ID.
50105011
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
5011-
require.True(t,
5012-
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
5012+
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
50135013

50145014
// Without a partitioned RHS we'll end up always writing a tombstone here because
50155015
// the RHS will be created at the initial replica ID because it will get
@@ -5078,8 +5078,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
50785078
// and will be rolled back. Nevertheless it will have learned that it
50795079
// has been removed at the old replica ID.
50805080
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
5081-
require.True(t,
5082-
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
5081+
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
50835082
// Ensure that the replica exists with the higher replica ID.
50845083
repl, err := mtc.Store(0).GetReplica(rhsInfo.Desc.RangeID)
50855084
require.NoError(t, err)
@@ -5135,8 +5134,7 @@ func TestProcessSplitAfterRightHandSideHasBeenRemoved(t *testing.T) {
51355134
// and will be rolled back. Nevertheless it will have learned that it
51365135
// has been removed at the old replica ID.
51375136
err = changeReplicas(t, db, roachpb.ADD_VOTER, keyB, 0)
5138-
require.True(t,
5139-
testutils.IsError(err, "snapshot failed.*cannot apply snapshot: snapshot intersects"), err)
5137+
require.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
51405138
// Ensure that there's no tombstone.
51415139
// The RHS on store 0 never should have heard about its original ID.
51425140
ensureNoTombstone(t, mtc.Store(0), rhsID)

pkg/kv/kvserver/client_replica_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2240,9 +2240,7 @@ func TestRandomConcurrentAdminChangeReplicasRequests(t *testing.T) {
22402240
var gotSuccess bool
22412241
for _, err := range errors {
22422242
if err != nil {
2243-
const exp = "change replicas of .* failed: descriptor changed" +
2244-
"|snapshot failed:"
2245-
assert.True(t, testutils.IsError(err, exp), err)
2243+
assert.True(t, kvserver.IsRetriableReplicationChangeError(err), err)
22462244
} else if gotSuccess {
22472245
t.Error("expected only one success")
22482246
} else {

pkg/kv/kvserver/client_split_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3172,7 +3172,7 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
31723172
})
31733173

31743174
close(blockPromoteCh)
3175-
if err := g.Wait(); !testutils.IsError(err, `descriptor changed`) {
3175+
if err := g.Wait(); !kvserver.IsRetriableReplicationChangeError(err) {
31763176
t.Fatalf(`expected "descriptor changed" error got: %+v`, err)
31773177
}
31783178

@@ -3184,8 +3184,8 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
31843184
// has not heard a raft message addressed to a later replica ID while the
31853185
// "was not found on" error is expected if the store has heard that it has
31863186
// a newer replica ID before receiving the snapshot.
3187-
if !testutils.IsError(err, `snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+`) {
3188-
t.Fatalf(`expected snapshot intersects existing range|r[0-9]+ was not found on s[0-9]+" error got: %+v`, err)
3187+
if !kvserver.IsRetriableReplicationChangeError(err) {
3188+
t.Fatal(err)
31893189
}
31903190
}
31913191
for i := 0; i < 5; i++ {

pkg/kv/kvserver/client_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,10 @@ func (m *multiTestContext) makeStoreConfig(i int) kvserver.StoreConfig {
790790
cfg.TestingKnobs.DisableMergeQueue = true
791791
cfg.TestingKnobs.DisableSplitQueue = true
792792
cfg.TestingKnobs.ReplicateQueueAcceptsUnsplit = true
793+
// The mtc does not populate the allocator's store pool well and so
794+
// the check never sees any live replicas.
795+
cfg.TestingKnobs.AllowDangerousReplicationChanges = true
796+
793797
return cfg
794798
}
795799

@@ -1242,10 +1246,7 @@ func (m *multiTestContext) changeReplicas(
12421246
continue
12431247
}
12441248

1245-
// We can't use storage.IsSnapshotError() because the original error object
1246-
// is lost. We could make a this into a roachpb.Error but it seems overkill
1247-
// for this one usage.
1248-
if testutils.IsError(err, "snapshot failed: .*|descriptor changed") {
1249+
if kvserver.IsRetriableReplicationChangeError(err) {
12491250
log.Infof(ctx, "%v", err)
12501251
continue
12511252
}

pkg/kv/kvserver/markers.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2020 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package kvserver
12+
13+
import (
14+
"github.com/cockroachdb/cockroach/pkg/roachpb"
15+
"github.com/cockroachdb/errors"
16+
)
17+
18+
// NB: don't change the string here; this will cause cross-version issues
19+
// since this singleton is used as a marker.
20+
var errMarkSnapshotError = errors.New("snapshot failed")
21+
22+
// isSnapshotError returns true iff the error indicates that a snapshot failed.
23+
func isSnapshotError(err error) bool {
24+
return errors.Is(err, errMarkSnapshotError)
25+
}
26+
27+
// NB: don't change the string here; this will cause cross-version issues
28+
// since this singleton is used as a marker.
29+
var errMarkCanRetryReplicationChangeWithUpdatedDesc = errors.New("should retry with updated descriptor")
30+
31+
// IsRetriableReplicationChangeError detects whether an error (which is
32+
// assumed to have been emitted by the KV layer during a replication change
33+
// operation) is likely to succeed when retried with an updated descriptor.
34+
func IsRetriableReplicationChangeError(err error) bool {
35+
return errors.Is(err, errMarkCanRetryReplicationChangeWithUpdatedDesc) || isSnapshotError(err)
36+
}
37+
38+
const (
39+
descChangedRangeSubsumedErrorFmt = "descriptor changed: expected %s != [actual] nil (range subsumed)"
40+
descChangedErrorFmt = "descriptor changed: [expected] %s != [actual] %s"
41+
)
42+
43+
func newDescChangedError(desc, actualDesc *roachpb.RangeDescriptor) error {
44+
if actualDesc == nil {
45+
return errors.Mark(errors.Newf(descChangedRangeSubsumedErrorFmt, desc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
46+
}
47+
return errors.Mark(errors.Newf(descChangedErrorFmt, desc, actualDesc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
48+
}
49+
50+
func wrapDescChangedError(err error, desc, actualDesc *roachpb.RangeDescriptor) error {
51+
if actualDesc == nil {
52+
return errors.Mark(errors.Wrapf(err, descChangedRangeSubsumedErrorFmt, desc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
53+
}
54+
return errors.Mark(errors.Wrapf(err, descChangedErrorFmt, desc, actualDesc), errMarkCanRetryReplicationChangeWithUpdatedDesc)
55+
}
56+
57+
// NB: don't change the string here; this will cause cross-version issues
58+
// since this singleton is used as a marker.
59+
var errMarkInvalidReplicationChange = errors.New("invalid replication change")
60+
61+
// IsIllegalReplicationChangeError detects whether an error (assumed to have been emitted
62+
// by a replication change) indicates that the replication change is illegal, meaning
63+
// that it's unlikely to be handled through a retry. Examples of this are attempts to add
64+
// a store that is already a member of the supplied descriptor. A non-example is a change
65+
// detected in the descriptor at the KV layer relative to that supplied as input to the
66+
// replication change, which would likely benefit from a retry.
67+
func IsIllegalReplicationChangeError(err error) bool {
68+
return errors.Is(err, errMarkInvalidReplicationChange)
69+
}

0 commit comments

Comments
 (0)