Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 87 additions & 29 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"
"unsafe"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -62,6 +63,11 @@ var (
metaDistSenderNotLeaseHolderErrCount = metric.Metadata{
Name: "distsender.errors.notleaseholder",
Help: "Number of NotLeaseHolderErrors encountered"}
metaDistSenderFollowerReadEligibleCount = metric.Metadata{
Name: "distsender.followerread.eligible",
Help: "Number of requests sent to nearest replica due to follower read eligibility"}
// TODO(tschottdorf): should track follower-read-mismatches, where we sent to a
// follower but that follower wasn't able to serve it.
)

var rangeDescriptorCacheSize = settings.RegisterIntSetting(
Expand All @@ -72,22 +78,24 @@ var rangeDescriptorCacheSize = settings.RegisterIntSetting(

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
FollowerReadEligibleCount *metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
return DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaDistSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaDistSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
FollowerReadEligibleCount: metric.NewCounter(metaDistSenderFollowerReadEligibleCount),
}
}

Expand Down Expand Up @@ -126,12 +134,13 @@ type DistSender struct {
// rangeCache caches replica metadata for key ranges.
rangeCache *RangeDescriptorCache
// leaseHolderCache caches range lease holders by range ID.
leaseHolderCache *LeaseHolderCache
transportFactory TransportFactory
rpcContext *rpc.Context
rpcRetryOptions retry.Options
asyncSenderSem chan struct{}
asyncSenderCount int32
leaseHolderCache *LeaseHolderCache
transportFactory TransportFactory
rpcContext *rpc.Context
rpcRetryOptions retry.Options
asyncSenderSem chan struct{}
asyncSenderCount int32
followerReadInterval time.Duration
}

var _ client.Sender = &DistSender{}
Expand All @@ -151,7 +160,8 @@ type DistSenderConfig struct {
RPCContext *rpc.Context
RangeDescriptorDB RangeDescriptorDB

TestingKnobs ClientTestingKnobs
TestingKnobs ClientTestingKnobs
FollowerReadInterval time.Duration
}

