Skip to content

Revert "IO-Threads redesign cleanup work (#3544)"#3756

Merged
madolson merged 2 commits into
valkey-io:unstablefrom
akashkgit:iothreads-revert
May 18, 2026
Merged

Revert "IO-Threads redesign cleanup work (#3544)"#3756
madolson merged 2 commits into
valkey-io:unstablefrom
akashkgit:iothreads-revert

Conversation

@akashkgit

Copy link
Copy Markdown
Contributor

This reverts the commit that was merged as part of the PR #3544 due to a performance regression observed here

@coderabbitai

coderabbitai Bot commented May 18, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

Fixed-size queue infrastructure replaces per-instance sizing with compile-time macros; all queue operations become inline. IO threads integrate these queues, rename handlers, implement CPU-based ignition via RUSAGE_THREAD, and refine free-object job handling. Client eviction now tracks pending-freed memory to prevent over-eviction. Metrics enum splits main-thread active-time into separate CPU sys/user counters.

Changes

Queue Infrastructure and IO Thread Refactoring

Layer / File(s) Summary
Queue infrastructure contracts and fixed-size design
src/queues.h
Queue structs remove per-instance queue_size fields. MPSC, SPMC, and SPSC queues define compile-time *_QUEUE_SIZE and *_QUEUE_MASK macros with power-of-two validation. Init function signatures accept only queue pointers.
Queue operations implementation with fixed sizing and inlining
src/queues.c
All queue operations become inline. MPSC, SPMC, and SPSC enqueue/dequeue/init/free functions replace runtime q->queue_size with fixed *_QUEUE_SIZE and *_QUEUE_MASK constants. Memory ordering and atomics remain preserved.
IO thread header updates: assertions and job handler typedef
src/io_threads.h
Compile-time assertions for JOB_REQ_COUNT and JOB_RES_COUNT are tightened for pointer arithmetic safety. New job_handler function-pointer typedef is added.
Main thread CPU metrics for ignition decisions
src/server.h
instantaneous_metric_type enum replaces STATS_METRIC_MAIN_THREAD_ACTIVE_TIME with separate STATS_METRIC_MAIN_THREAD_CPU_SYS and STATS_METRIC_MAIN_THREAD_CPU_USER for CPU-based ignition.
IO threads: fixed-size queue integration, CPU-based ignition, handler renames
src/io_threads.c
Local queue-size macros are removed. Ignition logic samples CPU sys/user via getrusage(RUSAGE_THREAD, ...) when available, falling back to numevents threshold. Handlers renamed IOThreadPoll and IOThreadFreeArgv. Free-object jobs enqueue the object pointer directly instead of a separate SDS pointer; IO thread side calls decrRefCount instead of zfree. Write-offload reply-buffer header-reset logic is refactored for encoded buffers.
Queue tests updated for fixed sizing
src/unit/test_queues.cpp
Test fixtures remove local queue-size constants and call init functions without size arguments, relying on header-defined fixed sizes.
Build configuration: queues.o reordering
src/Makefile
queues.o is reordered within ENGINE_SERVER_OBJ for proper dependency ordering.

Client Eviction with Pending Memory Accounting

Layer / File(s) Summary
Client eviction with pending_freed accumulator
src/networking.c
evictClients() introduces pending_freed to track client memory scheduled for async closure. Loop condition subtracts pending from active memory before deciding whether to continue evicting. Clients marked close_asap are skipped while their memory is added to pending. Async-freed clients also increment pending via getClientMemoryUsage() instead of assuming immediate freeing.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 22.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main change: reverting a previous IO-Threads redesign cleanup work commit.
Description check ✅ Passed The description clearly explains the purpose of the revert, referencing the specific commit and issue related to the performance regression.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

This reverts commit fdd9039.

