Skip to content

Commit f039f4c

Browse files
fix: jobs import error
1 parent c4bdb7e commit f039f4c

File tree

4 files changed

+54
-42
lines changed

4 files changed

+54
-42
lines changed

app/jobs/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import glob
22
import importlib.util
3+
import sys
34
from os.path import basename, dirname, join
45

56
modules = glob.glob(join(dirname(__file__), "*.py"))
@@ -9,5 +10,9 @@
910
if name.startswith("_"):
1011
continue
1112

12-
spec = importlib.util.spec_from_file_location(name, file)
13-
spec.loader.exec_module(importlib.util.module_from_spec(spec))
13+
# Load job modules under the proper package path so multiprocessing pickling works
14+
module_name = f"{__name__}.{name}"
15+
spec = importlib.util.spec_from_file_location(module_name, file)
16+
module = importlib.util.module_from_spec(spec)
17+
sys.modules[module_name] = module
18+
spec.loader.exec_module(module)

app/jobs/record_usages.py

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,16 @@ async def calculate_users_usage(api_params: dict, usage_coefficient: dict) -> li
437437
if not api_params:
438438
return []
439439

440+
def _process_usage_sync(chunks_data: list[tuple[int, list[dict], float]]):
441+
"""Synchronous fallback used for small batches or on executor failures."""
442+
users_usage = defaultdict(int)
443+
for _, params, coeff in chunks_data:
444+
for param in params:
445+
uid = int(param["uid"])
446+
value = int(param["value"] * coeff)
447+
users_usage[uid] += value
448+
return [{"uid": uid, "value": value} for uid, value in users_usage.items()]
449+
440450
# Prepare chunks for parallel processing
441451
chunks = [
442452
(node_id, params, usage_coefficient.get(node_id, 1))
@@ -450,45 +460,42 @@ async def calculate_users_usage(api_params: dict, usage_coefficient: dict) -> li
450460
# For small datasets, process synchronously to avoid overhead
451461
total_params = sum(len(params) for _, params, _ in chunks)
452462
if total_params < 1000:
453-
# Small dataset - process synchronously
454-
users_usage = defaultdict(int)
455-
for node_id, params, coeff in chunks:
456-
for param in params:
457-
uid = int(param["uid"])
458-
value = int(param["value"] * coeff)
459-
users_usage[uid] += value
460-
return [{"uid": uid, "value": value} for uid, value in users_usage.items()]
463+
return _process_usage_sync(chunks)
461464

462465
# Large dataset - use multiprocessing
463466
loop = asyncio.get_running_loop()
464-
process_pool = await _get_process_pool()
465-
466-
# Process chunks in parallel
467-
tasks = [
468-
loop.run_in_executor(process_pool, _process_node_chunk, chunk)
469-
for chunk in chunks
470-
]
471-
472-
chunk_results = await asyncio.gather(*tasks)
473-
474-
# Merge results - this is also CPU-bound, so parallelize if many chunks
475-
if len(chunk_results) > 4:
476-
# Split merge operation into smaller chunks
477-
chunk_size = max(1, len(chunk_results) // 4)
478-
merge_chunks = [
479-
chunk_results[i:i + chunk_size]
480-
for i in range(0, len(chunk_results), chunk_size)
481-
]
482-
merge_tasks = [
483-
loop.run_in_executor(process_pool, _merge_usage_dicts, merge_chunk)
484-
for merge_chunk in merge_chunks
485-
]
486-
partial_results = await asyncio.gather(*merge_tasks)
487-
final_result = _merge_usage_dicts(partial_results)
488-
else:
489-
final_result = _merge_usage_dicts(chunk_results)
467+
try:
468+
process_pool = await _get_process_pool()
469+
except Exception:
470+
logger.exception("Falling back to synchronous user usage calculation: failed to init process pool")
471+
return _process_usage_sync(chunks)
490472

491-
return [{"uid": uid, "value": value} for uid, value in final_result.items()]
473+
try:
474+
# Process chunks in parallel
475+
tasks = [loop.run_in_executor(process_pool, _process_node_chunk, chunk) for chunk in chunks]
476+
chunk_results = await asyncio.gather(*tasks)
477+
478+
# Merge results - this is also CPU-bound, so parallelize if many chunks
479+
if len(chunk_results) > 4:
480+
# Split merge operation into smaller chunks
481+
chunk_size = max(1, len(chunk_results) // 4)
482+
merge_chunks = [
483+
chunk_results[i : i + chunk_size]
484+
for i in range(0, len(chunk_results), chunk_size)
485+
]
486+
merge_tasks = [
487+
loop.run_in_executor(process_pool, _merge_usage_dicts, merge_chunk)
488+
for merge_chunk in merge_chunks
489+
]
490+
partial_results = await asyncio.gather(*merge_tasks)
491+
final_result = _merge_usage_dicts(partial_results)
492+
else:
493+
final_result = _merge_usage_dicts(chunk_results)
494+
495+
return [{"uid": uid, "value": value} for uid, value in final_result.items()]
496+
except Exception:
497+
logger.exception("Falling back to synchronous user usage calculation: executor merge failed")
498+
return _process_usage_sync(chunks)
492499

493500

494501
async def record_user_usages():

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies = [
4040
"uvloop>=0.22.1; sys_platform != 'win32'",
4141
"aiorwlock>=1.5.0",
4242
"typer>=0.20.0",
43-
"pasarguard-node-bridge>=0.3.10",
43+
"pasarguard-node-bridge>=0.3.11",
4444
"pip-system-certs>=5.3",
4545
]
4646

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)