// NewDistSender returns a batch.Sender instance which connects to the
Expand All @@ -160,10 +170,11 @@ type DistSenderConfig struct {
// defaults will be used.
func NewDistSender(cfg DistSenderConfig, g *gossip.Gossip) *DistSender {
ds := &DistSender{
st: cfg.Settings,
clock: cfg.Clock,
gossip: g,
metrics: makeDistSenderMetrics(),
st: cfg.Settings,
clock: cfg.Clock,
gossip: g,
metrics: makeDistSenderMetrics(),
followerReadInterval: cfg.FollowerReadInterval,
}
if ds.st == nil {
ds.st = cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -376,6 +387,38 @@ func (ds *DistSender) getDescriptor(
return desc, returnToken, nil
}

func (ds *DistSender) maybeCanReadFromFollower(ba roachpb.BatchRequest) bool {
if ds.followerReadInterval == 0 {
return false
}

// TODO(tschottdorf): this is simplistic. We need to gracefully handle the
// case of lagging followers, for example by reading from the leaseholder
// whenever a follower read fails. With that in, we may still have to
// track lagging replicas so we avoid them in the first place, though this
// may not matter in practice for a long long time (or ever). A similar
// problem exists when resolving an intent during a follower read; on
// retry the follower may still have the intent around. We should go to
// the leaseholder in that case as well.
//
// There's also a correctness problem here. We treat any read
// as a possible follower read, but follower reads never exhibit uncertainty
// restarts. The following can happen:
// 1. txn1 starts, gets timestamp 100, and does nothing for ~10s.
// 2. txn1 reads via follower read, so leader's timestamp cache does not see the update.
// 3. does something else, who knows, commits.
// 3. txn2 starts at timestamp 99 and writes a value invalidating the read on the lease
// holder.
// So basically we need to check whether the transaction has an empty uncertainty
// window (which is how we know it's a historical read).
estimatedClosedTimestamp := ds.clock.Now().Add(-ds.followerReadInterval.Nanoseconds(), 0)
if ba.GetActiveTimestamp(ds.clock.Now).Less(estimatedClosedTimestamp) {
ds.metrics.FollowerReadEligibleCount.Inc(1)
return true
}
return false
}

// sendSingleRange gathers and rearranges the replicas, and makes an RPC call.
func (ds *DistSender) sendSingleRange(
ctx context.Context, ba roachpb.BatchRequest, desc *roachpb.RangeDescriptor,
Expand All @@ -391,12 +434,27 @@ func (ds *DistSender) sendSingleRange(
}
replicas.OptimizeReplicaOrder(ds.getNodeDescriptor(), latencyFn)

// If this request needs to go to a lease holder and we know who that is, move
// it to the front.
if !ba.IsReadOnly() || ba.ReadConsistency.RequiresReadLease() {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
// If this request needs to go to a lease holder and we know who
// that is, possibly move it to the front.
readOnly := ba.IsReadOnly()
if !readOnly || ba.ReadConsistency.RequiresReadLease() {
// We can avoid sending to the leaseholder if the batch qualifies
// for being served by follower reads.
//
// TODO(tschottdorf): important improvements are to be made here:
// 1. if a follower for whatever reason falls behind, the request will
// bounce back here and we'll choose the same follower again until
// it catches back up. This isn't great; we should limit follower
// read attempts to one per batch, or even blacklist replicas that
// fail repeatedly (though that is lower priority).
// 2. we may want to add some jitter here to spread out load among replicas
// with similar latencies (or even actively prefer followers over lease-
// holders).
if !readOnly || !ds.maybeCanReadFromFollower(ba) {
if storeID, ok := ds.leaseHolderCache.Lookup(ctx, desc.RangeID); ok {
if i := replicas.FindReplica(storeID); i >= 0 {
replicas.MoveToFront(i)
}
}
}
}
Expand Down
44 changes: 26 additions & 18 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,41 @@ import (

//go:generate go run -tags gen-batch gen_batch.go

// SetActiveTimestamp sets the correct timestamp at which the request
// is to be carried out. For transactional requests, ba.Timestamp must
// be zero initially and it will be set to txn.OrigTimestamp (and
// forwarded to txn.SafeTimestamp if non-zero). For non-transactional
// requests, if no timestamp is specified, nowFn is used to create and
// set one.
func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
// GetActiveTimestamp returns the timestamp at which the batch will operate.
// For non-transactional requests, this is `ba.Timestamp` if set and nowFn() otherwise.
//
// For transactional requests, it is the OrigTimestamp forwarded by the RefreshTimestamp.
func (ba *BatchRequest) GetActiveTimestamp(nowFn func() hlc.Timestamp) hlc.Timestamp {
if txn := ba.Txn; txn != nil {
if ba.Timestamp != (hlc.Timestamp{}) {
return errors.New("transactional request must not set batch timestamp")
}

// Always use the original timestamp for reads and writes, even
// though some intents may be written at higher timestamps in the
// event of a WriteTooOldError.
ba.Timestamp = txn.OrigTimestamp
ts := txn.OrigTimestamp
// If a refreshed timestamp is set for the transaction, forward
// the batch timestamp to it. The refreshed timestamp indicates a
// future timestamp at which the transaction would like to commit
// to safely avoid a serializable transaction restart.
ba.Timestamp.Forward(txn.RefreshedTimestamp)
} else {
// When not transactional, allow empty timestamp and use nowFn instead
if ba.Timestamp == (hlc.Timestamp{}) {
ba.Timestamp = nowFn()
}
ts.Forward(txn.RefreshedTimestamp)
return ts
}
// When not transactional, use value from nowFn().
if ba.Timestamp == (hlc.Timestamp{}) {
return nowFn()
}
return ba.Timestamp
}

// SetActiveTimestamp sets ba.Timestamp to the active timestamp (as returned
// by GetActiveTimestamp).
//
// Note that for transactional batches, the method will verify that ba.Timestamp
// is initially unset. In particular, SetActiveTimestamp can be called only once
// in that case.
func (ba *BatchRequest) SetActiveTimestamp(nowFn func() hlc.Timestamp) error {
if ba.Txn != nil && ba.Timestamp != (hlc.Timestamp{}) {
return errors.New("transactional request must not set batch timestamp")
}
ba.Timestamp = ba.GetActiveTimestamp(nowFn)
return nil
}

Expand Down
58 changes: 32 additions & 26 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
s.registry,
)

// The store config is intialized to the default values here in order
// to pass the follower read interval to the distributed sender. It's
// augmented later in this method before sending to the node to create
// stores.
storeCfg := storage.StoreConfig{}
storeCfg.SetDefaults()

// A custom RetryOptions is created which uses stopper.ShouldQuiesce() as
// the Closer. This prevents infinite retry loops from occurring during
// graceful server shutdown
Expand All @@ -260,12 +267,13 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
}
retryOpts.Closer = s.stopper.ShouldQuiesce()
distSenderCfg := kv.DistSenderConfig{
AmbientCtx: s.cfg.AmbientCtx,
Settings: st,
Clock: s.clock,
RPCContext: s.rpcContext,
RPCRetryOptions: &retryOpts,
TestingKnobs: clientTestingKnobs,
AmbientCtx: s.cfg.AmbientCtx,
Settings: st,
Clock: s.clock,
RPCContext: s.rpcContext,
RPCRetryOptions: &retryOpts,
TestingKnobs: clientTestingKnobs,
FollowerReadInterval: storeCfg.FollowerReadInterval(),
}
s.distSender = kv.NewDistSender(distSenderCfg, s.gossip)
s.registry.AddMetricStruct(s.distSender.Metrics())
Expand Down Expand Up @@ -397,27 +405,25 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
var execCfg sql.ExecutorConfig

// TODO(bdarnell): make StoreConfig configurable.
storeCfg := storage.StoreConfig{
Settings: st,
AmbientCtx: s.cfg.AmbientCtx,
RaftConfig: s.cfg.RaftConfig,
Clock: s.clock,
DB: s.db,
Gossip: s.gossip,
NodeLiveness: s.nodeLiveness,
Transport: s.raftTransport,
RPCContext: s.rpcContext,
ScanInterval: s.cfg.ScanInterval,
ScanMaxIdleTime: s.cfg.ScanMaxIdleTime,
TimestampCachePageSize: s.cfg.TimestampCachePageSize,
HistogramWindowInterval: s.cfg.HistogramWindowInterval(),
StorePool: s.storePool,
SQLExecutor: internalExecutor,
LogRangeEvents: s.cfg.EventLogEnabled,
TimeSeriesDataStore: s.tsDB,
storeCfg.Settings = st
storeCfg.AmbientCtx = s.cfg.AmbientCtx
storeCfg.RaftConfig = s.cfg.RaftConfig
storeCfg.Clock = s.clock
storeCfg.DB = s.db
storeCfg.Gossip = s.gossip
storeCfg.NodeLiveness = s.nodeLiveness
storeCfg.Transport = s.raftTransport
storeCfg.RPCContext = s.rpcContext
storeCfg.ScanInterval = s.cfg.ScanInterval
storeCfg.ScanMaxIdleTime = s.cfg.ScanMaxIdleTime
storeCfg.TimestampCachePageSize = s.cfg.TimestampCachePageSize
storeCfg.HistogramWindowInterval = s.cfg.HistogramWindowInterval()
storeCfg.StorePool = s.storePool
storeCfg.SQLExecutor = internalExecutor
storeCfg.LogRangeEvents = s.cfg.EventLogEnabled
storeCfg.TimeSeriesDataStore = s.tsDB
storeCfg.EnableEpochRangeLeases = true

EnableEpochRangeLeases: true,
}
if storeTestingKnobs := s.cfg.TestingKnobs.Store; storeTestingKnobs != nil {
storeCfg.TestingKnobs = *storeTestingKnobs.(*storage.StoreTestingKnobs)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/storage/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestStoreRangeLease(t *testing.T) {

// Allow leases to expire and send commands to ensure we
// re-acquire, then check types again.
mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
for _, key := range splitKeys {
if _, err := mtc.dbs[0].Inc(context.TODO(), key, 1); err != nil {
t.Fatalf("%s failed to increment: %s", key, err)
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestStoreRangeLeaseSwitcheroo(t *testing.T) {

// Allow leases to expire and send commands to ensure we
// re-acquire, then check types again.
mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
if _, err := mtc.dbs[0].Inc(context.TODO(), splitKey, 1); err != nil {
t.Fatalf("failed to increment: %s", err)
}
Expand All @@ -134,7 +134,7 @@ func TestStoreRangeLeaseSwitcheroo(t *testing.T) {
sc.EnableEpochRangeLeases = false
mtc.restartStore(0)

mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
if _, err := mtc.dbs[0].Inc(context.TODO(), splitKey, 1); err != nil {
t.Fatalf("failed to increment: %s", err)
}
Expand All @@ -151,7 +151,7 @@ func TestStoreRangeLeaseSwitcheroo(t *testing.T) {
sc.EnableEpochRangeLeases = true
mtc.restartStore(0)

mtc.advanceClock(context.TODO())
mtc.expireLeases(context.TODO())
if _, err := mtc.dbs[0].Inc(context.TODO(), splitKey, 1); err != nil {
t.Fatalf("failed to increment: %s", err)
}
Expand Down
Loading