Signed-off-by: akash kumar <akumdev@amazon.com>

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/io_threads.c`:
- Around line 129-141: The IOThreadsBeforeSleep hot path currently calls
getrusage(RUSAGE_THREAD) and trackInstantaneousMetric directly (guarded by
RUSAGE_THREAD and server.active_io_threads_num), which adds syscalls/metrics to
every event-loop iteration; remove that sampling from IOThreadsBeforeSleep and
instead implement a non-hot-path periodic sampler (e.g., a new background
function like SampleMainThreadCpuUsage or ScheduleMainThreadCpuSampler) that
calls getrusage(RUSAGE_THREAD) on a timer, updates cached variables (e.g.,
cached_main_thread_sys_us, cached_main_thread_user_us and
cached_main_thread_ts), and have IOThreadsBeforeSleep simply read those cached
values and call trackInstantaneousMetric (or better, let the sampler call
trackInstantaneousMetric) so the event-loop hot path no longer performs
getrusage or metric collection; alternatively revert IOThreadsBeforeSleep to the
previous non-CPU ignition heuristic until a zero-overhead benchmark is
available.

In `@src/networking.c`:
- Around line 6531-6555: The loop can underflow when subtracting pending_freed
(size_t) from server.stat_clients_type_memory[...] if pending_freed > total;
change the loop condition to compute a safe remaining value first (e.g. size_t
total = server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]; size_t remaining = total >
pending_freed ? total - pending_freed : 0;) and then compare remaining >
client_eviction_limit; update any uses that assumed the subtraction in the
condition so pending_freed and getClientMemoryUsage(c, NULL) and freeClient(c)
logic remain unchanged but protected from unsigned underflow.

In `@src/queues.c`:
- Line 12: The exported queue functions (e.g., mpscInit) are defined as
non-static inline in src/queues.c which does not provide an external symbol for
other translation units; remove the inline keyword from all exported queue
function definitions (such as mpscInit and any other public APIs implemented in
this file) so they become normal out-of-line definitions, or alternatively move
their implementations into src/queues.h as static inline — apply the chosen fix
consistently to every exported queue API in this file.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 8dc6a26f-e158-4467-a912-9bf61e3939c2

📥 Commits

Reviewing files that changed from the base of the PR and between 2946807 and 9be2193.

📒 Files selected for processing (8)
  • src/Makefile
  • src/io_threads.c
  • src/io_threads.h
  • src/networking.c
  • src/queues.c
  • src/queues.h
  • src/server.h
  • src/unit/test_queues.cpp

Comment thread src/io_threads.c
Comment on lines +129 to +141
#ifdef RUSAGE_THREAD
/* If threads are not active track main thread CPU time for ignition decision */
if (server.active_io_threads_num == 1) {
static long long last_measurement_time = 0;
if (current_time - last_measurement_time < 50000) return; /* Sample once in 50ms */
last_measurement_time = current_time;
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_ACTIVE_TIME, server.stat_active_time, current_time, 1000000);
struct rusage ru;
if (getrusage(RUSAGE_THREAD, &ru) == 0) {
long long sys_time_us = ru.ru_stime.tv_sec * 1000000LL + ru.ru_stime.tv_usec;
long long user_time_us = ru.ru_utime.tv_sec * 1000000LL + ru.ru_utime.tv_usec;
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_SYS, sys_time_us, current_time, 1000000);
trackInstantaneousMetric(STATS_METRIC_MAIN_THREAD_CPU_USER, user_time_us, current_time, 1000000);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🏁 Script executed:

# Find the IOThreadsBeforeSleep function
rg -A 20 "IOThreadsBeforeSleep" src/io_threads.c | head -40

Repository: valkey-io/valkey

Length of output: 878


🏁 Script executed:

# Check the structure and context of src/io_threads.c around lines 129-141
sed -n '100,160p' src/io_threads.c

Repository: valkey-io/valkey

Length of output: 2564


🏁 Script executed:

# Look for DEVELOPMENT_GUIDE.md to understand the exact coding guideline
find . -name "DEVELOPMENT_GUIDE.md" -o -name "DEVELOPMENT.md" | head -5

Repository: valkey-io/valkey

Length of output: 83


🏁 Script executed:

# Check git history for recent changes to src/io_threads.c
git log --oneline -20 -- src/io_threads.c

Repository: valkey-io/valkey

Length of output: 114


🏁 Script executed:

# Search for other hot-path functions calling this code to understand execution frequency
rg "IOThreadsBeforeSleep" --type c

Repository: valkey-io/valkey

Length of output: 250


🏁 Script executed:

# Read DEVELOPMENT_GUIDE.md to find the hot-path metrics guideline
grep -A 3 -B 3 "hot path" DEVELOPMENT_GUIDE.md

Repository: valkey-io/valkey

Length of output: 42


🏁 Script executed:

# Check the event loop in server.c to understand call frequency
rg -B 5 "IOThreadsBeforeSleep" src/server.c | head -30

Repository: valkey-io/valkey

Length of output: 257


🏁 Script executed:

# Get more git history context around iothreads work
git log --oneline --all | grep -i "iothreads\|rusage" | head -15

Repository: valkey-io/valkey

Length of output: 114


🏁 Script executed:

# Check if there are any benchmark files or performance regression notes in recent commits
git log -5 --name-only | grep -i "benchmark\|perf\|regression"

Repository: valkey-io/valkey

Length of output: 424


🏁 Script executed:

# Check if the RUSAGE_THREAD code is new (look at git blame or check when this ifdef block was added)
git log -p --all -- src/io_threads.c | grep -A 10 "RUSAGE_THREAD" | head -50

Repository: valkey-io/valkey

Length of output: 42


🏁 Script executed:

# Read DEVELOPMENT_GUIDE.md to find the hot-path metrics guideline
cat DEVELOPMENT_GUIDE.md | grep -A 5 -B 5 "metric"

Repository: valkey-io/valkey

Length of output: 42


🏁 Script executed:

# Also check for the exact phrase from the coding guidelines provided
cat DEVELOPMENT_GUIDE.md | grep -i "zero-overhead"

Repository: valkey-io/valkey

Length of output: 42


🏁 Script executed:

# Get the full content of DEVELOPMENT_GUIDE.md if it's small enough
wc -l DEVELOPMENT_GUIDE.md

Repository: valkey-io/valkey

Length of output: 84


🏁 Script executed:

# Check when RUSAGE_THREAD support was added to this file
git log --all --pretty=format:"%h %s" -- src/io_threads.c | head -20

Repository: valkey-io/valkey

Length of output: 1282


🏁 Script executed:

# Check the current branch and recent work on iothreads
git log --oneline -10

Repository: valkey-io/valkey

Length of output: 114


🏁 Script executed:

# Read the full DEVELOPMENT_GUIDE.md
cat DEVELOPMENT_GUIDE.md

Repository: valkey-io/valkey

Length of output: 4863


🏁 Script executed:

# Check if RUSAGE_THREAD is a new addition by looking at the diff in recent commits
git show fdd9039 -- src/io_threads.c | grep -A 20 "RUSAGE_THREAD" | head -40

Repository: valkey-io/valkey

Length of output: 1576


🏁 Script executed:

# Check the merge commit context to understand the revert situation
git show 9be2193 --stat | head -30

Repository: valkey-io/valkey

Length of output: 1753


🏁 Script executed:

# Check the current state of lines 129-141 in src/io_threads.c
sed -n '120,155p' src/io_threads.c

Repository: valkey-io/valkey

Length of output: 1629


🏁 Script executed:

# Also search for server.stat_active_time to see if that's being used instead
rg "stat_active_time" src/io_threads.c

Repository: valkey-io/valkey

Length of output: 42


Remove getrusage() CPU sampling from the event-loop hot path.

The code adds a syscall and two metric-tracking calls to IOThreadsBeforeSleep(), which is called on every event-loop iteration. Even with 50ms sampling, this violates the coding guideline: "Do not add new metrics on hot paths without providing zero-overhead proof." Move this sampling to a non-hot-path periodic task and consume the cached result here, or revert to the existing non-CPU ignition heuristic until benchmarks demonstrate zero-overhead impact.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/io_threads.c` around lines 129 - 141, The IOThreadsBeforeSleep hot path
currently calls getrusage(RUSAGE_THREAD) and trackInstantaneousMetric directly
(guarded by RUSAGE_THREAD and server.active_io_threads_num), which adds
syscalls/metrics to every event-loop iteration; remove that sampling from
IOThreadsBeforeSleep and instead implement a non-hot-path periodic sampler
(e.g., a new background function like SampleMainThreadCpuUsage or
ScheduleMainThreadCpuSampler) that calls getrusage(RUSAGE_THREAD) on a timer,
updates cached variables (e.g., cached_main_thread_sys_us,
cached_main_thread_user_us and cached_main_thread_ts), and have
IOThreadsBeforeSleep simply read those cached values and call
trackInstantaneousMetric (or better, let the sampler call
trackInstantaneousMetric) so the event-loop hot path no longer performs
getrusage or metric collection; alternatively revert IOThreadsBeforeSleep to the
previous non-CPU ignition heuristic until a zero-overhead benchmark is
available.

