Skip to content

Commit b192bba

Browse files
committed
kvserver: allow historical reads on subsumed ranges
Currently, we block all requests on the RHS of a merge after it is subsumed. While we need to block all write & admin requests, we need only block "recent" read requests. In particular, if a read request for the RHS is old enough that its freeze timestamp is outside the request's uncertainty window, it is safe to let it through. This makes range merges a little bit less disruptive to foreground traffic. Release note: None
1 parent 71e1113 commit b192bba

9 files changed

Lines changed: 293 additions & 40 deletions

File tree

pkg/kv/kvserver/batcheval/cmd_subsume.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,6 @@ func Subsume(
173173
reply.FreezeStart = cArgs.EvalCtx.Clock().Now()
174174

175175
return result.Result{
176-
Local: result.LocalResult{MaybeWatchForMerge: true},
176+
Local: result.LocalResult{FreezeStart: reply.FreezeStart},
177177
}, nil
178178
}

pkg/kv/kvserver/batcheval/result/result.go

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/util/hlc"
1920
"github.com/cockroachdb/cockroach/pkg/util/log"
2021
"github.com/cockroachdb/errors"
2122
"github.com/kr/pretty"
@@ -63,8 +64,10 @@ type LocalResult struct {
6364
MaybeAddToSplitQueue bool
6465
// Call MaybeGossipNodeLiveness with the specified Span, if set.
6566
MaybeGossipNodeLiveness *roachpb.Span
66-
// Call maybeWatchForMerge.
67-
MaybeWatchForMerge bool
67+
// FreezeStart indicates the high water mark timestamp beyond which the range
68+
// is guaranteed to not have served any requests. This value is only set when
69+
// a range merge is in progress. If set, call maybeWatchForMerge.
70+
FreezeStart hlc.Timestamp
6871

6972
// Metrics contains counters which are to be passed to the
7073
// metrics subsystem.
@@ -84,7 +87,7 @@ func (lResult *LocalResult) IsZero() bool {
8487
!lResult.MaybeGossipSystemConfig &&
8588
!lResult.MaybeGossipSystemConfigIfHaveFailure &&
8689
lResult.MaybeGossipNodeLiveness == nil &&
87-
!lResult.MaybeWatchForMerge &&
90+
lResult.FreezeStart.IsEmpty() &&
8891
lResult.Metrics == nil
8992
}
9093

@@ -97,13 +100,13 @@ func (lResult *LocalResult) String() string {
97100
"#updated txns: %d #end txns: %d, "+
98101
"GossipFirstRange:%t MaybeGossipSystemConfig:%t "+
99102
"MaybeGossipSystemConfigIfHaveFailure:%t MaybeAddToSplitQueue:%t "+
100-
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
103+
"MaybeGossipNodeLiveness:%s FreezeStart:%s",
101104
lResult.Reply,
102105
len(lResult.EncounteredIntents), len(lResult.AcquiredLocks), len(lResult.ResolvedLocks),
103106
len(lResult.UpdatedTxns), len(lResult.EndTxns),
104107
lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig,
105108
lResult.MaybeGossipSystemConfigIfHaveFailure, lResult.MaybeAddToSplitQueue,
106-
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
109+
lResult.MaybeGossipNodeLiveness, lResult.FreezeStart)
107110
}
108111

109112
// DetachEncounteredIntents returns (and removes) those encountered
@@ -187,32 +190,32 @@ func coalesceBool(lhs *bool, rhs *bool) {
187190
func (p *Result) MergeAndDestroy(q Result) error {
188191
if q.Replicated.State != nil {
189192
if q.Replicated.State.RaftAppliedIndex != 0 {
190-
return errors.New("must not specify RaftApplyIndex")
193+
return errors.AssertionFailedf("must not specify RaftApplyIndex")
191194
}
192195
if q.Replicated.State.LeaseAppliedIndex != 0 {
193-
return errors.New("must not specify RaftApplyIndex")
196+
return errors.AssertionFailedf("must not specify RaftApplyIndex")
194197
}
195198
if p.Replicated.State == nil {
196199
p.Replicated.State = &kvserverpb.ReplicaState{}
197200
}
198201
if p.Replicated.State.Desc == nil {
199202
p.Replicated.State.Desc = q.Replicated.State.Desc
200203
} else if q.Replicated.State.Desc != nil {
201-
return errors.New("conflicting RangeDescriptor")
204+
return errors.AssertionFailedf("conflicting RangeDescriptor")
202205
}
203206
q.Replicated.State.Desc = nil
204207

205208
if p.Replicated.State.Lease == nil {
206209
p.Replicated.State.Lease = q.Replicated.State.Lease
207210
} else if q.Replicated.State.Lease != nil {
208-
return errors.New("conflicting Lease")
211+
return errors.AssertionFailedf("conflicting Lease")
209212
}
210213
q.Replicated.State.Lease = nil
211214

212215
if p.Replicated.State.TruncatedState == nil {
213216
p.Replicated.State.TruncatedState = q.Replicated.State.TruncatedState
214217
} else if q.Replicated.State.TruncatedState != nil {
215-
return errors.New("conflicting TruncatedState")
218+
return errors.AssertionFailedf("conflicting TruncatedState")
216219
}
217220
q.Replicated.State.TruncatedState = nil
218221

@@ -226,7 +229,7 @@ func (p *Result) MergeAndDestroy(q Result) error {
226229
}
227230

228231
if q.Replicated.State.Stats != nil {
229-
return errors.New("must not specify Stats")
232+
return errors.AssertionFailedf("must not specify Stats")
230233
}
231234
if (*q.Replicated.State != kvserverpb.ReplicaState{}) {
232235
log.Fatalf(context.TODO(), "unhandled EvalResult: %s",
@@ -238,42 +241,42 @@ func (p *Result) MergeAndDestroy(q Result) error {
238241
if p.Replicated.Split == nil {
239242
p.Replicated.Split = q.Replicated.Split
240243
} else if q.Replicated.Split != nil {
241-
return errors.New("conflicting Split")
244+
return errors.AssertionFailedf("conflicting Split")
242245
}
243246
q.Replicated.Split = nil
244247

245248
if p.Replicated.Merge == nil {
246249
p.Replicated.Merge = q.Replicated.Merge
247250
} else if q.Replicated.Merge != nil {
248-
return errors.New("conflicting Merge")
251+
return errors.AssertionFailedf("conflicting Merge")
249252
}
250253
q.Replicated.Merge = nil
251254

252255
if p.Replicated.ChangeReplicas == nil {
253256
p.Replicated.ChangeReplicas = q.Replicated.ChangeReplicas
254257
} else if q.Replicated.ChangeReplicas != nil {
255-
return errors.New("conflicting ChangeReplicas")
258+
return errors.AssertionFailedf("conflicting ChangeReplicas")
256259
}
257260
q.Replicated.ChangeReplicas = nil
258261

259262
if p.Replicated.ComputeChecksum == nil {
260263
p.Replicated.ComputeChecksum = q.Replicated.ComputeChecksum
261264
} else if q.Replicated.ComputeChecksum != nil {
262-
return errors.New("conflicting ComputeChecksum")
265+
return errors.AssertionFailedf("conflicting ComputeChecksum")
263266
}
264267
q.Replicated.ComputeChecksum = nil
265268

266269
if p.Replicated.RaftLogDelta == 0 {
267270
p.Replicated.RaftLogDelta = q.Replicated.RaftLogDelta
268271
} else if q.Replicated.RaftLogDelta != 0 {
269-
return errors.New("conflicting RaftLogDelta")
272+
return errors.AssertionFailedf("conflicting RaftLogDelta")
270273
}
271274
q.Replicated.RaftLogDelta = 0
272275

273276
if p.Replicated.AddSSTable == nil {
274277
p.Replicated.AddSSTable = q.Replicated.AddSSTable
275278
} else if q.Replicated.AddSSTable != nil {
276-
return errors.New("conflicting AddSSTable")
279+
return errors.AssertionFailedf("conflicting AddSSTable")
277280
}
278281
q.Replicated.AddSSTable = nil
279282

@@ -289,7 +292,7 @@ func (p *Result) MergeAndDestroy(q Result) error {
289292
if p.Replicated.PrevLeaseProposal == nil {
290293
p.Replicated.PrevLeaseProposal = q.Replicated.PrevLeaseProposal
291294
} else if q.Replicated.PrevLeaseProposal != nil {
292-
return errors.New("conflicting lease expiration")
295+
return errors.AssertionFailedf("conflicting lease expiration")
293296
}
294297
q.Replicated.PrevLeaseProposal = nil
295298

@@ -331,15 +334,21 @@ func (p *Result) MergeAndDestroy(q Result) error {
331334
if p.Local.MaybeGossipNodeLiveness == nil {
332335
p.Local.MaybeGossipNodeLiveness = q.Local.MaybeGossipNodeLiveness
333336
} else if q.Local.MaybeGossipNodeLiveness != nil {
334-
return errors.New("conflicting MaybeGossipNodeLiveness")
337+
return errors.AssertionFailedf("conflicting MaybeGossipNodeLiveness")
335338
}
336339
q.Local.MaybeGossipNodeLiveness = nil
337340

341+
if p.Local.FreezeStart.IsEmpty() {
342+
p.Local.FreezeStart = q.Local.FreezeStart
343+
} else if !q.Local.FreezeStart.IsEmpty() {
344+
return errors.AssertionFailedf("conflicting FreezeStart")
345+
}
346+
q.Local.FreezeStart = hlc.Timestamp{}
347+
338348
coalesceBool(&p.Local.GossipFirstRange, &q.Local.GossipFirstRange)
339349
coalesceBool(&p.Local.MaybeGossipSystemConfig, &q.Local.MaybeGossipSystemConfig)
340350
coalesceBool(&p.Local.MaybeGossipSystemConfigIfHaveFailure, &q.Local.MaybeGossipSystemConfigIfHaveFailure)
341351
coalesceBool(&p.Local.MaybeAddToSplitQueue, &q.Local.MaybeAddToSplitQueue)
342-
coalesceBool(&p.Local.MaybeWatchForMerge, &q.Local.MaybeWatchForMerge)
343352

344353
if p.Local.Metrics == nil {
345354
p.Local.Metrics = q.Local.Metrics

pkg/kv/kvserver/client_merge_test.go

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3676,6 +3676,175 @@ func TestInvalidSubsumeRequest(t *testing.T) {
36763676
}
36773677
}
36783678

3679+
// TestHistoricalReadsAfterSubsume tests that a subsumed right hand side range
3680+
// can only serve read-only traffic for timestamps that precede the subsumption
3681+
// time, but don't contain the subsumption time in their uncertainty interval.
3682+
func TestHistoricalReadsAfterSubsume(t *testing.T) {
3683+
defer leaktest.AfterTest(t)()
3684+
ctx := context.Background()
3685+
3686+
maxOffset := 100 * time.Millisecond
3687+
send := func(store *kvserver.Store,
3688+
desc *roachpb.RangeDescriptor,
3689+
ts hlc.Timestamp,
3690+
args roachpb.Request) error {
3691+
txn := roachpb.MakeTransaction("test txn", desc.StartKey.AsRawKey(),
3692+
0, ts, maxOffset.Nanoseconds())
3693+
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), roachpb.Header{Txn: &txn}, args)
3694+
return pErr.GoError()
3695+
}
3696+
checkRangeNotFound := func(err error) error {
3697+
if err == nil {
3698+
return errors.Newf("expected RangeNotFoundError, got nil")
3699+
}
3700+
if errors.HasType(err, (*roachpb.RangeNotFoundError)(nil)) {
3701+
return nil
3702+
}
3703+
return err
3704+
}
3705+
preUncertaintyTs := func(ts hlc.Timestamp) hlc.Timestamp {
3706+
return hlc.Timestamp{
3707+
WallTime: ts.GoTime().Add(-maxOffset).UnixNano() - 1,
3708+
Logical: ts.Logical,
3709+
}
3710+
}
3711+
3712+
type testCase struct {
3713+
name string
3714+
queryTsFunc func(freezeStart hlc.Timestamp) hlc.Timestamp
3715+
queryArgsFunc func(key roachpb.Key) roachpb.Request
3716+
shouldBlock bool
3717+
}
3718+
3719+
tests := []testCase{
3720+
// Ensure that a read query for a timestamp older than freezeStart-MaxOffset
3721+
// is let through.
3722+
{
3723+
name: "historical read",
3724+
queryTsFunc: preUncertaintyTs,
3725+
queryArgsFunc: func(key roachpb.Key) roachpb.Request {
3726+
return getArgs(key)
3727+
},
3728+
shouldBlock: false,
3729+
},
3730+
// Write queries for the same historical timestamp should block (and then
3731+
// eventually fail because the range no longer exists).
3732+
{
3733+
name: "historical write",
3734+
queryTsFunc: preUncertaintyTs,
3735+
queryArgsFunc: func(key roachpb.Key) roachpb.Request {
3736+
return putArgs(key, []byte(`test value`))
3737+
},
3738+
shouldBlock: true,
3739+
},
3740+
// Read queries that contain the subsumption time in its uncertainty interval
3741+
// should block and eventually fail.
3742+
{
3743+
name: "historical read with uncertainty",
3744+
queryTsFunc: func(freezeStart hlc.Timestamp) hlc.Timestamp {
3745+
return freezeStart.Prev()
3746+
},
3747+
queryArgsFunc: func(key roachpb.Key) roachpb.Request {
3748+
return getArgs(key)
3749+
},
3750+
shouldBlock: true,
3751+
},
3752+
}
3753+
3754+
for _, test := range tests {
3755+
t.Run(test.name, func(t *testing.T) {
3756+
tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc :=
3757+
setupClusterWithSubsumedRange(ctx, t, maxOffset)
3758+
defer tc.Stopper().Stop(ctx)
3759+
errCh := make(chan error)
3760+
go func() {
3761+
errCh <- send(store, rhsDesc, test.queryTsFunc(freezeStart),
3762+
test.queryArgsFunc(rhsDesc.StartKey.AsRawKey()))
3763+
}()
3764+
if test.shouldBlock {
3765+
waitForBlocked()
3766+
// Requests that wait for the merge must fail with a RangeNotFoundError
3767+
// because the RHS should cease to exist once the merge completes.
3768+
cleanupFunc()
3769+
require.NoError(t, checkRangeNotFound(<-errCh))
3770+
} else {
3771+
require.NoError(t, <-errCh)
3772+
// We cleanup *after* the non-blocking read request succeeds to prevent
3773+
// it from racing with the merge commit trigger.
3774+
cleanupFunc()
3775+
}
3776+
})
3777+
}
3778+
}
3779+
3780+
// setupClusterWithSubsumedRange returns a TestCluster during an ongoing merge
3781+
// transaction, such that the merge has been suspended right before the merge
3782+
// trigger is evaluated. This leaves the right hand side range of the merge in
3783+
// its subsumed state. It is the responsibility of the caller to call
3784+
// `cleanupFunc` to unblock the merge and Stop() the tc's Stopper when done.
3785+
func setupClusterWithSubsumedRange(
3786+
ctx context.Context, t *testing.T, testMaxOffset time.Duration,
3787+
) (
3788+
tc serverutils.TestClusterInterface,
3789+
store *kvserver.Store,
3790+
rhsDesc *roachpb.RangeDescriptor,
3791+
freezeStart hlc.Timestamp,
3792+
waitForBlocked func(),
3793+
cleanupFunc func(),
3794+
) {
3795+
state := mergeFilterState{
3796+
blockMergeTrigger: make(chan hlc.Timestamp),
3797+
finishMergeTxn: make(chan struct{}),
3798+
}
3799+
var blockedRequestCount int32
3800+
clusterArgs := base.TestClusterArgs{
3801+
ServerArgs: base.TestServerArgs{
3802+
Knobs: base.TestingKnobs{
3803+
Store: &kvserver.StoreTestingKnobs{
3804+
DisableMergeQueue: true,
3805+
MaxOffset: testMaxOffset,
3806+
TestingRequestFilter: state.suspendMergeTrigger,
3807+
TestingConcurrencyRetryFilter: func(
3808+
ctx context.Context, ba roachpb.BatchRequest, pErr *roachpb.Error,
3809+
) {
3810+
if _, ok := pErr.GetDetail().(*roachpb.MergeInProgressError); ok {
3811+
atomic.AddInt32(&blockedRequestCount, 1)
3812+
}
3813+
},
3814+
},
3815+
},
3816+
},
3817+
}
3818+
tc = serverutils.StartTestCluster(t, 1, clusterArgs)
3819+
ts := tc.Server(0)
3820+
stores, _ := ts.GetStores().(*kvserver.Stores)
3821+
store, err := stores.GetStore(ts.GetFirstStoreID())
3822+
require.NoError(t, err)
3823+
lhsDesc, rhsDesc, err := createSplitRanges(ctx, store)
3824+
require.NoError(t, err)
3825+
mergeArgs := adminMergeArgs(lhsDesc.StartKey.AsRawKey())
3826+
errCh := make(chan error)
3827+
go func() {
3828+
_, err := kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
3829+
errCh <- err.GoError()
3830+
}()
3831+
freezeStart = <-state.blockMergeTrigger
3832+
cleanupFunc = func() {
3833+
// Let the merge commit.
3834+
close(state.finishMergeTxn)
3835+
require.NoError(t, <-errCh)
3836+
}
3837+
waitForBlocked = func() {
3838+
testutils.SucceedsSoon(t, func() error {
3839+
if actualBlocked := atomic.LoadInt32(&blockedRequestCount); actualBlocked != 1 {
3840+
return errors.Newf("expected 1 blocked request but found %d", actualBlocked)
3841+
}
3842+
return nil
3843+
})
3844+
}
3845+
return tc, store, rhsDesc, freezeStart, waitForBlocked, cleanupFunc
3846+
}
3847+
36793848
func BenchmarkStoreRangeMerge(b *testing.B) {
36803849
ctx := context.Background()
36813850
var mtc multiTestContext

pkg/kv/kvserver/kvserverbase/base.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ func (f *FilterArgs) InRaftCmd() bool {
8080
// processing or non-nil to terminate processing with the returned error.
8181
type ReplicaRequestFilter func(context.Context, roachpb.BatchRequest) *roachpb.Error
8282

83+
// ReplicaConcurrencyRetryFilter can be used to examine a concurrency retry
84+
// error before it is handled and its batch is re-evaluated.
85+
type ReplicaConcurrencyRetryFilter func(context.Context, roachpb.BatchRequest, *roachpb.Error)
86+
8387
// ReplicaCommandFilter may be used in tests through the StoreTestingKnobs to
8488
// intercept the handling of commands and artificially generate errors. Return
8589
// nil to continue with regular processing or non-nil to terminate processing

0 commit comments

Comments
 (0)