Redesign IO threading communication model#3324
Conversation
Co-authored-by: Dan Touitou <dan.touitou@gmail.com> Signed-off-by: Uri Yagelnik <uriy@amazon.com>
6ac3d49 to
b6dbd28
Compare
|
The diff now contains many commits from unstable. I don't know it happened but I hope you can remove them and then git merge unstable. |
c5b9d5a to
97d6872
Compare
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
97d6872 to
1e4ea39
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #3324 +/- ##
============================================
+ Coverage 74.51% 76.44% +1.92%
============================================
Files 130 159 +29
Lines 72748 79512 +6764
============================================
+ Hits 54207 60781 +6574
- Misses 18541 18731 +190
🚀 New features to boost your workflow:
|
|
Can the 32-bit build failure be related to the |
Signed-off-by: akash kumar <akumdev@amazon.com>
zuiderkwast
left a comment
There was a problem hiding this comment.
It's a very large diff. 🤯 I think it's close, though I didn't proof-read the complete diff.
@akashkgit Can you incorporate the full description of #2909 in this PR? Then we can close that one and let only stick to this one instead.
| /* DEADLOCK PREVENTION: | ||
| * Check if the pending workload fits in the return queue. | ||
| * If the number of pending jobs is greater than the capacity of the Global MPSC queue, | ||
| * the worker threads might fill the queue and block. If we enter drainIOThreadsQueue | ||
| * in that state, we will deadlock (Main thread waits for worker, Worker waits for queue space). */ | ||
| size_t pending = getPendingIOResponsesCount(); | ||
|
|
||
| if (pending > MPSC_QUEUE_SIZE) { | ||
| if (err) *err = "Can't update IO threads under load, try again later"; | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
Interesting! OK, "try again later" makes sense in this case.
There was a problem hiding this comment.
I was playing around with this change and then to understand it further, I was trying to leverage the underlying infrastructure (queues) for offloading cluster IO operations. I should be able to create a PR after this gets merged.
Overall, I like the primitives being introduced here. Reminds of Golang channel/routine idea. We should leverage this for future work around IO and maintain this system.
| typedef enum { | ||
| JOB_REQ_READ_CLIENT = 0, | ||
| JOB_REQ_WRITE_CLIENT, | ||
| JOB_REQ_FREE_ARGV, | ||
| JOB_REQ_FREE_OBJ, | ||
| JOB_REQ_POLL, | ||
| JOB_REQ_ACCEPT, | ||
| JOB_REQ_COUNT | ||
| } JobRequest; | ||
| _Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic"); |
There was a problem hiding this comment.
We might have to move away from pointer tagging approach pretty soon. As other changes are going to leverage this infrastructure and we are going to run out of the bits (3) required to support all the jobs.
We could wrap the private_data with the actual object (client, ?) and type.
| * All rights reserved. | ||
| * SPDX-License-Identifier: BSD-3-Clause | ||
| * | ||
| * Implements different types of queues |
There was a problem hiding this comment.
We should call out these are bounded/buffered queues. Each of these are of different sizes. Did we determine the sizes {SPMC, SPSC -> 4096 and MPSC -> 16384} based on certain workloads ?
There was a problem hiding this comment.
Should the size be parameter of mpscInit ?
There was a problem hiding this comment.
Nobody knows why these sizes were picked. I wasn't written down...
To make them more generic, it seems reasonable to specify the size in init, but we can also do that later, when we want to use these queues for something else.
| /* Pushes an item into the queue and returns true if the queue is not full. | ||
| * Otherwise, a slot index is reserved and saved in the ticket, and returns false. | ||
| * Subsequent retries must pass the same ticket to fill the reserved slot, provided the queue is not full */ | ||
| extern bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket); |
There was a problem hiding this comment.
What's the benefit of this ticket/slot index reservation? I couldn't find a good example for it.
| JOB_REQ_ACCEPT, | ||
| JOB_REQ_COUNT | ||
| } JobRequest; | ||
| _Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic"); |
There was a problem hiding this comment.
Count won't be used for pointer tagging.
| _Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic"); | |
| _Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 8 for pointer arithmetic"); |
| JOB_RES_WRITE_CLIENT, | ||
| JOB_RES_COUNT | ||
| } JobResult; | ||
| _Static_assert(JOB_RES_COUNT <= 7, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic"); |
There was a problem hiding this comment.
| _Static_assert(JOB_RES_COUNT <= 7, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic"); | |
| _Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 8 for pointer arithmetic"); |
The SPMC queue from #3324 needs each `spmcCell` to be cache-line aligned, but plain `zmalloc()` does not guarantee that in all build configurations. This change introduces `zmalloc_cache_aligned()` and uses it for the SPMC queue buffer allocation in `spmcInit()`. Failing CI: https://github.com/valkey-io/valkey/actions/runs/24374139344 --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
The test was accidentally waking the IO threads while trying to check that they had gone idle. After the recent IO-thread refactor in valkey-io#3324, the [test](https://github.com/valkey-io/valkey/pull/3324/changes#diff-21314ec3a338f739eab1536f91f528d1efe7c6a93935a71b9c02f77a3858f121R112) started forcing `io-threads-always-active`, and its repeated `INFO` polling counted as fresh activity. So instead of just observing the worker threads, the test kept reactivating them and then flaked. --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> Signed-off-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com> Co-authored-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com>
valkey-io#3324 introduced `BATCH_SIZE` as a const int local variable and used it as an array bound. Clang 17 rejects this with: ``` io_threads.c:305:22: error: variable length array folded to constant array as an extension [-Werror,-Wgnu-folding-constant] 305 | void *batch_jobs[BATCH_SIZE]; | ^~~~~~~~~~ 1 error generated. make[1]: *** [io_threads.o] Error 1 make: *** [all] Error 2 ``` Old Clang versions do not emit this warning, maybe that is why the CI passed. Fix by promoting `BATCH_SIZE` to a file-scope `#define`. Signed-off-by: Yang Zhao <zymy701@gmail.com>
…3504) The SPMC queue from valkey-io#3324 needs each `spmcCell` to be cache-line aligned, but plain `zmalloc()` does not guarantee that in all build configurations. This change introduces `zmalloc_cache_aligned()` and uses it for the SPMC queue buffer allocation in `spmcInit()`. Failing CI: https://github.com/valkey-io/valkey/actions/runs/24374139344 --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
#### Summary
This PR redesigns the IO threading communication model, replacing the
inefficient client-list polling approach with a high-performance,
lock-free queue architecture. This change improves throughput by
**8–17%** across various workloads and lays the groundwork for
offloading command execution to IO threads in following PRs.
### Performance Comparison: Unstable vs New IO Queues
| Type | Operation | Unstable Branch (M TPS) | New IO Queues (M TPS)|
Difference (%) |
| :--- | :--- | :--- | :--- | :--- |
| **CME**<sup>1</sup> | SET | 1.02 | 1.19 | **+16.67%** |
| **CME** | GET | 1.30 | 1.47 | **+13.08%** |
| **CMD**<sup>2</sup> | SET | 1.15 | 1.35 | **+17.39%** |
| **CMD** | GET | 1.52 | 1.64 | **+7.89%** |
<sup>1</sup> Amazon terminology for cluster mode
<sup>2</sup> Amazon termonology for standalone mode, i.e. config
`cluster-enabled no`
- Test Configuration: 8 IO threads • 400 clients • 512-byte values • 3M
keys
#### Motivation
The previous IO model had several limitations that created performance
bottlenecks:
* **Inefficient Polling:** The main thread lacks a direct notification
mechanism for completed work. Instead, it must constantly iterate
through a list of all pending clients to check their state, wasting
significant CPU cycles.
* **Manual Load Balancing:** Jobs are assigned to specific threads
upfront. This requires the main thread to predict which thread to use,
often leaving some threads idle while others are overloaded.
* **Static Scaling:** Thread activation relies on a fixed heuristic
(e.g., 1 thread per 2 events). This approach fails to adapt to varying
workloads, such as TLS connections or differing read/write sizes.
### The Solution
To address these inefficiencies, this PR replaces the single SPSC queue
used currently with three specialized queues to handle communication and
load balancing more effectively.
#### 1. Main > IO: Shared Queue (Single Producer Multi Consumer)
Single queue from the main-thread to IO threads.
* **Automatic Load Balancing:** All threads pull from the same source.
Busy threads take less work, and idle threads take more, so we don't
need to manually select a thread.
* **Adaptive Scaling:** We now use the queue depth to decide when to add
or remove threads. If the queue is full, we scale up; if it's empty, we
scale down.
* *Ignition:* To get things started before the queue fills up, we
monitor the main thread's CPU. If usage goes over 30%, we wake up the
first IO thread.
* **Implementation:** To prevent contention among consumers, each item
in the ring buffer is padded to reside in its own cache line. Sequence
numbers are utilized to indicate whether a cell is empty or populated,
allowing threads to safely claim work.
#### 2. IO > Main: The Response Channel (MPSC Queue)
We replaced the old polling loop with a response queue.
* ** Faster Completion:** IO threads push completed jobs into this
queue. The main thread detects new data simply by checking if the queue
is not empty, removing the need to scan pending clients.
* **Contention Management:** To avoid lock contention, each thread
reserves a slot by atomically incrementing the tail index. In the rare
event that the queue is full, pending jobs are buffered in a local
temporary list until space becomes available.
#### 3. MAIN > IO (Thread-Specific): Private Inbox (SPSC Queue)
We kept the existing Single-Producer Single-Consumer (SPSC) queues for
tasks that must happen on a specific thread (like freeing memory
allocated by that thread). IO threads always check their private inbox
before looking at the shared queue.
### Changes Required
* **Async client release**
The main thread no longer busy-waits for IO threads to finish with a
client. Since the client must be popped from the multi-producer queue
before it can be released, clients with pending IO are now marked for
asynchronous closure.
* **eviction clients logic**
Updated evictClients() to account for memory pending release (clients
marked close_asap). freeClient() now returns a status code (1 for freed,
0 for async-close) to ensure the eviction loop does not over-evict by
ignoring memory that is about to be reclaimed.
* **events-per-io-thread config**
Replaced the `events-per-io-thread` configuration with
`io-threads-always-active`. as we no longer track events, since this
config is use only for tests no backward compatibility issue arises.
* **packed job instead of handlers**
Jobs are now represented as tagged pointers (using lower 3 bits for job
type) instead of separate `{handler, data}` structs. This reduces memory
overhead and allows jobs to be passed through the queues as single
pointers.
* **head caching in spsc queue**
The SPSC queue now caches the `head` index on the producer side
(`head_cache`) to avoid frequent atomic loads. The producer only
refreshes from the atomic `head` when the cache indicates the queue
might be full, reducing cross-thread cache-line bouncing.
* **deferred commit in SPSC queue**.
`spscEnqueue()` supports batching via a `commit` flag. Multiple jobs can
be enqueued with `commit=false`, then flushed with a single
`spscCommit()` call, reducing atomic operations and cache-line bouncing.
* **rollback on fullness check failure**
When `spmcEnqueue()` fails due to a full queue, the client state is
rolled back (e.g., `io_write_state` reset to `CLIENT_IDLE`). This
rollback approach removes the need to call an expensive `isFull` check
before every enqueue, we just attempt the enqueue and revert if it
fails.
* **epoll offloading via SPSC at high thread counts**.
When `active_io_threads_num > 9`, poll jobs are sent to per-thread SPSC
queues (round-robin). Since threads check their private queue first,
this ensures poll jobs are processed promptly without waiting behind
jobs in the shared SPMC queue.
* **avoid offload write before read comes back**
Added a check `if (c->io_read_state == CLIENT_PENDING_IO) return C_OK`
in `trySendWriteToIOThreads()`. In the previous per-thread SPSC
implementation, we could send consecutive read and write jobs for the
same client knowing a single thread would handle them in order. With the
shared SPMC queue, different threads may pick up the jobs, so we must
wait for the read to complete before sending a write to avoid 2 threads
handling the same client.
* **removing pending_read_list_node from client and
clients_pending_io_read/write lists from server**
Removed `pending_read_list_node` from the `client` struct and
`clients_pending_io_read`/`clients_pending_io_write` lists from
`valkeyServer`. as the new mpsc eliminates the need for these tracking
structures.
* **added inst metrics for pending io jobs**
Added `instantaneous_io_pending_jobs` metric via `STATS_METRIC_IO_WAIT`
to track average queue depth over time.
* **added stat for current active threads number**
Added `active_io_threads_num` to the INFO stats output for better
visibility.
* **added internal inst metric for main-thread cpu (non apple
compliant)**
Added `STATS_METRIC_MAIN_THREAD_CPU_SYS` to track main thread CPU usage
via `getrusage(RUSAGE_THREAD)`. This powers the "ignition" policy, when
CPU exceeds 30%, the first IO thread is activated. `RUSAGE_THREAD` is
Linux-specific, so macOS falls back to event-count heuristics.
* **added stat for pending read and writes for io**
Added `io_threaded_reads_pending` and `io_threaded_writes_pending` stats
to track how many read/write jobs are currently in-flight to IO threads.
* **added volatile for crashed**
Changed `server.crashed` from `int` to `volatile int` to ensure the
crash flag is visible across threads immediately, allowing IO threads to
detect a crash and stop sending responses back to the main thread to
avoid deadlock on crash.
---------
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Co-authored-by: Uri Yagelnik <uriy@amazon.com>
Co-authored-by: Dan Touitou <dan.touitou@gmail.com>
The test was accidentally waking the IO threads while trying to check that they had gone idle. After the recent IO-thread refactor in #3324, the [test](https://github.com/valkey-io/valkey/pull/3324/changes#diff-21314ec3a338f739eab1536f91f528d1efe7c6a93935a71b9c02f77a3858f121R112) started forcing `io-threads-always-active`, and its repeated `INFO` polling counted as fresh activity. So instead of just observing the worker threads, the test kept reactivating them and then flaked. --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> Signed-off-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com> Co-authored-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com>
#3324 introduced `BATCH_SIZE` as a const int local variable and used it as an array bound. Clang 17 rejects this with: ``` io_threads.c:305:22: error: variable length array folded to constant array as an extension [-Werror,-Wgnu-folding-constant] 305 | void *batch_jobs[BATCH_SIZE]; | ^~~~~~~~~~ 1 error generated. make[1]: *** [io_threads.o] Error 1 make: *** [all] Error 2 ``` Old Clang versions do not emit this warning, maybe that is why the CI passed. Fix by promoting `BATCH_SIZE` to a file-scope `#define`. Signed-off-by: Yang Zhao <zymy701@gmail.com>
The SPMC queue from #3324 needs each `spmcCell` to be cache-line aligned, but plain `zmalloc()` does not guarantee that in all build configurations. This change introduces `zmalloc_cache_aligned()` and uses it for the SPMC queue buffer allocation in `spmcInit()`. Failing CI: https://github.com/valkey-io/valkey/actions/runs/24374139344 --------- Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
…sers) Removed valkey-io#3504, valkey-io#3545, valkey-io#3503 - these fix bugs introduced by valkey-io#3324, valkey-io#2936, Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com> valkey-io#3392 respectively, all new in RC2. Users never experienced these bugs.
Follow-ups to the IO-threads redesign (#3324 / #3367). Bundles several independent fixes and refactors: - tryOffloadFreeObjToIOThreads: only offload the free of the sds buffer (which the I/O thread allocated). The robj itself is main-thread-owned and is now freed on the main thread. - trySendWriteToIOThreads: defer clearing last_header until after a successful enqueue, so the main thread can't mutate a header the I/O thread is concurrently reading. - evictClients: drop the pending_freed / close_asap bookkeeping and rely on freeClient's return value to bump stat_evictedclients. The old accounting double-counted protected clients and could under-evict. - Make MPSC/SPMC/SPSC queue sizes a runtime parameter on *Init(q, size) instead of compile-time *_QUEUE_SIZE macros in queues.h. The I/O-threads sizes (16384 / 4096 / 4096) now live as constants in io_threads.c; tests pass their own sizes. - updateIOThreads uses io_shared_outbox.queue_size instead of the macro. - Add function-level doc comments to queues.h. - Rename IOThreadFreeArgv/IOThreadPoll → ioThreadFreeArgv/ioThreadPoll for consistency. - Remove unused job_handler typedef; fix static_assert message (7 → 8). clang-format pass; Makefile tab→spaces nit. - Replace the getrusage(RUSAGE_THREAD) sys/user CPU sampling (and its #ifdef RUSAGE_THREAD fallback) with server.stat_active_time. One portable signal, one threshold (IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT = 30) replaces the dual sys>30% / sys>5%+user>50% rules. - Renames STATS_METRIC_MAIN_THREAD_CPU_{SYS,USER} → STATS_METRIC_MAIN_THREAD_ACTIVE_TIME. --------- Signed-off-by: akash kumar <akumdev@amazon.com> Signed-off-by: Akash Kumar <45854686+akashkgit@users.noreply.github.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Follow-ups to the IO-threads redesign (valkey-io#3324 / valkey-io#3367). Bundles several independent fixes and refactors: - tryOffloadFreeObjToIOThreads: only offload the free of the sds buffer (which the I/O thread allocated). The robj itself is main-thread-owned and is now freed on the main thread. - trySendWriteToIOThreads: defer clearing last_header until after a successful enqueue, so the main thread can't mutate a header the I/O thread is concurrently reading. - evictClients: drop the pending_freed / close_asap bookkeeping and rely on freeClient's return value to bump stat_evictedclients. The old accounting double-counted protected clients and could under-evict. - Make MPSC/SPMC/SPSC queue sizes a runtime parameter on *Init(q, size) instead of compile-time *_QUEUE_SIZE macros in queues.h. The I/O-threads sizes (16384 / 4096 / 4096) now live as constants in io_threads.c; tests pass their own sizes. - updateIOThreads uses io_shared_outbox.queue_size instead of the macro. - Add function-level doc comments to queues.h. - Rename IOThreadFreeArgv/IOThreadPoll → ioThreadFreeArgv/ioThreadPoll for consistency. - Remove unused job_handler typedef; fix static_assert message (7 → 8). clang-format pass; Makefile tab→spaces nit. - Replace the getrusage(RUSAGE_THREAD) sys/user CPU sampling (and its #ifdef RUSAGE_THREAD fallback) with server.stat_active_time. One portable signal, one threshold (IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT = 30) replaces the dual sys>30% / sys>5%+user>50% rules. - Renames STATS_METRIC_MAIN_THREAD_CPU_{SYS,USER} → STATS_METRIC_MAIN_THREAD_ACTIVE_TIME. --------- Signed-off-by: akash kumar <akumdev@amazon.com> Signed-off-by: Akash Kumar <45854686+akashkgit@users.noreply.github.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
Follow-ups to the IO-threads redesign (#3324 / #3367). Bundles several independent fixes and refactors: - tryOffloadFreeObjToIOThreads: only offload the free of the sds buffer (which the I/O thread allocated). The robj itself is main-thread-owned and is now freed on the main thread. - trySendWriteToIOThreads: defer clearing last_header until after a successful enqueue, so the main thread can't mutate a header the I/O thread is concurrently reading. - evictClients: drop the pending_freed / close_asap bookkeeping and rely on freeClient's return value to bump stat_evictedclients. The old accounting double-counted protected clients and could under-evict. - Make MPSC/SPMC/SPSC queue sizes a runtime parameter on *Init(q, size) instead of compile-time *_QUEUE_SIZE macros in queues.h. The I/O-threads sizes (16384 / 4096 / 4096) now live as constants in io_threads.c; tests pass their own sizes. - updateIOThreads uses io_shared_outbox.queue_size instead of the macro. - Add function-level doc comments to queues.h. - Rename IOThreadFreeArgv/IOThreadPoll → ioThreadFreeArgv/ioThreadPoll for consistency. - Remove unused job_handler typedef; fix static_assert message (7 → 8). clang-format pass; Makefile tab→spaces nit. - Replace the getrusage(RUSAGE_THREAD) sys/user CPU sampling (and its #ifdef RUSAGE_THREAD fallback) with server.stat_active_time. One portable signal, one threshold (IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT = 30) replaces the dual sys>30% / sys>5%+user>50% rules. - Renames STATS_METRIC_MAIN_THREAD_CPU_{SYS,USER} → STATS_METRIC_MAIN_THREAD_ACTIVE_TIME. --------- Signed-off-by: akash kumar <akumdev@amazon.com> Signed-off-by: Akash Kumar <45854686+akashkgit@users.noreply.github.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
When a replica executes `replicaof no one`, `replicationUnsetPrimary()` sets `server.primary_host = NULL` before calling `freeClient(server.primary)`. Since PR valkey-io#3324 ("Redesign IO threading communication model"), freeClient() on a primary client with pending IO is deferred via freeClientAsync. When it eventually executes, it chains through replicationCachePrimary() -> replicationHandlePrimaryDisconnection() which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. By that time, replicationUnsetPrimary() has already finalized the state with `repl_state = REPL_STATE_NONE`. The deferred free resurrects REPL_STATE_CONNECT while primary_host is NULL. replicationCron then calls connectWithPrimary() which passes NULL to connTLSConnect(), causing inet_pton(AF_INET, NULL, ...) to SIGSEGV. Fix by conditioning the state transition in replicationHandlePrimaryDisconnection() on primary_host being set. If primary_host is NULL, the disconnection is part of a deliberate unset that has already finalized state, so we set REPL_STATE_NONE instead of REPL_STATE_CONNECT. Additionally: - Add a NULL check for addr in connTLSConnect() as defense in depth. - Add a 10s timeout to the WAITAOF test that blocks indefinitely, preventing the test from hanging if the replica fails to sync. Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
When a replica executes `replicaof no one`, `replicationUnsetPrimary()` sets `server.primary_host = NULL` before calling `freeClient(server.primary)`. Since PR valkey-io#3324 ("Redesign IO threading communication model"), freeClient() on a primary client with pending IO is deferred via freeClientAsync. When it eventually executes, it chains through replicationCachePrimary() -> replicationHandlePrimaryDisconnection() which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. By that time, replicationUnsetPrimary() has already finalized the state with `repl_state = REPL_STATE_NONE`. The deferred free resurrects REPL_STATE_CONNECT while primary_host is NULL. replicationCron then calls connectWithPrimary() which passes NULL to connTLSConnect(), causing inet_pton(AF_INET, NULL, ...) to SIGSEGV. Fix by conditioning the state transition in replicationHandlePrimaryDisconnection() on primary_host being set. If primary_host is NULL, the disconnection is part of a deliberate unset that has already finalized state, so we set REPL_STATE_NONE instead of REPL_STATE_CONNECT. Additionally: - Add a NULL check for addr in connTLSConnect() as defense in depth. - Add a 10s timeout to the WAITAOF test that blocks indefinitely, preventing the test from hanging if the replica fails to sync. Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
When a replica executes `replicaof no one`, `replicationUnsetPrimary()` sets `server.primary_host = NULL` before calling `freeClient(server.primary)`. Since PR valkey-io#3324 ("Redesign IO threading communication model"), freeClient() on a primary client with pending IO is deferred via freeClientAsync. When it eventually executes, it chains through replicationCachePrimary() -> replicationHandlePrimaryDisconnection() which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. By that time, replicationUnsetPrimary() has already finalized the state with `repl_state = REPL_STATE_NONE`. The deferred free resurrects REPL_STATE_CONNECT while primary_host is NULL. replicationCron then calls connectWithPrimary() which passes NULL to connTLSConnect(), causing inet_pton(AF_INET, NULL, ...) to SIGSEGV. Fix by conditioning the state transition in replicationHandlePrimaryDisconnection() on primary_host being set. If primary_host is NULL, the disconnection is part of a deliberate unset that has already finalized state, so we set REPL_STATE_NONE instead of REPL_STATE_CONNECT. Additionally: - Add a NULL check for addr in connTLSConnect() as defense in depth. - Add a 10s timeout to the WAITAOF test that blocks indefinitely, preventing the test from hanging if the replica fails to sync. Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
Since PR valkey-io#3324, freeClient() on a primary client with pending IO is deferred via freeClientAsync. The deferred free eventually chains through replicationCachePrimary() -> replicationHandlePrimaryDisconnection(), which unconditionally set repl_state = REPL_STATE_CONNECT. This causes two bugs: 1. REPLICAOF NO ONE: primary_host is NULL when the deferred free runs, so replicationCron calls connectWithPrimary(NULL) -> SIGSEGV in connTLSConnect (inet_pton with NULL addr). 2. REPLICAOF newhost newport: the deferred free clobbers the already- progressed repl_state (CONNECTING) back to CONNECT, causing replicationCron to call connectWithPrimary() again, which overwrites server.repl_transfer_s without closing the previous connection (FD leak). Fix by making replicationHandlePrimaryDisconnection() only transition to REPL_STATE_CONNECT when repl_state is still REPL_STATE_CONNECTED (meaning this is a genuine disconnect, not a stale deferred free). If repl_state has already moved on, the deferred free is stale and should not mutate the state machine. Additionally: - Add NULL check for addr in connTLSConnect() as defense in depth. - Add 10s timeout to the WAITAOF test to prevent indefinite hanging. - Add dedicated tests for the repoint scenario. Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
…3719) ## Summary This PR addresses 2 race conditions where deferred `freeClient` (introduced by #3324) clobbers replication state set by `REPLICAOF` commands. Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI failure in `tests/unit/wait.tcl`. ## Root Cause Since PR #3324 ("Redesign IO threading communication model"), `freeClient()` on a primary client with pending IO is deferred via `freeClientAsync` (gated on `clientHasPendingIO`). When the deferred free eventually executes, it chains through `replicationCachePrimary()` -> `replicationHandlePrimaryDisconnection()`, which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. This causes two bugs: ### Bug 1: REPLICAOF NO ONE (SIGSEGV) `replicationUnsetPrimary()` sets `primary_host = NULL` before calling `freeClient`. The deferred free runs later, sets `repl_state = REPL_STATE_CONNECT` while `primary_host` is still NULL. `replicationCron` then calls `connectWithPrimary()` which passes NULL to `connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV. ### Bug 2: REPLICAOF newhost newport (connection leak) `replicationSetPrimary()` calls `freeClient(old_primary)` (deferred), then sets `primary_host` to the new IP and progresses `repl_state` to `REPL_STATE_CONNECTING` with a new connection handle in `server.repl_transfer_s`. The deferred free runs later, clobbers `repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls `connectWithPrimary()` again, overwriting `server.repl_transfer_s` without closing the previous connection -- an FD leak. ## Fix Make `replicationHandlePrimaryDisconnection()` only transition to `REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED` and `primary_host` is set. This means the disconnection is genuine and no other state transition has already occurred. If `repl_state` has already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is stale and the function leaves the state untouched. Additionally: - `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in depth). - `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test, and add dedicated tests for the repoint scenario. ## Reproduction Reproduced locally by establishing TLS replication and executing `REPLICAOF NO ONE`: - Without fix: server crashes with signal 11, accessing address 0x0 - With fix: server continues operating normally ## Testing - Full `unit/wait` test suite passes (51/51) with IO threads enabled - New tests "Repoint replica between primaries does not leak connections or crash" and "Rapid repoint does not crash or leak" pass - Crash reproduced locally over TLS (SIGSEGV without fix, graceful handling with fix) --------- Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com> Signed-off-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
…alkey-io#3719) ## Summary This PR addresses 2 race conditions where deferred `freeClient` (introduced by valkey-io#3324) clobbers replication state set by `REPLICAOF` commands. Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI failure in `tests/unit/wait.tcl`. ## Root Cause Since PR valkey-io#3324 ("Redesign IO threading communication model"), `freeClient()` on a primary client with pending IO is deferred via `freeClientAsync` (gated on `clientHasPendingIO`). When the deferred free eventually executes, it chains through `replicationCachePrimary()` -> `replicationHandlePrimaryDisconnection()`, which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. This causes two bugs: ### Bug 1: REPLICAOF NO ONE (SIGSEGV) `replicationUnsetPrimary()` sets `primary_host = NULL` before calling `freeClient`. The deferred free runs later, sets `repl_state = REPL_STATE_CONNECT` while `primary_host` is still NULL. `replicationCron` then calls `connectWithPrimary()` which passes NULL to `connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV. ### Bug 2: REPLICAOF newhost newport (connection leak) `replicationSetPrimary()` calls `freeClient(old_primary)` (deferred), then sets `primary_host` to the new IP and progresses `repl_state` to `REPL_STATE_CONNECTING` with a new connection handle in `server.repl_transfer_s`. The deferred free runs later, clobbers `repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls `connectWithPrimary()` again, overwriting `server.repl_transfer_s` without closing the previous connection -- an FD leak. ## Fix Make `replicationHandlePrimaryDisconnection()` only transition to `REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED` and `primary_host` is set. This means the disconnection is genuine and no other state transition has already occurred. If `repl_state` has already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is stale and the function leaves the state untouched. Additionally: - `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in depth). - `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test, and add dedicated tests for the repoint scenario. ## Reproduction Reproduced locally by establishing TLS replication and executing `REPLICAOF NO ONE`: - Without fix: server crashes with signal 11, accessing address 0x0 - With fix: server continues operating normally ## Testing - Full `unit/wait` test suite passes (51/51) with IO threads enabled - New tests "Repoint replica between primaries does not leak connections or crash" and "Rapid repoint does not crash or leak" pass - Crash reproduced locally over TLS (SIGSEGV without fix, graceful handling with fix) --------- Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com> Signed-off-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com> Signed-off-by: Alon Arenberg <alonare@amazon.com>
…3719) ## Summary This PR addresses 2 race conditions where deferred `freeClient` (introduced by #3324) clobbers replication state set by `REPLICAOF` commands. Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI failure in `tests/unit/wait.tcl`. ## Root Cause Since PR #3324 ("Redesign IO threading communication model"), `freeClient()` on a primary client with pending IO is deferred via `freeClientAsync` (gated on `clientHasPendingIO`). When the deferred free eventually executes, it chains through `replicationCachePrimary()` -> `replicationHandlePrimaryDisconnection()`, which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. This causes two bugs: ### Bug 1: REPLICAOF NO ONE (SIGSEGV) `replicationUnsetPrimary()` sets `primary_host = NULL` before calling `freeClient`. The deferred free runs later, sets `repl_state = REPL_STATE_CONNECT` while `primary_host` is still NULL. `replicationCron` then calls `connectWithPrimary()` which passes NULL to `connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV. ### Bug 2: REPLICAOF newhost newport (connection leak) `replicationSetPrimary()` calls `freeClient(old_primary)` (deferred), then sets `primary_host` to the new IP and progresses `repl_state` to `REPL_STATE_CONNECTING` with a new connection handle in `server.repl_transfer_s`. The deferred free runs later, clobbers `repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls `connectWithPrimary()` again, overwriting `server.repl_transfer_s` without closing the previous connection -- an FD leak. ## Fix Make `replicationHandlePrimaryDisconnection()` only transition to `REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED` and `primary_host` is set. This means the disconnection is genuine and no other state transition has already occurred. If `repl_state` has already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is stale and the function leaves the state untouched. Additionally: - `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in depth). - `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test, and add dedicated tests for the repoint scenario. ## Reproduction Reproduced locally by establishing TLS replication and executing `REPLICAOF NO ONE`: - Without fix: server crashes with signal 11, accessing address 0x0 - With fix: server continues operating normally ## Testing - Full `unit/wait` test suite passes (51/51) with IO threads enabled - New tests "Repoint replica between primaries does not leak connections or crash" and "Rapid repoint does not crash or leak" pass - Crash reproduced locally over TLS (SIGSEGV without fix, graceful handling with fix) --------- Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com> Signed-off-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
…3719) ## Summary This PR addresses 2 race conditions where deferred `freeClient` (introduced by #3324) clobbers replication state set by `REPLICAOF` commands. Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI failure in `tests/unit/wait.tcl`. ## Root Cause Since PR #3324 ("Redesign IO threading communication model"), `freeClient()` on a primary client with pending IO is deferred via `freeClientAsync` (gated on `clientHasPendingIO`). When the deferred free eventually executes, it chains through `replicationCachePrimary()` -> `replicationHandlePrimaryDisconnection()`, which unconditionally sets `server.repl_state = REPL_STATE_CONNECT`. This causes two bugs: ### Bug 1: REPLICAOF NO ONE (SIGSEGV) `replicationUnsetPrimary()` sets `primary_host = NULL` before calling `freeClient`. The deferred free runs later, sets `repl_state = REPL_STATE_CONNECT` while `primary_host` is still NULL. `replicationCron` then calls `connectWithPrimary()` which passes NULL to `connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV. ### Bug 2: REPLICAOF newhost newport (connection leak) `replicationSetPrimary()` calls `freeClient(old_primary)` (deferred), then sets `primary_host` to the new IP and progresses `repl_state` to `REPL_STATE_CONNECTING` with a new connection handle in `server.repl_transfer_s`. The deferred free runs later, clobbers `repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls `connectWithPrimary()` again, overwriting `server.repl_transfer_s` without closing the previous connection -- an FD leak. ## Fix Make `replicationHandlePrimaryDisconnection()` only transition to `REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED` and `primary_host` is set. This means the disconnection is genuine and no other state transition has already occurred. If `repl_state` has already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is stale and the function leaves the state untouched. Additionally: - `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in depth). - `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test, and add dedicated tests for the repoint scenario. ## Reproduction Reproduced locally by establishing TLS replication and executing `REPLICAOF NO ONE`: - Without fix: server crashes with signal 11, accessing address 0x0 - With fix: server continues operating normally ## Testing - Full `unit/wait` test suite passes (51/51) with IO threads enabled - New tests "Repoint replica between primaries does not leak connections or crash" and "Rapid repoint does not crash or leak" pass - Crash reproduced locally over TLS (SIGSEGV without fix, graceful handling with fix) --------- Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com> Signed-off-by: Ran Shidlansik <ranshid@amazon.com> Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
valkey-io/valkey#3324 introduced `BATCH_SIZE` as a const int local variable and used it as an array bound. Clang 17 rejects this with: ``` io_threads.c:305:22: error: variable length array folded to constant array as an extension [-Werror,-Wgnu-folding-constant] 305 | void *batch_jobs[BATCH_SIZE]; | ^~~~~~~~~~ 1 error generated. make[1]: *** [io_threads.o] Error 1 make: *** [all] Error 2 ``` Old Clang versions do not emit this warning, maybe that is why the CI passed. Fix by promoting `BATCH_SIZE` to a file-scope `#define`. Signed-off-by: Yang Zhao <zymy701@gmail.com>
This PR supersedes #2909 (part of #2208) to address comments from the reviewers as follow:
pack_jobandunpack_jobtotagJobanduntagJobioThreadReadQueryFromClient,IOThreadPollto use specific pointers instead of void * as parameterserver.stat_io_writes_processedtest_io_queues.ctotest_queues.cppWhen merging, just use the commit below this line
Summary
This PR redesigns the IO threading communication model, replacing the inefficient client-list polling approach with a high-performance, lock-free queue architecture. This change improves throughput by 8–17% across various workloads and lays the groundwork for offloading command execution to IO threads in following PRs.
Performance Comparison: Unstable vs New IO Queues
1 Amazon terminology for cluster mode
2 Amazon termonology for standalone mode, i.e. config
cluster-enabled noMotivation
The previous IO model had several limitations that created performance bottlenecks:
The Solution
To address these inefficiencies, this PR replaces the single SPSC queue used currently with three specialized queues to handle communication and load balancing more effectively.
1. Main > IO: Shared Queue (Single Producer Multi Consumer)
Single queue from the main-thread to IO threads.
2. IO > Main: The Response Channel (MPSC Queue)
We replaced the old polling loop with a response queue.
3. MAIN > IO (Thread-Specific): Private Inbox (SPSC Queue)
We kept the existing Single-Producer Single-Consumer (SPSC) queues for tasks that must happen on a specific thread (like freeing memory allocated by that thread). IO threads always check their private inbox before looking at the shared queue.
Changes Required
Async client release
The main thread no longer busy-waits for IO threads to finish with a client. Since the client must be popped from the multi-producer queue before it can be released, clients with pending IO are now marked for asynchronous closure.
eviction clients logic
Updated evictClients() to account for memory pending release (clients marked close_asap). freeClient() now returns a status code (1 for freed, 0 for async-close) to ensure the eviction loop does not over-evict by ignoring memory that is about to be reclaimed.
events-per-io-thread config
Replaced the
events-per-io-threadconfiguration withio-threads-always-active. as we no longer track events, since this config is use only for tests no backward compatibility issue arises.packed job instead of handlers
Jobs are now represented as tagged pointers (using lower 3 bits for job type) instead of separate
{handler, data}structs. This reduces memory overhead and allows jobs to be passed through the queues as single pointers.head caching in spsc queue
The SPSC queue now caches the
headindex on the producer side (head_cache) to avoid frequent atomic loads. The producer only refreshes from the atomicheadwhen the cache indicates the queue might be full, reducing cross-thread cache-line bouncing.deferred commit in SPSC queue.
spscEnqueue()supports batching via acommitflag. Multiple jobs can be enqueued withcommit=false, then flushed with a singlespscCommit()call, reducing atomic operations and cache-line bouncing.rollback on fullness check failure
When
spmcEnqueue()fails due to a full queue, the client state is rolled back (e.g.,io_write_statereset toCLIENT_IDLE). This rollback approach removes the need to call an expensiveisFullcheck before every enqueue, we just attempt the enqueue and revert if it fails.epoll offloading via SPSC at high thread counts.
When
active_io_threads_num > 9, poll jobs are sent to per-thread SPSC queues (round-robin). Since threads check their private queue first, this ensures poll jobs are processed promptly without waiting behind jobs in the shared SPMC queue.avoid offload write before read comes back
Added a check
if (c->io_read_state == CLIENT_PENDING_IO) return C_OKintrySendWriteToIOThreads(). In the previous per-thread SPSC implementation, we could send consecutive read and write jobs for the same client knowing a single thread would handle them in order. With the shared SPMC queue, different threads may pick up the jobs, so we must wait for the read to complete before sending a write to avoid 2 threads handling the same client.removing pending_read_list_node from client and clients_pending_io_read/write lists from server
Removed
pending_read_list_nodefrom theclientstruct andclients_pending_io_read/clients_pending_io_writelists fromvalkeyServer. as the new mpsc eliminates the need for these tracking structures.added inst metrics for pending io jobs
Added
instantaneous_io_pending_jobsmetric viaSTATS_METRIC_IO_WAITto track average queue depth over time.added stat for current active threads number
Added
active_io_threads_numto the INFO stats output for better visibility.added internal inst metric for main-thread cpu (non apple compliant)
Added
STATS_METRIC_MAIN_THREAD_CPU_SYSto track main thread CPU usage viagetrusage(RUSAGE_THREAD). This powers the "ignition" policy, when CPU exceeds 30%, the first IO thread is activated.RUSAGE_THREADis Linux-specific, so macOS falls back to event-count heuristics.added stat for pending read and writes for io
Added
io_threaded_reads_pendingandio_threaded_writes_pendingstats to track how many read/write jobs are currently in-flight to IO threads.added volatile for crashed
Changed
server.crashedfrominttovolatile intto ensure the crash flag is visible across threads immediately, allowing IO threads to detect a crash and stop sending responses back to the main thread to avoid deadlock on crash.Co-authored-by: Dan Touitou dan.touitou@gmail.com
Signed-off-by: Uri Yagelnik uriy@amazon.com