Skip to content

Commit becf7ba

Browse files
committed
[MOD-12069] [MOD-12695] Add active_io_threads metric (#7530)
* Add multi-threading statistics tracking for active I/O threads * fix comment * add cpp test * fix spelling * address comment * remove unnecessary nre line * test cleanups * Include unistd.h in common.h Added unistd.h header for POSIX compliance. (cherry picked from commit 8bbdafa)
1 parent a4993de commit becf7ba

7 files changed

Lines changed: 172 additions & 1 deletion

File tree

src/coord/rmr/io_runtime_ctx.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "cluster.h"
1717
#include <rmutil/rm_assert.h> // Include the assertion header
1818
#include "../config.h"
19+
#include "info/global_stats.h"
1920

2021
// Atomically exchange the pending topology with a new topology.
2122
// Returns the old pending topology (or NULL if there was no pending topology).
@@ -49,7 +50,9 @@ static void rqAsyncCb(uv_async_t *async) {
4950
}
5051
queueItem *req;
5152
while (NULL != (req = RQ_Pop(io_runtime_ctx->queue, &io_runtime_ctx->uv_runtime.async))) {
53+
GlobalStats_UpdateActiveIoThreads(1);
5254
req->cb(req->privdata);
55+
GlobalStats_UpdateActiveIoThreads(-1);
5356
rm_free(req);
5457
}
5558
}

src/info/global_stats.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,3 +123,22 @@ void QueryErrorsGlobalStats_UpdateError(QueryErrorCode code, int toAdd, bool coo
123123
break;
124124
}
125125
}
126+
127+
// Update the number of active io threads.
128+
void GlobalStats_UpdateActiveIoThreads(int toAdd) {
129+
#ifdef ENABLE_ASSERT
130+
RS_LOG_ASSERT(toAdd != 0, "Attempt to change active_io_threads by 0");
131+
size_t current = READ(RSGlobalStats.totalStats.multi_threading.active_io_threads);
132+
RS_LOG_ASSERT_FMT(toAdd > 0 || current > 0,
133+
"Cannot decrease active_io_threads below 0. toAdd: %d, current: %zu", toAdd, current);
134+
#endif
135+
INCR_BY(RSGlobalStats.totalStats.multi_threading.active_io_threads, toAdd);
136+
}
137+
138+
// Get the number of active io threads.
139+
// Get multiThreadingStats
140+
MultiThreadingStats GlobalStats_GetMultiThreadingStats() {
141+
MultiThreadingStats stats;
142+
stats.active_io_threads = READ(RSGlobalStats.totalStats.multi_threading.active_io_threads);
143+
return stats;
144+
}

src/info/global_stats.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
#include "spec.h"
1212
#include "rs_wall_clock.h"
1313

14+
#ifdef __cplusplus
15+
extern "C" {
16+
#endif
1417
#define DIALECT_OFFSET(d) (1ULL << (d - MIN_DIALECT_VERSION))// offset of the d'th bit. begins at MIN_DIALECT_VERSION (bit 0) up to MAX_DIALECT_VERSION.
1518
#define GET_DIALECT(barr, d) (!!(barr & DIALECT_OFFSET(d))) // return the truth value of the d'th dialect in the dialect bitarray.
1619
#define SET_DIALECT(barr, d) (barr |= DIALECT_OFFSET(d)) // set the d'th dialect in the dialect bitarray to true.
@@ -58,11 +61,16 @@ typedef struct {
5861
QueryErrorsGlobalStats coord_errors; // Coordinator query errors statistics
5962
} QueriesGlobalStats;
6063

64+
typedef struct {
65+
size_t active_io_threads; // number of I/O thread callbacks currently executing
66+
} MultiThreadingStats;
67+
6168
typedef struct {
6269
QueriesGlobalStats queries; // Queries statistics. values should be fetched by calling `TotalGlobalStats_GetQueryStats`, otherwise not safe.
6370
uint_least8_t used_dialects; // bitarray of dialects used by all indices
6471
size_t logically_deleted; // Number of logically deleted documents in all indices
6572
// (i.e., marked with DELETED flag but their memory was not yet cleaned by the GC)
73+
MultiThreadingStats multi_threading;
6674
} TotalGlobalStats;
6775

6876
// The global stats object type
@@ -121,3 +129,13 @@ size_t IndexesGlobalStats_GetLogicallyDeletedDocs();
121129
* `toAdd` can be negative to decrease the counter.
122130
*/
123131
void QueryErrorsGlobalStats_UpdateError(QueryErrorCode error, int toAdd, bool coord);
132+
133+
// Update the number of active io threads.
134+
void GlobalStats_UpdateActiveIoThreads(int toAdd);
135+
136+
// Get multiThreadingStats
137+
MultiThreadingStats GlobalStats_GetMultiThreadingStats();
138+
139+
#ifdef __cplusplus
140+
}
141+
#endif

