Skip to content

Commit af961be

Browse files
committed
[MOD-12694] [MOD-12069] Add active_coord_threads metric (#7546)
* Add multi-threading statistics tracking for active I/O threads * fix comment * add cpp test * fix spelling * address comment * remove unnecessary nre line * add "active_worker_threads" metric * fix comment imp tests * test cleanups * add test comne about the num queries and cleanups * fix declartion * remove coord threads * add active_coord_threads expose ConcurrentSearchPool_WorkingThreadCount * make the tests run... * add "active_worker_threads" metric * fix declartion * remove coord threads * make the tests run... * introduce workersThreadPool_isInitialized assert is initizlied in GlobalStats_GetMultiThreadingStats * cleanup * rename workersThreadPool_isCreated * introduce ConcurrentSearchPool_IsCreated * fix test * we dont need workers * remove ConcurrentSearchPool_IsCreated and workersThreadPool_isInitialized * fix merge (cherry picked from commit b0fc4ea)
1 parent 6bbb4e8 commit af961be

9 files changed

Lines changed: 38 additions & 9 deletions

File tree

src/concurrent_ctx.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,14 @@ void ConcurrentSearch_ThreadPoolRun(void (*func)(void *), void *arg, int type) {
5454
redisearch_thpool_add_work(p, func, arg, THPOOL_PRIORITY_HIGH);
5555
}
5656

57+
/* return number of currently working threads */
58+
size_t ConcurrentSearchPool_WorkingThreadCount() {
59+
RS_ASSERT(threadpools_g);
60+
// Assert we only have 1 pool
61+
RS_LOG_ASSERT(array_len(threadpools_g) == 1, "assuming 1 ConcurrentSearch pool");
62+
return redisearch_thpool_num_jobs_in_progress(threadpools_g[0]);
63+
}
64+
5765
static void threadHandleCommand(void *p) {
5866
ConcurrentCmdCtx *ctx = p;
5967

src/concurrent_ctx.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@
1414
#include "thpool/thpool.h"
1515
#include "util/references.h"
1616

17+
#ifdef __cplusplus
18+
extern "C" {
19+
#endif
20+
1721
/** Concurrent Search Execution Context.
1822
*/
1923

@@ -28,6 +32,9 @@ int ConcurrentSearch_CreatePool(int numThreads);
2832
/* Run a function on the concurrent thread pool */
2933
void ConcurrentSearch_ThreadPoolRun(void (*func)(void *), void *arg, int type);
3034

35+
/* return number of currently working threads */
36+
size_t ConcurrentSearchPool_WorkingThreadCount();
37+
3138
struct ConcurrentCmdCtx;
3239
typedef void (*ConcurrentCmdHandler)(RedisModuleCtx *, RedisModuleString **, int,
3340
struct ConcurrentCmdCtx *);
@@ -54,5 +61,7 @@ WeakRef ConcurrentCmdCtx_GetWeakRef(struct ConcurrentCmdCtx *cctx);
5461
int ConcurrentSearch_HandleRedisCommandEx(int poolType, ConcurrentCmdHandler handler,
5562
RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
5663
WeakRef spec_ref);
57-
64+
#ifdef __cplusplus
65+
}
5866
#endif
67+
#endif // RS_CONCERRNT_CTX_

src/info/global_stats.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "util/units.h"
1212
#include "rs_wall_clock.h"
1313
#include "util/workers.h"
14+
#include "concurrent_ctx.h"
1415

1516
#define INCR_BY(x,y) __atomic_add_fetch(&(x), (y), __ATOMIC_RELAXED)
1617
#define INCR(x) INCR_BY(x, 1)
@@ -182,7 +183,7 @@ void GlobalStats_UpdateActiveIoThreads(int toAdd) {
182183
MultiThreadingStats GlobalStats_GetMultiThreadingStats() {
183184
MultiThreadingStats stats;
184185
stats.active_io_threads = READ(RSGlobalStats.totalStats.multi_threading.active_io_threads);
185-
RS_ASSERT(workersThreadPool_isCreated()); // In production workers threadpool is created at startup.
186186
stats.active_worker_threads = workersThreadPool_WorkingThreadCount();
187+
stats.active_coord_threads = ConcurrentSearchPool_WorkingThreadCount();
187188
return stats;
188189
}

src/info/global_stats.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ typedef struct {
7373
typedef struct {
7474
size_t active_io_threads; // number of I/O thread callbacks currently executing
7575
size_t active_worker_threads; // number of worker threads currently executing jobs
76+
size_t active_coord_threads; // number of coordinator threads currently executing jobs
7677
} MultiThreadingStats;
7778

7879
typedef struct {

src/info/info_redis/info_redis.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ void AddToInfo_MultiThreading(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_i
279279
MultiThreadingStats stats = GlobalStats_GetMultiThreadingStats();
280280
RedisModule_InfoAddFieldULongLong(ctx, "active_io_threads", stats.active_io_threads);
281281
RedisModule_InfoAddFieldULongLong(ctx, "active_worker_threads", stats.active_worker_threads);
282+
RedisModule_InfoAddFieldULongLong(ctx, "active_coord_threads", stats.active_coord_threads);
282283
}
283284

284285
void AddToInfo_Dialects(RedisModuleInfoCtx *ctx) {

src/util/workers.c

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,3 @@ void workersThreadPool_wait() {
205205
}
206206
redisearch_thpool_wait(_workers_thpool);
207207
}
208-
209-
bool workersThreadPool_isCreated() {
210-
return _workers_thpool != NULL;
211-
}

src/util/workers.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,5 +63,3 @@ int workersThreadPool_resume();
6363
thpool_stats workersThreadPool_getStats();
6464

6565
void workersThreadPool_wait();
66-
67-
bool workersThreadPool_isCreated();

tests/cpptests/coord_tests/test_cpp_io_runtime_ctx.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "rmutil/rm_assert.h"
1515
#include "redismodule.h"
1616
#include "info/global_stats.h"
17+
#include "concurrent_ctx.h"
1718
#include "common.h"
1819
#include <unistd.h>
1920
#include <atomic>
@@ -194,6 +195,9 @@ TEST_F(IORuntimeCtxCommonTest, ShutdownWithPendingRequests) {
194195
TEST_F(IORuntimeCtxCommonTest, ActiveIoThreadsMetric) {
195196
// Test that the active_io_threads metric is tracked correctly
196197

198+
// Create ConcurrentSearch required to call GlobalStats_GetMultiThreadingStats
199+
ConcurrentSearch_CreatePool(1);
200+
197201
// Phase 1: Verify metric starts at 0
198202
MultiThreadingStats stats = GlobalStats_GetMultiThreadingStats();
199203
ASSERT_EQ(stats.active_io_threads, 0) << "active_io_threads should start at 0";
@@ -241,4 +245,7 @@ TEST_F(IORuntimeCtxCommonTest, ActiveIoThreadsMetric) {
241245
});
242246

243247
ASSERT_TRUE(success) << "Timeout waiting for active_io_threads to return to 0, current value: " << stats.active_io_threads;
248+
249+
// Free ConcurrentSearch and WorkersPool
250+
ConcurrentSearch_ThreadPoolDestroy();
244251
}

tests/pytests/test_info_modules.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1659,12 +1659,14 @@ def _test_active_worker_threads(env, num_queries):
16591659
for i in range(10):
16601660
conn.execute_command('HSET', f'doc{i}', 'n', i)
16611661

1662-
# Verify active_worker_threads starts at 0
1662+
# Verify active_worker_threads and coord threads start at 0
16631663
multi_threading_section = f'{SEARCH_PREFIX}multi_threading'
16641664
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
16651665
info_dict = info_modules_to_dict(con)
16661666
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_worker_threads'], '0',
16671667
message=f"shard {i}: active_worker_threads should be 0 when idle")
1668+
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_coord_threads'], '0',
1669+
message=f"shard {i}: active_coord_threads should be 0 when idle")
16681670

16691671
# Define callback for testing a specific query type
16701672
def _test_query_type(query_type):
@@ -1696,6 +1698,12 @@ def _test_query_type(query_type):
16961698
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_worker_threads'], str(num_queries),
16971699
message=f"shard {i}: {query_type}: active_worker_threads should be {num_queries} when {num_queries} queries are paused")
16981700

1701+
# If this is cluster, and FT.AGGREGATE, verify active_coord_threads == num_queries
1702+
if env.isCluster() and query_type == 'FT.AGGREGATE':
1703+
info_dict = info_modules_to_dict(env)
1704+
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_coord_threads'], str(num_queries),
1705+
message=f"coordinator: {query_type}: active_coord_threads should be {num_queries} when {num_queries} queries are paused")
1706+
16991707
# Resume all queries
17001708
allShards_setPauseRPResume(env)
17011709

0 commit comments

Comments
 (0)