Skip to content

Commit 03b116f

Browse files
committed
storage: prevent unbounded raft log growth without quorum
Fixes #27772. This change adds safeguards to prevent cases where a raft log would grow without bound during loss of quorum scenarios. It also adds a new test that demonstrates that the raft log does not grow without bound in these cases. There are two cases that need to be handled to prevent the unbounded raft log growth observed in #27772. 1. When the leader proposes a command and cannot establish a quorum. In this case, we know the leader has the entry in its log, so there's no need to refresh it with `reasonTicks`. To avoid this, we no longer use `refreshTicks` as a leader. 2. When a follower proposes a command that is forwarded to the leader who cannot establish a quorum. In this case, the follower can't be sure (currently) that the leader got the proposal, so it needs to refresh using `reasonTicks`. However, the leader now detects duplicate forwarded proposals and avoids appending redundant entries to its log. It does so by maintaining a set of in-flight forwarded proposals that it has received during its term as leader. This set is reset after every leadership change. Both of these cases are tested against in the new TestLogGrowthWhenRefreshingPendingCommands. Without both of the safeguards introduced in this commit, the test fails. Release note (bug fix): Prevent unbounded growth of the raft log caused by a loss of quorum.
1 parent d08a3ec commit 03b116f

5 files changed

Lines changed: 219 additions & 7 deletions

File tree

pkg/storage/client_raft_test.go

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ import (
4242
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
4343
"github.com/cockroachdb/cockroach/pkg/testutils"
4444
"github.com/cockroachdb/cockroach/pkg/util/hlc"
45+
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
4546
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
4647
"github.com/cockroachdb/cockroach/pkg/util/log"
4748
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
@@ -1126,6 +1127,144 @@ func TestRefreshPendingCommands(t *testing.T) {
11261127
}
11271128
}
11281129

