Skip to content

Commit 439796f

Browse files
committed
chore: update pasarguard-node-bridge to version 0.4.1 and adjust related dependencies
1 parent 25e3649 commit 439796f

File tree

5 files changed

+21
-115
lines changed

5 files changed

+21
-115
lines changed

app/jobs/record_usages.py

Lines changed: 12 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -94,31 +94,6 @@ async def _cleanup_thread_pool():
9494
logger.info("ThreadPoolExecutor shut down successfully")
9595

9696

97-
# Helper functions for threading (lightweight operations that release GIL)
98-
def _process_node_chunk(chunk_data: tuple) -> dict:
99-
"""
100-
Process a chunk of node data - lightweight CPU operation.
101-
Uses simple arithmetic and dict operations that release GIL, perfect for threads.
102-
"""
103-
node_id, params, coeff = chunk_data
104-
users_usage = defaultdict(int)
105-
for param in params:
106-
uid = int(param["uid"])
107-
value = int(param["value"] * coeff)
108-
users_usage[uid] += value
109-
return dict(users_usage)
110-
111-
112-
def _merge_usage_dicts(dicts: list[dict]) -> dict:
113-
"""
114-
Merge multiple usage dictionaries.
115-
Dict operations release GIL, perfect for ThreadPoolExecutor.
116-
"""
117-
merged = defaultdict(int)
118-
for d in dicts:
119-
for uid, value in d.items():
120-
merged[uid] += value
121-
return dict(merged)
12297

12398

12499
async def get_dialect() -> str:
@@ -567,76 +542,6 @@ async def calculate_admin_usage(users_usage: list) -> tuple[dict, set[int]]:
567542
return admin_usage, set(user_admin_map.keys())
568543

569544

570-
async def calculate_users_usage(api_params: dict, usage_coefficient: dict) -> list:
571-
"""Calculate aggregated user usage across all nodes with coefficients applied.
572-
573-
Uses ThreadPoolExecutor for lightweight operations (dict/arithmetic that release GIL).
574-
ThreadPoolExecutor is faster than ProcessPoolExecutor for these operations due to less overhead.
575-
"""
576-
if not api_params:
577-
return []
578-
579-
def _process_usage_sync(chunks_data: list[tuple[int, list[dict], float]]):
580-
"""Synchronous fallback used for small batches or on executor failures."""
581-
users_usage = defaultdict(int)
582-
for _, params, coeff in chunks_data:
583-
for param in params:
584-
uid = int(param["uid"])
585-
value = int(param["value"] * coeff)
586-
users_usage[uid] += value
587-
return [{"uid": uid, "value": value} for uid, value in users_usage.items()]
588-
589-
# Prepare chunks for parallel processing
590-
chunks = [
591-
(node_id, params, usage_coefficient.get(node_id, 1))
592-
for node_id, params in api_params.items()
593-
if params # Skip empty params
594-
]
595-
596-
if not chunks:
597-
return []
598-
599-
# For small datasets, process synchronously to avoid overhead
600-
total_params = sum(len(params) for _, params, _ in chunks)
601-
if total_params < 1000:
602-
return _process_usage_sync(chunks)
603-
604-
# Large dataset - use ThreadPoolExecutor (faster for lightweight operations)
605-
loop = asyncio.get_running_loop()
606-
try:
607-
thread_pool = await _get_thread_pool()
608-
except Exception:
609-
logger.exception("Falling back to synchronous user usage calculation: failed to init thread pool")
610-
return _process_usage_sync(chunks)
611-
612-
try:
613-
# Process chunks in parallel using threads (less overhead than processes)
614-
tasks = [loop.run_in_executor(thread_pool, _process_node_chunk, chunk) for chunk in chunks]
615-
chunk_results = await asyncio.gather(*tasks)
616-
617-
# Merge results - also lightweight, use threads
618-
if len(chunk_results) > 4:
619-
# Split merge operation into smaller chunks
620-
chunk_size = max(1, len(chunk_results) // 4)
621-
merge_chunks = [
622-
chunk_results[i : i + chunk_size]
623-
for i in range(0, len(chunk_results), chunk_size)
624-
]
625-
merge_tasks = [
626-
loop.run_in_executor(thread_pool, _merge_usage_dicts, merge_chunk)
627-
for merge_chunk in merge_chunks
628-
]
629-
partial_results = await asyncio.gather(*merge_tasks)
630-
final_result = _merge_usage_dicts(partial_results)
631-
else:
632-
final_result = _merge_usage_dicts(chunk_results)
633-
634-
return [{"uid": uid, "value": value} for uid, value in final_result.items()]
635-
except Exception:
636-
logger.exception("Falling back to synchronous user usage calculation: executor merge failed")
637-
return _process_usage_sync(chunks)
638-
639-
640545
async def _record_user_usages_impl():
641546
"""
642547
Internal implementation of record_user_usages.
@@ -673,7 +578,18 @@ async def _record_user_usages_impl():
673578
else:
674579
api_params[node_id] = result
675580

676-
users_usage = await calculate_users_usage(api_params, usage_coefficient)
581+
# Aggregate user usage across all nodes with coefficients applied
582+
users_usage_dict = defaultdict(int)
583+
for node_id, params in api_params.items():
584+
if not params:
585+
continue
586+
coeff = usage_coefficient.get(node_id, 1.0)
587+
for param in params:
588+
uid = int(param["uid"])
589+
value = int(param["value"] * coeff)
590+
users_usage_dict[uid] += value
591+
592+
users_usage = [{"uid": uid, "value": value} for uid, value in users_usage_dict.items()]
677593
if not users_usage:
678594
logger.debug("No user usage to record")
679595
return

app/node/__init__.py

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,15 @@
1616

1717
def calculate_max_message_size(active_users_count: int) -> int:
1818
"""
19-
Calculate max_message_size based on active users count.
20-
21-
Formula: Base 5MB + (active_users_count * 1KB per user)
22-
This ensures sufficient buffer for large node configurations with many users.
19+
max_message_size to 64MB.
2320
2421
Args:
2522
active_users_count: Number of active users in the system
2623
2724
Returns:
28-
int: Max message size in bytes
25+
int: Max message size in bytes (64MB)
2926
"""
30-
base_size = 5 * 1024 * 1024 # 5MB base
31-
per_user_size = 1024 # 1KB per user
32-
calculated_size = base_size + (active_users_count * per_user_size)
33-
34-
# Cap at 100MB to prevent excessive memory usage
35-
max_cap = 100 * 1024 * 1024 # 100MB
36-
37-
return min(calculated_size, max_cap)
27+
return 64 * 1024 * 1024 # 64MB
3828

3929

4030
class NodeManager:

dashboard/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "pasarguard-dashboard",
33
"private": true,
4-
"version": "1.10.0",
4+
"version": "1.10.2",
55
"scripts": {
66
"dev": "vite dev",
77
"build": "vite build",

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ dependencies = [
4040
"uvloop>=0.22.1; sys_platform != 'win32'",
4141
"aiorwlock>=1.5.0",
4242
"typer>=0.20.0",
43-
"pasarguard-node-bridge>=0.4.0",
43+
"pasarguard-node-bridge>=0.4.1",
4444
"pip-system-certs>=5.3",
4545
]
4646

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)