Skip to content

Commit 4fdf7fc

Browse files
committed
perf: optimize usage recording and node checks with concurrency limits
- Add JOB_SEM semaphore (limit 3) to prevent DB lock storms in record_usages - Add NODE_CHECK_SEM semaphore (limit 5) to prevent DB/API overload in node_checker - Reduce time buckets from hourly to 10-minute intervals (6x less hot row contention) - Reduce max_retries from 5 to 2 to prevent retry amplification - Add fail-fast logic for SQLite locks (retry once then fail) - Implement batched writes for all node stats (single UPSERT instead of per-node) - Add hard timeout (25s) to prevent scheduler backlog - Add adaptive scheduling to skip overlapping job runs - Add comprehensive metrics and logging for monitoring These changes address the root causes of crashes under high traffic: - Fan-out concurrency explosion - Hot row contention from hourly buckets - Retry storms under load - Write amplification from per-node operations - Scheduler backlog without timeouts
1 parent c6547be commit 4fdf7fc

File tree

2 files changed

+387
-210
lines changed

2 files changed

+387
-210
lines changed

app/jobs/node_checker.py

Lines changed: 66 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
node_operator = NodeOperation(operator_type=OperatorType.SYSTEM)
1919
logger = get_logger("node-checker")
2020

21+
# Hard-limit concurrency: Prevent DB/API overload during health checks
22+
# Limits concurrent node health check operations
23+
NODE_CHECK_SEM = asyncio.Semaphore(5) # Max 5 concurrent node health checks
24+
2125

2226
async def verify_node_backend_health(node: PasarGuardNode, node_name: str) -> tuple[Health, int | None, str | None]:
2327
"""
@@ -80,74 +84,76 @@ async def process_node_health_check(db_node: Node, node: PasarGuardNode):
8084
if node is None:
8185
return
8286

83-
# Handle hard reset requirement
84-
if node.requires_hard_reset():
85-
async with GetDB() as db:
86-
await node_operator.connect_single_node(db, db_node.id)
87-
return
88-
89-
try:
90-
health, error_code, error_message = await verify_node_backend_health(node, db_node.name)
91-
except asyncio.TimeoutError:
92-
# Record timeout error in database but don't reconnect
93-
logger.warning(f"[{db_node.name}] Health check timed out")
94-
async with GetDB() as db:
95-
await NodeOperation._update_single_node_status(
96-
db, db_node.id, NodeStatus.error, message="Health check timeout"
97-
)
98-
return
99-
except NodeAPIError as e:
100-
# Record error in database
101-
async with GetDB() as db:
102-
await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail)
103-
# For timeout errors (code=-1), don't reconnect - just wait for recovery
104-
if e.code == -1:
105-
logger.warning(f"[{db_node.name}] Health check timed out (NodeAPIError), waiting for recovery")
87+
# Limit concurrent health checks to prevent DB/API overload
88+
async with NODE_CHECK_SEM:
89+
# Handle hard reset requirement
90+
if node.requires_hard_reset():
91+
async with GetDB() as db:
92+
await node_operator.connect_single_node(db, db_node.id)
10693
return
107-
# For other errors, reconnect
108-
async with GetDB() as db:
109-
await node_operator.connect_single_node(db, db_node.id)
110-
return
11194

112-
# Skip nodes that are already healthy and connected
113-
if health == Health.HEALTHY and db_node.status == NodeStatus.connected:
114-
return
95+
try:
96+
health, error_code, error_message = await verify_node_backend_health(node, db_node.name)
97+
except asyncio.TimeoutError:
98+
# Record timeout error in database but don't reconnect
99+
logger.warning(f"[{db_node.name}] Health check timed out")
100+
async with GetDB() as db:
101+
await NodeOperation._update_single_node_status(
102+
db, db_node.id, NodeStatus.error, message="Health check timeout"
103+
)
104+
return
105+
except NodeAPIError as e:
106+
# Record error in database
107+
async with GetDB() as db:
108+
await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=e.detail)
109+
# For timeout errors (code=-1), don't reconnect - just wait for recovery
110+
if e.code == -1:
111+
logger.warning(f"[{db_node.name}] Health check timed out (NodeAPIError), waiting for recovery")
112+
return
113+
# For other errors, reconnect
114+
async with GetDB() as db:
115+
await node_operator.connect_single_node(db, db_node.id)
116+
return
115117

116-
if health is Health.INVALID:
117-
logger.warning(f"[{db_node.name}] Node health is INVALID, ignoring...")
118-
return
118+
# Skip nodes that are already healthy and connected
119+
if health == Health.HEALTHY and db_node.status == NodeStatus.connected:
120+
return
119121

120-
# Handle NOT_CONNECTED - reconnect immediately
121-
if health is Health.NOT_CONNECTED:
122-
async with GetDB() as db:
123-
await node_operator.connect_single_node(db, db_node.id)
124-
return
122+
if health is Health.INVALID:
123+
logger.warning(f"[{db_node.name}] Node health is INVALID, ignoring...")
124+
return
125125

126-
# Handle BROKEN health
127-
if health == Health.BROKEN:
128-
# Record actual error in database
129-
async with GetDB() as db:
130-
await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=error_message)
131-
# Only reconnect for non-timeout errors (code > -1)
132-
if error_code is not None and error_code > -1:
126+
# Handle NOT_CONNECTED - reconnect immediately
127+
if health is Health.NOT_CONNECTED:
133128
async with GetDB() as db:
134129
await node_operator.connect_single_node(db, db_node.id)
135-
# For timeout (code=-1 or None), just wait - don't reconnect
136-
return
130+
return
137131

138-
# Update status for recovering nodes
139-
if db_node.status in (NodeStatus.connecting, NodeStatus.error) and health == Health.HEALTHY:
140-
async with GetDB() as db:
141-
logger.info(f"Node '{db_node.name}' have been recovered")
142-
node_version, core_version = await node.get_versions()
143-
await NodeOperation._update_single_node_status(
144-
db,
145-
db_node.id,
146-
NodeStatus.connected,
147-
xray_version=core_version,
148-
node_version=node_version,
149-
)
150-
return
132+
# Handle BROKEN health
133+
if health == Health.BROKEN:
134+
# Record actual error in database
135+
async with GetDB() as db:
136+
await NodeOperation._update_single_node_status(db, db_node.id, NodeStatus.error, message=error_message)
137+
# Only reconnect for non-timeout errors (code > -1)
138+
if error_code is not None and error_code > -1:
139+
async with GetDB() as db:
140+
await node_operator.connect_single_node(db, db_node.id)
141+
# For timeout (code=-1 or None), just wait - don't reconnect
142+
return
143+
144+
# Update status for recovering nodes
145+
if db_node.status in (NodeStatus.connecting, NodeStatus.error) and health == Health.HEALTHY:
146+
async with GetDB() as db:
147+
logger.info(f"Node '{db_node.name}' have been recovered")
148+
node_version, core_version = await node.get_versions()
149+
await NodeOperation._update_single_node_status(
150+
db,
151+
db_node.id,
152+
NodeStatus.connected,
153+
xray_version=core_version,
154+
node_version=node_version,
155+
)
156+
return
151157

152158

153159
async def check_node_limits():

0 commit comments

Comments
 (0)