Skip to content

Commit e2654ff

Browse files
committed
feat: add max message size calculation based on active users count
- Introduced a new function `calculate_max_message_size` to compute the maximum message size based on the number of active users. - Updated `NodeManager`'s `update_node` method to accept an optional `max_message_size` parameter. - Modified `NodeOperation` to calculate and pass the max message size when updating nodes, ensuring efficient memory usage.
1 parent 1e0c9d2 commit e2654ff

File tree

2 files changed

+53
-9
lines changed

2 files changed

+53
-9
lines changed

app/node/__init__.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,29 @@
1414
}
1515

1616

17+
def calculate_max_message_size(active_users_count: int) -> int:
18+
"""
19+
Calculate max_message_size based on active users count.
20+
21+
Formula: Base 10MB + (active_users_count * 50KB per user)
22+
This ensures sufficient buffer for large node configurations with many users.
23+
24+
Args:
25+
active_users_count: Number of active users in the system
26+
27+
Returns:
28+
int: Max message size in bytes
29+
"""
30+
base_size = 10 * 1024 * 1024 # 10MB base
31+
per_user_size = 50 * 1024 # 50KB per user
32+
calculated_size = base_size + (active_users_count * per_user_size)
33+
34+
# Cap at 100MB to prevent excessive memory usage
35+
max_cap = 100 * 1024 * 1024 # 100MB
36+
37+
return min(calculated_size, max_cap)
38+
39+
1740
class NodeManager:
1841
def __init__(self):
1942
self._nodes: dict[int, PasarGuardNode] = {}
@@ -30,7 +53,7 @@ async def _shutdown_node(self, node: PasarGuardNode | None):
3053
except Exception:
3154
pass
3255

33-
async def update_node(self, node: Node) -> PasarGuardNode:
56+
async def update_node(self, node: Node, max_message_size: int | None = None) -> PasarGuardNode:
3457
async with self._lock.writer_lock:
3558
old_node: PasarGuardNode | None = self._nodes.pop(node.id, None)
3659

@@ -45,6 +68,7 @@ async def update_node(self, node: Node) -> PasarGuardNode:
4568
logger=self.logger,
4669
default_timeout=node.default_timeout,
4770
internal_timeout=node.internal_timeout,
71+
max_message_size=max_message_size,
4872
extra={"id": node.id, "usage_coefficient": node.usage_coefficient},
4973
)
5074

@@ -117,4 +141,4 @@ async def remove_user(self, user: UserResponse):
117141
node_manager: NodeManager = NodeManager()
118142

119143

120-
__all__ = ["core_users", "node_manager"]
144+
__all__ = ["calculate_max_message_size", "core_users", "node_manager"]

app/operation/node.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
reset_node_usage,
2222
update_node_status,
2323
)
24-
from app.db.crud.user import get_user
25-
from app.db.models import Node, NodeStatus
24+
from app.db.crud.user import get_user, get_users_count_by_status
25+
from app.db.models import Node, NodeStatus, UserStatus
2626
from app.models.admin import AdminDetails
2727
from app.models.node import (
2828
NodeCoreUpdate,
@@ -37,7 +37,7 @@
3737
UserIPListAll,
3838
)
3939
from app.models.stats import NodeRealtimeStats, NodeStatsList, NodeUsageStatsList, Period
40-
from app.node import core_users, node_manager
40+
from app.node import calculate_max_message_size, core_users, node_manager
4141
from app.operation import BaseOperation
4242
from app.utils.logger import get_logger
4343

@@ -197,8 +197,13 @@ async def create_node(self, db: AsyncSession, new_node: NodeCreate, admin: Admin
197197
except IntegrityError:
198198
await self.raise_error(message=f'Node "{new_node.name}" already exists', code=409, db=db)
199199

200+
# Calculate max_message_size based on active users count
201+
user_counts = await get_users_count_by_status(db, [UserStatus.active])
202+
active_users_count = user_counts.get(UserStatus.active.value, 0)
203+
max_message_size = calculate_max_message_size(active_users_count)
204+
200205
try:
201-
await node_manager.update_node(db_node)
206+
await node_manager.update_node(db_node, max_message_size=max_message_size)
202207
asyncio.create_task(self.connect_single_node(db, db_node.id))
203208
except NodeAPIError as e:
204209
await self._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail)
@@ -226,8 +231,13 @@ async def modify_node(
226231
if db_node.status in (NodeStatus.disabled, NodeStatus.limited):
227232
await self.disconnect_single_node(db_node.id)
228233
else:
234+
# Calculate max_message_size based on active users count
235+
user_counts = await get_users_count_by_status(db, [UserStatus.active])
236+
active_users_count = user_counts.get(UserStatus.active.value, 0)
237+
max_message_size = calculate_max_message_size(active_users_count)
238+
229239
try:
230-
await node_manager.update_node(db_node)
240+
await node_manager.update_node(db_node, max_message_size=max_message_size)
231241
asyncio.create_task(self.connect_single_node(db, db_node.id))
232242
except NodeAPIError as e:
233243
await self._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail)
@@ -300,12 +310,17 @@ async def connect_nodes_bulk(
300310
# Fetch users ONCE for all nodes
301311
users = await core_users(db=db)
302312

313+
# Calculate max_message_size based on active users count (once for all nodes)
314+
user_counts = await get_users_count_by_status(db, [UserStatus.active])
315+
active_users_count = user_counts.get(UserStatus.active.value, 0)
316+
max_message_size = calculate_max_message_size(active_users_count)
317+
303318
async def connect_single(node: Node) -> dict | None:
304319
if node is None or node.status in (NodeStatus.disabled, NodeStatus.limited):
305320
return
306321

307322
try:
308-
await node_manager.update_node(node)
323+
await node_manager.update_node(node, max_message_size=max_message_size)
309324
except NodeAPIError as e:
310325
return {
311326
"node_id": node.id,
@@ -376,9 +391,14 @@ async def connect_single_node(self, db: AsyncSession, node_id: int) -> None:
376391
# Get core users once
377392
users = await core_users(db=db)
378393

394+
# Calculate max_message_size based on active users count
395+
user_counts = await get_users_count_by_status(db, [UserStatus.active])
396+
active_users_count = user_counts.get(UserStatus.active.value, 0)
397+
max_message_size = calculate_max_message_size(active_users_count)
398+
379399
# Update node manager
380400
try:
381-
await node_manager.update_node(db_node)
401+
await node_manager.update_node(db_node, max_message_size=max_message_size)
382402
except NodeAPIError as e:
383403
# Update status to error using simple CRUD
384404
await update_node_status(

0 commit comments

Comments
 (0)