Cluster Bus IO offload#3438
Conversation
|
This PR is based off #3324. So, we would need to get that change into avoid lot of refactoring on this PR. |
There was a problem hiding this comment.
This review is posted by AI
Review: Cluster Bus I/O Offload
Overall the architecture is sound — separating transport I/O from state mutation is the right design, and reusing the shared queues avoids unnecessary complexity. The design document is excellent.
I found a few issues worth addressing, detailed in inline comments below. The two most important are:
rcvbuf_lenmixed atomic/non-atomic access — undefined behavior in C11drainIOThreadsQueuedeadlock — main thread can deadlock when the MPSC outbox is full
The MPSC reservation concern with pthread_cancel is lower probability but worth considering a cooperative shutdown model.
| break; | ||
| } | ||
| /* Real read error. */ | ||
| result = CLUSTER_IO_READ_ERROR; |
There was a problem hiding this comment.
🔴 Thread Safety — rcvbuf_len mixed atomic/non-atomic access is UB
rcvbuf_len is declared atomic_size_t in clusterLink, but here in the I/O thread worker it's modified with a plain +=:
link->rcvbuf_len += nread;Meanwhile the main thread reads it with atomic_load_explicit in freeClusterLinkOnBufferLimitReached, and as a plain variable in clusterReadHandler.
In C11, mixing atomic and non-atomic access to the same _Atomic variable is undefined behavior. This may work on x86 but could break on ARM or with aggressive compiler optimizations.
Suggested fix: Use atomic_store_explicit/atomic_load_explicit consistently everywhere rcvbuf_len is accessed, or keep it non-atomic and use rcvbuf_alloc as a conservative upper bound in the buffer limit check when a read job is in flight.
| #endif | ||
| if (server.io_threads_num == 1) return; | ||
| serverAssert(inMainThread()); | ||
|
|
There was a problem hiding this comment.
🟡 Deadlock — drainIOThreadsQueue doesn't drain the MPSC outbox
This function spins waiting for io_jobs_finished to catch up to io_jobs_submitted. But I/O threads increment io_jobs_finished only after processing a job — and if they can't push their completion result to the full MPSC outbox (io_shared_outbox), they get stuck in flushPendingIOResponses retrying.
Since this function never calls processIOThreadsResponses(), nobody drains the outbox, and we have a classic deadlock: main thread waits for workers, workers wait for outbox space.
Suggested fix:
void drainIOThreadsQueue(void) {
serverAssert(inMainThread());
commitIOJobs();
while (getPendingIOThreadsJobs()) {
processIOThreadsResponses(); // drain outbox to unblock workers
atomic_thread_fence(memory_order_acquire);
}
}Note: updateIOThreads has a partial guard (pending > MPSC_QUEUE_SIZE check), but that doesn't cover all paths that call drainIOThreadsQueue.
There was a problem hiding this comment.
Good to highlight this issue and the one below on #3324 the PR which introduces this change.
| /* Reserve a slot (or use existing reservation) */ | ||
| if (!ticket->has_reservation) { | ||
| tail = atomic_fetch_add_explicit(&q->tail, 1, memory_order_relaxed); | ||
| } else { |
There was a problem hiding this comment.
🟡 MPSC reservation can permanently block dequeue if producer thread is cancelled
When the queue is full, mpscEnqueue reserves a slot (increments tail) but doesn't write data. The ticket saves the index for retry. However, mpscDequeueBatch stops at the first NULL slot:
if (!data) break;If a producer thread is killed via pthread_cancel after reserving a slot but before writing data, that NULL slot permanently blocks all subsequent dequeues.
In practice the window is narrow since pthread_cancel fires at cancellation points (top of the I/O loop), not mid-enqueue. But it's a latent risk.
Suggested fix: Consider cooperative shutdown (atomic flag + clean exit) instead of pthread_cancel. This eliminates the problem and is generally safer for lock-free data structures.
sarthakaggarwal97
left a comment
There was a problem hiding this comment.
Automated review found no actionable issues in this pass. It is not approving automatically.
|
Got unstable merged into this branch. |
6e20b2f to
61ddd4f
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughCluster bus reads, writes, and TLS accepts now offload to I/O threads with snapshot-based receive handling, deferred cluster-link teardown, ownership tracking, and updated cluster stats. Tests and docs were updated to cover the new offload paths and metrics. ChangesCluster Bus I/O Offload
Sequence Diagram(s)Cluster write offload sequenceDiagram
participant clusterWriteHandler
participant trySendClusterWriteToIOThreads
participant clusterWriteJob
participant processIOThreadsResponses
participant clusterHandleWriteCompletion
clusterWriteHandler->>trySendClusterWriteToIOThreads: enqueue cluster write job
trySendClusterWriteToIOThreads->>clusterWriteJob: JOB_REQ_CLUSTER_WRITE
clusterWriteJob->>processIOThreadsResponses: JOB_RES_CLUSTER_WRITE
processIOThreadsResponses->>clusterHandleWriteCompletion: invoke completion
TLS accept offload sequenceDiagram
participant clusterAcceptHandler
participant trySendClusterAcceptToIOThreads
participant clusterAcceptJob
participant processIOThreadsResponses
participant clusterHandleAcceptCompletion
participant clusterConnAcceptHandler
clusterAcceptHandler->>trySendClusterAcceptToIOThreads: enqueue cluster accept job
trySendClusterAcceptToIOThreads->>clusterAcceptJob: JOB_REQ_CLUSTER_ACCEPT
clusterAcceptJob->>processIOThreadsResponses: JOB_RES_CLUSTER_ACCEPT
processIOThreadsResponses->>clusterHandleAcceptCompletion: invoke completion
clusterHandleAcceptCompletion->>clusterConnAcceptHandler: finalize accepted cluster link
Estimated code review effort🎯 5 (Critical) | ⏱️ ~90+ minutes Possibly related issues
Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/cluster_legacy.c`:
- Around line 4725-4756: When clusterSnapshotPackets() encounters a bad trailing
header or length after successfully parsing one or more valid messages, ensure
the function records the parsed prefix before returning: set *snapshot_len =
offset (the byte length of the valid prefix) and leave *packet_count reflecting
parsed packets, then set *result to CLUSTER_IO_BAD_HEADER or
CLUSTER_IO_BAD_LENGTH and return; do not return early without publishing
snapshot_len/packet_count. Update the error branches that currently "return" on
memcmp(hdr->sig...) and totlen < minlen to first assign *snapshot_len = offset
and then set *result appropriately before returning so the synchronous path can
process the complete messages already parsed.
- Around line 8827-8875: clusterWriteJob (and the analogous threaded-write block
around lines 8964-9030) never updates cluster byte counters for successful
writes; update these threaded paths to account for bytes actually written by
calling clusterBusAddNetworkBytesByType() (which updates
cluster_stats_bytes_sent and per-type counters) whenever nwritten > 0,
accounting for partial writes by passing the number of bytes written for that
message fragment and the message's type (use msg/meta from
clusterMsgSendBlock/clusterMsg to determine the bus type). Ensure io_head_offset
and io_nodes_sent semantics remain correct when accounting bytes for partial and
full sends, and preserve existing sendToMainThread(link, JOB_RES_CLUSTER_WRITE)
behavior.
In `@src/unit/test_cluster_io_offload.cpp`:
- Around line 151-157: The helper makeConn() currently sets fc->conn.owner_kind
= CONN_OWNER_CLUSTER_LINK which makes accept-path tests start with a
cluster-owned connection; change makeConn() to accept an optional owner_kind
parameter (or create a separate makePreLinkConn()) and default to the pre-link
owner kind used in production for accept flow tests, then update the accept-path
tests to call the new parameterized helper or use the new makePreLinkConn() so
those tests begin with the correct non-cluster owner state instead of
CONN_OWNER_CLUSTER_LINK.
- Around line 398-406: The test leaves a synthetic accept dispatch referenced by
the shared I/O queue; after asserting the pending flag in
TEST_F(ClusterIOOffloadTest, AcceptDispatchSetsPendingFlag) you must unwind that
in-flight state to avoid a stale connection pointer during TearDown — mirror the
write-dispatch test by clearing the synthetic dispatch before returning: call
the test-only cleanup routine (testOnlyFreeIOThreadQueues()) to remove the
queued reference for the FakeConn created by makeConn() and ensure
CONN_FLAG_ACCEPT_OFFLOAD_PENDING is cleared on fc->conn (or otherwise released)
so owned_conns teardown won’t encounter a stale connection.
In `@tests/unit/cluster/cluster-io-offload.tcl`:
- Around line 17-49: Both tests mutate node 0 config but only restore on the
success path; wrap each test body (the code that calls R 0 config set ...,
wait_for_condition, and wait_for_cluster_state ok) in a try { ... } finally {
... } (or a catch/restore pattern) so the original values captured in oldlimit
and old_threads are always restored via R 0 config set <oldvalue> and cluster
state checked in the finally block; reference the test names ("buffer-limit
enforcement increments stat..." and "disabling io threads on one node causes
sync fallback"), the R 0 config set/get commands, wait_for_condition, and
wait_for_cluster_state ok when placing the restore logic.
In `@tests/unit/io-threads.tcl`:
- Around line 118-123: Remove the lifetime-bound assertion that checks the
cumulative used_active_time against the total sleep window (the assert using
used_active_time < ($sleep_time_ms/1000)); this check is flaky because
initial_active_times can already be large—keep the existing per-thread delta
check that computes used_active_time - $initial_active_times($i) and any
assertions using initial_active_times and getInfoProperty, so only drop the
cumulative assertion and rely on the delta-based assertion to validate
post-sleep activity.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 77bd46be-935a-438c-93c2-66fd0dc02015
📒 Files selected for processing (12)
design-docs/cluster-io-offload.mdsrc/cluster_legacy.csrc/cluster_legacy.hsrc/connection.hsrc/io_threads.csrc/io_threads.hsrc/server.csrc/server.hsrc/socket.csrc/unit/test_cluster_io_offload.cpptests/unit/cluster/cluster-io-offload.tcltests/unit/io-threads.tcl
| test "cluster_io_accepts_offloaded increments under tls reconnect churn" { | ||
| wait_for_condition 500 10 { | ||
| [getInfoProperty [R 0 info stats] cluster_io_accepts_offloaded] > 0 || \ | ||
| [getInfoProperty [R 1 info stats] cluster_io_accepts_offloaded] > 0 || \ | ||
| [getInfoProperty [R 2 info stats] cluster_io_accepts_offloaded] > 0 | ||
| } else { | ||
| fail "cluster_io_accepts_offloaded did not increase" | ||
| } | ||
| } |
There was a problem hiding this comment.
Make the TLS accept-offload check self-contained.
This test never triggers reconnect churn itself or captures a pre-churn baseline; it just checks whether cluster_io_accepts_offloaded is already non-zero. That can pass because of the previous restart test or even initial cluster boot, so it does not actually verify "under tls reconnect churn". Reset the relevant stats, induce the reconnects in this test, then assert the counter increased from the saved baseline.
As per coding guidelines: "Tests should not depend on execution order (test isolation)" and "Keep tests simple and focused on one scenario (test readability)".
zuiderkwast
left a comment
There was a problem hiding this comment.
Sorry for delay. I didn't get to finish the review yet so I'm just posting my pending comments now.
Maybe we don't need to add so many INFO fields? INFO is already bloated. I guess you needed them for developing/debugging.
Btw, we already count these reads in io_threaded_reads_processed and in the existing cluster message counters, so maybe we just need to add a "reads in main thread" counter? (And the same for writes)
The main-thread fallback path uses connWrite/connRead, which are non-blocking. "Synchronous" implies blocking I/O, which is not what happens. Per zuiderkwast's review feedback on PR valkey-io#3438.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #3438 +/- ##
============================================
+ Coverage 76.73% 76.80% +0.07%
============================================
Files 162 163 +1
Lines 81029 81712 +683
============================================
+ Hits 62175 62759 +584
- Misses 18854 18953 +99
🚀 New features to boost your workflow:
|
Use I/O threads for cluster bus reads, writes, and TLS accepts, mirroring the client I/O threading model. Splits packet framing from application, adds deferred link teardown with io_refs for safe concurrent I/O, dual send queues, and a framed packet queue on clusterLink. Adds observability counters (CLUSTER INFO). Signed-off-by: Harkrishn Patro <h_patro@apple.com>
b4fdc03 to
16d11d2
Compare
|
Squashed the changes. |
|
@zuiderkwast The CI is green, if you want to take a pass again. |
Fixes: #3361
Cluster Bus I/O Offload
Overview
Cluster bus read, write, and TLS accept work can run on the shared I/O thread
pool, while all cluster state mutation stays on the main thread.
Jobs flow through:
io_shared_inbox(SPMC): main thread -> I/O workersio_shared_outbox(MPSC): I/O workers -> main threadIf the pool is inactive or enqueue fails, the caller falls back to synchronous
I/O on the main thread and increments
stat_cluster_io_sync_fallbacks.Architecture
Key Design Decisions
connRead,connWrite,connAccept, framing, and result publication.send_msg_queueplus a snapshot boundary:io_last_send_block,io_head_offset, andio_nodes_sent.rcvbufsnapshot boundaries:io_rcvbuf_snapshot_lenandio_rcvbuf_snapshot_packets.rcvbufsnapshot fully; there is no bounded packet/time budget.io_refsandasync_close.CONN_FLAG_ACCEPT_OFFLOAD_PENDING.ConnOwnerKindlets generic connection/TLS code distinguish client-owned and cluster-owned connections safely.Data Flow: Read Path
If dispatch returns
C_ERR, the main thread falls back toclusterReadHandler(conn).Data Flow: Write Path
Important note:
send_msg_queue,but the worker stops at
io_last_send_block, so those new nodes are picked upby a later dispatch.
If dispatch returns
C_ERR,clusterWriteHandler(conn)falls back to the synchronousconnWrite()loop.Data Flow: Accept Path (TLS)
Important note:
connections are routed back to
trySendClusterAcceptToIOThreads, and thepending flag ensures only one accept job is in flight for that connection.
If dispatch returns
C_ERR, the main thread falls back toconnAccept(conn, clusterConnAcceptHandler).clusterLink I/O State
Read and write jobs are mutually exclusive per link.
Follow-Up
rcvbufsnapshot in one pass on the main thread and compacts the buffer after each packet. (might be fine for time being)CLUSTER_IO_BAD_HEADER/CLUSTER_IO_BAD_LENGTHis to close the link immediately, even if the same offloaded read also contained a valid packet prefix in the queued snapshot. This is accepted for now as an invalid-peer path, but if we ever need sync/offload parity here, read completion should drain the valid prefix before tearing the link down.