Skip to content

Commit a544a18

Browse files
committed
[MOD-12069] Add *_pending_jobs metrics (#7556)
* align info/* to active_coord * add APIs to get queues length * add to info * fix * test * fix test * catch general error * rename * fix moduleArgs * rename * rename test_active_worker_threads * rename to wworketrs (cherry picked from commit ea0476a)
1 parent a8ecd32 commit a544a18

9 files changed

Lines changed: 246 additions & 26 deletions

File tree

deps/thpool/thpool.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,14 @@ size_t redisearch_thpool_get_num_threads(redisearch_thpool_t *thpool_p) {
539539
return thpool_p->n_threads;
540540
}
541541

542+
size_t redisearch_thpool_high_priority_pending_jobs(redisearch_thpool_t *thpool_p) {
543+
return __atomic_load_n(&(thpool_p->jobqueues.high_priority_jobqueue.len), __ATOMIC_RELAXED);
544+
}
545+
546+
size_t redisearch_thpool_low_priority_pending_jobs(redisearch_thpool_t *thpool_p) {
547+
return __atomic_load_n(&(thpool_p->jobqueues.low_priority_jobqueue.len), __ATOMIC_RELAXED);
548+
}
549+
542550
thpool_stats redisearch_thpool_get_stats(redisearch_thpool_t *thpool_p) {
543551
/* Locking must be done in the following order to prevent deadlocks. */
544552
redisearch_thpool_lock(thpool_p);

deps/thpool/thpool.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,10 @@ thpool_stats redisearch_thpool_get_stats(redisearch_thpool_t *);
275275

276276
size_t redisearch_thpool_get_num_threads(redisearch_thpool_t *);
277277

278+
size_t redisearch_thpool_high_priority_pending_jobs(redisearch_thpool_t *);
279+
280+
size_t redisearch_thpool_low_priority_pending_jobs(redisearch_thpool_t *);
281+
278282
/**
279283
* @brief Schedule a job to reduce the number of threads in the threadpool in an asynchronous manner.
280284
*

src/info/global_stats.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,14 @@ void GlobalStats_UpdateActiveIoThreads(int toAdd) {
141141
MultiThreadingStats GlobalStats_GetMultiThreadingStats() {
142142
MultiThreadingStats stats;
143143
stats.active_io_threads = READ(RSGlobalStats.totalStats.multi_threading.active_io_threads);
144+
145+
// Workers stats
146+
// We don't use workersThreadPool_getStats here to avoid the overhead of locking the thread pool.
144147
stats.active_worker_threads = workersThreadPool_WorkingThreadCount();
148+
stats.workers_low_priority_pending_jobs = workersThreadPool_LowPriorityPendingJobsCount();
149+
stats.workers_high_priority_pending_jobs = workersThreadPool_HighPriorityPendingJobsCount();
150+
151+
// Coordinator stats
145152
stats.active_coord_threads = ConcurrentSearchPool_WorkingThreadCount();
146153
return stats;
147154
}

src/info/global_stats.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ typedef struct {
6565
size_t active_io_threads; // number of I/O thread callbacks currently executing
6666
size_t active_worker_threads; // number of worker threads currently executing jobs
6767
size_t active_coord_threads; // number of coordinator threads currently executing jobs
68+
size_t workers_low_priority_pending_jobs; // number of low priority jobs waiting to be executed (currently only vecsim background indexing)
69+
size_t workers_high_priority_pending_jobs; // number of high priority jobs waiting to be executed (currently only queries)
6870
} MultiThreadingStats;
6971

7072
typedef struct {

src/info/info_redis/info_redis.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ void AddToInfo_MultiThreading(RedisModuleInfoCtx *ctx, TotalIndexesInfo *total_i
269269
RedisModule_InfoAddFieldULongLong(ctx, "active_io_threads", stats.active_io_threads);
270270
RedisModule_InfoAddFieldULongLong(ctx, "active_worker_threads", stats.active_worker_threads);
271271
RedisModule_InfoAddFieldULongLong(ctx, "active_coord_threads", stats.active_coord_threads);
272+
RedisModule_InfoAddFieldULongLong(ctx, "workers_low_priority_pending_jobs", stats.workers_low_priority_pending_jobs);
273+
RedisModule_InfoAddFieldULongLong(ctx, "workers_high_priority_pending_jobs", stats.workers_high_priority_pending_jobs);
272274
}
273275

274276
void AddToInfo_Dialects(RedisModuleInfoCtx *ctx) {

src/util/workers.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,18 @@ size_t workersThreadPool_WorkingThreadCount(void) {
111111
return redisearch_thpool_num_jobs_in_progress(_workers_thpool);
112112
}
113113

114+
size_t workersThreadPool_LowPriorityPendingJobsCount(void) {
115+
RS_ASSERT(_workers_thpool != NULL);
116+
117+
return redisearch_thpool_low_priority_pending_jobs(_workers_thpool);
118+
}
119+
120+
size_t workersThreadPool_HighPriorityPendingJobsCount(void) {
121+
RS_ASSERT(_workers_thpool != NULL);
122+
123+
return redisearch_thpool_high_priority_pending_jobs(_workers_thpool);
124+
}
125+
114126
// return n_threads value.
115127
size_t workersThreadPool_NumThreads(void) {
116128
RS_ASSERT(_workers_thpool);

src/util/workers.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ void workersThreadPool_SetNumWorkers(void);
2525
// return number of currently working threads
2626
size_t workersThreadPool_WorkingThreadCount(void);
2727

28+
// Return the number of low priority jobs waiting to be executed.
29+
size_t workersThreadPool_LowPriorityPendingJobsCount(void);
30+
31+
// Return the number of high priority jobs waiting to be executed.
32+
size_t workersThreadPool_HighPriorityPendingJobsCount(void);
33+
2834
// return n_threads value.
2935
size_t workersThreadPool_NumThreads(void);
3036

tests/pytests/common.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,33 @@ def __exit__(self, exc_type, exc_value, traceback):
5252
def handler(self, signum, frame):
5353
raise Exception(f'Timeout: {self.message}')
5454

55+
def wait_for_condition(check_fn, message):
56+
"""
57+
Wait for a condition with timeout and status reporting.
58+
59+
Parameters:
60+
- env: Test environment
61+
- check_fn: Function that takes returns (status: bool, state: dict)
62+
where state is a dict of the current state information
63+
- message: Message prefix for timeout exception
64+
"""
65+
iter = 0
66+
timeout_msg = {}
67+
68+
try:
69+
with TimeLimit(120):
70+
while True:
71+
done, state = check_fn()
72+
if done:
73+
break
74+
time.sleep(0.01)
75+
iter += 1
76+
timeout_msg['iter'] = iter
77+
timeout_msg['state'] = state
78+
except Exception as e:
79+
log = f"{message}: {timeout_msg}"
80+
raise Exception(f'Error: {e}, log: {log}')
81+
5582
class DialectEnv(Env):
5683
def __init__(self, *args, **kwargs):
5784
super().__init__(*args, **kwargs)

tests/pytests/test_info_modules.py

Lines changed: 178 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import redis
44
from inspect import currentframe
55
import numpy as np
6-
6+
from vecsim_utils import (
7+
DEFAULT_FIELD_NAME,
8+
set_up_database_with_vectors,
9+
)
710

811
def info_modules_to_dict(conn):
912
res = conn.execute_command('INFO MODULES')
@@ -881,6 +884,17 @@ def test_errors_and_warnings_init(env):
881884
for field in info_dict[metric]:
882885
env.assertEqual(info_dict[metric][field], '0')
883886

887+
########
888+
# Multi Threaded Stats tests
889+
########
890+
891+
MULTI_THREADING_SECTION = f'{SEARCH_PREFIX}multi_threading'
892+
ACTIVE_IO_THREADS_METRIC = f'{SEARCH_PREFIX}active_io_threads'
893+
ACTIVE_WORKER_THREADS_METRIC = f'{SEARCH_PREFIX}active_worker_threads'
894+
ACTIVE_COORD_THREADS_METRIC = f'{SEARCH_PREFIX}active_coord_threads'
895+
WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC = f'{SEARCH_PREFIX}workers_low_priority_pending_jobs'
896+
WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC = f'{SEARCH_PREFIX}workers_high_priority_pending_jobs'
897+
884898
def test_active_io_threads_stats(env):
885899
conn = getConnectionByEnv(env)
886900
# Setup: Create index with some data
@@ -892,17 +906,16 @@ def test_active_io_threads_stats(env):
892906
info_dict = info_modules_to_dict(env)
893907

894908
# Verify multi_threading section exists
895-
multi_threading_section = f'{SEARCH_PREFIX}multi_threading'
896-
env.assertTrue(multi_threading_section in info_dict,
909+
env.assertTrue(MULTI_THREADING_SECTION in info_dict,
897910
message="multi_threading section should exist in INFO MODULES")
898911

899912
# Verify all expected fields exist
900-
env.assertTrue(f'{SEARCH_PREFIX}active_io_threads' in info_dict[multi_threading_section],
901-
message="active_io_threads field should exist in multi_threading section")
913+
env.assertTrue(ACTIVE_IO_THREADS_METRIC in info_dict[MULTI_THREADING_SECTION],
914+
message=f"{ACTIVE_IO_THREADS_METRIC} field should exist in multi_threading section")
902915

903916
# Verify all fields initialized to 0.
904-
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_io_threads'], '0',
905-
message="active_io_threads should be 0 when idle")
917+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][ACTIVE_IO_THREADS_METRIC], '0',
918+
message=f"{ACTIVE_IO_THREADS_METRIC} should be 0 when idle")
906919
# There's no deterministic way to test active_io_threads increases while a query is running,
907920
# we test it in unit tests.
908921

@@ -929,13 +942,12 @@ def _test_active_worker_threads(env, num_queries):
929942
conn.execute_command('HSET', f'doc{i}', 'n', i)
930943

931944
# Verify active_worker_threads and coord threads start at 0
932-
multi_threading_section = f'{SEARCH_PREFIX}multi_threading'
933945
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
934946
info_dict = info_modules_to_dict(con)
935-
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_worker_threads'], '0',
936-
message=f"shard {i}: active_worker_threads should be 0 when idle")
937-
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_coord_threads'], '0',
938-
message=f"shard {i}: active_coord_threads should be 0 when idle")
947+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][ACTIVE_WORKER_THREADS_METRIC], '0',
948+
message=f"shard {i}: {ACTIVE_WORKER_THREADS_METRIC} should be 0 when idle")
949+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][ACTIVE_COORD_THREADS_METRIC], '0',
950+
message=f"shard {i}: {ACTIVE_COORD_THREADS_METRIC} should be 0 when idle")
939951

940952
# Define callback for testing a specific query type
941953
def _test_query_type(query_type):
@@ -964,14 +976,14 @@ def _test_query_type(query_type):
964976
# Verify active_worker_threads == num_queries
965977
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
966978
info_dict = info_modules_to_dict(con)
967-
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_worker_threads'], str(num_queries),
968-
message=f"shard {i}: {query_type}: active_worker_threads should be {num_queries} when {num_queries} queries are paused")
979+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][ACTIVE_WORKER_THREADS_METRIC], str(num_queries),
980+
message=f"shard {i}: {query_type}: {ACTIVE_WORKER_THREADS_METRIC} should be {num_queries} when {num_queries} queries are paused")
969981

970982
# If this is cluster, and FT.AGGREGATE, verify active_coord_threads == num_queries
971983
if env.isCluster() and query_type == 'FT.AGGREGATE':
972984
info_dict = info_modules_to_dict(env)
973-
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_coord_threads'], str(num_queries),
974-
message=f"coordinator: {query_type}: active_coord_threads should be {num_queries} when {num_queries} queries are paused")
985+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][ACTIVE_COORD_THREADS_METRIC], str(num_queries),
986+
message=f"coordinator: {query_type}: {ACTIVE_COORD_THREADS_METRIC} should be {num_queries} when {num_queries} queries are paused")
975987

976988
# Resume all queries
977989
allShards_setPauseRPResume(env)
@@ -986,21 +998,161 @@ def _test_query_type(query_type):
986998
# Verify active_worker_threads returns to 0
987999
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
9881000
info_dict = info_modules_to_dict(con)
989-
env.assertEqual(info_dict[multi_threading_section][f'{SEARCH_PREFIX}active_worker_threads'], '0',
990-
message=f"shard {i}: {query_type}: active_worker_threads should return to 0 after queries complete")
1001+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][ACTIVE_WORKER_THREADS_METRIC], '0',
1002+
message=f"shard {i}: {query_type}: {ACTIVE_WORKER_THREADS_METRIC} should return to 0 after queries complete")
9911003

9921004
# Test both query types
9931005
_test_query_type('FT.SEARCH')
9941006
_test_query_type('FT.AGGREGATE')
9951007

996-
# --- Test 1: Standalone Mode ---
997-
@skip(cluster=True) # Only run in standalone mode
998-
def test_active_worker_threads_SA(env):
1008+
def test_active_worker_threads(env):
9991009
num_queries = 1
10001010
_test_active_worker_threads(env, num_queries)
10011011

1002-
# --- Test 2: Cluster Mode ---
1003-
@skip(cluster=False) # Only run in cluster mode
1004-
def test_active_worker_threads_cluster(env):
1005-
num_queries = 1
1006-
_test_active_worker_threads(env, num_queries)
1012+
def _test_pending_jobs_metrics(env, command_type):
1013+
"""
1014+
Parameters:
1015+
- env: Test environment (works for both SA and cluster)
1016+
"""
1017+
1018+
# --- STEP 1: SETUP ---
1019+
# Configure WORKERS (we just need workers enabled, e.g., 2)
1020+
run_command_on_all_shards(env, config_cmd(), 'SET', 'WORKERS', '2')
1021+
1022+
# Define variables
1023+
num_vectors = 10 * env.shardsCount # Number of vectors to index (creates low priority jobs)
1024+
num_queries = 3 # Number of queries to execute (creates high priority jobs)
1025+
dim = 4
1026+
vector_field = DEFAULT_FIELD_NAME
1027+
index_name = 'idx'
1028+
1029+
# --- STEP 2: VERIFY INITIAL STATE (metrics = 0) ---
1030+
for conn in env.getOSSMasterNodesConnectionList():
1031+
info_dict = info_modules_to_dict(conn)
1032+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC], '0')
1033+
env.assertEqual(info_dict[MULTI_THREADING_SECTION][WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC], '0')
1034+
1035+
# --- STEP 3: PAUSE WORKERS THREAD POOL ---
1036+
# Pause workers to prevent jobs from executing
1037+
run_command_on_all_shards(env, debug_cmd(), 'WORKERS', 'PAUSE')
1038+
1039+
# --- STEP 4: CREATE INDEX AND INDEX VECTORS (creates workers_low_priority_pending_jobs) ---
1040+
# Create index with HNSW and load vectors (HNSW creates background indexing jobs which are low priority)
1041+
set_up_database_with_vectors(env, dim, num_vectors, index_name=index_name,
1042+
field_name=vector_field, datatype='FLOAT32',
1043+
metric='L2', alg='HNSW')
1044+
1045+
def check_indexing_jobs_pending():
1046+
num_shards = env.shardsCount
1047+
all_shards_ready = [False] * num_shards
1048+
state = {
1049+
'indexing_jobs_pending': [0] * num_shards,
1050+
'expected_indexing_jobs': [0] * num_shards,
1051+
}
1052+
1053+
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
1054+
# Expected low_priority_pending_jobs = con.dbsize() (number of vectors on this shard)
1055+
expected_indexing_jobs = con.execute_command('DBSIZE')
1056+
1057+
shard_stats = info_modules_to_dict(con)
1058+
indexing_jobs_pending = int(shard_stats[MULTI_THREADING_SECTION][WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC])
1059+
1060+
all_shards_ready[i] = (expected_indexing_jobs == indexing_jobs_pending)
1061+
state['expected_indexing_jobs'][i] = expected_indexing_jobs
1062+
state['indexing_jobs_pending'][i] = indexing_jobs_pending
1063+
return all(all_shards_ready), state
1064+
1065+
wait_for_condition(check_indexing_jobs_pending, "wait_for_workers_low_priority_jobs_pending")
1066+
1067+
# --- STEP 5: EXECUTE QUERIES (creates high_priority_pending_jobs) ---
1068+
# Launch num_queries queries in background threads
1069+
# Queries will be queued as high-priority jobs but not executed (workers paused)
1070+
1071+
query_threads = []
1072+
query_results = []
1073+
1074+
def run_query(query_id):
1075+
conn = getConnectionByEnv(env)
1076+
try:
1077+
result = conn.execute_command(f'FT.{command_type}', index_name, '*')
1078+
query_results.append((query_id, 'success', result))
1079+
except Exception as e:
1080+
query_results.append((query_id, 'error', e))
1081+
1082+
for i in range(num_queries):
1083+
t = threading.Thread(target=run_query, args=(i,))
1084+
query_threads.append(t)
1085+
t.start()
1086+
1087+
# Give threads a moment to start and attempt to queue their queries
1088+
time.sleep(0.1)
1089+
1090+
# Check if any queries failed immediately (before being queued)
1091+
for query_id, status, result in query_results:
1092+
if status == 'error':
1093+
env.assertTrue(False, message=f"Query {query_id} failed immediately: {result}")
1094+
1095+
# --- STEP 6: WAIT FOR THREADPOOL STATS TO UPDATE (jobs queued) ---
1096+
# Wait for the threadpool stats to reflect the expected pending jobs
1097+
def check_queries_jobs_pending():
1098+
num_shards = env.shardsCount
1099+
all_shards_ready = [False] * num_shards
1100+
expected_queries_jobs = num_queries
1101+
state = {
1102+
'queries_jobs_pending': [0] * num_shards,
1103+
'expected_queries_jobs': [expected_queries_jobs] * num_shards,
1104+
}
1105+
1106+
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
1107+
1108+
shard_stats = info_modules_to_dict(con)
1109+
queries_pending_jobs = int(shard_stats[MULTI_THREADING_SECTION][WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC])
1110+
1111+
all_shards_ready[i] = (expected_queries_jobs == queries_pending_jobs)
1112+
state['queries_jobs_pending'][i] = queries_pending_jobs
1113+
state['expected_queries_jobs'][i] = expected_queries_jobs
1114+
return all(all_shards_ready), state
1115+
1116+
wait_for_condition(check_queries_jobs_pending, "wait_for_high_priority_jobs_pending")
1117+
1118+
# --- STEP 7: RESUME WORKERS AND DRAIN ---
1119+
# Resume workers:
1120+
run_command_on_all_shards(env, debug_cmd(), 'WORKERS', 'RESUME')
1121+
1122+
# Wait for all query threads to complete:
1123+
for t in query_threads:
1124+
t.join(timeout=30)
1125+
1126+
# Drain worker thread pool to ensure all jobs complete:
1127+
run_command_on_all_shards(env, debug_cmd(), 'WORKERS', 'DRAIN')
1128+
1129+
# --- STEP 8: VERIFY METRICS RETURN TO 0 ---
1130+
# Wait for metrics to return to 0 (job callback finished before stats update)
1131+
def check_reset_metrics():
1132+
num_shards = env.shardsCount
1133+
all_shards_ready = [False] * num_shards
1134+
state = {
1135+
'workers_low_priority_jobs_pending': [-1] * num_shards,
1136+
'workers_high_priority_jobs_pending': [-1] * num_shards,
1137+
}
1138+
1139+
for i, con in enumerate(env.getOSSMasterNodesConnectionList()):
1140+
1141+
shard_stats = info_modules_to_dict(con)
1142+
queries_jobs_pending = int(shard_stats[MULTI_THREADING_SECTION][WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC])
1143+
background_indexing_jobs_pending = int(shard_stats[MULTI_THREADING_SECTION][WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC])
1144+
1145+
all_shards_ready[i] = (queries_jobs_pending == 0 and background_indexing_jobs_pending == 0)
1146+
state['workers_low_priority_jobs_pending'][i] = background_indexing_jobs_pending
1147+
state['workers_high_priority_jobs_pending'][i] = queries_jobs_pending
1148+
return all(all_shards_ready), state
1149+
1150+
wait_for_condition(check_reset_metrics, "wait_for_workers_pending_jobs_metric_reset")
1151+
1152+
def test_pending_jobs_metrics_search():
1153+
env = Env(moduleArgs='DEFAULT_DIALECT 2')
1154+
_test_pending_jobs_metrics(env, 'SEARCH')
1155+
1156+
def test_pending_jobs_metrics_aggregate():
1157+
env = Env(moduleArgs='DEFAULT_DIALECT 2')
1158+
_test_pending_jobs_metrics(env, 'AGGREGATE')

0 commit comments

Comments
 (0)