|
18 | 18 | node_operator = NodeOperation(operator_type=OperatorType.SYSTEM) |
19 | 19 | logger = get_logger("node-checker") |
20 | 20 |
|
| 21 | +# Hard-limit concurrency: Prevent DB/API overload during health checks |
| 22 | +# Limits concurrent node health check operations |
| 23 | +NODE_CHECK_SEM = asyncio.Semaphore(5) # Max 5 concurrent node health checks |
| 24 | + |
21 | 25 |
|
22 | 26 | async def verify_node_backend_health(node: PasarGuardNode, node_name: str) -> tuple[Health, int | None, str | None]: |
23 | 27 | """ |
@@ -80,74 +84,76 @@ async def process_node_health_check(db_node: Node, node: PasarGuardNode): |
80 | 84 | if node is None: |
81 | 85 | return |
82 | 86 |
|
83 | | - # Handle hard reset requirement |
84 | | - if node.requires_hard_reset(): |
85 | | - async with GetDB() as db: |
86 | | - await node_operator.connect_single_node(db, db_node.id) |
87 | | - return |
88 | | - |
89 | | - try: |
90 | | - health, error_code, error_message = await verify_node_backend_health(node, db_node.name) |
91 | | - except asyncio.TimeoutError: |
92 | | - # Record timeout error in database but don't reconnect |
93 | | - logger.warning(f"[{db_node.name}] Health check timed out") |
94 | | - async with GetDB() as db: |
95 | | - await NodeOperation._update_single_node_status( |
96 | | - db, db_node.id, NodeStatus.error, message="Health check timeout" |
97 | | - ) |
98 | | - return |
99 | | - except NodeAPIError as e: |
100 | | - # Record error in database |
101 | | - async with GetDB() as db: |
102 | | - await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail) |
103 | | - # For timeout errors (code=-1), don't reconnect - just wait for recovery |
104 | | - if e.code == -1: |
105 | | - logger.warning(f"[{db_node.name}] Health check timed out (NodeAPIError), waiting for recovery") |
| 87 | + # Limit concurrent health checks to prevent DB/API overload |
| 88 | + async with NODE_CHECK_SEM: |
| 89 | + # Handle hard reset requirement |
| 90 | + if node.requires_hard_reset(): |
| 91 | + async with GetDB() as db: |
| 92 | + await node_operator.connect_single_node(db, db_node.id) |
106 | 93 | return |
107 | | - # For other errors, reconnect |
108 | | - async with GetDB() as db: |
109 | | - await node_operator.connect_single_node(db, db_node.id) |
110 | | - return |
111 | 94 |
|
112 | | - # Skip nodes that are already healthy and connected |
113 | | - if health == Health.HEALTHY and db_node.status == NodeStatus.connected: |
114 | | - return |
| 95 | + try: |
| 96 | + health, error_code, error_message = await verify_node_backend_health(node, db_node.name) |
| 97 | + except asyncio.TimeoutError: |
| 98 | + # Record timeout error in database but don't reconnect |
| 99 | + logger.warning(f"[{db_node.name}] Health check timed out") |
| 100 | + async with GetDB() as db: |
| 101 | + await NodeOperation._update_single_node_status( |
| 102 | + db, db_node.id, NodeStatus.error, message="Health check timeout" |
| 103 | + ) |
| 104 | + return |
| 105 | + except NodeAPIError as e: |
| 106 | + # Record error in database |
| 107 | + async with GetDB() as db: |
| 108 | + await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail) |
| 109 | + # For timeout errors (code=-1), don't reconnect - just wait for recovery |
| 110 | + if e.code == -1: |
| 111 | + logger.warning(f"[{db_node.name}] Health check timed out (NodeAPIError), waiting for recovery") |
| 112 | + return |
| 113 | + # For other errors, reconnect |
| 114 | + async with GetDB() as db: |
| 115 | + await node_operator.connect_single_node(db, db_node.id) |
| 116 | + return |
115 | 117 |
|
116 | | - if health is Health.INVALID: |
117 | | - logger.warning(f"[{db_node.name}] Node health is INVALID, ignoring...") |
118 | | - return |
| 118 | + # Skip nodes that are already healthy and connected |
| 119 | + if health == Health.HEALTHY and db_node.status == NodeStatus.connected: |
| 120 | + return |
119 | 121 |
|
120 | | - # Handle NOT_CONNECTED - reconnect immediately |
121 | | - if health is Health.NOT_CONNECTED: |
122 | | - async with GetDB() as db: |
123 | | - await node_operator.connect_single_node(db, db_node.id) |
124 | | - return |
| 122 | + if health is Health.INVALID: |
| 123 | + logger.warning(f"[{db_node.name}] Node health is INVALID, ignoring...") |
| 124 | + return |
125 | 125 |
|
126 | | - # Handle BROKEN health |
127 | | - if health == Health.BROKEN: |
128 | | - # Record actual error in database |
129 | | - async with GetDB() as db: |
130 | | - await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=error_message) |
131 | | - # Only reconnect for non-timeout errors (code > -1) |
132 | | - if error_code is not None and error_code > -1: |
| 126 | + # Handle NOT_CONNECTED - reconnect immediately |
| 127 | + if health is Health.NOT_CONNECTED: |
133 | 128 | async with GetDB() as db: |
134 | 129 | await node_operator.connect_single_node(db, db_node.id) |
135 | | - # For timeout (code=-1 or None), just wait - don't reconnect |
136 | | - return |
| 130 | + return |
137 | 131 |
|
138 | | - # Update status for recovering nodes |
139 | | - if db_node.status in (NodeStatus.connecting, NodeStatus.error) and health == Health.HEALTHY: |
140 | | - async with GetDB() as db: |
141 | | - logger.info(f"Node '{db_node.name}' have been recovered") |
142 | | - node_version, core_version = await node.get_versions() |
143 | | - await NodeOperation._update_single_node_status( |
144 | | - db, |
145 | | - db_node.id, |
146 | | - NodeStatus.connected, |
147 | | - xray_version=core_version, |
148 | | - node_version=node_version, |
149 | | - ) |
150 | | - return |
| 132 | + # Handle BROKEN health |
| 133 | + if health == Health.BROKEN: |
| 134 | + # Record actual error in database |
| 135 | + async with GetDB() as db: |
| 136 | + await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=error_message) |
| 137 | + # Only reconnect for non-timeout errors (code > -1) |
| 138 | + if error_code is not None and error_code > -1: |
| 139 | + async with GetDB() as db: |
| 140 | + await node_operator.connect_single_node(db, db_node.id) |
| 141 | + # For timeout (code=-1 or None), just wait - don't reconnect |
| 142 | + return |
| 143 | + |
| 144 | + # Update status for recovering nodes |
| 145 | + if db_node.status in (NodeStatus.connecting, NodeStatus.error) and health == Health.HEALTHY: |
| 146 | + async with GetDB() as db: |
| 147 | + logger.info(f"Node '{db_node.name}' have been recovered") |
| 148 | + node_version, core_version = await node.get_versions() |
| 149 | + await NodeOperation._update_single_node_status( |
| 150 | + db, |
| 151 | + db_node.id, |
| 152 | + NodeStatus.connected, |
| 153 | + xray_version=core_version, |
| 154 | + node_version=node_version, |
| 155 | + ) |
| 156 | + return |
151 | 157 |
|
152 | 158 |
|
153 | 159 | async def check_node_limits(): |
|
0 commit comments