@@ -94,31 +94,6 @@ async def _cleanup_thread_pool():
9494 logger .info ("ThreadPoolExecutor shut down successfully" )
9595
9696
97- # Helper functions for threading (lightweight operations that release GIL)
98- def _process_node_chunk (chunk_data : tuple ) -> dict :
99- """
100- Process a chunk of node data - lightweight CPU operation.
101- Uses simple arithmetic and dict operations that release GIL, perfect for threads.
102- """
103- node_id , params , coeff = chunk_data
104- users_usage = defaultdict (int )
105- for param in params :
106- uid = int (param ["uid" ])
107- value = int (param ["value" ] * coeff )
108- users_usage [uid ] += value
109- return dict (users_usage )
110-
111-
112- def _merge_usage_dicts (dicts : list [dict ]) -> dict :
113- """
114- Merge multiple usage dictionaries.
115- Dict operations release GIL, perfect for ThreadPoolExecutor.
116- """
117- merged = defaultdict (int )
118- for d in dicts :
119- for uid , value in d .items ():
120- merged [uid ] += value
121- return dict (merged )
12297
12398
12499async def get_dialect () -> str :
@@ -567,76 +542,6 @@ async def calculate_admin_usage(users_usage: list) -> tuple[dict, set[int]]:
567542 return admin_usage , set (user_admin_map .keys ())
568543
569544
570- async def calculate_users_usage (api_params : dict , usage_coefficient : dict ) -> list :
571- """Calculate aggregated user usage across all nodes with coefficients applied.
572-
573- Uses ThreadPoolExecutor for lightweight operations (dict/arithmetic that release GIL).
574- ThreadPoolExecutor is faster than ProcessPoolExecutor for these operations due to less overhead.
575- """
576- if not api_params :
577- return []
578-
579- def _process_usage_sync (chunks_data : list [tuple [int , list [dict ], float ]]):
580- """Synchronous fallback used for small batches or on executor failures."""
581- users_usage = defaultdict (int )
582- for _ , params , coeff in chunks_data :
583- for param in params :
584- uid = int (param ["uid" ])
585- value = int (param ["value" ] * coeff )
586- users_usage [uid ] += value
587- return [{"uid" : uid , "value" : value } for uid , value in users_usage .items ()]
588-
589- # Prepare chunks for parallel processing
590- chunks = [
591- (node_id , params , usage_coefficient .get (node_id , 1 ))
592- for node_id , params in api_params .items ()
593- if params # Skip empty params
594- ]
595-
596- if not chunks :
597- return []
598-
599- # For small datasets, process synchronously to avoid overhead
600- total_params = sum (len (params ) for _ , params , _ in chunks )
601- if total_params < 1000 :
602- return _process_usage_sync (chunks )
603-
604- # Large dataset - use ThreadPoolExecutor (faster for lightweight operations)
605- loop = asyncio .get_running_loop ()
606- try :
607- thread_pool = await _get_thread_pool ()
608- except Exception :
609- logger .exception ("Falling back to synchronous user usage calculation: failed to init thread pool" )
610- return _process_usage_sync (chunks )
611-
612- try :
613- # Process chunks in parallel using threads (less overhead than processes)
614- tasks = [loop .run_in_executor (thread_pool , _process_node_chunk , chunk ) for chunk in chunks ]
615- chunk_results = await asyncio .gather (* tasks )
616-
617- # Merge results - also lightweight, use threads
618- if len (chunk_results ) > 4 :
619- # Split merge operation into smaller chunks
620- chunk_size = max (1 , len (chunk_results ) // 4 )
621- merge_chunks = [
622- chunk_results [i : i + chunk_size ]
623- for i in range (0 , len (chunk_results ), chunk_size )
624- ]
625- merge_tasks = [
626- loop .run_in_executor (thread_pool , _merge_usage_dicts , merge_chunk )
627- for merge_chunk in merge_chunks
628- ]
629- partial_results = await asyncio .gather (* merge_tasks )
630- final_result = _merge_usage_dicts (partial_results )
631- else :
632- final_result = _merge_usage_dicts (chunk_results )
633-
634- return [{"uid" : uid , "value" : value } for uid , value in final_result .items ()]
635- except Exception :
636- logger .exception ("Falling back to synchronous user usage calculation: executor merge failed" )
637- return _process_usage_sync (chunks )
638-
639-
640545async def _record_user_usages_impl ():
641546 """
642547 Internal implementation of record_user_usages.
@@ -673,7 +578,18 @@ async def _record_user_usages_impl():
673578 else :
674579 api_params [node_id ] = result
675580
676- users_usage = await calculate_users_usage (api_params , usage_coefficient )
581+ # Aggregate user usage across all nodes with coefficients applied
582+ users_usage_dict = defaultdict (int )
583+ for node_id , params in api_params .items ():
584+ if not params :
585+ continue
586+ coeff = usage_coefficient .get (node_id , 1.0 )
587+ for param in params :
588+ uid = int (param ["uid" ])
589+ value = int (param ["value" ] * coeff )
590+ users_usage_dict [uid ] += value
591+
592+ users_usage = [{"uid" : uid , "value" : value } for uid , value in users_usage_dict .items ()]
677593 if not users_usage :
678594 logger .debug ("No user usage to record" )
679595 return
0 commit comments