Skip to content

Commit 2a1857f

Browse files
committed
kvserver: remove snapshot CanDecline flag
Only preemptive snapshots specified CanDecline, and we haven't been using those since 19.2. This also removes the corresponding rejection enum reason `SnapshotResponse_DECLINED`, and retires the `server.declined_reservation_timeout` cluster setting. Release note: None
1 parent 850099a commit 2a1857f

10 files changed

Lines changed: 128 additions & 342 deletions

File tree

pkg/kv/kvserver/client_raft_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,9 +1327,8 @@ func TestFailedSnapshotFillsReservation(t *testing.T) {
13271327
rep2Desc, found := desc.GetReplicaDescriptor(2)
13281328
require.True(t, found)
13291329
header := kvserver.SnapshotRequest_Header{
1330-
CanDecline: true,
1331-
RangeSize: 100,
1332-
State: kvserverpb.ReplicaState{Desc: desc},
1330+
RangeSize: 100,
1331+
State: kvserverpb.ReplicaState{Desc: desc},
13331332
RaftMessageRequest: kvserver.RaftMessageRequest{
13341333
RangeID: rep.RangeID,
13351334
FromReplica: repDesc,

pkg/kv/kvserver/raft.pb.go

Lines changed: 81 additions & 123 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/kv/kvserver/raft.proto

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@ message SnapshotRequest {
153153
}
154154

155155
message Header {
156-
reserved 1;
157-
158156
// The replica state at the time the snapshot was generated. Note
159157
// that ReplicaState.Desc differs from the above range_descriptor
160158
// field which holds the updated descriptor after the new replica
@@ -168,11 +166,6 @@ message SnapshotRequest {
168166
// The estimated size of the range, to be used in reservation decisions.
169167
int64 range_size = 3;
170168

171-
// can_decline is set on preemptive snapshots, but not those generated
172-
// by raft because at that point it is better to queue up the stream
173-
// than to cancel it.
174-
bool can_decline = 4;
175-
176169
// The priority of the snapshot.
177170
Priority priority = 6;
178171

@@ -193,6 +186,8 @@ message SnapshotRequest {
193186
//
194187
// TODO(irfansharif): Remove this in v22.1.
195188
bool deprecated_unreplicated_truncated_state = 8;
189+
190+
reserved 1, 4;
196191
}
197192

198193
Header header = 1;
@@ -214,7 +209,7 @@ message SnapshotResponse {
214209
ACCEPTED = 1;
215210
APPLIED = 2;
216211
ERROR = 3;
217-
DECLINED = 4;
212+
reserved 4;
218213
}
219214
Status status = 1;
220215
string message = 2;

pkg/kv/kvserver/replica_command.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2467,14 +2467,9 @@ func (r *Replica) sendSnapshot(
24672467
},
24682468
},
24692469
RangeSize: r.GetMVCCStats().Total(),
2470-
// Recipients currently cannot choose to decline any snapshots.
2471-
// In 19.2 and earlier versions pre-emptive snapshots could be declined.
2472-
//
2473-
// TODO(ajwerner): Consider removing the CanDecline flag.
2474-
CanDecline: false,
2475-
Priority: priority,
2476-
Strategy: SnapshotRequest_KV_BATCH,
2477-
Type: snapType,
2470+
Priority: priority,
2471+
Strategy: SnapshotRequest_KV_BATCH,
2472+
Type: snapType,
24782473
}
24792474
newBatchFn := func() storage.Batch {
24802475
return r.store.Engine().NewUnindexedBatch(true /* writeOnly */)

pkg/kv/kvserver/store_pool.go

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,6 @@ const (
4141
TestTimeUntilStoreDeadOff = 24 * time.Hour
4242
)
4343

44-
// DeclinedReservationsTimeout specifies a duration during which the local
45-
// replicate queue will not consider stores which have rejected a reservation a
46-
// viable target.
47-
var DeclinedReservationsTimeout = settings.RegisterDurationSetting(
48-
"server.declined_reservation_timeout",
49-
"the amount of time to consider the store throttled for up-replication after a reservation was declined",
50-
1*time.Second,
51-
settings.NonNegativeDuration,
52-
)
53-
5444
// FailedReservationsTimeout specifies a duration during which the local
5545
// replicate queue will not consider stores which have failed a reservation a
5646
// viable target.
@@ -914,7 +904,6 @@ type throttleReason int
914904

915905
const (
916906
_ throttleReason = iota
917-
throttleDeclined
918907
throttleFailed
919908
)
920909

@@ -929,19 +918,10 @@ func (sp *StorePool) throttle(reason throttleReason, why string, storeID roachpb
929918
detail := sp.getStoreDetailLocked(storeID)
930919
detail.throttledBecause = why
931920

932-
// If a snapshot is declined, be it due to an error or because it was
933-
// rejected, we mark the store detail as having been declined so it won't
934-
// be considered as a candidate for new replicas until after the configured
935-
// timeout period has passed.
921+
// If a snapshot is declined, we mark the store detail as having been declined
922+
// so it won't be considered as a candidate for new replicas until after the
923+
// configured timeout period has passed.
936924
switch reason {
937-
case throttleDeclined:
938-
timeout := DeclinedReservationsTimeout.Get(&sp.st.SV)
939-
detail.throttledUntil = sp.clock.PhysicalTime().Add(timeout)
940-
if log.V(2) {
941-
ctx := sp.AnnotateCtx(context.TODO())
942-
log.Infof(ctx, "snapshot declined (%s), s%d will be throttled for %s until %s",
943-
why, storeID, timeout, detail.throttledUntil)
944-
}
945925
case throttleFailed:
946926
timeout := FailedReservationsTimeout.Get(&sp.st.SV)
947927
detail.throttledUntil = sp.clock.PhysicalTime().Add(timeout)
@@ -950,6 +930,8 @@ func (sp *StorePool) throttle(reason throttleReason, why string, storeID roachpb
950930
log.Infof(ctx, "snapshot failed (%s), s%d will be throttled for %s until %s",
951931
why, storeID, timeout, detail.throttledUntil)
952932
}
933+
default:
934+
log.Warningf(sp.AnnotateCtx(context.TODO()), "unknown throttle reason %v", reason)
953935
}
954936
}
955937

pkg/kv/kvserver/store_pool_test.go

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -835,30 +835,15 @@ func TestStorePoolThrottle(t *testing.T) {
835835
sg := gossiputil.NewStoreGossiper(g)
836836
sg.GossipStores(uniqueStore, t)
837837

838-
{
839-
expected := sp.clock.Now().GoTime().Add(DeclinedReservationsTimeout.Get(&sp.st.SV))
840-
sp.throttle(throttleDeclined, "", 1)
841-
842-
sp.detailsMu.Lock()
843-
detail := sp.getStoreDetailLocked(1)
844-
sp.detailsMu.Unlock()
845-
if !detail.throttledUntil.Equal(expected) {
846-
t.Errorf("expected store to have been throttled to %v, found %v",
847-
expected, detail.throttledUntil)
848-
}
849-
}
838+
expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV))
839+
sp.throttle(throttleFailed, "", 1)
850840

851-
{
852-
expected := sp.clock.Now().GoTime().Add(FailedReservationsTimeout.Get(&sp.st.SV))
853-
sp.throttle(throttleFailed, "", 1)
854-
855-
sp.detailsMu.Lock()
856-
detail := sp.getStoreDetailLocked(1)
857-
sp.detailsMu.Unlock()
858-
if !detail.throttledUntil.Equal(expected) {
859-
t.Errorf("expected store to have been throttled to %v, found %v",
860-
expected, detail.throttledUntil)
861-
}
841+
sp.detailsMu.Lock()
842+
detail := sp.getStoreDetailLocked(1)
843+
sp.detailsMu.Unlock()
844+
if !detail.throttledUntil.Equal(expected) {
845+
t.Errorf("expected store to have been throttled to %v, found %v",
846+
expected, detail.throttledUntil)
862847
}
863848
}
864849

pkg/kv/kvserver/store_raft.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (s *Store) HandleSnapshot(
7474

7575
if s.IsDraining() {
7676
return stream.Send(&SnapshotResponse{
77-
Status: SnapshotResponse_DECLINED,
77+
Status: SnapshotResponse_ERROR,
7878
Message: storeDrainingMsg,
7979
})
8080
}

pkg/kv/kvserver/store_snapshot.go

Lines changed: 13 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ import (
4040

4141
const (
4242
// Messages that provide detail about why a snapshot was rejected.
43-
snapshotStoreTooFullMsg = "store almost out of disk space"
44-
snapshotApplySemBusyMsg = "store busy applying snapshots"
45-
storeDrainingMsg = "store is draining"
43+
storeDrainingMsg = "store is draining"
4644

4745
// IntersectingSnapshotMsg is part of the error message returned from
4846
// canAcceptSnapshotLocked and is exposed here so testing can rely on it.
@@ -500,38 +498,23 @@ func (kvSS *kvBatchSnapshotStrategy) Close(ctx context.Context) {
500498
}
501499

502500
// reserveSnapshot throttles incoming snapshots. The returned closure is used
503-
// to cleanup the reservation and release its resources. A nil cleanup function
504-
// and a non-empty rejectionMessage indicates the reservation was declined.
501+
// to cleanup the reservation and release its resources.
505502
func (s *Store) reserveSnapshot(
506503
ctx context.Context, header *SnapshotRequest_Header,
507-
) (_cleanup func(), _rejectionMsg string, _err error) {
504+
) (_cleanup func(), _err error) {
508505
tBegin := timeutil.Now()
509-
if header.RangeSize == 0 {
510-
// Empty snapshots are exempt from rate limits because they're so cheap to
511-
// apply. This vastly speeds up rebalancing any empty ranges created by a
512-
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
513-
// getting stuck behind large snapshots managed by the replicate queue.
514-
} else if header.CanDecline {
515-
storeDesc, ok := s.cfg.StorePool.getStoreDescriptor(s.StoreID())
516-
if ok && (!maxCapacityCheck(storeDesc) || header.RangeSize > storeDesc.Capacity.Available) {
517-
return nil, snapshotStoreTooFullMsg, nil
518-
}
519-
select {
520-
case s.snapshotApplySem <- struct{}{}:
521-
case <-ctx.Done():
522-
return nil, "", ctx.Err()
523-
case <-s.stopper.ShouldQuiesce():
524-
return nil, "", errors.Errorf("stopped")
525-
default:
526-
return nil, snapshotApplySemBusyMsg, nil
527-
}
528-
} else {
506+
507+
// Empty snapshots are exempt from rate limits because they're so cheap to
508+
// apply. This vastly speeds up rebalancing any empty ranges created by a
509+
// RESTORE or manual SPLIT AT, since it prevents these empty snapshots from
510+
// getting stuck behind large snapshots managed by the replicate queue.
511+
if header.RangeSize != 0 {
529512
select {
530513
case s.snapshotApplySem <- struct{}{}:
531514
case <-ctx.Done():
532-
return nil, "", ctx.Err()
515+
return nil, ctx.Err()
533516
case <-s.stopper.ShouldQuiesce():
534-
return nil, "", errors.Errorf("stopped")
517+
return nil, errors.Errorf("stopped")
535518
}
536519
}
537520

@@ -560,7 +543,7 @@ func (s *Store) reserveSnapshot(
560543
if header.RangeSize != 0 {
561544
<-s.snapshotApplySem
562545
}
563-
}, "", nil
546+
}, nil
564547
}
565548

566549
// canAcceptSnapshotLocked returns (_, nil) if the snapshot can be applied to
@@ -716,16 +699,10 @@ func (s *Store) receiveSnapshot(
716699
header.Type, storeID, header.State.Desc.Replicas())
717700
}
718701

719-
cleanup, rejectionMsg, err := s.reserveSnapshot(ctx, header)
702+
cleanup, err := s.reserveSnapshot(ctx, header)
720703
if err != nil {
721704
return err
722705
}
723-
if cleanup == nil {
724-
return stream.Send(&SnapshotResponse{
725-
Status: SnapshotResponse_DECLINED,
726-
Message: rejectionMsg,
727-
})
728-
}
729706
defer cleanup()
730707

731708
// The comment on ReplicaPlaceholder motivates and documents
@@ -995,7 +972,6 @@ func SendEmptySnapshot(
995972
State: state,
996973
RaftMessageRequest: req,
997974
RangeSize: ms.Total(),
998-
CanDecline: false,
999975
Priority: SnapshotRequest_RECOVERY,
1000976
Strategy: SnapshotRequest_KV_BATCH,
1001977
Type: SnapshotRequest_VIA_SNAPSHOT_QUEUE,
@@ -1056,20 +1032,6 @@ func sendSnapshot(
10561032
return err
10571033
}
10581034
switch resp.Status {
1059-
case SnapshotResponse_DECLINED:
1060-
if header.CanDecline {
1061-
declinedMsg := "reservation rejected"
1062-
if len(resp.Message) > 0 {
1063-
declinedMsg = resp.Message
1064-
}
1065-
err := &benignError{errors.Errorf("%s: remote declined %s: %s", to, snap, declinedMsg)}
1066-
storePool.throttle(throttleDeclined, err.Error(), to.StoreID)
1067-
return err
1068-
}
1069-
err := errors.Errorf("%s: programming error: remote declined required %s: %s",
1070-
to, snap, resp.Message)
1071-
storePool.throttle(throttleFailed, err.Error(), to.StoreID)
1072-
return err
10731035
case SnapshotResponse_ERROR:
10741036
storePool.throttle(throttleFailed, resp.Message, to.StoreID)
10751037
return errors.Errorf("%s: remote couldn't accept %s with error: %s",

0 commit comments

Comments
 (0)