@@ -437,6 +437,16 @@ async def calculate_users_usage(api_params: dict, usage_coefficient: dict) -> li
437437 if not api_params :
438438 return []
439439
440+ def _process_usage_sync (chunks_data : list [tuple [int , list [dict ], float ]]):
441+ """Synchronous fallback used for small batches or on executor failures."""
442+ users_usage = defaultdict (int )
443+ for _ , params , coeff in chunks_data :
444+ for param in params :
445+ uid = int (param ["uid" ])
446+ value = int (param ["value" ] * coeff )
447+ users_usage [uid ] += value
448+ return [{"uid" : uid , "value" : value } for uid , value in users_usage .items ()]
449+
440450 # Prepare chunks for parallel processing
441451 chunks = [
442452 (node_id , params , usage_coefficient .get (node_id , 1 ))
@@ -450,45 +460,42 @@ async def calculate_users_usage(api_params: dict, usage_coefficient: dict) -> li
450460 # For small datasets, process synchronously to avoid overhead
451461 total_params = sum (len (params ) for _ , params , _ in chunks )
452462 if total_params < 1000 :
453- # Small dataset - process synchronously
454- users_usage = defaultdict (int )
455- for node_id , params , coeff in chunks :
456- for param in params :
457- uid = int (param ["uid" ])
458- value = int (param ["value" ] * coeff )
459- users_usage [uid ] += value
460- return [{"uid" : uid , "value" : value } for uid , value in users_usage .items ()]
463+ return _process_usage_sync (chunks )
461464
462465 # Large dataset - use multiprocessing
463466 loop = asyncio .get_running_loop ()
464- process_pool = await _get_process_pool ()
465-
466- # Process chunks in parallel
467- tasks = [
468- loop .run_in_executor (process_pool , _process_node_chunk , chunk )
469- for chunk in chunks
470- ]
471-
472- chunk_results = await asyncio .gather (* tasks )
473-
474- # Merge results - this is also CPU-bound, so parallelize if many chunks
475- if len (chunk_results ) > 4 :
476- # Split merge operation into smaller chunks
477- chunk_size = max (1 , len (chunk_results ) // 4 )
478- merge_chunks = [
479- chunk_results [i :i + chunk_size ]
480- for i in range (0 , len (chunk_results ), chunk_size )
481- ]
482- merge_tasks = [
483- loop .run_in_executor (process_pool , _merge_usage_dicts , merge_chunk )
484- for merge_chunk in merge_chunks
485- ]
486- partial_results = await asyncio .gather (* merge_tasks )
487- final_result = _merge_usage_dicts (partial_results )
488- else :
489- final_result = _merge_usage_dicts (chunk_results )
467+ try :
468+ process_pool = await _get_process_pool ()
469+ except Exception :
470+ logger .exception ("Falling back to synchronous user usage calculation: failed to init process pool" )
471+ return _process_usage_sync (chunks )
490472
491- return [{"uid" : uid , "value" : value } for uid , value in final_result .items ()]
473+ try :
474+ # Process chunks in parallel
475+ tasks = [loop .run_in_executor (process_pool , _process_node_chunk , chunk ) for chunk in chunks ]
476+ chunk_results = await asyncio .gather (* tasks )
477+
478+ # Merge results - this is also CPU-bound, so parallelize if many chunks
479+ if len (chunk_results ) > 4 :
480+ # Split merge operation into smaller chunks
481+ chunk_size = max (1 , len (chunk_results ) // 4 )
482+ merge_chunks = [
483+ chunk_results [i : i + chunk_size ]
484+ for i in range (0 , len (chunk_results ), chunk_size )
485+ ]
486+ merge_tasks = [
487+ loop .run_in_executor (process_pool , _merge_usage_dicts , merge_chunk )
488+ for merge_chunk in merge_chunks
489+ ]
490+ partial_results = await asyncio .gather (* merge_tasks )
491+ final_result = _merge_usage_dicts (partial_results )
492+ else :
493+ final_result = _merge_usage_dicts (chunk_results )
494+
495+ return [{"uid" : uid , "value" : value } for uid , value in final_result .items ()]
496+ except Exception :
497+ logger .exception ("Falling back to synchronous user usage calculation: executor merge failed" )
498+ return _process_usage_sync (chunks )
492499
493500
494501async def record_user_usages ():
0 commit comments