Comment thread src/networking.c
Comment on lines +6531 to +6555
/* Variable to track memory of clients marked for close but not yet freed */
size_t pending_freed = 0;

while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
pending_freed >
client_eviction_limit) {
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
if (c->flag.close_asap) {
/* Already scheduled to close. Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
if (freeClient(c)) server.stat_evictedclients++;
sdsfree(ci);
server.stat_evictedclients++;

if (freeClient(c) == 0) {
/* Protected client (async close). Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Avoid unsigned underflow in the pending-memory check.

Line 6534 subtracts pending_freed from a size_t total, but pending_freed is populated from getClientMemoryUsage() while the left-hand side comes from server.stat_clients_type_memory[...]. Those are not guaranteed to stay identical, so pending_freed can exceed the tracked sum and wrap the subtraction, causing over-eviction.

Suggested fix
-    size_t pending_freed = 0;
+    size_t pending_freed = 0;

-    while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
-               server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
-               pending_freed >
-           client_eviction_limit) {
+    while (1) {
+        size_t tracked_mem = server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
+                             server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB];
+        size_t effective_mem = tracked_mem > pending_freed ?
+                               tracked_mem - pending_freed : 0;
+        if (effective_mem <= client_eviction_limit) break;
+
         listNode *ln = listNext(&bucket_iter);
         if (ln) {
             client *c = ln->value;
             if (c->flag.close_asap) {
                 /* Already scheduled to close. Count memory as freed and skip. */
-                pending_freed += getClientMemoryUsage(c, NULL);
+                pending_freed += c->last_memory_usage;
                 continue;
             }
             sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
             serverLog(LL_NOTICE, "Evicting client: %s", ci);
             sdsfree(ci);
             server.stat_evictedclients++;

             if (freeClient(c) == 0) {
                 /* Protected client (async close). Count memory as freed and skip. */
-                pending_freed += getClientMemoryUsage(c, NULL);
+                pending_freed += c->last_memory_usage;
                 continue;
             }
         } else {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Variable to track memory of clients marked for close but not yet freed */
size_t pending_freed = 0;
while (server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] >
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB] -
pending_freed >
client_eviction_limit) {
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
if (c->flag.close_asap) {
/* Already scheduled to close. Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
if (freeClient(c)) server.stat_evictedclients++;
sdsfree(ci);
server.stat_evictedclients++;
if (freeClient(c) == 0) {
/* Protected client (async close). Count memory as freed and skip. */
pending_freed += getClientMemoryUsage(c, NULL);
continue;
}
/* Variable to track memory of clients marked for close but not yet freed */
size_t pending_freed = 0;
while (1) {
size_t tracked_mem = server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB];
size_t effective_mem = tracked_mem > pending_freed ?
tracked_mem - pending_freed : 0;
if (effective_mem <= client_eviction_limit) break;
listNode *ln = listNext(&bucket_iter);
if (ln) {
client *c = ln->value;
if (c->flag.close_asap) {
/* Already scheduled to close. Count memory as freed and skip. */
pending_freed += c->last_memory_usage;
continue;
}
sds ci = catClientInfoString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Evicting client: %s", ci);
sdsfree(ci);
server.stat_evictedclients++;
if (freeClient(c) == 0) {
/* Protected client (async close). Count memory as freed and skip. */
pending_freed += c->last_memory_usage;
continue;
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/networking.c` around lines 6531 - 6555, The loop can underflow when
subtracting pending_freed (size_t) from server.stat_clients_type_memory[...] if
pending_freed > total; change the loop condition to compute a safe remaining
value first (e.g. size_t total =
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL] +
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]; size_t remaining = total >
pending_freed ? total - pending_freed : 0;) and then compare remaining >
client_eviction_limit; update any uses that assumed the subtraction in the
condition so pending_freed and getClientMemoryUsage(c, NULL) and freeClient(c)
logic remain unchanged but protected from unsigned underflow.

Comment thread src/queues.c
assert((queue_size & (queue_size - 1)) == 0);
q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * queue_size);
q->queue_size = queue_size;
inline void mpscInit(mpscQueue *q) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

🧩 Analysis chain

🌐 Web query:

In C11/GNU11, if a function is declared in a header and defined as non-static inlineonly in a.c file, does that provide the external definition needed by callers in other translation units, or is a separate non-inline definition required?

💡 Result:

In C11 and GNU11, a non-static inline function definition in a.c file (without the extern keyword) is considered an inline definition [1][2]. An inline definition does not provide an external definition for the function [1][2]. If a function is declared inline in a header and defined as non-static inline in a.c file, that definition is not externally visible to other translation units [1][2]. Consequently, if the compiler chooses not to inline the function in other translation units, the linker will fail to find an external definition, resulting in an undefined reference error [3][4]. To provide the necessary external definition, you must ensure that at least one translation unit provides a non-inline, non-static definition of the function [1][2]. This is typically achieved by: 1. Providing a separate, non-inline definition of the function in a.c file (e.g., by removing the inline keyword from the definition in that specific file) [3][5]. 2. Alternatively, using an explicit extern declaration in one translation unit (e.g., extern inline void func(void);) to force the generation of an external definition [1][2]. Note that GNU C (prior to C99/C11 standards compliance) historically treated the inline keyword differently, but modern GCC defaults to standard C99/C11 semantics unless -fgnu89-inline is specified [6]. Under standard C11 rules, the inline specifier is primarily a hint for optimization and does not automatically generate an externally linkable symbol [3][1].

Citations:


Remove inline from all exported queue function definitions.

These functions are declared in src/queues.h and called from other translation units (e.g., src/unit/test_queues.cpp). Under C11/GNU11 semantics, defining them as non-static inline only in src/queues.c does not provide an external definition to callers in other translation units—the linker will fail with an undefined reference if the compiler chooses not to inline. Either keep them as normal out-of-line definitions (remove inline), or move static inline implementations into src/queues.h.

Example fix
-inline void mpscInit(mpscQueue *q) {
+void mpscInit(mpscQueue *q) {
    q->buffer = (_Atomic(void *) *)zmalloc(sizeof(_Atomic(void *)) * MPSC_QUEUE_SIZE);
    ...
 }

-inline bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
+bool mpscEnqueue(mpscQueue *q, void *data, mpscTicket *ticket) {
    ...
 }

-inline void spmcInit(spmcQueue *q) {
+void spmcInit(spmcQueue *q) {
    ...
 }

-inline void spscInit(spscQueue *q) {
+void spscInit(spscQueue *q) {
    ...
 }

Apply to all exported queue APIs in this file.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline void mpscInit(mpscQueue *q) {
void mpscInit(mpscQueue *q) {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/queues.c` at line 12, The exported queue functions (e.g., mpscInit) are
defined as non-static inline in src/queues.c which does not provide an external
symbol for other translation units; remove the inline keyword from all exported
queue function definitions (such as mpscInit and any other public APIs
implemented in this file) so they become normal out-of-line definitions, or
alternatively move their implementations into src/queues.h as static inline —
apply the chosen fix consistently to every exported queue API in this file.

@codecov

codecov Bot commented May 18, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 88.57143% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 76.77%. Comparing base (2946807) to head (9be2193).

Files with missing lines Patch % Lines
src/io_threads.c 83.33% 4 Missing ⚠️
src/networking.c 55.55% 4 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##           unstable    #3756      +/-   ##
============================================
- Coverage     76.93%   76.77%   -0.17%     
============================================
  Files           162      162              
  Lines         80695    80697       +2     
============================================
- Hits          62083    61954     -129     
- Misses        18612    18743     +131     
Files with missing lines Coverage Δ
src/queues.c 99.32% <100.00%> (-0.03%) ⬇️
src/server.h 100.00% <ø> (ø)
src/unit/test_queues.cpp 99.53% <100.00%> (ø)
src/io_threads.c 75.76% <83.33%> (+0.27%) ⬆️
src/networking.c 91.97% <55.55%> (-0.24%) ⬇️

... and 16 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@madolson madolson merged commit f2fe917 into valkey-io:unstable May 18, 2026
62 checks passed
@github-project-automation github-project-automation Bot moved this to To be backported in Valkey 9.1 Jun 8, 2026
valkeyrie-ops Bot pushed a commit that referenced this pull request Jun 9, 2026
This reverts the
[commit](fdd9039)
that was merged as part of the PR #3544 due to a performance regression
observed [here](#3750)

Signed-off-by: akash kumar <akumdev@amazon.com>
madolson pushed a commit that referenced this pull request Jun 9, 2026
This regression is still present in 9.1 GA as the cherrypick of revert
commit was missed during release.

Re-applies #3544 (reverted in #3756 due to ~20% SET regression) with the
performance fix from #3760.

**Root Cause:** The original #3544 changed
`tryOffloadFreeObjToIOThreads` to only offload the SDS buffer free to IO
threads, freeing the `robj` shell on the main thread. I carried out
profiling for the change and it showed that freeing the `robj` shell on
the main thread became the prime main-thread hotspot (~10% CPU), while
IO threads shifted from doing real `jemalloc` work to spinning idle on
`spmcDequeue`.

**Fix**: Keep `tryOffloadFreeObjToIOThreads` offloading the entire robj
(`decrRefCount`) to the IO thread. Cross-thread `zfree` is safe with
`jemalloc`.

This PR includes all cleanup work from #3544 so - 
- `trySendWriteToIOThreads`: defer clearing `last_header` until after
successful enqueue
- `evictClients`: simplified bookkeeping
- Queue sizes as runtime parameters instead of compile-time macros
- IO ignition policy using `stat_active_time` instead of `getrusage`
- Function renames (`IOThreadFreeArgv` --> `ioThreadFreeArgv`, etc.) and
doc comments

**Benchmark** on (Graviton4 c8gb.metal-48xl):
Config: SET, 128B values, 9 IO threads, pipeline=10, 1600 clients - Same
as Valkey official method
| Version | Throughput |
|---------|-----------|
| Unstable + original #3544 | ~1,554K rps |
| Unstable + this PR | ~2,116K rps |

<details>
<summary>Diff vs original #3544 (perf fix)</summary>

```diff
diff --git a/src/io_threads.c b/src/io_threads.c
--- a/src/io_threads.c
+++ b/src/io_threads.c
@@ // IO thread handler
 case JOB_REQ_FREE_OBJ:
-    zfree(data);
+    decrRefCount(data);
     break;

@@ // tryOffloadFreeObjToIOThreads
-    /* We offload only the free of the ptr that may be allocated by the I/O thread.
-     * The object itself was allocated by the main thread and will be freed by the main thread. */
-    void *job = tagJob(sdsAllocPtr(objectGetVal(obj)), JOB_REQ_FREE_OBJ);
+    void *job = tagJob(obj, JOB_REQ_FREE_OBJ);
     if (unlikely(spmcEnqueue(&io_shared_inbox, job) == false)) return C_ERR;
-    objectSetVal(obj, NULL);
-    decrRefCount(obj);
     io_jobs_submitted++;
```
</details>

---------

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
valkeyrie-ops Bot pushed a commit that referenced this pull request Jun 10, 2026
This reverts the
[commit](fdd9039)
that was merged as part of the PR #3544 due to a performance regression
observed [here](#3750)

Signed-off-by: akash kumar <akumdev@amazon.com>
valkeyrie-ops Bot pushed a commit that referenced this pull request Jun 12, 2026
This reverts the
[commit](fdd9039)
that was merged as part of the PR #3544 due to a performance regression
observed [here](#3750)

Signed-off-by: akash kumar <akumdev@amazon.com>
@sarthakaggarwal97 sarthakaggarwal97 moved this from To be backported to Done in Valkey 9.1 Jun 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

4 participants