Skip to content

Redesign IO threading communication model#3324

Merged
madolson merged 12 commits into
valkey-io:unstablefrom
akashkgit:io-threads-queues
Apr 13, 2026
Merged

Redesign IO threading communication model#3324
madolson merged 12 commits into
valkey-io:unstablefrom
akashkgit:io-threads-queues

Conversation

@akashkgit

@akashkgit akashkgit commented Mar 5, 2026

Copy link
Copy Markdown
Contributor

This PR supersedes #2909 (part of #2208) to address comments from the reviewers as follow:

  • Refactors pack_job and unpack_job to tagJob and untagJob
  • Adds explicit type casting by refactoring ioThread handler prototypes such as ioThreadReadQueryFromClient, IOThreadPoll to use specific pointers instead of void * as parameter
  • Removes duplicate increment of server.stat_io_writes_processed
  • Switches Incompatible numeric types to the correct ones (last_sample_time and now)
  • Makes the io_queues.h generic and splits it to queues.h and queues.c to also facilitate c++ unit testing
  • Migrates test_io_queues.c to test_queues.cpp
  • Replaces Doxygen comments to use normal text comments
  • Handles Merge conflicts

When merging, just use the commit below this line


Summary

This PR redesigns the IO threading communication model, replacing the inefficient client-list polling approach with a high-performance, lock-free queue architecture. This change improves throughput by 8–17% across various workloads and lays the groundwork for offloading command execution to IO threads in following PRs.

Performance Comparison: Unstable vs New IO Queues

Type Operation Unstable Branch (M TPS) New IO Queues (M TPS) Difference (%)
CME1 SET 1.02 1.19 +16.67%
CME GET 1.30 1.47 +13.08%
CMD2 SET 1.15 1.35 +17.39%
CMD GET 1.52 1.64 +7.89%

1 Amazon terminology for cluster mode
2 Amazon termonology for standalone mode, i.e. config cluster-enabled no

  • Test Configuration: 8 IO threads • 400 clients • 512-byte values • 3M keys

Motivation

The previous IO model had several limitations that created performance bottlenecks:

  • Inefficient Polling: The main thread lacks a direct notification mechanism for completed work. Instead, it must constantly iterate through a list of all pending clients to check their state, wasting significant CPU cycles.
  • Manual Load Balancing: Jobs are assigned to specific threads upfront. This requires the main thread to predict which thread to use, often leaving some threads idle while others are overloaded.
  • Static Scaling: Thread activation relies on a fixed heuristic (e.g., 1 thread per 2 events). This approach fails to adapt to varying workloads, such as TLS connections or differing read/write sizes.

The Solution

To address these inefficiencies, this PR replaces the single SPSC queue used currently with three specialized queues to handle communication and load balancing more effectively.

1. Main > IO: Shared Queue (Single Producer Multi Consumer)

Single queue from the main-thread to IO threads.

  • Automatic Load Balancing: All threads pull from the same source. Busy threads take less work, and idle threads take more, so we don't need to manually select a thread.
  • Adaptive Scaling: We now use the queue depth to decide when to add or remove threads. If the queue is full, we scale up; if it's empty, we scale down.
    • Ignition: To get things started before the queue fills up, we monitor the main thread's CPU. If usage goes over 30%, we wake up the first IO thread.
  • Implementation: To prevent contention among consumers, each item in the ring buffer is padded to reside in its own cache line. Sequence numbers are utilized to indicate whether a cell is empty or populated, allowing threads to safely claim work.

2. IO > Main: The Response Channel (MPSC Queue)

We replaced the old polling loop with a response queue.

  • ** Faster Completion:** IO threads push completed jobs into this queue. The main thread detects new data simply by checking if the queue is not empty, removing the need to scan pending clients.
  • Contention Management: To avoid lock contention, each thread reserves a slot by atomically incrementing the tail index. In the rare event that the queue is full, pending jobs are buffered in a local temporary list until space becomes available.

3. MAIN > IO (Thread-Specific): Private Inbox (SPSC Queue)

We kept the existing Single-Producer Single-Consumer (SPSC) queues for tasks that must happen on a specific thread (like freeing memory allocated by that thread). IO threads always check their private inbox before looking at the shared queue.

Changes Required

  • Async client release
    The main thread no longer busy-waits for IO threads to finish with a client. Since the client must be popped from the multi-producer queue before it can be released, clients with pending IO are now marked for asynchronous closure.

  • eviction clients logic
    Updated evictClients() to account for memory pending release (clients marked close_asap). freeClient() now returns a status code (1 for freed, 0 for async-close) to ensure the eviction loop does not over-evict by ignoring memory that is about to be reclaimed.

  • events-per-io-thread config
    Replaced the events-per-io-thread configuration with io-threads-always-active. as we no longer track events, since this config is use only for tests no backward compatibility issue arises.

  • packed job instead of handlers
    Jobs are now represented as tagged pointers (using lower 3 bits for job type) instead of separate {handler, data} structs. This reduces memory overhead and allows jobs to be passed through the queues as single pointers.

  • head caching in spsc queue
    The SPSC queue now caches the head index on the producer side (head_cache) to avoid frequent atomic loads. The producer only refreshes from the atomic head when the cache indicates the queue might be full, reducing cross-thread cache-line bouncing.

  • deferred commit in SPSC queue.
    spscEnqueue() supports batching via a commit flag. Multiple jobs can be enqueued with commit=false, then flushed with a single spscCommit() call, reducing atomic operations and cache-line bouncing.

  • rollback on fullness check failure
    When spmcEnqueue() fails due to a full queue, the client state is rolled back (e.g., io_write_state reset to CLIENT_IDLE). This rollback approach removes the need to call an expensive isFull check before every enqueue, we just attempt the enqueue and revert if it fails.

  • epoll offloading via SPSC at high thread counts.
    When active_io_threads_num > 9, poll jobs are sent to per-thread SPSC queues (round-robin). Since threads check their private queue first, this ensures poll jobs are processed promptly without waiting behind jobs in the shared SPMC queue.

  • avoid offload write before read comes back
    Added a check if (c->io_read_state == CLIENT_PENDING_IO) return C_OK in trySendWriteToIOThreads(). In the previous per-thread SPSC implementation, we could send consecutive read and write jobs for the same client knowing a single thread would handle them in order. With the shared SPMC queue, different threads may pick up the jobs, so we must wait for the read to complete before sending a write to avoid 2 threads handling the same client.

  • removing pending_read_list_node from client and clients_pending_io_read/write lists from server
    Removed pending_read_list_node from the client struct and clients_pending_io_read/clients_pending_io_write lists from valkeyServer. as the new mpsc eliminates the need for these tracking structures.

  • added inst metrics for pending io jobs
    Added instantaneous_io_pending_jobs metric via STATS_METRIC_IO_WAIT to track average queue depth over time.

  • added stat for current active threads number
    Added active_io_threads_num to the INFO stats output for better visibility.

  • added internal inst metric for main-thread cpu (non apple compliant)
    Added STATS_METRIC_MAIN_THREAD_CPU_SYS to track main thread CPU usage via getrusage(RUSAGE_THREAD). This powers the "ignition" policy, when CPU exceeds 30%, the first IO thread is activated. RUSAGE_THREAD is Linux-specific, so macOS falls back to event-count heuristics.

  • added stat for pending read and writes for io
    Added io_threaded_reads_pending and io_threaded_writes_pending stats to track how many read/write jobs are currently in-flight to IO threads.

  • added volatile for crashed
    Changed server.crashed from int to volatile int to ensure the crash flag is visible across threads immediately, allowing IO threads to detect a crash and stop sending responses back to the main thread to avoid deadlock on crash.

