33import redis
44from inspect import currentframe
55import numpy as np
6-
6+ from vecsim_utils import (
7+ DEFAULT_FIELD_NAME ,
8+ set_up_database_with_vectors ,
9+ )
710
811def info_modules_to_dict (conn ):
912 res = conn .execute_command ('INFO MODULES' )
@@ -881,6 +884,17 @@ def test_errors_and_warnings_init(env):
881884 for field in info_dict [metric ]:
882885 env .assertEqual (info_dict [metric ][field ], '0' )
883886
887+ ########
888+ # Multi Threaded Stats tests
889+ ########
890+
891+ MULTI_THREADING_SECTION = f'{ SEARCH_PREFIX } multi_threading'
892+ ACTIVE_IO_THREADS_METRIC = f'{ SEARCH_PREFIX } active_io_threads'
893+ ACTIVE_WORKER_THREADS_METRIC = f'{ SEARCH_PREFIX } active_worker_threads'
894+ ACTIVE_COORD_THREADS_METRIC = f'{ SEARCH_PREFIX } active_coord_threads'
895+ WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC = f'{ SEARCH_PREFIX } workers_low_priority_pending_jobs'
896+ WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC = f'{ SEARCH_PREFIX } workers_high_priority_pending_jobs'
897+
884898def test_active_io_threads_stats (env ):
885899 conn = getConnectionByEnv (env )
886900 # Setup: Create index with some data
@@ -892,17 +906,16 @@ def test_active_io_threads_stats(env):
892906 info_dict = info_modules_to_dict (env )
893907
894908 # Verify multi_threading section exists
895- multi_threading_section = f'{ SEARCH_PREFIX } multi_threading'
896- env .assertTrue (multi_threading_section in info_dict ,
909+ env .assertTrue (MULTI_THREADING_SECTION in info_dict ,
897910 message = "multi_threading section should exist in INFO MODULES" )
898911
899912 # Verify all expected fields exist
900- env .assertTrue (f' { SEARCH_PREFIX } active_io_threads' in info_dict [multi_threading_section ],
901- message = "active_io_threads field should exist in multi_threading section" )
913+ env .assertTrue (ACTIVE_IO_THREADS_METRIC in info_dict [MULTI_THREADING_SECTION ],
914+ message = f" { ACTIVE_IO_THREADS_METRIC } field should exist in multi_threading section" )
902915
903916 # Verify all fields initialized to 0.
904- env .assertEqual (info_dict [multi_threading_section ][ f' { SEARCH_PREFIX } active_io_threads' ], '0' ,
905- message = "active_io_threads should be 0 when idle" )
917+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][ ACTIVE_IO_THREADS_METRIC ], '0' ,
918+ message = f" { ACTIVE_IO_THREADS_METRIC } should be 0 when idle" )
906919 # There's no deterministic way to test active_io_threads increases while a query is running,
907920 # we test it in unit tests.
908921
@@ -929,13 +942,12 @@ def _test_active_worker_threads(env, num_queries):
929942 conn .execute_command ('HSET' , f'doc{ i } ' , 'n' , i )
930943
931944 # Verify active_worker_threads and coord threads start at 0
932- multi_threading_section = f'{ SEARCH_PREFIX } multi_threading'
933945 for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
934946 info_dict = info_modules_to_dict (con )
935- env .assertEqual (info_dict [multi_threading_section ][ f' { SEARCH_PREFIX } active_worker_threads' ], '0' ,
936- message = f"shard { i } : active_worker_threads should be 0 when idle" )
937- env .assertEqual (info_dict [multi_threading_section ][ f' { SEARCH_PREFIX } active_coord_threads' ], '0' ,
938- message = f"shard { i } : active_coord_threads should be 0 when idle" )
947+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][ ACTIVE_WORKER_THREADS_METRIC ], '0' ,
948+ message = f"shard { i } : { ACTIVE_WORKER_THREADS_METRIC } should be 0 when idle" )
949+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][ ACTIVE_COORD_THREADS_METRIC ], '0' ,
950+ message = f"shard { i } : { ACTIVE_COORD_THREADS_METRIC } should be 0 when idle" )
939951
940952 # Define callback for testing a specific query type
941953 def _test_query_type (query_type ):
@@ -964,14 +976,14 @@ def _test_query_type(query_type):
964976 # Verify active_worker_threads == num_queries
965977 for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
966978 info_dict = info_modules_to_dict (con )
967- env .assertEqual (info_dict [multi_threading_section ][ f' { SEARCH_PREFIX } active_worker_threads' ], str (num_queries ),
968- message = f"shard { i } : { query_type } : active_worker_threads should be { num_queries } when { num_queries } queries are paused" )
979+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][ ACTIVE_WORKER_THREADS_METRIC ], str (num_queries ),
980+ message = f"shard { i } : { query_type } : { ACTIVE_WORKER_THREADS_METRIC } should be { num_queries } when { num_queries } queries are paused" )
969981
970982 # If this is cluster, and FT.AGGREGATE, verify active_coord_threads == num_queries
971983 if env .isCluster () and query_type == 'FT.AGGREGATE' :
972984 info_dict = info_modules_to_dict (env )
973- env .assertEqual (info_dict [multi_threading_section ][ f' { SEARCH_PREFIX } active_coord_threads' ], str (num_queries ),
974- message = f"coordinator: { query_type } : active_coord_threads should be { num_queries } when { num_queries } queries are paused" )
985+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][ ACTIVE_COORD_THREADS_METRIC ], str (num_queries ),
986+ message = f"coordinator: { query_type } : { ACTIVE_COORD_THREADS_METRIC } should be { num_queries } when { num_queries } queries are paused" )
975987
976988 # Resume all queries
977989 allShards_setPauseRPResume (env )
@@ -986,21 +998,161 @@ def _test_query_type(query_type):
986998 # Verify active_worker_threads returns to 0
987999 for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
9881000 info_dict = info_modules_to_dict (con )
989- env .assertEqual (info_dict [multi_threading_section ][ f' { SEARCH_PREFIX } active_worker_threads' ], '0' ,
990- message = f"shard { i } : { query_type } : active_worker_threads should return to 0 after queries complete" )
1001+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][ ACTIVE_WORKER_THREADS_METRIC ], '0' ,
1002+ message = f"shard { i } : { query_type } : { ACTIVE_WORKER_THREADS_METRIC } should return to 0 after queries complete" )
9911003
9921004 # Test both query types
9931005 _test_query_type ('FT.SEARCH' )
9941006 _test_query_type ('FT.AGGREGATE' )
9951007
996- # --- Test 1: Standalone Mode ---
997- @skip (cluster = True ) # Only run in standalone mode
998- def test_active_worker_threads_SA (env ):
1008+ def test_active_worker_threads (env ):
9991009 num_queries = 1
10001010 _test_active_worker_threads (env , num_queries )
10011011
1002- # --- Test 2: Cluster Mode ---
1003- @skip (cluster = False ) # Only run in cluster mode
1004- def test_active_worker_threads_cluster (env ):
1005- num_queries = 1
1006- _test_active_worker_threads (env , num_queries )
1012+ def _test_pending_jobs_metrics (env , command_type ):
1013+ """
1014+ Parameters:
1015+ - env: Test environment (works for both SA and cluster)
1016+ """
1017+
1018+ # --- STEP 1: SETUP ---
1019+ # Configure WORKERS (we just need workers enabled, e.g., 2)
1020+ run_command_on_all_shards (env , config_cmd (), 'SET' , 'WORKERS' , '2' )
1021+
1022+ # Define variables
1023+ num_vectors = 10 * env .shardsCount # Number of vectors to index (creates low priority jobs)
1024+ num_queries = 3 # Number of queries to execute (creates high priority jobs)
1025+ dim = 4
1026+ vector_field = DEFAULT_FIELD_NAME
1027+ index_name = 'idx'
1028+
1029+ # --- STEP 2: VERIFY INITIAL STATE (metrics = 0) ---
1030+ for conn in env .getOSSMasterNodesConnectionList ():
1031+ info_dict = info_modules_to_dict (conn )
1032+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC ], '0' )
1033+ env .assertEqual (info_dict [MULTI_THREADING_SECTION ][WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC ], '0' )
1034+
1035+ # --- STEP 3: PAUSE WORKERS THREAD POOL ---
1036+ # Pause workers to prevent jobs from executing
1037+ run_command_on_all_shards (env , debug_cmd (), 'WORKERS' , 'PAUSE' )
1038+
1039+ # --- STEP 4: CREATE INDEX AND INDEX VECTORS (creates workers_low_priority_pending_jobs) ---
1040+ # Create index with HNSW and load vectors (HNSW creates background indexing jobs which are low priority)
1041+ set_up_database_with_vectors (env , dim , num_vectors , index_name = index_name ,
1042+ field_name = vector_field , datatype = 'FLOAT32' ,
1043+ metric = 'L2' , alg = 'HNSW' )
1044+
1045+ def check_indexing_jobs_pending ():
1046+ num_shards = env .shardsCount
1047+ all_shards_ready = [False ] * num_shards
1048+ state = {
1049+ 'indexing_jobs_pending' : [0 ] * num_shards ,
1050+ 'expected_indexing_jobs' : [0 ] * num_shards ,
1051+ }
1052+
1053+ for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
1054+ # Expected low_priority_pending_jobs = con.dbsize() (number of vectors on this shard)
1055+ expected_indexing_jobs = con .execute_command ('DBSIZE' )
1056+
1057+ shard_stats = info_modules_to_dict (con )
1058+ indexing_jobs_pending = int (shard_stats [MULTI_THREADING_SECTION ][WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC ])
1059+
1060+ all_shards_ready [i ] = (expected_indexing_jobs == indexing_jobs_pending )
1061+ state ['expected_indexing_jobs' ][i ] = expected_indexing_jobs
1062+ state ['indexing_jobs_pending' ][i ] = indexing_jobs_pending
1063+ return all (all_shards_ready ), state
1064+
1065+ wait_for_condition (check_indexing_jobs_pending , "wait_for_workers_low_priority_jobs_pending" )
1066+
1067+ # --- STEP 5: EXECUTE QUERIES (creates high_priority_pending_jobs) ---
1068+ # Launch num_queries queries in background threads
1069+ # Queries will be queued as high-priority jobs but not executed (workers paused)
1070+
1071+ query_threads = []
1072+ query_results = []
1073+
1074+ def run_query (query_id ):
1075+ conn = getConnectionByEnv (env )
1076+ try :
1077+ result = conn .execute_command (f'FT.{ command_type } ' , index_name , '*' )
1078+ query_results .append ((query_id , 'success' , result ))
1079+ except Exception as e :
1080+ query_results .append ((query_id , 'error' , e ))
1081+
1082+ for i in range (num_queries ):
1083+ t = threading .Thread (target = run_query , args = (i ,))
1084+ query_threads .append (t )
1085+ t .start ()
1086+
1087+ # Give threads a moment to start and attempt to queue their queries
1088+ time .sleep (0.1 )
1089+
1090+ # Check if any queries failed immediately (before being queued)
1091+ for query_id , status , result in query_results :
1092+ if status == 'error' :
1093+ env .assertTrue (False , message = f"Query { query_id } failed immediately: { result } " )
1094+
1095+ # --- STEP 6: WAIT FOR THREADPOOL STATS TO UPDATE (jobs queued) ---
1096+ # Wait for the threadpool stats to reflect the expected pending jobs
1097+ def check_queries_jobs_pending ():
1098+ num_shards = env .shardsCount
1099+ all_shards_ready = [False ] * num_shards
1100+ expected_queries_jobs = num_queries
1101+ state = {
1102+ 'queries_jobs_pending' : [0 ] * num_shards ,
1103+ 'expected_queries_jobs' : [expected_queries_jobs ] * num_shards ,
1104+ }
1105+
1106+ for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
1107+
1108+ shard_stats = info_modules_to_dict (con )
1109+ queries_pending_jobs = int (shard_stats [MULTI_THREADING_SECTION ][WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC ])
1110+
1111+ all_shards_ready [i ] = (expected_queries_jobs == queries_pending_jobs )
1112+ state ['queries_jobs_pending' ][i ] = queries_pending_jobs
1113+ state ['expected_queries_jobs' ][i ] = expected_queries_jobs
1114+ return all (all_shards_ready ), state
1115+
1116+ wait_for_condition (check_queries_jobs_pending , "wait_for_high_priority_jobs_pending" )
1117+
1118+ # --- STEP 7: RESUME WORKERS AND DRAIN ---
1119+ # Resume workers:
1120+ run_command_on_all_shards (env , debug_cmd (), 'WORKERS' , 'RESUME' )
1121+
1122+ # Wait for all query threads to complete:
1123+ for t in query_threads :
1124+ t .join (timeout = 30 )
1125+
1126+ # Drain worker thread pool to ensure all jobs complete:
1127+ run_command_on_all_shards (env , debug_cmd (), 'WORKERS' , 'DRAIN' )
1128+
1129+ # --- STEP 8: VERIFY METRICS RETURN TO 0 ---
1130+ # Wait for metrics to return to 0 (job callback finished before stats update)
1131+ def check_reset_metrics ():
1132+ num_shards = env .shardsCount
1133+ all_shards_ready = [False ] * num_shards
1134+ state = {
1135+ 'workers_low_priority_jobs_pending' : [- 1 ] * num_shards ,
1136+ 'workers_high_priority_jobs_pending' : [- 1 ] * num_shards ,
1137+ }
1138+
1139+ for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
1140+
1141+ shard_stats = info_modules_to_dict (con )
1142+ queries_jobs_pending = int (shard_stats [MULTI_THREADING_SECTION ][WORKERS_HIGH_PRIORITY_PENDING_JOBS_METRIC ])
1143+ background_indexing_jobs_pending = int (shard_stats [MULTI_THREADING_SECTION ][WORKERS_LOW_PRIORITY_PENDING_JOBS_METRIC ])
1144+
1145+ all_shards_ready [i ] = (queries_jobs_pending == 0 and background_indexing_jobs_pending == 0 )
1146+ state ['workers_low_priority_jobs_pending' ][i ] = background_indexing_jobs_pending
1147+ state ['workers_high_priority_jobs_pending' ][i ] = queries_jobs_pending
1148+ return all (all_shards_ready ), state
1149+
1150+ wait_for_condition (check_reset_metrics , "wait_for_workers_pending_jobs_metric_reset" )
1151+
1152+ def test_pending_jobs_metrics_search ():
1153+ env = Env (moduleArgs = 'DEFAULT_DIALECT 2' )
1154+ _test_pending_jobs_metrics (env , 'SEARCH' )
1155+
1156+ def test_pending_jobs_metrics_aggregate ():
1157+ env = Env (moduleArgs = 'DEFAULT_DIALECT 2' )
1158+ _test_pending_jobs_metrics (env , 'AGGREGATE' )
0 commit comments