Skip to content

kvserver: e2e flow control for raft messages #79755

@tbg

Description

@tbg

Is your feature request related to a problem? Please describe.

The flow control for raft messages is currently overly simplistic. A high-level overview of the current state follows.

Sending

Raft messages originate in a raft.Ready:

if hasReady = raftGroup.HasReady(); hasReady {
rd = raftGroup.Ready()
}

and are passed to the RaftTransport queues here:

if r.maybeCoalesceHeartbeat(ctx, msg, toReplica, fromReplica, false, nil) {
return
}
req := newRaftMessageRequest()
*req = kvserverpb.RaftMessageRequest{
RangeID: r.RangeID,
ToReplica: toReplica,
FromReplica: fromReplica,
Message: msg,
RangeStartKey: startKey, // usually nil
}
if !r.sendRaftMessageRequest(ctx, req) {
if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) {
r.mu.droppedMessages++
raftGroup.ReportUnreachable(msg.To)
return true, nil
}); err != nil && !errors.Is(err, errRemoved) {
log.Fatalf(ctx, "%v", err)
}
}
}

There is a single queue for messages to each destination store (irrespective of rangeID), which is essentially a 10k-buffered channel:

ch := make(chan *kvserverpb.RaftMessageRequest, raftSendBufferSize)
value, ok = queuesMap.LoadOrStore(int64(nodeID), unsafe.Pointer(&ch))

from which batches of messages are put on the wire:

case req := <-ch:
budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size())
batch.Requests = append(batch.Requests, *req)
releaseRaftMessageRequest(req)
// Pull off as many queued requests as possible, within reason.
for budget > 0 {
select {
case req = <-ch:
budget -= int64(req.Size())
batch.Requests = append(batch.Requests, *req)
releaseRaftMessageRequest(req)
default:
budget = -1
}
}
err := stream.Send(batch)
if err != nil {
return err
}

Receiving

Messages arrive here:

for i := range batch.Requests {
req := &batch.Requests[i]
atomic.AddInt64(&stats.serverRecv, 1)
if pErr := t.handleRaftRequest(ctx, req, stream); pErr != nil {
atomic.AddInt64(&stats.serverSent, 1)
if err := stream.Send(newRaftMessageResponse(req, pErr)); err != nil {
return err
}
}
}

and are put on the raft receive queue:

s.metrics.RaftRcvdMessages[req.Message.Type].Inc(1)
value, ok := s.replicaQueues.Load(int64(req.RangeID))
if !ok {
value, _ = s.replicaQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftRequestQueue{}))
}
q := (*raftRequestQueue)(value)
q.Lock()
defer q.Unlock()
if len(q.infos) >= replicaRequestQueueSize {
// TODO(peter): Return an error indicating the request was dropped. Note
// that dropping the request is safe. Raft will retry.
s.metrics.RaftRcvdMsgDropped.Inc(1)
return false
}
q.infos = append(q.infos, raftRequestInfo{
req: req,
respStream: respStream,
})

The next available raft scheduler goroutine will pick up the nonempty queue in

func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) bool {
value, ok := s.replicaQueues.Load(int64(rangeID))
if !ok {
return false
}
q := (*raftRequestQueue)(value)
infos, ok := q.drain()
if !ok {
return false
}
defer q.recycle(infos)
var hadError bool
for i := range infos {
info := &infos[i]
if pErr := s.withReplicaForRequest(
ctx, info.req, func(_ context.Context, r *Replica) *roachpb.Error {
return s.processRaftRequestWithReplica(r.raftCtx, r, info.req)
},

and hands it to the raft group (which will not perform work yet, but instead stages everything for the next Ready):

if err := r.stepRaftGroup(req); err != nil {
return roachpb.NewError(err)
}

Finally, a scheduler goroutine will actually process the replica for raft ready handling:

func (s *Store) processReady(rangeID roachpb.RangeID) {
r, ok := s.mu.replicasByRangeID.Load(rangeID)
if !ok {
return
}
ctx := r.raftCtx
start := timeutil.Now()
stats, expl, err := r.handleRaftReady(ctx, noSnap)
maybeFatalOnRaftReadyErr(ctx, expl, err)

which is where I/O happens and the memory is flushed to disk.

Describe the solution you'd like

Introduce a model that more deliberately handles the case in which a follower is receiving more load than it can handle. Control the memory usage, while prioritizing heartbeats (and generally allowing QOS), without excessive performance cliffs. Integrate with admission control.

See also #79215.

Jira issue: CRDB-15932

Epic CRDB-15069

Metadata

Metadata

Assignees

No one assigned

    Labels

    A-admission-controlC-enhancementSolution expected to add code/behavior + preserve backward-compat (pg compat issues are exception)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions