@@ -1659,12 +1659,14 @@ def _test_active_worker_threads(env, num_queries):
16591659 for i in range (10 ):
16601660 conn .execute_command ('HSET' , f'doc{ i } ' , 'n' , i )
16611661
1662- # Verify active_worker_threads starts at 0
1662+ # Verify active_worker_threads and coord threads start at 0
16631663 multi_threading_section = f'{ SEARCH_PREFIX } multi_threading'
16641664 for i , con in enumerate (env .getOSSMasterNodesConnectionList ()):
16651665 info_dict = info_modules_to_dict (con )
16661666 env .assertEqual (info_dict [multi_threading_section ][f'{ SEARCH_PREFIX } active_worker_threads' ], '0' ,
16671667 message = f"shard { i } : active_worker_threads should be 0 when idle" )
1668+ env .assertEqual (info_dict [multi_threading_section ][f'{ SEARCH_PREFIX } active_coord_threads' ], '0' ,
1669+ message = f"shard { i } : active_coord_threads should be 0 when idle" )
16681670
16691671 # Define callback for testing a specific query type
16701672 def _test_query_type (query_type ):
@@ -1696,6 +1698,12 @@ def _test_query_type(query_type):
16961698 env .assertEqual (info_dict [multi_threading_section ][f'{ SEARCH_PREFIX } active_worker_threads' ], str (num_queries ),
16971699 message = f"shard { i } : { query_type } : active_worker_threads should be { num_queries } when { num_queries } queries are paused" )
16981700
1701+ # If this is cluster, and FT.AGGREGATE, verify active_coord_threads == num_queries
1702+ if env .isCluster () and query_type == 'FT.AGGREGATE' :
1703+ info_dict = info_modules_to_dict (env )
1704+ env .assertEqual (info_dict [multi_threading_section ][f'{ SEARCH_PREFIX } active_coord_threads' ], str (num_queries ),
1705+ message = f"coordinator: { query_type } : active_coord_threads should be { num_queries } when { num_queries } queries are paused" )
1706+
16991707 # Resume all queries
17001708 allShards_setPauseRPResume (env )
17011709
0 commit comments