-
Notifications
You must be signed in to change notification settings - Fork 4.1k
kvserver: e2e flow control for raft messages #79755
Description
Is your feature request related to a problem? Please describe.
The flow control for raft messages is currently overly simplistic. A high-level overview of the current state follows.
Sending
Raft messages originate in a raft.Ready:
cockroach/pkg/kv/kvserver/replica_raft.go
Lines 596 to 598 in 40fd5d3
| if hasReady = raftGroup.HasReady(); hasReady { | |
| rd = raftGroup.Ready() | |
| } |
and are passed to the RaftTransport queues here:
cockroach/pkg/kv/kvserver/replica_raft.go
Lines 1443 to 1465 in 40fd5d3
| if r.maybeCoalesceHeartbeat(ctx, msg, toReplica, fromReplica, false, nil) { | |
| return | |
| } | |
| req := newRaftMessageRequest() | |
| *req = kvserverpb.RaftMessageRequest{ | |
| RangeID: r.RangeID, | |
| ToReplica: toReplica, | |
| FromReplica: fromReplica, | |
| Message: msg, | |
| RangeStartKey: startKey, // usually nil | |
| } | |
| if !r.sendRaftMessageRequest(ctx, req) { | |
| if err := r.withRaftGroup(true, func(raftGroup *raft.RawNode) (bool, error) { | |
| r.mu.droppedMessages++ | |
| raftGroup.ReportUnreachable(msg.To) | |
| return true, nil | |
| }); err != nil && !errors.Is(err, errRemoved) { | |
| log.Fatalf(ctx, "%v", err) | |
| } | |
| } | |
| } | |
There is a single queue for messages to each destination store (irrespective of rangeID), which is essentially a 10k-buffered channel:
cockroach/pkg/kv/kvserver/raft_transport.go
Lines 556 to 557 in c23d1aa
| ch := make(chan *kvserverpb.RaftMessageRequest, raftSendBufferSize) | |
| value, ok = queuesMap.LoadOrStore(int64(nodeID), unsafe.Pointer(&ch)) |
from which batches of messages are put on the wire:
cockroach/pkg/kv/kvserver/raft_transport.go
Lines 515 to 534 in c23d1aa
| case req := <-ch: | |
| budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - int64(req.Size()) | |
| batch.Requests = append(batch.Requests, *req) | |
| releaseRaftMessageRequest(req) | |
| // Pull off as many queued requests as possible, within reason. | |
| for budget > 0 { | |
| select { | |
| case req = <-ch: | |
| budget -= int64(req.Size()) | |
| batch.Requests = append(batch.Requests, *req) | |
| releaseRaftMessageRequest(req) | |
| default: | |
| budget = -1 | |
| } | |
| } | |
| err := stream.Send(batch) | |
| if err != nil { | |
| return err | |
| } |
Receiving
Messages arrive here:
cockroach/pkg/kv/kvserver/raft_transport.go
Lines 386 to 395 in c23d1aa
| for i := range batch.Requests { | |
| req := &batch.Requests[i] | |
| atomic.AddInt64(&stats.serverRecv, 1) | |
| if pErr := t.handleRaftRequest(ctx, req, stream); pErr != nil { | |
| atomic.AddInt64(&stats.serverSent, 1) | |
| if err := stream.Send(newRaftMessageResponse(req, pErr)); err != nil { | |
| return err | |
| } | |
| } | |
| } |
and are put on the raft receive queue:
cockroach/pkg/kv/kvserver/store_raft.go
Lines 167 to 185 in 98a66b5
| s.metrics.RaftRcvdMessages[req.Message.Type].Inc(1) | |
| value, ok := s.replicaQueues.Load(int64(req.RangeID)) | |
| if !ok { | |
| value, _ = s.replicaQueues.LoadOrStore(int64(req.RangeID), unsafe.Pointer(&raftRequestQueue{})) | |
| } | |
| q := (*raftRequestQueue)(value) | |
| q.Lock() | |
| defer q.Unlock() | |
| if len(q.infos) >= replicaRequestQueueSize { | |
| // TODO(peter): Return an error indicating the request was dropped. Note | |
| // that dropping the request is safe. Raft will retry. | |
| s.metrics.RaftRcvdMsgDropped.Inc(1) | |
| return false | |
| } | |
| q.infos = append(q.infos, raftRequestInfo{ | |
| req: req, | |
| respStream: respStream, | |
| }) |
The next available raft scheduler goroutine will pick up the nonempty queue in
cockroach/pkg/kv/kvserver/store_raft.go
Lines 442 to 460 in 98a66b5
| func (s *Store) processRequestQueue(ctx context.Context, rangeID roachpb.RangeID) bool { | |
| value, ok := s.replicaQueues.Load(int64(rangeID)) | |
| if !ok { | |
| return false | |
| } | |
| q := (*raftRequestQueue)(value) | |
| infos, ok := q.drain() | |
| if !ok { | |
| return false | |
| } | |
| defer q.recycle(infos) | |
| var hadError bool | |
| for i := range infos { | |
| info := &infos[i] | |
| if pErr := s.withReplicaForRequest( | |
| ctx, info.req, func(_ context.Context, r *Replica) *roachpb.Error { | |
| return s.processRaftRequestWithReplica(r.raftCtx, r, info.req) | |
| }, |
and hands it to the raft group (which will not perform work yet, but instead stages everything for the next Ready):
cockroach/pkg/kv/kvserver/store_raft.go
Lines 254 to 256 in 98a66b5
| if err := r.stepRaftGroup(req); err != nil { | |
| return roachpb.NewError(err) | |
| } |
Finally, a scheduler goroutine will actually process the replica for raft ready handling:
cockroach/pkg/kv/kvserver/store_raft.go
Lines 497 to 506 in 98a66b5
| func (s *Store) processReady(rangeID roachpb.RangeID) { | |
| r, ok := s.mu.replicasByRangeID.Load(rangeID) | |
| if !ok { | |
| return | |
| } | |
| ctx := r.raftCtx | |
| start := timeutil.Now() | |
| stats, expl, err := r.handleRaftReady(ctx, noSnap) | |
| maybeFatalOnRaftReadyErr(ctx, expl, err) |
which is where I/O happens and the memory is flushed to disk.
Describe the solution you'd like
Introduce a model that more deliberately handles the case in which a follower is receiving more load than it can handle. Control the memory usage, while prioritizing heartbeats (and generally allowing QOS), without excessive performance cliffs. Integrate with admission control.
See also #79215.
Jira issue: CRDB-15932
Epic CRDB-15069