Skip to content

Commit f447eb0

Browse files
authored
Merge pull request #7355 from petermattis/pmattis/eager-replication
storage: eagerly replicate split ranges
2 parents ec0727b + 988628a commit f447eb0

7 files changed

Lines changed: 80 additions & 4 deletions

File tree

server/node_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func createAndStartTestNode(addr net.Addr, engines []engine.Engine, gossipBS net
110110
if err := node.start(addr, engines, roachpb.Attributes{}); err != nil {
111111
t.Fatal(err)
112112
}
113-
if err := waitForInitialSplits(node.ctx.DB); err != nil {
113+
if err := WaitForInitialSplits(node.ctx.DB); err != nil {
114114
t.Fatal(err)
115115
}
116116
return grpcServer, addr, node, stopper

server/testserver.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,13 @@ func ExpectedInitialRangeCount() int {
276276
// splits at startup. If the expected range count is not reached within a
277277
// configured timeout, an error is returned.
278278
func (ts *TestServer) WaitForInitialSplits() error {
279-
return waitForInitialSplits(ts.DB())
279+
return WaitForInitialSplits(ts.DB())
280280
}
281281

282-
func waitForInitialSplits(db *client.DB) error {
282+
// WaitForInitialSplits waits for the expected number of initial ranges to be
283+
// populated in the meta2 table. If the expected range count is not reached
284+
// within a configured timeout, an error is returned.
285+
func WaitForInitialSplits(db *client.DB) error {
283286
expectedRanges := ExpectedInitialRangeCount()
284287
return util.RetryForDuration(initialSplitsTimeout, func() error {
285288
// Scan all keys in the Meta2Prefix; we only need a count.

storage/helpers_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,18 @@ func (s *Store) LogReplicaChangeTest(txn *client.Txn, changeType roachpb.Replica
110110
return s.logChange(txn, changeType, replica, desc)
111111
}
112112

113+
// ReplicateQueuePurgatoryLength returns the number of replicas in replicate
114+
// queue purgatory.
115+
func (s *Store) ReplicateQueuePurgatoryLength() int {
116+
return s.replicateQueue.PurgatoryLength()
117+
}
118+
119+
// SetReplicaScannerDisabled turns replica scanning off or on as directed. Note
120+
// that while disabled, removals are still processed.
121+
func (s *Store) SetReplicaScannerDisabled(disabled bool) {
122+
s.scanner.SetDisabled(disabled)
123+
}
124+
113125
// GetLastIndex is the same function as LastIndex but it does not require
114126
// that the replica lock is held.
115127
func (r *Replica) GetLastIndex() (uint64, error) {

storage/queue.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,13 @@ func (bq *baseQueue) Length() int {
222222
return bq.mu.priorityQ.Len()
223223
}
224224

225+
// PurgatoryLength returns the current size of purgatory.
226+
func (bq *baseQueue) PurgatoryLength() int {
227+
bq.mu.Lock()
228+
defer bq.mu.Unlock()
229+
return len(bq.mu.purgatory)
230+
}
231+
225232
// SetDisabled turns queue processing off or on as directed.
226233
func (bq *baseQueue) SetDisabled(disabled bool) {
227234
if disabled {

storage/replica_command.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2307,6 +2307,13 @@ func (r *Replica) splitTrigger(
23072307
// Update store stats with difference in stats before and after split.
23082308
r.store.metrics.addMVCCStats(deltaMS)
23092309

2310+
// If the range was in not properly replicated before the split, the
2311+
// replicate queue may not have picked it up (due to the need for a
2312+
// split). Enqueue both new halves to speed up a potentially necessary
2313+
// replication. See #7022.
2314+
r.store.replicateQueue.MaybeAdd(r, r.store.Clock().Now())
2315+
r.store.replicateQueue.MaybeAdd(newRng, r.store.Clock().Now())
2316+
23102317
// To avoid leaving the new range unavailable as it waits to elect
23112318
// its leader, one (and only one) of the nodes should start an
23122319
// election as soon as the split is processed.

storage/replicate_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2016 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License.
14+
//
15+
// Author: Peter Mattis (peter@cockroachlabs.com)
16+
17+
package storage_test
18+
19+
import (
20+
"testing"
21+
22+
"github.com/cockroachdb/cockroach/server"
23+
"github.com/cockroachdb/cockroach/util/leaktest"
24+
)
25+
26+
func TestEagerReplication(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
store, stopper, _ := createTestStore(t)
29+
defer stopper.Stop()
30+
31+
// Disable the replica scanner so that we rely on the eager replication code
32+
// path that occurs after splits.
33+
store.SetReplicaScannerDisabled(true)
34+
35+
if err := server.WaitForInitialSplits(store.DB()); err != nil {
36+
t.Fatal(err)
37+
}
38+
39+
// After the initial splits have been performed, all of the resulting ranges
40+
// should be present in replicate queue purgatory (because we only have a
41+
// single store in the test and thus replication cannot succeed).
42+
expected := server.ExpectedInitialRangeCount()
43+
if n := store.ReplicateQueuePurgatoryLength(); expected != n {
44+
t.Fatalf("expected %d replicas in purgatory, but found %d",
45+
expected, n)
46+
}
47+
}

storage/store_pool.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ const (
5050

5151
// defaultDeclinedReservationsTimeout is the amount of time to consider the
5252
// store unavailable for up-replication after a reservation was declined.
53-
defaultDeclinedReservationsTimeout = 1 * time.Second
53+
defaultDeclinedReservationsTimeout = 0 * time.Second
5454

5555
// defaultReserveRPCTimeout is used for the rpc calls to Reserve on other
5656
// nodes. It should be short as this may block calls to ChangeReplicas.

0 commit comments

Comments
 (0)