1130+
// Test that when a Raft group is not able to establish a quorum, its Raft log
1131+
// does not grow without bound. It tests two different scenerios where this used
1132+
// to be possible (see #27772):
1133+
// 1. The leader proposes a command and cannot establish a quorum. The leader
1134+
// continually re-proposes the command.
1135+
// 2. The follower proposes a command and forwards it to the leader, who cannot
1136+
// establish a quorum. The follower continually re-proposes and forwards the
1137+
// command to the leader.
1138+
func TestLogGrowthWhenRefreshingPendingCommands(t *testing.T) {
1139+
defer leaktest.AfterTest(t)()
1140+
1141+
sc := storage.TestStoreConfig(nil)
1142+
// Drop the raft tick interval so the Raft group is ticked more.
1143+
sc.RaftTickInterval = 10 * time.Millisecond
1144+
// Don't timeout raft leader. We don't want leadership moving.
1145+
sc.RaftElectionTimeoutTicks = 1000000
1146+
// Disable leader transfers during leaseholder changes so that we
1147+
// can easily create leader-not-leaseholder scenarios.
1148+
sc.TestingKnobs.DisableLeaderFollowsLeaseholder = true
1149+
// Refresh pending commands on every Raft group tick instead of
1150+
// every RaftElectionTimeoutTicks.
1151+
sc.TestingKnobs.RefreshReasonTicksPeriod = 1
1152+
// Disable periodic gossip tasks which can move the range 1 lease
1153+
// unexpectedly.
1154+
sc.TestingKnobs.DisablePeriodicGossips = true
1155+
mtc := &multiTestContext{storeConfig: &sc}
1156+
defer mtc.Stop()
1157+
mtc.Start(t, 5)
1158+
1159+
const rangeID = roachpb.RangeID(1)
1160+
mtc.replicateRange(rangeID, 1, 2, 3, 4)
1161+
1162+
// Raft leadership is kept on node 0.
1163+
leaderRepl, err := mtc.Store(0).GetReplica(rangeID)
1164+
if err != nil {
1165+
t.Fatal(err)
1166+
}
1167+
1168+
// Put some data in the range so we'll have something to test for.
1169+
incArgs := incrementArgs([]byte("a"), 5)
1170+
if _, err := client.SendWrapped(context.Background(), mtc.stores[0].TestSender(), incArgs); err != nil {
1171+
t.Fatal(err)
1172+
}
1173+
1174+
// Wait for all nodes to catch up.
1175+
mtc.waitForValues(roachpb.Key("a"), []int64{5, 5, 5, 5, 5})
1176+
1177+
// Test proposing on leader and proposing on follower. Neither should result
1178+
// in unbounded raft log growth.
1179+
testutils.RunTrueAndFalse(t, "proposeOnFollower", func(t *testing.T, proposeOnFollower bool) {
1180+
// Restart any nodes that are down.
1181+
for _, s := range []int{2, 3, 4} {
1182+
if mtc.Store(s) == nil {
1183+
mtc.restartStore(s)
1184+
}
1185+
}
1186+
1187+
// Determine which node to propose on. Transfer lease to that node.
1188+
var propIdx, otherIdx int
1189+
if !proposeOnFollower {
1190+
propIdx, otherIdx = 0, 1
1191+
} else {
1192+
propIdx, otherIdx = 1, 0
1193+
}
1194+
propNode := mtc.stores[propIdx].TestSender()
1195+
mtc.transferLease(context.TODO(), rangeID, otherIdx, propIdx)
1196+
testutils.SucceedsSoon(t, func() error {
1197+
// Lease transfers may not be immediately observed by the new
1198+
// leaseholder. Wait until the new leaseholder is aware.
1199+
repl, err := mtc.Store(propIdx).GetReplica(rangeID)
1200+
if err != nil {
1201+
t.Fatal(err)
1202+
}
1203+
repDesc, err := repl.GetReplicaDescriptor()
1204+
if err != nil {
1205+
t.Fatal(err)
1206+
}
1207+
if lease, _ := repl.GetLease(); lease.Replica != repDesc {
1208+
return errors.Errorf("lease not transferred yet; found %v", lease)
1209+
}
1210+
return nil
1211+
})
1212+
1213+
// Stop enough nodes to prevent a quorum.
1214+
for _, s := range []int{2, 3, 4} {
1215+
mtc.stopStore(s)
1216+
}
1217+
1218+
// Determine the current raft log size.
1219+
initLogSize := leaderRepl.GetRaftLogSize()
1220+
1221+
// While a majority nodes are down, write some data.
1222+
putRes := make(chan *roachpb.Error)
1223+
go func() {
1224+
putArgs := putArgs([]byte("b"), make([]byte, 8<<10 /* 8 KB */))
1225+
_, err := client.SendWrapped(context.Background(), propNode, putArgs)
1226+
putRes <- err
1227+
}()
1228+
1229+
// Wait for a bit and watch for Raft log growth.
1230+
wait := time.After(500 * time.Millisecond)
1231+
ticker := time.Tick(50 * time.Millisecond)
1232+
Loop:
1233+
for {
1234+
select {
1235+
case <-wait:
1236+
break Loop
1237+
case <-ticker:
1238+
// Verify that the leader is node 0.
1239+
status := leaderRepl.RaftStatus()
1240+
if status == nil || status.RaftState != raft.StateLeader {
1241+
t.Fatalf("raft leader should be node 0, but got status %+v", status)
1242+
}
1243+
1244+
// Check raft log size.
1245+
const logSizeLimit = 64 << 10 // 64 KB
1246+
curlogSize := leaderRepl.GetRaftLogSize()
1247+
logSize := curlogSize - initLogSize
1248+
logSizeStr := humanizeutil.IBytes(logSize)
1249+
if logSize > logSizeLimit {
1250+
t.Fatalf("raft log size grew to %s", logSizeStr)
1251+
}
1252+
t.Logf("raft log size grew to %s", logSizeStr)
1253+
case err := <-putRes:
1254+
t.Fatalf("write finished with quorum unavailable; err=%v", err)
1255+
}
1256+
}
1257+
1258+
// Start enough nodes to establish a quorum.
1259+
mtc.restartStore(2)
1260+
1261+
// The write should now succeed.
1262+
if err := <-putRes; err != nil {
1263+
t.Fatal(err)
1264+
}
1265+
})
1266+
}
1267+
11291268
// TestStoreRangeUpReplicate verifies that the replication queue will notice
11301269
// under-replicated ranges and replicate them. Also tests that preemptive
11311270
// snapshots which contain sideloaded proposals don't panic the receiving end.

