Redesign IO threading communication model#2909
Conversation
dd619ee to
9af5548
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #2909 +/- ##
============================================
+ Coverage 72.41% 73.59% +1.17%
============================================
Files 129 126 -3
Lines 70528 69594 -934
============================================
+ Hits 51076 51217 +141
+ Misses 19452 18377 -1075
🚀 New features to boost your workflow:
|
9af5548 to
e377b7b
Compare
e377b7b to
2a88211
Compare
| while (1) { | ||
| cell = &q->buffer[head & SPMC_QUEUE_MASK]; | ||
| size_t seq = atomic_load_explicit(&cell->sequence, memory_order_acquire); | ||
|
|
||
| intptr_t diff = (intptr_t)seq - (intptr_t)(head + 1); | ||
|
|
||
| if (diff == 0) { | ||
| /* Slot has data. Attempt to claim via CAS on head. */ | ||
| if (atomic_compare_exchange_weak_explicit(&q->head, &head, head + 1, | ||
| memory_order_relaxed, | ||
| memory_order_relaxed)) { | ||
| data = cell->data; | ||
|
|
||
| /* Mark slot empty for next generation (pos + size) */ | ||
| atomic_store_explicit(&cell->sequence, head + SPMC_QUEUE_SIZE, memory_order_release); | ||
| return data; | ||
| } |
There was a problem hiding this comment.
What if we do relaxed load first and then only acquire on successful claim? I believe that would help under high contention
UPD: Tried locally, was not beneficial for SET load
| typedef struct spmcCell { | ||
| _Alignas(CACHE_LINE_SIZE) atomic_size_t sequence; | ||
| void *data; | ||
| } spmcCell; |
There was a problem hiding this comment.
Why do we need alignment here? Wouldn't it be better to multiple cells in one cache line? Or we can even separate hot/colder paths here, embed _Alignas(CACHE_LINE_SIZE) atomic_size_t *sequences; into spmcQueue and separately add _Alignas(CACHE_LINE_SIZE) void **data; WDYT?
There was a problem hiding this comment.
My understanding is the intention behind this was reduce cache line contention. I can dig in further, but I suspect that reverting this can result in more CPU cache invalidation and can degrade performance.
I think your concern is valid, if we are losing 48 bytes per cell to internal fragmentation and the queue size is set to 4096, we are losing nearly 200kb to internal fragmentation.
If this overhead cost is too high, we can evaluate the performance trade off. The memory is only allocated if the feature is turned on.
Co-authored-by: Dan Touitou <dan.touitou@gmail.com> Signed-off-by: Uri Yagelnik <uriy@amazon.com>
2a88211 to
a4b9aac
Compare
There was a problem hiding this comment.
Pull request overview
Redesigns the IO-threading communication model to remove main-thread polling and move to a queue-based architecture (SPMC for main→IO, MPSC for IO→main, and per-thread SPSC inboxes), improving throughput and enabling future IO-thread offloads.
Changes:
- Replace pending-client list polling with an IO→main response queue and main→IO shared work queue, plus per-thread private queues for thread-affine jobs.
- Update IO-thread activation/scaling logic (including a new
io-threads-always-activeconfig and an “ignition” policy). - Add unit tests for the new queue primitives and update test harness configs accordingly.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/support/server.tcl | Switch test config from events-per-io-thread to io-threads-always-active. |
| tests/instances.tcl | Same test-config switch for spawned instances. |
| src/unit/test_io_queues.c | Adds unit and stress tests for SPSC/SPMC/MPSC queue implementations. |
| src/unit/test_files.h | Registers new IO queue unit tests into the unit test suite. |
| src/server.h | Updates server/client structs and APIs for the new IO threading model and new stats/metrics. |
| src/server.c | Routes IO completion handling through processIOThreadsResponses() and hooks IO thread policies into before/after sleep. |
| src/networking.c | Updates client unlink/free semantics for async-close with pending IO and refactors IO read/write completion handlers. |
| src/io_threads.h | Introduces job type enums and new IO thread lifecycle/response APIs. |
| src/io_threads.c | Implements shared/private queues, scaling policies, response processing, and tagged-pointer jobs. |
| src/io_queues.h | Adds lock-free queue implementations used by the new IO threading model. |
| src/config.c | Adds io-threads-always-active config and removes events-per-io-thread. |
| for (int i = 0; i < dequeued_count; i++) { | ||
| client *c; | ||
| int job_type; | ||
| unpack_job(jobs[i], (void *)&c, &job_type); |
There was a problem hiding this comment.
unpack_job() expects a void ** for the output pointer, but this call passes (void *)&c, which is a void * and will generate an incompatible-pointer-type warning (and is easy to misuse). Pass the address with the correct type (e.g., cast to void **) to make the intent and typing safe.
| unpack_job(jobs[i], (void *)&c, &job_type); | |
| unpack_job(jobs[i], (void **)&c, &job_type); |
| static size_t last_sample_time = 0; | ||
| static size_t spmc_size_sum = 0; | ||
| static size_t sample_count = 0; | ||
|
|
||
| /* Scaling Up/Down Policy */ | ||
| if (now - last_sample_time < IO_SAMPLE_RATE_MS) return; | ||
| last_sample_time = now; |
There was a problem hiding this comment.
now is a long long (milliseconds), but last_sample_time is declared as size_t. The subtraction now - last_sample_time mixes signed/unsigned types and can behave unexpectedly (or at least be compiler-warning-prone) on some platforms. Prefer using long long/mstime_t consistently for timestamps here.
There was a problem hiding this comment.
switching datatype of last_sample_time and now to mstime_t
madolson
left a comment
There was a problem hiding this comment.
Most of it seems right to me. A lot of it feels like black magic, I'm not really sure why every optimization was made (or not). I might take another pass tomorrow just to comment where things don't really make sense.
| STATS_METRIC_IO_WAIT, /* IO queue size */ | ||
| STATS_METRIC_MAIN_THREAD_CPU_SYS, /* Main thread CPU sys time */ | ||
| STATS_METRIC_MAIN_THREAD_CPU_USER, /* Main thread CPU user time */ |
There was a problem hiding this comment.
I think there is a broad preference to avoid instantaneous metrics. There was also another PR merged for main thread CPU, so not sure we just need IO queue size.
| * Consumer: IO thread only. | ||
| * ========================================================================== */ | ||
|
|
||
| #define SPSC_QUEUE_SIZE 4096 |
There was a problem hiding this comment.
So we know practically have two SPSC (mutexQueue) and this. I understand they have slightly different guarantees, but it would be preferably to only have 1. @JimB123 thoughts if we can just use one?
There was a problem hiding this comment.
MutexQueue is MPMC. It's designed to be general purpose.
This queue is custom built SPSC for this purpose only. It's not reusable. This is designed to be fast for this one purpose only. AFAIK, there have been no performance comparisons, but I would expect that this custom implementation would be better for this custom purpose.
I think we have a few options:
- Use mutexQueue instead - it's designed to be multi-purpose. But it may reduce performance for this case.
- Try to re-design this new queue to be a faster lockless queue with its own unique API. This might make it more reusable, but also might impact its usefulness for this specific use case.
- Keep it separate, but ensure that the naming conveys that it's custom built for one purpose. Looking at the naming (
io_queues.h) it seems like maybe this is already done.
It's really hard to tell if this is just premature optimization or not. For example, coding it all in a header file doesn't really bring value now that we have LTO. It might just cause code bloat.
There was a problem hiding this comment.
The BIO code is also SPSC, so it seems like we might be using the wrong datatype for that then.
There was a problem hiding this comment.
This queue is custom built SPSC for this purpose only. It's not reusable. This is designed to be fast for this one purpose only.
It's a bold statement. 😄
In the mutexqueue, all operations are protected by a single mutex. That's what makes it MPMC. It's also resizable. It never rejects an insert.
That design seems optimal for async work like lazyfree and forkless, but suboptimal for io-threads communication where we need low latency though the queue.
I see these io-queues as mostly generic, just fixed size. We can say they're optimized for low latency.
I don't see a problem having both in the repo. (But if we really want, I guess we could use a fixed size queue for lazyfree. When it's full, the main thread could do the freeing by itself. For forkless, the main threads would have to wait a bit if the queue gets full. I don't if that would be very bad or not. An aspect is that the memory usage gets bounded.)
Fixing the hard-coded sizes of io-queues
It's a header-only implementation, so we can let the file including it define the size before including it:
// bio.c
#define SPSC_QUEUE_SIZE 1024
#include "queues.h"
// queues.h
#ifndef SPSC_QUEUE_SIZE
#define SPSC_QUEUE_SIZE 4096
#endif| "io_threaded_reads_pending:%lld\r\n", server.stat_io_reads_pending, | ||
| "io_threaded_writes_pending:%lld\r\n", server.stat_io_writes_pending, |
There was a problem hiding this comment.
What exactly are end users supposed to do about this metric?
There was a problem hiding this comment.
Agree, we probably don't need to expose these.
There was a problem hiding this comment.
Discussed in core team. Probably remove the active_io_threads_num, since it changes so much. Then move these to the info_debug section.
| client *c = ln->value; | ||
| if (c->flag.close_asap) { | ||
| /* Already scheduled to close. Count memory as freed and skip. */ | ||
| pending_freed += getClientMemoryUsage(c, NULL); |
There was a problem hiding this comment.
This isn't actually the memory usage we're tracking, it's the value we're computing int he cron. I wonder if it would be a bit simpler to just to zero out this clients contribution to the memory usage bucket. That would still prevent the looping here.
| /* Push data to the queue. Caller must ensure queue is not full via spscIsFull(). | ||
| * @param commit If true, the tail pointer is updated immediately (visible to consumer). | ||
| * If false, only local index is updated (batching). */ |
There was a problem hiding this comment.
| /* Push data to the queue. Caller must ensure queue is not full via spscIsFull(). | |
| * @param commit If true, the tail pointer is updated immediately (visible to consumer). | |
| * If false, only local index is updated (batching). */ | |
| /* Push data to the queue. Caller must ensure queue is not full via spscIsFull(). | |
| * If commit is set to true, the tail pointer is updated immediately (visible to consumer). | |
| * If false, only local index is updated (batching). */ |
Generally we just have text comments, not doxygen style comments.
| */ | ||
|
|
||
| #include "io_threads.h" | ||
| #include "io_queues.h" |
There was a problem hiding this comment.
There is implicit ordering between these two, since io_threads uses inMainThread(). It would be nice to make it so that io_queues was independent so this doesn't break later.
There was a problem hiding this comment.
forward declaration in io_queues.h would solve this problem
| #define JOB_TAG_MASK 0x7 | ||
| #define JOB_PTR_MASK (~(uintptr_t)JOB_TAG_MASK) | ||
|
|
||
| static inline void *pack_job(void *ptr, int type) { | ||
| return (void *)((uintptr_t)ptr | type); | ||
| } |
There was a problem hiding this comment.
This code is now also in fifo.c, would be nice to move this to a helper. Also by convention it would be packJob. (Although I would prefer tagPointer() and untagPointer()).
There was a problem hiding this comment.
Renaming this function to packJob
Regarding moving this to a helper,
looks like @JimB123's fifo mutexQueue uses union based pointer tagging to store the first or last element index in the block.
And, this PR uses this function to tag any generic pointer like client, argv, etc.
But, to promote general reusability, this may be moved to any header file that would cause less header bloat.
There was a problem hiding this comment.
I'm a big fan of functional units with clean APIs.
Tagging pointers (using the extra bits for flags or other purposes) is a pretty low level operation. My expectation is that this should not be exposed beyond the unit. It should be internal, in the C file, and static.
That's exactly what I see above. A small static routine that's internal only, and not exposed. The same is true for the pointer tagging in mutexQueue.
Given that, I don't feel strongly that there is a need to organize common helper functions.
If we did want to create helper functions, the naming and use can get challenging. For example, names like "packJob" would be inappropriate. Also, the use cases themselves can be different - sometimes individual bits are used as flags. Sometimes bits are used together as an integral value. Combinations, etc.
Having private, internal, routines likely allows for more use-case driven naming - better readability.
| #define IO_COOLDOWN_MS 1000 | ||
| #define IO_SAMPLE_RATE_MS 10 | ||
| #define IO_IGNITION_EVENTS 4 | ||
| #define IO_IGNITION_CPU_SYS 30.0 |
There was a problem hiding this comment.
I don't really understand the motivation behind the 30% here. Don't we want to be much closer to 100% to start the second thread? Maybe include some documentation here about how these numbers were picked, I didn't see anything in the top level comment either.
There was a problem hiding this comment.
CPU SYS is kernel CPU time I believe. User space and kernel space CPU time is tracked separately.
I posted another comment about this, but I guess if we use the new used_active_time_main_thread metric instead, it can be more reliable for ignition. That one simply measures how long we work vs wait for work in plain monotonic time, so when this one is close to 100% it means the main thread is spending no time waiting for more work, so we can ignite.
zuiderkwast
left a comment
There was a problem hiding this comment.
Hi Uri! It's a great PR. Sorry for the delay.
This is an incomplete review. I had draft comments on this PR that need be posted.
| /* If threads are not active track main thread CPU time for ignition decision */ | ||
| if (server.active_io_threads_num == 1) { | ||
| static long long last_measurement_time = 0; | ||
| if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */ | ||
| last_measurement_time = current_time; | ||
| struct rusage ru; | ||
| if (getrusage(RUSAGE_THREAD, &ru) == 0) { |
There was a problem hiding this comment.
There is a new metric that's just using monotonic time to measure the time the main thread is trying to work vs waiting for work: #2931. When IO threads are disabled, this is essentially the total time minus the time spent in blocking epoll.
We could use that (possibly transformed into an instantaneous metric) instead of CPU usage for ignition. It is cheaper and I guess it can also be more reliable. CPU usage can be low even if the thread is busy, if we're waiting for blocking IO, if there is preemption happening, DEBUG SLEEP and some other situations.
There was a problem hiding this comment.
I mean we store it as an instantaneous metric internally if you need it over the last 1.6 seconds, but we don't need to expose a new INFO field for it.
|
@valkey-io/core-team We discussed the metrics in the core team meeting. We thought the new fields looked OK and the architecture made sense. Others, please take a look at the overall architecture if you have any concerns. |
|
The on-demand benchmark would not work as the branch has conflicts with unstable and we are checking out |
|
Replaced by #3324 |
Summary
This PR redesigns the IO threading communication model, replacing the inefficient client-list polling approach with a high-performance, lock-free queue architecture. This change improves throughput by 8–17% across various workloads and lays the groundwork for offloading command execution to IO threads in following PRs.
Performance Comparison: Unstable vs New IO Queues
1 Amazon terminology for cluster mode
2 Amazon termonology for standalone mode, i.e. config
cluster-enabled noMotivation
The previous IO model had several limitations that created performance bottlenecks:
The Solution
To address these inefficiencies, this PR replaces the single SPSC queue used currently with three specialized queues to handle communication and load balancing more effectively.
1. Main > IO: Shared Queue (Single Producer Multi Consumer)
Single queue from the main-thread to IO threads.
2. IO > Main: The Response Channel (MPSC Queue)
We replaced the old polling loop with a response queue.
3. MAIN > IO (Thread-Specific): Private Inbox (SPSC Queue)
We kept the existing Single-Producer Single-Consumer (SPSC) queues for tasks that must happen on a specific thread (like freeing memory allocated by that thread). IO threads always check their private inbox before looking at the shared queue.
Changes Required
Async client release
The main thread no longer busy-waits for IO threads to finish with a client. Since the client must be popped from the multi-producer queue before it can be released, clients with pending IO are now marked for asynchronous closure.
eviction clients logic
Updated evictClients() to account for memory pending release (clients marked close_asap). freeClient() now returns a status code (1 for freed, 0 for async-close) to ensure the eviction loop does not over-evict by ignoring memory that is about to be reclaimed.
events-per-io-thread config
Replaced the
events-per-io-threadconfiguration withio-threads-always-active. as we no longer track events, since this config is use only for tests no backward compatibility issue arises.packed job instead of handlers
Jobs are now represented as tagged pointers (using lower 3 bits for job type) instead of separate
{handler, data}structs. This reduces memory overhead and allows jobs to be passed through the queues as single pointers.head caching in spsc queue
The SPSC queue now caches the
headindex on the producer side (head_cache) to avoid frequent atomic loads. The producer only refreshes from the atomicheadwhen the cache indicates the queue might be full, reducing cross-thread cache-line bouncing.deferred commit in SPSC queue.
spscEnqueue()supports batching via acommitflag. Multiple jobs can be enqueued withcommit=false, then flushed with a singlespscCommit()call, reducing atomic operations and cache-line bouncing.rollback on fullness check failure
When
spmcEnqueue()fails due to a full queue, the client state is rolled back (e.g.,io_write_statereset toCLIENT_IDLE). This rollback approach removes the need to call an expensiveisFullcheck before every enqueue, we just attempt the enqueue and revert if it fails.epoll offloading via SPSC at high thread counts.
When
active_io_threads_num > 9, poll jobs are sent to per-thread SPSC queues (round-robin). Since threads check their private queue first, this ensures poll jobs are processed promptly without waiting behind jobs in the shared SPMC queue.avoid offload write before read comes back
Added a check
if (c->io_read_state == CLIENT_PENDING_IO) return C_OKintrySendWriteToIOThreads(). In the previous per-thread SPSC implementation, we could send consecutive read and write jobs for the same client knowing a single thread would handle them in order. With the shared SPMC queue, different threads may pick up the jobs, so we must wait for the read to complete before sending a write to avoid 2 threads handling the same client.removing pending_read_list_node from client and clients_pending_io_read/write lists from server
Removed
pending_read_list_nodefrom theclientstruct andclients_pending_io_read/clients_pending_io_writelists fromvalkeyServer. as the new mpsc eliminates the need for these tracking structures.added inst metrics for pending io jobs
Added
instantaneous_io_pending_jobsmetric viaSTATS_METRIC_IO_WAITto track average queue depth over time.added stat for current active threads number
Added
active_io_threads_numto the INFO stats output for better visibility.added internal inst metric for main-thread cpu (non apple compliant)
Added
STATS_METRIC_MAIN_THREAD_CPU_SYSto track main thread CPU usage viagetrusage(RUSAGE_THREAD). This powers the "ignition" policy, when CPU exceeds 30%, the first IO thread is activated.RUSAGE_THREADis Linux-specific, so macOS falls back to event-count heuristics.added stat for pending read and writes for io
Added
io_threaded_reads_pendingandio_threaded_writes_pendingstats to track how many read/write jobs are currently in-flight to IO threads.added volatile for crashed
Changed
server.crashedfrominttovolatile intto ensure the crash flag is visible across threads immediately, allowing IO threads to detect a crash and stop sending responses back to the main thread to avoid deadlock on crash.Co-authored-by: Dan Touitou dan.touitou@gmail.com
Signed-off-by: Uri Yagelnik uriy@amazon.com