Co-authored-by: Dan Touitou dan.touitou@gmail.com
Signed-off-by: Uri Yagelnik uriy@amazon.com

Co-authored-by: Dan Touitou <dan.touitou@gmail.com>
Signed-off-by: Uri Yagelnik <uriy@amazon.com>
@akashkgit akashkgit changed the title visuaIo threads queues PR Addressing comments on Redesign IO threading communication model Mar 5, 2026
@akashkgit akashkgit changed the title PR Addressing comments on Redesign IO threading communication model Addressing comments on Redesign IO threading communication model (#2909) Mar 5, 2026
@akashkgit akashkgit changed the title Addressing comments on Redesign IO threading communication model (#2909) Addressing comments on Redesign IO threading communication model Mar 5, 2026
@madolson madolson requested review from madolson and zuiderkwast March 6, 2026 01:50
Comment thread src/io_threads.c
@akashkgit akashkgit force-pushed the io-threads-queues branch from 6ac3d49 to b6dbd28 Compare March 6, 2026 19:30
@zuiderkwast

Copy link
Copy Markdown
Contributor

The diff now contains many commits from unstable. I don't know it happened but I hope you can remove them and then git merge unstable.

@akashkgit akashkgit force-pushed the io-threads-queues branch 2 times, most recently from c5b9d5a to 97d6872 Compare March 9, 2026 06:51
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
@akashkgit akashkgit force-pushed the io-threads-queues branch from 97d6872 to 1e4ea39 Compare March 9, 2026 07:00
@codecov

codecov Bot commented Mar 9, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 84.04558% with 112 lines in your changes missing coverage. Please review.
✅ Project coverage is 76.44%. Comparing base (07554d3) to head (2efd67d).
⚠️ Report is 68 commits behind head on unstable.

Files with missing lines Patch % Lines
src/io_threads.c 64.82% 102 Missing ⚠️
src/networking.c 82.22% 8 Missing ⚠️
src/queues.c 99.32% 1 Missing ⚠️
src/unit/test_queues.cpp 99.53% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #3324      +/-   ##
============================================
+ Coverage     74.51%   76.44%   +1.92%     
============================================
  Files           130      159      +29     
  Lines         72748    79512    +6764     
============================================
+ Hits          54207    60781    +6574     
- Misses        18541    18731     +190     
Files with missing lines Coverage Δ
src/config.c 78.33% <ø> (+0.62%) ⬆️
src/server.c 89.53% <100.00%> (+0.12%) ⬆️
src/server.h 100.00% <ø> (ø)
src/queues.c 99.32% <99.32%> (ø)
src/unit/test_queues.cpp 99.53% <99.53%> (ø)
src/networking.c 92.07% <82.22%> (+0.90%) ⬆️
src/io_threads.c 70.80% <64.82%> (-11.19%) ⬇️

... and 77 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@zuiderkwast

zuiderkwast commented Mar 9, 2026

Copy link
Copy Markdown
Contributor

Can the 32-bit build failure be related to the _Atomic incompatibility for C++ described here?

Comment thread src/io_threads.c
Signed-off-by: akash kumar <akumdev@amazon.com>

@zuiderkwast zuiderkwast left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a very large diff. 🤯 I think it's close, though I didn't proof-read the complete diff.

@akashkgit Can you incorporate the full description of #2909 in this PR? Then we can close that one and let only stick to this one instead.

Comment thread src/networking.c
Comment thread src/unit/wrappers.h Outdated
Comment thread src/io_threads.c
Comment on lines +454 to +464
/* DEADLOCK PREVENTION:
* Check if the pending workload fits in the return queue.
* If the number of pending jobs is greater than the capacity of the Global MPSC queue,
* the worker threads might fill the queue and block. If we enter drainIOThreadsQueue
* in that state, we will deadlock (Main thread waits for worker, Worker waits for queue space). */
size_t pending = getPendingIOResponsesCount();

if (pending > MPSC_QUEUE_SIZE) {
if (err) *err = "Can't update IO threads under load, try again later";
return 0;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! OK, "try again later" makes sense in this case.

Comment thread src/server.c Outdated
@madolson madolson changed the title Addressing comments on Redesign IO threading communication model Redesign IO threading communication model Mar 11, 2026
Comment thread src/queues.h Outdated

@hpatro hpatro left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was playing around with this change and then to understand it further, I was trying to leverage the underlying infrastructure (queues) for offloading cluster IO operations. I should be able to create a PR after this gets merged.

Overall, I like the primitives being introduced here. Reminds of Golang channel/routine idea. We should leverage this for future work around IO and maintain this system.

Comment thread src/io_threads.h Outdated
Comment on lines +6 to +15
typedef enum {
JOB_REQ_READ_CLIENT = 0,
JOB_REQ_WRITE_CLIENT,
JOB_REQ_FREE_ARGV,
JOB_REQ_FREE_OBJ,
JOB_REQ_POLL,
JOB_REQ_ACCEPT,
JOB_REQ_COUNT
} JobRequest;
_Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might have to move away from pointer tagging approach pretty soon. As other changes are going to leverage this infrastructure and we are going to run out of the bits (3) required to support all the jobs.

We could wrap the private_data with the actual object (client, ?) and type.

Comment thread src/queues.h
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*
* Implements different types of queues

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call out these are bounded/buffered queues. Each of these are of different sizes. Did we determine the sizes {SPMC, SPSC -> 4096 and MPSC -> 16384} based on certain workloads ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the size be parameter of mpscInit ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody knows why these sizes were picked. I wasn't written down...

To make them more generic, it seems reasonable to specify the size in init, but we can also do that later, when we want to use these queues for something else.

Comment thread src/queues.h Outdated
/* Pushes an item into the queue and returns true if the queue is not full.
* Otherwise, a slot index is reserved and saved in the ticket, and returns false.
* Subsequent retries must pass the same ticket to fill the reserved slot, provided the queue is not full */
extern bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of this ticket/slot index reservation? I couldn't find a good example for it.

Comment thread src/io_threads.h Outdated
JOB_REQ_ACCEPT,
JOB_REQ_COUNT
} JobRequest;
_Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Count won't be used for pointer tagging.

Suggested change
_Static_assert(JOB_REQ_COUNT <= 7, "JOB_REQ_COUNT must not exceed 7 for pointer arithmetic");
_Static_assert(JOB_REQ_COUNT <= 8, "JOB_REQ_COUNT must not exceed 8 for pointer arithmetic");

Comment thread src/io_threads.h Outdated
JOB_RES_WRITE_CLIENT,
JOB_RES_COUNT
} JobResult;
_Static_assert(JOB_RES_COUNT <= 7, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_Static_assert(JOB_RES_COUNT <= 7, "JOB_RES_COUNT must not exceed 7 for pointer arithmetic");
_Static_assert(JOB_RES_COUNT <= 8, "JOB_RES_COUNT must not exceed 8 for pointer arithmetic");

madolson pushed a commit that referenced this pull request Apr 23, 2026
The SPMC queue from #3324 needs each `spmcCell` to be cache-line
aligned, but plain `zmalloc()` does not guarantee that in all build
configurations.

This change introduces `zmalloc_cache_aligned()` and uses it for the
SPMC queue buffer allocation in `spmcInit()`.

Failing CI: https://github.com/valkey-io/valkey/actions/runs/24374139344

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
sarthakaggarwal97 added a commit to sarthakaggarwal97/valkey that referenced this pull request Apr 23, 2026
The test was accidentally waking the IO threads while trying to check
that they had gone idle.

After the recent IO-thread refactor in valkey-io#3324, the
[test](https://github.com/valkey-io/valkey/pull/3324/changes#diff-21314ec3a338f739eab1536f91f528d1efe7c6a93935a71b9c02f77a3858f121R112)
started forcing `io-threads-always-active`, and its repeated `INFO`
polling counted as fresh activity. So instead of just observing the
worker threads, the test kept reactivating them and then flaked.

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com>
Co-authored-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com>
sarthakaggarwal97 pushed a commit to sarthakaggarwal97/valkey that referenced this pull request Apr 23, 2026
valkey-io#3324 introduced `BATCH_SIZE` as
a const int local variable and used it as an array bound. Clang 17
rejects this with:
```
io_threads.c:305:22: error: variable length array folded to constant array as an extension [-Werror,-Wgnu-folding-constant]
  305 |     void *batch_jobs[BATCH_SIZE];
      |                      ^~~~~~~~~~
1 error generated.
make[1]: *** [io_threads.o] Error 1
make: *** [all] Error 2

```
Old Clang versions do not emit this warning, maybe that is why the CI
passed. Fix by promoting `BATCH_SIZE` to a file-scope `#define`.

Signed-off-by: Yang Zhao <zymy701@gmail.com>
sarthakaggarwal97 added a commit to sarthakaggarwal97/valkey that referenced this pull request Apr 23, 2026
…3504)

The SPMC queue from valkey-io#3324 needs each `spmcCell` to be cache-line
aligned, but plain `zmalloc()` does not guarantee that in all build
configurations.

This change introduces `zmalloc_cache_aligned()` and uses it for the
SPMC queue buffer allocation in `spmcInit()`.

Failing CI: https://github.com/valkey-io/valkey/actions/runs/24374139344

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
madolson pushed a commit that referenced this pull request Apr 27, 2026
#### Summary
This PR redesigns the IO threading communication model, replacing the
inefficient client-list polling approach with a high-performance,
lock-free queue architecture. This change improves throughput by
**8–17%** across various workloads and lays the groundwork for
offloading command execution to IO threads in following PRs.



### Performance Comparison: Unstable vs New IO Queues

| Type | Operation | Unstable Branch (M TPS) | New IO Queues (M TPS)|
Difference (%) |
| :--- | :--- | :--- | :--- | :--- |
| **CME**<sup>1</sup> | SET | 1.02 | 1.19 | **+16.67%** |
| **CME** | GET | 1.30 | 1.47 | **+13.08%** |
| **CMD**<sup>2</sup> | SET | 1.15 | 1.35 | **+17.39%** |
| **CMD** | GET | 1.52 | 1.64 | **+7.89%** |

<sup>1</sup> Amazon terminology for cluster mode
<sup>2</sup> Amazon termonology for standalone mode, i.e. config
`cluster-enabled no`

- Test Configuration: 8 IO threads • 400 clients • 512-byte values • 3M
keys

#### Motivation
The previous IO model had several limitations that created performance
bottlenecks:
* **Inefficient Polling:** The main thread lacks a direct notification
mechanism for completed work. Instead, it must constantly iterate
through a list of all pending clients to check their state, wasting
significant CPU cycles.
* **Manual Load Balancing:** Jobs are assigned to specific threads
upfront. This requires the main thread to predict which thread to use,
often leaving some threads idle while others are overloaded.
* **Static Scaling:** Thread activation relies on a fixed heuristic
(e.g., 1 thread per 2 events). This approach fails to adapt to varying
workloads, such as TLS connections or differing read/write sizes.

### The Solution
To address these inefficiencies, this PR replaces the single SPSC queue
used currently with three specialized queues to handle communication and
load balancing more effectively.

#### 1. Main > IO: Shared Queue (Single Producer Multi Consumer)
Single queue from the main-thread to IO threads.
* **Automatic Load Balancing:** All threads pull from the same source.
Busy threads take less work, and idle threads take more, so we don't
need to manually select a thread.
* **Adaptive Scaling:** We now use the queue depth to decide when to add
or remove threads. If the queue is full, we scale up; if it's empty, we
scale down.
* *Ignition:* To get things started before the queue fills up, we
monitor the main thread's CPU. If usage goes over 30%, we wake up the
first IO thread.
* **Implementation:** To prevent contention among consumers, each item
in the ring buffer is padded to reside in its own cache line. Sequence
numbers are utilized to indicate whether a cell is empty or populated,
allowing threads to safely claim work.

#### 2. IO > Main: The Response Channel (MPSC Queue)
We replaced the old polling loop with a response queue.
* ** Faster Completion:** IO threads push completed jobs into this
queue. The main thread detects new data simply by checking if the queue
is not empty, removing the need to scan pending clients.
* **Contention Management:** To avoid lock contention, each thread
reserves a slot by atomically incrementing the tail index. In the rare
event that the queue is full, pending jobs are buffered in a local
temporary list until space becomes available.

#### 3. MAIN > IO (Thread-Specific): Private Inbox (SPSC Queue)
We kept the existing Single-Producer Single-Consumer (SPSC) queues for
tasks that must happen on a specific thread (like freeing memory
allocated by that thread). IO threads always check their private inbox
before looking at the shared queue.

### Changes Required
* **Async client release**
The main thread no longer busy-waits for IO threads to finish with a
client. Since the client must be popped from the multi-producer queue
before it can be released, clients with pending IO are now marked for
asynchronous closure.
* **eviction clients logic**
Updated evictClients() to account for memory pending release (clients
marked close_asap). freeClient() now returns a status code (1 for freed,
0 for async-close) to ensure the eviction loop does not over-evict by
ignoring memory that is about to be reclaimed.
* **events-per-io-thread config**
Replaced the `events-per-io-thread` configuration with
`io-threads-always-active`. as we no longer track events, since this
config is use only for tests no backward compatibility issue arises.
* **packed job instead of handlers**
Jobs are now represented as tagged pointers (using lower 3 bits for job
type) instead of separate `{handler, data}` structs. This reduces memory
overhead and allows jobs to be passed through the queues as single
pointers.
* **head caching in spsc queue**
The SPSC queue now caches the `head` index on the producer side
(`head_cache`) to avoid frequent atomic loads. The producer only
refreshes from the atomic `head` when the cache indicates the queue
might be full, reducing cross-thread cache-line bouncing.

* **deferred commit in SPSC queue**.   
`spscEnqueue()` supports batching via a `commit` flag. Multiple jobs can
be enqueued with `commit=false`, then flushed with a single
`spscCommit()` call, reducing atomic operations and cache-line bouncing.
* **rollback on fullness check failure**
When `spmcEnqueue()` fails due to a full queue, the client state is
rolled back (e.g., `io_write_state` reset to `CLIENT_IDLE`). This
rollback approach removes the need to call an expensive `isFull` check
before every enqueue, we just attempt the enqueue and revert if it
fails.

* **epoll offloading via SPSC at high thread counts**.  
When `active_io_threads_num > 9`, poll jobs are sent to per-thread SPSC
queues (round-robin). Since threads check their private queue first,
this ensures poll jobs are processed promptly without waiting behind
jobs in the shared SPMC queue.
* **avoid offload write before read comes back**
Added a check `if (c->io_read_state == CLIENT_PENDING_IO) return C_OK`
in `trySendWriteToIOThreads()`. In the previous per-thread SPSC
implementation, we could send consecutive read and write jobs for the
same client knowing a single thread would handle them in order. With the
shared SPMC queue, different threads may pick up the jobs, so we must
wait for the read to complete before sending a write to avoid 2 threads
handling the same client.
* **removing pending_read_list_node from client and
clients_pending_io_read/write lists from server**
Removed `pending_read_list_node` from the `client` struct and
`clients_pending_io_read`/`clients_pending_io_write` lists from
`valkeyServer`. as the new mpsc eliminates the need for these tracking
structures.
* **added inst metrics for pending io jobs**
Added `instantaneous_io_pending_jobs` metric via `STATS_METRIC_IO_WAIT`
to track average queue depth over time.
* **added stat for current active threads number**
Added `active_io_threads_num` to the INFO stats output for better
visibility.
* **added internal inst metric for main-thread cpu (non apple
compliant)**
Added `STATS_METRIC_MAIN_THREAD_CPU_SYS` to track main thread CPU usage
via `getrusage(RUSAGE_THREAD)`. This powers the "ignition" policy, when
CPU exceeds 30%, the first IO thread is activated. `RUSAGE_THREAD` is
Linux-specific, so macOS falls back to event-count heuristics.
* **added stat for pending read and writes for io**
Added `io_threaded_reads_pending` and `io_threaded_writes_pending` stats
to track how many read/write jobs are currently in-flight to IO threads.
* **added volatile for crashed**
Changed `server.crashed` from `int` to `volatile int` to ensure the
crash flag is visible across threads immediately, allowing IO threads to
detect a crash and stop sending responses back to the main thread to
avoid deadlock on crash.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: akash kumar <akumdev@amazon.com>
Co-authored-by: Uri Yagelnik <uriy@amazon.com>
Co-authored-by: Dan Touitou <dan.touitou@gmail.com>
madolson pushed a commit that referenced this pull request Apr 27, 2026
The test was accidentally waking the IO threads while trying to check
that they had gone idle.

After the recent IO-thread refactor in #3324, the
[test](https://github.com/valkey-io/valkey/pull/3324/changes#diff-21314ec3a338f739eab1536f91f528d1efe7c6a93935a71b9c02f77a3858f121R112)
started forcing `io-threads-always-active`, and its repeated `INFO`
polling counted as fresh activity. So instead of just observing the
worker threads, the test kept reactivating them and then flaked.

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
Signed-off-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com>
Co-authored-by: Sarthak Aggarwal <25262500+sarthakaggarwal97@users.noreply.github.com>
madolson pushed a commit that referenced this pull request Apr 27, 2026
#3324 introduced `BATCH_SIZE` as
a const int local variable and used it as an array bound. Clang 17
rejects this with:
```
io_threads.c:305:22: error: variable length array folded to constant array as an extension [-Werror,-Wgnu-folding-constant]
  305 |     void *batch_jobs[BATCH_SIZE];
      |                      ^~~~~~~~~~
1 error generated.
make[1]: *** [io_threads.o] Error 1
make: *** [all] Error 2

```
Old Clang versions do not emit this warning, maybe that is why the CI
passed. Fix by promoting `BATCH_SIZE` to a file-scope `#define`.

Signed-off-by: Yang Zhao <zymy701@gmail.com>
madolson pushed a commit that referenced this pull request Apr 27, 2026
The SPMC queue from #3324 needs each `spmcCell` to be cache-line
aligned, but plain `zmalloc()` does not guarantee that in all build
configurations.

This change introduces `zmalloc_cache_aligned()` and uses it for the
SPMC queue buffer allocation in `spmcInit()`.

Failing CI: https://github.com/valkey-io/valkey/actions/runs/24374139344

---------

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
sarthakaggarwal97 added a commit to sarthakaggarwal97/valkey that referenced this pull request Apr 27, 2026
…sers)

Removed valkey-io#3504, valkey-io#3545, valkey-io#3503 - these fix bugs introduced by valkey-io#3324, valkey-io#2936,

Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
valkey-io#3392 respectively, all new in RC2. Users never experienced these bugs.
@sarthakaggarwal97 sarthakaggarwal97 added the release-notes This issue should get a line item in the release notes label Apr 28, 2026
madolson added a commit that referenced this pull request May 12, 2026
Follow-ups to the IO-threads redesign (#3324 / #3367). Bundles several
independent fixes and refactors:

- tryOffloadFreeObjToIOThreads: only offload the free of the sds buffer
(which the I/O thread allocated). The robj itself is main-thread-owned
and is now freed on the main thread.
- trySendWriteToIOThreads: defer clearing last_header until after a
successful enqueue, so the main thread can't mutate a header the I/O
thread is concurrently reading.
- evictClients: drop the pending_freed / close_asap bookkeeping and rely
on freeClient's return value to bump stat_evictedclients. The old
accounting double-counted protected clients and could under-evict.
- Make MPSC/SPMC/SPSC queue sizes a runtime parameter on *Init(q, size)
instead of compile-time *_QUEUE_SIZE macros in queues.h. The I/O-threads
sizes (16384 / 4096 / 4096) now live as constants in io_threads.c; tests
pass their own sizes.
- updateIOThreads uses io_shared_outbox.queue_size instead of the macro.
- Add function-level doc comments to queues.h.
- Rename IOThreadFreeArgv/IOThreadPoll → ioThreadFreeArgv/ioThreadPoll
for consistency.
- Remove unused job_handler typedef; fix static_assert message (7 → 8).
clang-format pass; Makefile tab→spaces nit.
- Replace the getrusage(RUSAGE_THREAD) sys/user CPU sampling (and its
#ifdef RUSAGE_THREAD fallback) with server.stat_active_time. One
portable signal, one threshold (IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT =
30) replaces the dual sys>30% / sys>5%+user>50% rules.
- Renames STATS_METRIC_MAIN_THREAD_CPU_{SYS,USER} →
STATS_METRIC_MAIN_THREAD_ACTIVE_TIME.

---------

Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: Akash Kumar <45854686+akashkgit@users.noreply.github.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
sarthakaggarwal97 pushed a commit to sarthakaggarwal97/valkey that referenced this pull request May 12, 2026
Follow-ups to the IO-threads redesign (valkey-io#3324 / valkey-io#3367). Bundles several
independent fixes and refactors:

- tryOffloadFreeObjToIOThreads: only offload the free of the sds buffer
(which the I/O thread allocated). The robj itself is main-thread-owned
and is now freed on the main thread.
- trySendWriteToIOThreads: defer clearing last_header until after a
successful enqueue, so the main thread can't mutate a header the I/O
thread is concurrently reading.
- evictClients: drop the pending_freed / close_asap bookkeeping and rely
on freeClient's return value to bump stat_evictedclients. The old
accounting double-counted protected clients and could under-evict.
- Make MPSC/SPMC/SPSC queue sizes a runtime parameter on *Init(q, size)
instead of compile-time *_QUEUE_SIZE macros in queues.h. The I/O-threads
sizes (16384 / 4096 / 4096) now live as constants in io_threads.c; tests
pass their own sizes.
- updateIOThreads uses io_shared_outbox.queue_size instead of the macro.
- Add function-level doc comments to queues.h.
- Rename IOThreadFreeArgv/IOThreadPoll → ioThreadFreeArgv/ioThreadPoll
for consistency.
- Remove unused job_handler typedef; fix static_assert message (7 → 8).
clang-format pass; Makefile tab→spaces nit.
- Replace the getrusage(RUSAGE_THREAD) sys/user CPU sampling (and its
#ifdef RUSAGE_THREAD fallback) with server.stat_active_time. One
portable signal, one threshold (IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT =
30) replaces the dual sys>30% / sys>5%+user>50% rules.
- Renames STATS_METRIC_MAIN_THREAD_CPU_{SYS,USER} →
STATS_METRIC_MAIN_THREAD_ACTIVE_TIME.

---------

Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: Akash Kumar <45854686+akashkgit@users.noreply.github.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
lucasyonge pushed a commit that referenced this pull request May 12, 2026
Follow-ups to the IO-threads redesign (#3324 / #3367). Bundles several
independent fixes and refactors:

- tryOffloadFreeObjToIOThreads: only offload the free of the sds buffer
(which the I/O thread allocated). The robj itself is main-thread-owned
and is now freed on the main thread.
- trySendWriteToIOThreads: defer clearing last_header until after a
successful enqueue, so the main thread can't mutate a header the I/O
thread is concurrently reading.
- evictClients: drop the pending_freed / close_asap bookkeeping and rely
on freeClient's return value to bump stat_evictedclients. The old
accounting double-counted protected clients and could under-evict.
- Make MPSC/SPMC/SPSC queue sizes a runtime parameter on *Init(q, size)
instead of compile-time *_QUEUE_SIZE macros in queues.h. The I/O-threads
sizes (16384 / 4096 / 4096) now live as constants in io_threads.c; tests
pass their own sizes.
- updateIOThreads uses io_shared_outbox.queue_size instead of the macro.
- Add function-level doc comments to queues.h.
- Rename IOThreadFreeArgv/IOThreadPoll → ioThreadFreeArgv/ioThreadPoll
for consistency.
- Remove unused job_handler typedef; fix static_assert message (7 → 8).
clang-format pass; Makefile tab→spaces nit.
- Replace the getrusage(RUSAGE_THREAD) sys/user CPU sampling (and its
#ifdef RUSAGE_THREAD fallback) with server.stat_active_time. One
portable signal, one threshold (IO_IGNITION_MAIN_THREAD_ACTIVE_PERCENT =
30) replaces the dual sys>30% / sys>5%+user>50% rules.
- Renames STATS_METRIC_MAIN_THREAD_CPU_{SYS,USER} →
STATS_METRIC_MAIN_THREAD_ACTIVE_TIME.

---------

Signed-off-by: akash kumar <akumdev@amazon.com>
Signed-off-by: Akash Kumar <45854686+akashkgit@users.noreply.github.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
yaronsananes added a commit to yaronsananes/valkey that referenced this pull request May 14, 2026
When a replica executes `replicaof no one`, `replicationUnsetPrimary()`
sets `server.primary_host = NULL` before calling `freeClient(server.primary)`.
Since PR valkey-io#3324 ("Redesign IO threading communication model"), freeClient()
on a primary client with pending IO is deferred via freeClientAsync. When it
eventually executes, it chains through replicationCachePrimary() ->
replicationHandlePrimaryDisconnection() which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

By that time, replicationUnsetPrimary() has already finalized the state
with `repl_state = REPL_STATE_NONE`. The deferred free resurrects
REPL_STATE_CONNECT while primary_host is NULL. replicationCron then calls
connectWithPrimary() which passes NULL to connTLSConnect(), causing
inet_pton(AF_INET, NULL, ...) to SIGSEGV.

Fix by conditioning the state transition in replicationHandlePrimaryDisconnection()
on primary_host being set. If primary_host is NULL, the disconnection is
part of a deliberate unset that has already finalized state, so we set
REPL_STATE_NONE instead of REPL_STATE_CONNECT.

Additionally:
- Add a NULL check for addr in connTLSConnect() as defense in depth.
- Add a 10s timeout to the WAITAOF test that blocks indefinitely,
  preventing the test from hanging if the replica fails to sync.

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
yaronsananes added a commit to yaronsananes/valkey that referenced this pull request May 14, 2026
When a replica executes `replicaof no one`, `replicationUnsetPrimary()`
sets `server.primary_host = NULL` before calling `freeClient(server.primary)`.
Since PR valkey-io#3324 ("Redesign IO threading communication model"), freeClient()
on a primary client with pending IO is deferred via freeClientAsync. When it
eventually executes, it chains through replicationCachePrimary() ->
replicationHandlePrimaryDisconnection() which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

By that time, replicationUnsetPrimary() has already finalized the state
with `repl_state = REPL_STATE_NONE`. The deferred free resurrects
REPL_STATE_CONNECT while primary_host is NULL. replicationCron then calls
connectWithPrimary() which passes NULL to connTLSConnect(), causing
inet_pton(AF_INET, NULL, ...) to SIGSEGV.

Fix by conditioning the state transition in replicationHandlePrimaryDisconnection()
on primary_host being set. If primary_host is NULL, the disconnection is
part of a deliberate unset that has already finalized state, so we set
REPL_STATE_NONE instead of REPL_STATE_CONNECT.

Additionally:
- Add a NULL check for addr in connTLSConnect() as defense in depth.
- Add a 10s timeout to the WAITAOF test that blocks indefinitely,
  preventing the test from hanging if the replica fails to sync.

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
yaronsananes added a commit to yaronsananes/valkey that referenced this pull request May 14, 2026
When a replica executes `replicaof no one`, `replicationUnsetPrimary()`
sets `server.primary_host = NULL` before calling `freeClient(server.primary)`.
Since PR valkey-io#3324 ("Redesign IO threading communication model"), freeClient()
on a primary client with pending IO is deferred via freeClientAsync. When it
eventually executes, it chains through replicationCachePrimary() ->
replicationHandlePrimaryDisconnection() which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

By that time, replicationUnsetPrimary() has already finalized the state
with `repl_state = REPL_STATE_NONE`. The deferred free resurrects
REPL_STATE_CONNECT while primary_host is NULL. replicationCron then calls
connectWithPrimary() which passes NULL to connTLSConnect(), causing
inet_pton(AF_INET, NULL, ...) to SIGSEGV.

Fix by conditioning the state transition in replicationHandlePrimaryDisconnection()
on primary_host being set. If primary_host is NULL, the disconnection is
part of a deliberate unset that has already finalized state, so we set
REPL_STATE_NONE instead of REPL_STATE_CONNECT.

Additionally:
- Add a NULL check for addr in connTLSConnect() as defense in depth.
- Add a 10s timeout to the WAITAOF test that blocks indefinitely,
  preventing the test from hanging if the replica fails to sync.

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
yaronsananes added a commit to yaronsananes/valkey that referenced this pull request May 14, 2026
Since PR valkey-io#3324, freeClient() on a primary client with pending IO is
deferred via freeClientAsync. The deferred free eventually chains through
replicationCachePrimary() -> replicationHandlePrimaryDisconnection(),
which unconditionally set repl_state = REPL_STATE_CONNECT.

This causes two bugs:

1. REPLICAOF NO ONE: primary_host is NULL when the deferred free runs,
   so replicationCron calls connectWithPrimary(NULL) -> SIGSEGV in
   connTLSConnect (inet_pton with NULL addr).

2. REPLICAOF newhost newport: the deferred free clobbers the already-
   progressed repl_state (CONNECTING) back to CONNECT, causing
   replicationCron to call connectWithPrimary() again, which overwrites
   server.repl_transfer_s without closing the previous connection (FD leak).

Fix by making replicationHandlePrimaryDisconnection() only transition to
REPL_STATE_CONNECT when repl_state is still REPL_STATE_CONNECTED (meaning
this is a genuine disconnect, not a stale deferred free). If repl_state
has already moved on, the deferred free is stale and should not mutate
the state machine.

Additionally:
- Add NULL check for addr in connTLSConnect() as defense in depth.
- Add 10s timeout to the WAITAOF test to prevent indefinite hanging.
- Add dedicated tests for the repoint scenario.

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
ranshid added a commit that referenced this pull request May 15, 2026
…3719)

## Summary

This PR addresses 2 race conditions where deferred `freeClient`
(introduced by #3324) clobbers replication state set by `REPLICAOF`
commands.

Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI
failure in `tests/unit/wait.tcl`.

## Root Cause

Since PR #3324 ("Redesign IO threading communication model"),
`freeClient()` on a primary client with pending IO is deferred via
`freeClientAsync` (gated on `clientHasPendingIO`). When the deferred
free eventually executes, it chains through `replicationCachePrimary()`
-> `replicationHandlePrimaryDisconnection()`, which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

This causes two bugs:

### Bug 1: REPLICAOF NO ONE (SIGSEGV)

`replicationUnsetPrimary()` sets `primary_host = NULL` before calling
`freeClient`. The deferred free runs later, sets `repl_state =
REPL_STATE_CONNECT` while `primary_host` is still NULL.
`replicationCron` then calls `connectWithPrimary()` which passes NULL to
`connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV.

### Bug 2: REPLICAOF newhost newport (connection leak)

`replicationSetPrimary()` calls `freeClient(old_primary)` (deferred),
then sets `primary_host` to the new IP and progresses `repl_state` to
`REPL_STATE_CONNECTING` with a new connection handle in
`server.repl_transfer_s`. The deferred free runs later, clobbers
`repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls
`connectWithPrimary()` again, overwriting `server.repl_transfer_s`
without closing the previous connection -- an FD leak.

## Fix

Make `replicationHandlePrimaryDisconnection()` only transition to
`REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED`
and `primary_host` is set. This means the disconnection is genuine and
no other state transition has already occurred. If `repl_state` has
already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is
stale and the function leaves the state untouched.

Additionally:
- `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in
depth).
- `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test,
and add dedicated tests for the repoint scenario.

## Reproduction

Reproduced locally by establishing TLS replication and executing
`REPLICAOF NO ONE`:
- Without fix: server crashes with signal 11, accessing address 0x0
- With fix: server continues operating normally

## Testing

- Full `unit/wait` test suite passes (51/51) with IO threads enabled
- New tests "Repoint replica between primaries does not leak connections
or crash" and "Rapid repoint does not crash or leak" pass
- Crash reproduced locally over TLS (SIGSEGV without fix, graceful
handling with fix)

---------

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
alon-arenberg pushed a commit to alon-arenberg/valkey that referenced this pull request May 18, 2026
…alkey-io#3719)

## Summary

This PR addresses 2 race conditions where deferred `freeClient`
(introduced by valkey-io#3324) clobbers replication state set by `REPLICAOF`
commands.

Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI
failure in `tests/unit/wait.tcl`.

## Root Cause

Since PR valkey-io#3324 ("Redesign IO threading communication model"),
`freeClient()` on a primary client with pending IO is deferred via
`freeClientAsync` (gated on `clientHasPendingIO`). When the deferred
free eventually executes, it chains through `replicationCachePrimary()`
-> `replicationHandlePrimaryDisconnection()`, which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

This causes two bugs:

### Bug 1: REPLICAOF NO ONE (SIGSEGV)

`replicationUnsetPrimary()` sets `primary_host = NULL` before calling
`freeClient`. The deferred free runs later, sets `repl_state =
REPL_STATE_CONNECT` while `primary_host` is still NULL.
`replicationCron` then calls `connectWithPrimary()` which passes NULL to
`connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV.

### Bug 2: REPLICAOF newhost newport (connection leak)

`replicationSetPrimary()` calls `freeClient(old_primary)` (deferred),
then sets `primary_host` to the new IP and progresses `repl_state` to
`REPL_STATE_CONNECTING` with a new connection handle in
`server.repl_transfer_s`. The deferred free runs later, clobbers
`repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls
`connectWithPrimary()` again, overwriting `server.repl_transfer_s`
without closing the previous connection -- an FD leak.

## Fix

Make `replicationHandlePrimaryDisconnection()` only transition to
`REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED`
and `primary_host` is set. This means the disconnection is genuine and
no other state transition has already occurred. If `repl_state` has
already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is
stale and the function leaves the state untouched.

Additionally:
- `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in
depth).
- `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test,
and add dedicated tests for the repoint scenario.

## Reproduction

Reproduced locally by establishing TLS replication and executing
`REPLICAOF NO ONE`:
- Without fix: server crashes with signal 11, accessing address 0x0
- With fix: server continues operating normally

## Testing

- Full `unit/wait` test suite passes (51/51) with IO threads enabled
- New tests "Repoint replica between primaries does not leak connections
or crash" and "Rapid repoint does not crash or leak" pass
- Crash reproduced locally over TLS (SIGSEGV without fix, graceful
handling with fix)

---------

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
Signed-off-by: Alon Arenberg <alonare@amazon.com>
valkeyrie-ops Bot pushed a commit that referenced this pull request May 18, 2026
…3719)

## Summary

This PR addresses 2 race conditions where deferred `freeClient`
(introduced by #3324) clobbers replication state set by `REPLICAOF`
commands.

Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI
failure in `tests/unit/wait.tcl`.

## Root Cause

Since PR #3324 ("Redesign IO threading communication model"),
`freeClient()` on a primary client with pending IO is deferred via
`freeClientAsync` (gated on `clientHasPendingIO`). When the deferred
free eventually executes, it chains through `replicationCachePrimary()`
-> `replicationHandlePrimaryDisconnection()`, which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

This causes two bugs:

### Bug 1: REPLICAOF NO ONE (SIGSEGV)

`replicationUnsetPrimary()` sets `primary_host = NULL` before calling
`freeClient`. The deferred free runs later, sets `repl_state =
REPL_STATE_CONNECT` while `primary_host` is still NULL.
`replicationCron` then calls `connectWithPrimary()` which passes NULL to
`connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV.

### Bug 2: REPLICAOF newhost newport (connection leak)

`replicationSetPrimary()` calls `freeClient(old_primary)` (deferred),
then sets `primary_host` to the new IP and progresses `repl_state` to
`REPL_STATE_CONNECTING` with a new connection handle in
`server.repl_transfer_s`. The deferred free runs later, clobbers
`repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls
`connectWithPrimary()` again, overwriting `server.repl_transfer_s`
without closing the previous connection -- an FD leak.

## Fix

Make `replicationHandlePrimaryDisconnection()` only transition to
`REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED`
and `primary_host` is set. This means the disconnection is genuine and
no other state transition has already occurred. If `repl_state` has
already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is
stale and the function leaves the state untouched.

Additionally:
- `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in
depth).
- `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test,
and add dedicated tests for the repoint scenario.

## Reproduction

Reproduced locally by establishing TLS replication and executing
`REPLICAOF NO ONE`:
- Without fix: server crashes with signal 11, accessing address 0x0
- With fix: server continues operating normally

## Testing

- Full `unit/wait` test suite passes (51/51) with IO threads enabled
- New tests "Repoint replica between primaries does not leak connections
or crash" and "Rapid repoint does not crash or leak" pass
- Crash reproduced locally over TLS (SIGSEGV without fix, graceful
handling with fix)

---------

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
madolson pushed a commit that referenced this pull request May 19, 2026
…3719)

## Summary

This PR addresses 2 race conditions where deferred `freeClient`
(introduced by #3324) clobbers replication state set by `REPLICAOF`
commands.

Found while triaging the recurring `test-ubuntu-tls-io-threads` daily CI
failure in `tests/unit/wait.tcl`.

## Root Cause

Since PR #3324 ("Redesign IO threading communication model"),
`freeClient()` on a primary client with pending IO is deferred via
`freeClientAsync` (gated on `clientHasPendingIO`). When the deferred
free eventually executes, it chains through `replicationCachePrimary()`
-> `replicationHandlePrimaryDisconnection()`, which unconditionally sets
`server.repl_state = REPL_STATE_CONNECT`.

This causes two bugs:

### Bug 1: REPLICAOF NO ONE (SIGSEGV)

`replicationUnsetPrimary()` sets `primary_host = NULL` before calling
`freeClient`. The deferred free runs later, sets `repl_state =
REPL_STATE_CONNECT` while `primary_host` is still NULL.
`replicationCron` then calls `connectWithPrimary()` which passes NULL to
`connTLSConnect()` -> `inet_pton(AF_INET, NULL, ...)` -> SIGSEGV.

### Bug 2: REPLICAOF newhost newport (connection leak)

`replicationSetPrimary()` calls `freeClient(old_primary)` (deferred),
then sets `primary_host` to the new IP and progresses `repl_state` to
`REPL_STATE_CONNECTING` with a new connection handle in
`server.repl_transfer_s`. The deferred free runs later, clobbers
`repl_state` back to `REPL_STATE_CONNECT`. `replicationCron` then calls
`connectWithPrimary()` again, overwriting `server.repl_transfer_s`
without closing the previous connection -- an FD leak.

## Fix

Make `replicationHandlePrimaryDisconnection()` only transition to
`REPL_STATE_CONNECT` when `repl_state` is still `REPL_STATE_CONNECTED`
and `primary_host` is set. This means the disconnection is genuine and
no other state transition has already occurred. If `repl_state` has
already moved on (CONNECT, CONNECTING, NONE, etc.), the deferred free is
stale and the function leaves the state untouched.

Additionally:
- `connTLSConnect()`: Return `C_ERR` if `addr` is NULL (defense in
depth).
- `tests/unit/wait.tcl`: Add 10s timeout to the blocking WAITAOF test,
and add dedicated tests for the repoint scenario.

## Reproduction

Reproduced locally by establishing TLS replication and executing
`REPLICAOF NO ONE`:
- Without fix: server crashes with signal 11, accessing address 0x0
- With fix: server continues operating normally

## Testing

- Full `unit/wait` test suite passes (51/51) with IO threads enabled
- New tests "Repoint replica between primaries does not leak connections
or crash" and "Rapid repoint does not crash or leak" pass
- Crash reproduced locally over TLS (SIGSEGV without fix, graceful
handling with fix)

---------

Signed-off-by: Yaron Sananes <yaron.sananes@gmail.com>
Signed-off-by: Ran Shidlansik <ranshid@amazon.com>
Co-authored-by: Ran Shidlansik <ranshid@amazon.com>
hanzo-dev pushed a commit to hanzoai/kv that referenced this pull request Jun 10, 2026
valkey-io/valkey#3324 introduced `BATCH_SIZE` as
a const int local variable and used it as an array bound. Clang 17
rejects this with:
```
io_threads.c:305:22: error: variable length array folded to constant array as an extension [-Werror,-Wgnu-folding-constant]
  305 |     void *batch_jobs[BATCH_SIZE];
      |                      ^~~~~~~~~~
1 error generated.
make[1]: *** [io_threads.o] Error 1
make: *** [all] Error 2

```
Old Clang versions do not emit this warning, maybe that is why the CI
passed. Fix by promoting `BATCH_SIZE` to a file-scope `#define`.

Signed-off-by: Yang Zhao <zymy701@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

release-notes This issue should get a line item in the release notes run-extra-tests Run extra tests on this PR (Runs all tests from daily except valgrind and RESP)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants