Revert "IO-Threads redesign cleanup work (#3544)"#3756
Conversation
📝 WalkthroughWalkthroughFixed-size queue infrastructure replaces per-instance sizing with compile-time macros; all queue operations become ChangesQueue Infrastructure and IO Thread Refactoring
Client Eviction with Pending Memory Accounting
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
This reverts commit fdd9039. Signed-off-by: akash kumar <akumdev@amazon.com>
3f55f69 to
a828e50
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/io_threads.c`:
- Around line 129-141: The IOThreadsBeforeSleep hot path currently calls
getrusage(RUSAGE_THREAD) and trackInstantaneousMetric directly (guarded by
RUSAGE_THREAD and server.active_io_threads_num), which adds syscalls/metrics to
every event-loop iteration; remove that sampling from IOThreadsBeforeSleep and
instead implement a non-hot-path periodic sampler (e.g., a new background
function like SampleMainThreadCpuUsage or ScheduleMainThreadCpuSampler) that
calls getrusage(RUSAGE_THREAD) on a timer, updates cached variables (e.g.,
cached_main_thread_sys_us, cached_main_thread_user_us and
cached_main_thread_ts), and have IOThreadsBeforeSleep simply read those cached
values and call trackInstantaneousMetric (or better, let the sampler call
trackInstantaneousMetric) so the event-loop hot path no longer performs
getrusage or metric collection; alternatively revert IOThreadsBeforeSleep to the
previous non-CPU ignition heuristic until a zero-overhead benchmark is
available.
In `@src/networking.c`:
- Around line 6531-6555: The loop can underflow when subtracting pending_freed
(size_t) from server.stat_clients_type_memory[...] if pending_freed > total;
change the loop condition to compute a safe remaining value first (e.g. size_t
total = server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]; size_t remaining = total >
pending_freed ? total - pending_freed : 0;) and then compare remaining >
client_eviction_limit; update any uses that assumed the subtraction in the
condition so pending_freed and getClientMemoryUsage(c, NULL) and freeClient(c)
logic remain unchanged but protected from unsigned underflow.
In `@src/queues.c`:
- Line 12: The exported queue functions (e.g., mpscInit) are defined as
non-static inline in src/queues.c which does not provide an external symbol for
other translation units; remove the inline keyword from all exported queue
function definitions (such as mpscInit and any other public APIs implemented in
this file) so they become normal out-of-line definitions, or alternatively move
their implementations into src/queues.h as static inline — apply the chosen fix
consistently to every exported queue API in this file.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 8dc6a26f-e158-4467-a912-9bf61e3939c2
📒 Files selected for processing (8)
src/Makefilesrc/io_threads.csrc/io_threads.hsrc/networking.csrc/queues.csrc/queues.hsrc/server.hsrc/unit/test_queues.cpp
| #ifdef RUSAGE_THREAD | ||
| /* If threads are not active track main thread CPU time for ignition decision */ | ||
| if (server.active_io_threads_num == 1) { | ||
| static long long last_measurement_time = 0; | ||
| if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */ | ||
| last_measurement_time = current_time; | ||
| trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, server.stat_active_time, current_time, 1000000); | ||
| struct rusage ru; | ||
| if (getrusage(RUSAGE_THREAD, &ru) == 0) { | ||
| long long sys_time_us = ru.ru_stime.tv_sec * 1000000LL + ru.ru_stime.tv_usec; | ||
| long long user_time_us = ru.ru_utime.tv_sec * 1000000LL + ru.ru_utime.tv_usec; | ||
| trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS, sys_time_us, current_time, 1000000); | ||
| trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER, user_time_us, current_time, 1000000); | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Find the IOThreadsBeforeSleep function
rg -A 20 "IOThreadsBeforeSleep" src/io_threads.c | head -40Repository: valkey-io/valkey
Length of output: 878
🏁 Script executed:
# Check the structure and context of src/io_threads.c around lines 129-141
sed -n '100,160p' src/io_threads.cRepository: valkey-io/valkey
Length of output: 2564
🏁 Script executed:
# Look for DEVELOPMENT_GUIDE.md to understand the exact coding guideline
find . -name "DEVELOPMENT_GUIDE.md" -o -name "DEVELOPMENT.md" | head -5Repository: valkey-io/valkey
Length of output: 83
🏁 Script executed:
# Check git history for recent changes to src/io_threads.c
git log --oneline -20 -- src/io_threads.cRepository: valkey-io/valkey
Length of output: 114
🏁 Script executed:
# Search for other hot-path functions calling this code to understand execution frequency
rg "IOThreadsBeforeSleep" --type cRepository: valkey-io/valkey
Length of output: 250
🏁 Script executed:
# Read DEVELOPMENT_GUIDE.md to find the hot-path metrics guideline
grep -A 3 -B 3 "hot path" DEVELOPMENT_GUIDE.mdRepository: valkey-io/valkey
Length of output: 42
🏁 Script executed:
# Check the event loop in server.c to understand call frequency
rg -B 5 "IOThreadsBeforeSleep" src/server.c | head -30Repository: valkey-io/valkey
Length of output: 257
🏁 Script executed:
# Get more git history context around iothreads work
git log --oneline --all | grep -i "iothreads\|rusage" | head -15Repository: valkey-io/valkey
Length of output: 114
🏁 Script executed:
# Check if there are any benchmark files or performance regression notes in recent commits
git log -5 --name-only | grep -i "benchmark\|perf\|regression"Repository: valkey-io/valkey
Length of output: 424
🏁 Script executed:
# Check if the RUSAGE_THREAD code is new (look at git blame or check when this ifdef block was added)
git log -p --all -- src/io_threads.c | grep -A 10 "RUSAGE_THREAD" | head -50Repository: valkey-io/valkey
Length of output: 42
🏁 Script executed:
# Read DEVELOPMENT_GUIDE.md to find the hot-path metrics guideline
cat DEVELOPMENT_GUIDE.md | grep -A 5 -B 5 "metric"Repository: valkey-io/valkey
Length of output: 42
🏁 Script executed:
# Also check for the exact phrase from the coding guidelines provided
cat DEVELOPMENT_GUIDE.md | grep -i "zero-overhead"Repository: valkey-io/valkey
Length of output: 42
🏁 Script executed:
# Get the full content of DEVELOPMENT_GUIDE.md if it's small enough
wc -l DEVELOPMENT_GUIDE.mdRepository: valkey-io/valkey
Length of output: 84
🏁 Script executed:
# Check when RUSAGE_THREAD support was added to this file
git log --all --pretty=format:"%h %s" -- src/io_threads.c | head -20Repository: valkey-io/valkey
Length of output: 1282
🏁 Script executed:
# Check the current branch and recent work on iothreads
git log --oneline -10Repository: valkey-io/valkey
Length of output: 114
🏁 Script executed:
# Read the full DEVELOPMENT_GUIDE.md
cat DEVELOPMENT_GUIDE.mdRepository: valkey-io/valkey
Length of output: 4863
🏁 Script executed:
# Check if RUSAGE_THREAD is a new addition by looking at the diff in recent commits
git show fdd9039 -- src/io_threads.c | grep -A 20 "RUSAGE_THREAD" | head -40Repository: valkey-io/valkey
Length of output: 1576
🏁 Script executed:
# Check the merge commit context to understand the revert situation
git show 9be2193 --stat | head -30Repository: valkey-io/valkey
Length of output: 1753
🏁 Script executed:
# Check the current state of lines 129-141 in src/io_threads.c
sed -n '120,155p' src/io_threads.cRepository: valkey-io/valkey
Length of output: 1629
🏁 Script executed:
# Also search for server.stat_active_time to see if that's being used instead
rg "stat_active_time" src/io_threads.cRepository: valkey-io/valkey
Length of output: 42
Remove getrusage() CPU sampling from the event-loop hot path.
The code adds a syscall and two metric-tracking calls to IOThreadsBeforeSleep(), which is called on every event-loop iteration. Even with 50ms sampling, this violates the coding guideline: "Do not add new metrics on hot paths without providing zero-overhead proof." Move this sampling to a non-hot-path periodic task and consume the cached result here, or revert to the existing non-CPU ignition heuristic until benchmarks demonstrate zero-overhead impact.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/io_threads.c` around lines 129 - 141, The IOThreadsBeforeSleep hot path
currently calls getrusage(RUSAGE_THREAD) and trackInstantaneousMetric directly
(guarded by RUSAGE_THREAD and server.active_io_threads_num), which adds
syscalls/metrics to every event-loop iteration; remove that sampling from
IOThreadsBeforeSleep and instead implement a non-hot-path periodic sampler
(e.g., a new background function like SampleMainThreadCpuUsage or
ScheduleMainThreadCpuSampler) that calls getrusage(RUSAGE_THREAD) on a timer,
updates cached variables (e.g., cached_main_thread_sys_us,
cached_main_thread_user_us and cached_main_thread_ts), and have
IOThreadsBeforeSleep simply read those cached values and call
trackInstantaneousMetric (or better, let the sampler call
trackInstantaneousMetric) so the event-loop hot path no longer performs
getrusage or metric collection; alternatively revert IOThreadsBeforeSleep to the
previous non-CPU ignition heuristic until a zero-overhead benchmark is
available.
| /* Variable to track memory of clients marked for close but not yet freed */ | ||
| size_t pending_freed = 0; | ||
|
|
||
| while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + | ||
| server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] > | ||
| server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] - | ||
| pending_freed > | ||
| client_eviction_limit) { | ||
| listNode *ln = listNext(&bucket_iter); | ||
| if (ln) { | ||
| client *c = ln->value; | ||
| if (c->flag.close_asap) { | ||
| /* Already scheduled to close. Count memory as freed and skip. */ | ||
| pending_freed += getClientMemoryUsage(c, NULL); | ||
| continue; | ||
| } | ||
| sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log); | ||
| serverLog(LL_NOTICE, "Evicting client: %s", ci); | ||
| if (freeClient(c)) server.stat_evictedclients++; | ||
| sdsfree(ci); | ||
| server.stat_evictedclients++; | ||
|
|
||
| if (freeClient(c) == 0) { | ||
| /* Protected client (async close). Count memory as freed and skip. */ | ||
| pending_freed += getClientMemoryUsage(c, NULL); | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Avoid unsigned underflow in the pending-memory check.
Line 6534 subtracts pending_freed from a size_t total, but pending_freed is populated from getClientMemoryUsage() while the left-hand side comes from server.stat_clients_type_memory[...]. Those are not guaranteed to stay identical, so pending_freed can exceed the tracked sum and wrap the subtraction, causing over-eviction.
Suggested fix
- size_t pending_freed = 0;
+ size_t pending_freed = 0;
- while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
- server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
- pending_freed >
- client_eviction_limit) {
+ while (1) {
+ size_t tracked_mem = server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
+ server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB];
+ size_t effective_mem = tracked_mem > pending_freed ?
+ tracked_mem - pending_freed : 0;
+ if (effective_mem <= client_eviction_limit) break;
+
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
if (c->flag.close_asap) {
/* Already scheduled to close. Count memory as freed and skip. */
- pending_freed += getClientMemoryUsage(c, NULL);
+ pending_freed += c->last_memory_usage;
continue;
}
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
sdsfree(ci);
server.stat_evictedclients++;
if (freeClient(c) == 0) {
/* Protected client (async close). Count memory as freed and skip. */
- pending_freed += getClientMemoryUsage(c, NULL);
+ pending_freed += c->last_memory_usage;
continue;
}
} else {📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /* Variable to track memory of clients marked for close but not yet freed */ | |
| size_t pending_freed = 0; | |
| while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + | |
| server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] > | |
| server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] - | |
| pending_freed > | |
| client_eviction_limit) { | |
| listNode *ln = listNext(&bucket_iter); | |
| if (ln) { | |
| client *c = ln->value; | |
| if (c->flag.close_asap) { | |
| /* Already scheduled to close. Count memory as freed and skip. */ | |
| pending_freed += getClientMemoryUsage(c, NULL); | |
| continue; | |
| } | |
| sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log); | |
| serverLog(LL_NOTICE, "Evicting client: %s", ci); | |
| if (freeClient(c)) server.stat_evictedclients++; | |
| sdsfree(ci); | |
| server.stat_evictedclients++; | |
| if (freeClient(c) == 0) { | |
| /* Protected client (async close). Count memory as freed and skip. */ | |
| pending_freed += getClientMemoryUsage(c, NULL); | |
| continue; | |
| } | |
| /* Variable to track memory of clients marked for close but not yet freed */ | |
| size_t pending_freed = 0; | |
| while (1) { | |
| size_t tracked_mem = server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] + | |
| server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]; | |
| size_t effective_mem = tracked_mem > pending_freed ? | |
| tracked_mem - pending_freed : 0; | |
| if (effective_mem <= client_eviction_limit) break; | |
| listNode *ln = listNext(&bucket_iter); | |
| if (ln) { | |
| client *c = ln->value; | |
| if (c->flag.close_asap) { | |
| /* Already scheduled to close. Count memory as freed and skip. */ | |
| pending_freed += c->last_memory_usage; | |
| continue; | |
| } | |
| sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log); | |
| serverLog(LL_NOTICE, "Evicting client: %s", ci); | |
| sdsfree(ci); | |
| server.stat_evictedclients++; | |
| if (freeClient(c) == 0) { | |
| /* Protected client (async close). Count memory as freed and skip. */ | |
| pending_freed += c->last_memory_usage; | |
| continue; | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/networking.c` around lines 6531 - 6555, The loop can underflow when
subtracting pending_freed (size_t) from server.stat_clients_type_memory[...] if
pending_freed > total; change the loop condition to compute a safe remaining
value first (e.g. size_t total =
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]; size_t remaining = total >
pending_freed ? total - pending_freed : 0;) and then compare remaining >
client_eviction_limit; update any uses that assumed the subtraction in the
condition so pending_freed and getClientMemoryUsage(c, NULL) and freeClient(c)
logic remain unchanged but protected from unsigned underflow.
| assert((queue_size & (queue_size - 1)) == 0); | ||
| q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * queue_size); | ||
| q->queue_size = queue_size; | ||
| inline void mpscInit(mpscQueue *q) { |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In C11/GNU11, if a function is declared in a header and defined as non-static inlineonly in a.c file, does that provide the external definition needed by callers in other translation units, or is a separate non-inline definition required?
💡 Result:
In C11 and GNU11, a non-static inline function definition in a.c file (without the extern keyword) is considered an inline definition [1][2]. An inline definition does not provide an external definition for the function [1][2]. If a function is declared inline in a header and defined as non-static inline in a.c file, that definition is not externally visible to other translation units [1][2]. Consequently, if the compiler chooses not to inline the function in other translation units, the linker will fail to find an external definition, resulting in an undefined reference error [3][4]. To provide the necessary external definition, you must ensure that at least one translation unit provides a non-inline, non-static definition of the function [1][2]. This is typically achieved by: 1. Providing a separate, non-inline definition of the function in a.c file (e.g., by removing the inline keyword from the definition in that specific file) [3][5]. 2. Alternatively, using an explicit extern declaration in one translation unit (e.g., extern inline void func(void);) to force the generation of an external definition [1][2]. Note that GNU C (prior to C99/C11 standards compliance) historically treated the inline keyword differently, but modern GCC defaults to standard C99/C11 semantics unless -fgnu89-inline is specified [6]. Under standard C11 rules, the inline specifier is primarily a hint for optimization and does not automatically generate an externally linkable symbol [3][1].
Citations:
- 1: https://en.cppreference.com/c/language/inline
- 2: https://www.humprog.org/~stephen/blog/devel/understanding-c99-inlines.html
- 3: https://stackoverflow.com/questions/16245521/c99-inline-function-in-c-file/16245669
- 4: https://stackoverflow.com/questions/62475308/in-c-should-inline-functions-in-headers-be-externed-in-the-c-file
- 5: https://stackoverflow.com/questions/79752803/inline-non-static-function-in-c-source-file
- 6: https://stackoverflow.com/questions/34937816/inline-static-extern-in-c99
Remove inline from all exported queue function definitions.
These functions are declared in src/queues.h and called from other translation units (e.g., src/unit/test_queues.cpp). Under C11/GNU11 semantics, defining them as non-static inline only in src/queues.c does not provide an external definition to callers in other translation units—the linker will fail with an undefined reference if the compiler chooses not to inline. Either keep them as normal out-of-line definitions (remove inline), or move static inline implementations into src/queues.h.
Example fix
-inline void mpscInit(mpscQueue *q) {
+void mpscInit(mpscQueue *q) {
q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * MPSC_QUEUE_SIZE);
...
}
-inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
+bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
...
}
-inline void spmcInit(spmcQueue *q) {
+void spmcInit(spmcQueue *q) {
...
}
-inline void spscInit(spscQueue *q) {
+void spscInit(spscQueue *q) {
...
}Apply to all exported queue APIs in this file.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| inline void mpscInit(mpscQueue *q) { | |
| void mpscInit(mpscQueue *q) { |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/queues.c` at line 12, The exported queue functions (e.g., mpscInit) are
defined as non-static inline in src/queues.c which does not provide an external
symbol for other translation units; remove the inline keyword from all exported
queue function definitions (such as mpscInit and any other public APIs
implemented in this file) so they become normal out-of-line definitions, or
alternatively move their implementations into src/queues.h as static inline —
apply the chosen fix consistently to every exported queue API in this file.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## unstable #3756 +/- ##
============================================
- Coverage 76.93% 76.77% -0.17%
============================================
Files 162 162
Lines 80695 80697 +2
============================================
- Hits 62083 61954 -129
- Misses 18612 18743 +131
🚀 New features to boost your workflow:
|
This regression is still present in 9.1 GA as the cherrypick of revert commit was missed during release. Re-applies #3544 (reverted in #3756 due to ~20% SET regression) with the performance fix from #3760. **Root Cause:** The original #3544 changed `tryOffloadFreeObjToIOThreads` to only offload the SDS buffer free to IO threads, freeing the `robj` shell on the main thread. I carried out profiling for the change and it showed that freeing the `robj` shell on the main thread became the prime main-thread hotspot (~10% CPU), while IO threads shifted from doing real `jemalloc` work to spinning idle on `spmcDequeue`. **Fix**: Keep `tryOffloadFreeObjToIOThreads` offloading the entire robj (`decrRefCount`) to the IO thread. Cross-thread `zfree` is safe with `jemalloc`. This PR includes all cleanup work from #3544 so - - `trySendWriteToIOThreads`: defer clearing `last_header` until after successful enqueue - `evictClients`: simplified bookkeeping - Queue sizes as runtime parameters instead of compile-time macros - IO ignition policy using `stat_active_time` instead of `getrusage` - Function renames (`IOThreadFreeArgv` --> `ioThreadFreeArgv`, etc.) and doc comments **Benchmark** on (Graviton4 c8gb.metal-48xl): Config: SET, 128B values, 9 IO threads, pipeline=10, 1600 clients - Same as Valkey official method | Version | Throughput | |---------|-----------| | Unstable + original #3544 | ~1,554K rps | | Unstable + this PR | ~2,116K rps | <details> <summary>Diff vs original #3544 (perf fix)</summary> ```diff diff --git a/src/io_threads.c b/src/io_threads.c --- a/src/io_threads.c +++ b/src/io_threads.c @@ // IO thread handler case JOB_REQ_FREE_OBJ: - zfree(data); + decrRefCount(data); break; @@ // tryOffloadFreeObjToIOThreads - /* We offload only the free of the ptr that may be allocated by the I/O thread. - * The object itself was allocated by the main thread and will be freed by the main thread. */ - void *job = tagJob(sdsAllocPtr(objectGetVal(obj)), JOB_REQ_FREE_OBJ); + void *job = tagJob(obj, JOB_REQ_FREE_OBJ); if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) return C_ERR; - objectSetVal(obj, NULL); - decrRefCount(obj); io_jobs_submitted++; ``` </details> --------- Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
This reverts the commit that was merged as part of the PR #3544 due to a performance regression observed here