pkg/storage/replica.go

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,12 @@ type Replica struct {
382382
// map must only be referenced while Replica.mu is held, except if the
383383
// element is removed from the map first. The notable exception is the
384384
// contained RaftCommand, which we treat as immutable.
385-
proposals map[storagebase.CmdIDKey]*ProposalData
385+
proposals map[storagebase.CmdIDKey]*ProposalData
386+
// remoteProposals is maintained by Raft leaders and stores in-flight
387+
// commands that were forwarded to the leader during its current term.
388+
// The set allows leaders to detect duplicate forwarded commands and
389+
// avoid re-proposing the same forwarded command multiple times.
390+
remoteProposals map[storagebase.CmdIDKey]struct{}
386391
internalRaftGroup *raft.RawNode
387392
// The ID of the replica within the Raft group. May be 0 if the replica has
388393
// been created from a preemptive snapshot (i.e. before being added to the
@@ -851,6 +856,7 @@ func (r *Replica) cancelPendingCommandsLocked() {
851856
r.cleanupFailedProposalLocked(p)
852857
p.finishApplication(pr)
853858
}
859+
r.mu.remoteProposals = nil
854860
}
855861

856862
// cleanupFailedProposalLocked cleans up after a proposal that has failed. It
@@ -3747,6 +3753,39 @@ func (r *Replica) stepRaftGroup(req *RaftMessageRequest) error {
37473753
// other replica is not quiesced, so we don't need to wake the leader.
37483754
r.unquiesceLocked()
37493755
r.refreshLastUpdateTimeForReplicaLocked(req.FromReplica.ReplicaID)
3756+
if req.Message.Type == raftpb.MsgProp {
3757+
// A proposal was forwarded to this replica.
3758+
if r.mu.replicaID == r.mu.leaderID {
3759+
// This replica is the leader. Record that the proposal
3760+
// was seen and drop the proposal if it was already seen.
3761+
// This prevents duplicate forwarded proposals from each
3762+
// being appended to a leader's raft log.
3763+
allSeen := true
3764+
for _, e := range req.Message.Entries {
3765+
switch e.Type {
3766+
case raftpb.EntryNormal:
3767+
cmdID, _ := DecodeRaftCommand(e.Data)
3768+
if r.mu.remoteProposals == nil {
3769+
r.mu.remoteProposals = map[storagebase.CmdIDKey]struct{}{}
3770+
}
3771+
if _, ok := r.mu.remoteProposals[cmdID]; !ok {
3772+
r.mu.remoteProposals[cmdID] = struct{}{}
3773+
allSeen = false
3774+
}
3775+
case raftpb.EntryConfChange:
3776+
// We could peek into the EntryConfChange to find the
3777+
// command ID, but we don't expect follower-initiated
3778+
// conf changes.
3779+
allSeen = false
3780+
default:
3781+
log.Fatalf(context.TODO(), "unexpected Raft entry: %v", e)
3782+
}
3783+
}
3784+
if allSeen {
3785+
return false /* unquiesceAndWakeLeader */, nil
3786+
}
3787+
}
3788+
}
37503789
err := raftGroup.Step(req.Message)
37513790
if err == raft.ErrProposalDropped {
37523791
// A proposal was forwarded to this replica but we couldn't propose it.
@@ -3855,6 +3894,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked(
38553894
if !r.store.TestingKnobs().DisableRefreshReasonNewLeader {
38563895
refreshReason = reasonNewLeader
38573896
}
3897+
// Clear the remote proposal set. Would have been nil already if not
3898+
// previously the leader.
3899+
r.mu.remoteProposals = nil
38583900
leaderID = roachpb.ReplicaID(rd.SoftState.Lead)
38593901
}
38603902

@@ -4212,17 +4254,28 @@ func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) {
42124254

42134255
r.mu.ticks++
42144256
r.mu.internalRaftGroup.Tick()
4257+
4258+
refreshAtDelta := r.store.cfg.RaftElectionTimeoutTicks
4259+
if knob := r.store.TestingKnobs().RefreshReasonTicksPeriod; knob > 0 {
4260+
refreshAtDelta = knob
4261+
}
42154262
if !r.store.TestingKnobs().DisableRefreshReasonTicks &&
4216-
r.mu.ticks%r.store.cfg.RaftElectionTimeoutTicks == 0 {
4263+
r.mu.replicaID != r.mu.leaderID &&
4264+
r.mu.ticks%refreshAtDelta == 0 {
42174265
// RaftElectionTimeoutTicks is a reasonable approximation of how long we
42184266
// should wait before deciding that our previous proposal didn't go
42194267
// through. Note that the combination of the above condition and passing
42204268
// RaftElectionTimeoutTicks to refreshProposalsLocked means that commands
42214269
// will be refreshed when they have been pending for 1 to 2 election
42224270
// cycles.
4223-
r.refreshProposalsLocked(
4224-
r.store.cfg.RaftElectionTimeoutTicks, reasonTicks,
4225-
)
4271+
//
4272+
// However, we don't refresh proposals if we are the leader because
4273+
// doing so would be useless. The commands tracked by a leader replica
4274+
// were either all proposed when the replica was a leader or were
4275+
// re-proposed when the replica became a leader. Either way, they are
4276+
// guaranteed to be in the leader's Raft log so re-proposing won't do
4277+
// anything.
4278+
r.refreshProposalsLocked(refreshAtDelta, reasonTicks)
42264279
}
42274280
return true, nil
42284281
}
@@ -4314,6 +4367,9 @@ func (r *Replica) maybeTransferRaftLeader(
43144367
if !r.isLeaseValidRLocked(l, now) {
43154368
return
43164369
}
4370+
if r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
4371+
return
4372+
}
43174373
if pr, ok := status.Progress[uint64(l.Replica.ReplicaID)]; ok && pr.Match >= status.Commit {
43184374
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", l.Replica.ReplicaID)
43194375
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
@@ -5024,6 +5080,9 @@ func (r *Replica) processRaftCommand(
50245080
delete(r.mu.proposals, idKey)
50255081
}
50265082

5083+
// Delete the entry for a forwarded proposal set.
5084+
delete(r.mu.remoteProposals, idKey)
5085+
50275086
leaseIndex, proposalRetry, forcedErr := r.checkForcedErrLocked(ctx, idKey, raftCmd, proposal, proposedLocally)
50285087

50295088
r.mu.Unlock()

pkg/storage/replica_proposal.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,8 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease) {
310310
r.txnWaitQueue.Clear(true /* disable */)
311311
}
312312

313-
if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) {
313+
if !iAmTheLeaseHolder && r.IsLeaseValid(newLease, r.store.Clock().Now()) &&
314+
!r.store.TestingKnobs().DisableLeaderFollowsLeaseholder {
314315
// If this replica is the raft leader but it is not the new lease holder,
315316
// then try to transfer the raft leadership to match the lease. We like it
316317
// when leases and raft leadership are collocated because that facilitates

pkg/storage/replica_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8141,8 +8141,14 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
81418141
t.Fatal(err)
81428142
}
81438143

8144-
electionTicks := tc.store.cfg.RaftElectionTimeoutTicks
8144+
// Only followers refresh pending commands during tick events. Change the
8145+
// replica that the range thinks is the leader so that the replica thinks
8146+
// it's a follower.
8147+
r.mu.Lock()
8148+
r.mu.leaderID = 2
8149+
r.mu.Unlock()
81458150

8151+
electionTicks := tc.store.cfg.RaftElectionTimeoutTicks
81468152
{
81478153
// The verifications of the reproposal counts below rely on r.mu.ticks
81488154
// starting with a value of 0 (modulo electionTicks). Move the replica into

pkg/storage/store.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,9 @@ type StoreTestingKnobs struct {
759759
DisableScanner bool
760760
// DisablePeriodicGossips disables periodic gossiping.
761761
DisablePeriodicGossips bool
762+
// DisableLeaderFollowsLeaseholder disables attempts to transfer raft
763+
// leadership when it diverges from the range's leaseholder.
764+
DisableLeaderFollowsLeaseholder bool
762765
// DisableRefreshReasonTicks disables refreshing pending commands when a new
763766
// leader is discovered.
764767
DisableRefreshReasonNewLeader bool
@@ -768,6 +771,10 @@ type StoreTestingKnobs struct {
768771
// DisableRefreshReasonTicks disables refreshing pending commands
769772
// periodically.
770773
DisableRefreshReasonTicks bool
774+
// RefreshReasonTicksPeriod overrides the default period over which
775+
// pending commands are refreshed. The period is specified as a multiple
776+
// of Raft group ticks.
777+
RefreshReasonTicksPeriod int
771778
// DisableProcessRaft disables the process raft loop.
772779
DisableProcessRaft bool
773780
// DisableLastProcessedCheck disables checking on replica queue last processed times.

0 commit comments

Comments
 (0)