src/info/info_redis/info_redis.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ static inline void AddToInfo_Cursors(RedisModuleInfoCtx *ctx);
2828
static inline void AddToInfo_GC(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_info);
2929
static inline void AddToInfo_Queries(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_info);
3030
static inline void AddToInfo_ErrorsAndWarnings(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_info);
31+
static inline void AddToInfo_MultiThreading(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_info);
3132
static inline void AddToInfo_Dialects(RedisModuleInfoCtx *ctx);
3233
static inline void AddToInfo_RSConfig(RedisModuleInfoCtx *ctx);
3334
static inline void AddToInfo_BlockedQueries(RedisModuleInfoCtx *ctx);
@@ -74,6 +75,9 @@ void RS_moduleInfoFunc(RedisModuleInfoCtx *ctx, int for_crash_report) {
7475
// Errors statistics
7576
AddToInfo_ErrorsAndWarnings(ctx, &total_info);
7677

78+
// Multi threading statistics
79+
AddToInfo_MultiThreading(ctx, &total_info);
80+
7781
// Dialect statistics
7882
AddToInfo_Dialects(ctx);
7983

@@ -259,6 +263,12 @@ void AddToInfo_ErrorsAndWarnings(RedisModuleInfoCtx *ctx, TotalIndexesInfo *tota
259263
RedisModule_InfoAddFieldULongLong(ctx, "coord_total_query_errors_arguments", stats.coord_errors.arguments);
260264
}
261265

266+
void AddToInfo_MultiThreading(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_info) {
267+
RedisModule_InfoAddSection(ctx, "multi_threading");
268+
MultiThreadingStats stats = GlobalStats_GetMultiThreadingStats();
269+
RedisModule_InfoAddFieldULongLong(ctx, "active_io_threads", stats.active_io_threads);
270+
}
271+
262272
void AddToInfo_Dialects(RedisModuleInfoCtx *ctx) {
263273
RedisModule_InfoAddSection(ctx, "dialect_statistics");
264274
for (int dialect = MIN_DIALECT_VERSION; dialect <= MAX_DIALECT_VERSION; ++dialect) {

tests/cpptests/common.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@
1414
#include "spec.h"
1515
#include "document.h"
1616

17+
#ifdef __cplusplus
18+
#include <chrono>
19+
#include <functional>
20+
#include <sstream>
21+
#include <unistd.h>
22+
#endif
23+
1724
#define __ignore__(X) \
1825
do { \
1926
int rc = (X); \
@@ -73,4 +80,38 @@ IndexSpec *createIndex(RedisModuleCtx *ctx, const char *name, Ts... args) {
7380
std::vector<std::string> search(RSIndex *index, RSQueryNode *qn);
7481
std::vector<std::string> search(RSIndex *index, const char *s);
7582

83+
/**
84+
* @brief Wait for a condition to become true with a timeout.
85+
*
86+
* This function polls the condition at regular intervals until it becomes true
87+
* or the timeout expires.
88+
*
89+
* @tparam Condition A callable that returns bool (e.g., lambda, function pointer)
90+
* @param condition The condition to wait for (should return true when satisfied)
91+
* @param timeout_s Timeout in seconds (default: 30s)
92+
* @param poll_interval_us Polling interval in microseconds (default: 100us)
93+
* @return true if condition became true before timeout, false if timeout expired
94+
*
95+
* Example usage:
96+
* bool success = WaitForCondition([&]() { return counter == 0; }, 300);
97+
* ASSERT_TRUE(success) << "Timeout waiting for counter to reach 0";
98+
*
99+
*/
100+
template<typename Condition>
101+
bool WaitForCondition(Condition condition,
102+
int timeout_s = 30,
103+
int poll_interval_us = 100) {
104+
auto start = std::chrono::steady_clock::now();
105+
auto timeout = std::chrono::seconds(timeout_s);
106+
107+
while (!condition()) {
108+
auto elapsed = std::chrono::steady_clock::now() - start;
109+
if (elapsed > timeout) {
110+
return false; // Timeout
111+
}
112+
usleep(poll_interval_us);
113+
}
114+
return true; // Success
115+
}
116+
76117
} // namespace RS

tests/cpptests/coord_tests/test_cpp_io_runtime_ctx.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
#include "rmutil/alloc.h"
1414
#include "rmutil/rm_assert.h"
1515
#include "redismodule.h"
16+
#include "info/global_stats.h"
17+
#include "common.h"
1618
#include <unistd.h>
19+
#include <atomic>
1720

1821
// Test callback for queue operations
1922
static void testCallback(void *privdata) {
@@ -187,3 +190,55 @@ TEST_F(IORuntimeCtxCommonTest, ShutdownWithPendingRequests) {
187190
// Verify all requests were processed despite shutdown
188191
ASSERT_EQ(counter, 11);
189192
}
193+
194+
TEST_F(IORuntimeCtxCommonTest, ActiveIoThreadsMetric) {
195+
// Test that the active_io_threads metric is tracked correctly
196+
197+
// Phase 1: Verify metric starts at 0
198+
MultiThreadingStats stats = GlobalStats_GetMultiThreadingStats();
199+
ASSERT_EQ(stats.active_io_threads, 0) << "active_io_threads should start at 0";
200+
201+
// Phase 2: Schedule a callback that sleeps, and verify metric increases
202+
struct CallbackFlags {
203+
std::atomic<bool> started{false};
204+
std::atomic<bool> should_finish{false};
205+
};
206+
207+
CallbackFlags flags;
208+
209+
auto slowCallback = [](void *privdata) {
210+
auto *flags = (CallbackFlags *)privdata;
211+
flags->started.store(true);
212+
213+
// Wait until test tells us to finish
214+
while (!flags->should_finish.load()) {
215+
usleep(100); // 100us
216+
}
217+
};
218+
219+
// Schedule the slow callback - this will start the IO runtime automatically
220+
IORuntimeCtx_Schedule(ctx, slowCallback, &flags);
221+
222+
// Mark the IO runtime as ready to process callbacks
223+
ctx->uv_runtime.loop_th_ready = true;
224+
225+
// Wait for callback to start
226+
while (!flags.started.load()) {
227+
usleep(100); // 100us
228+
}
229+
230+
// Now the callback is executing - check that active_io_threads > 0
231+
stats = GlobalStats_GetMultiThreadingStats();
232+
ASSERT_EQ(stats.active_io_threads, 1) << "active_io_threads should be > 0 while callback is executing";
233+
234+
// Tell callback to finish
235+
flags.should_finish.store(true);
236+
237+
// Phase 3: Wait for metric to return to 0 with timeout
238+
bool success = RS::WaitForCondition([&]() {
239+
stats = GlobalStats_GetMultiThreadingStats();
240+
return stats.active_io_threads == 0;
241+
});
242+
243+
ASSERT_TRUE(success) << "Timeout waiting for active_io_threads to return to 0, current value: " << stats.active_io_threads;
244+
}

tests/pytests/test_info_modules.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
from inspect import currentframe
55
import numpy as np
66

7-
87
def info_modules_to_dict(conn):
98
res = conn.execute_command('INFO MODULES')
109
info = dict()
@@ -947,3 +946,29 @@ def test_errors_and_warnings_init(env):
947946
for metric in [WARN_ERR_SECTION, COORD_WARN_ERR_SECTION]:
948947
for field in info_dict[metric]:
949948
env.assertEqual(info_dict[metric][field], '0')
949+
950+
# @skip(cluster=False)
951+
def test_active_io_threads_stats(env):
952+
conn = getConnectionByEnv(env)
953+
# Setup: Create index with some data
954+
env.expect('FT.CREATE', 'idx', 'SCHEMA', 'name', 'TEXT', 'age', 'NUMERIC').ok()
955+
for i in range(10):
956+
conn.execute_command('HSET', f'doc{i}', 'name', f'name{i}', 'age', i)
957+
958+
# Phase 1: Verify multi_threading section exists and active_io_threads starts at 0
959+
info_dict = info_modules_to_dict(env)
960+
961+
# Verify multi_threading section exists
962+
multi_threading_section = f'{SEARCH_PREFIX}multi_threading'
963+
env.assertTrue(multi_threading_section in info_dict,
964+
message="multi_threading section should exist in INFO MODULES")
965+
966+
# Verify all expected fields exist
967+
env.assertTrue(f'{SEARCH_PREFIX}active_io_threads' in info_dict[multi_threading_section],
968+
message="active_io_threads field should exist in multi_threading section")
969+
970+
# Verify all fields initialized to 0.
971+
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_io_threads'], '0',
972+
message="active_io_threads should be 0 when idle")
973+
# There's no deterministic way to test active_io_threads increases while a query is running,
974+
# we test it in unit tests.

0 commit comments

Comments
 (0)