You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Ensure RPC and Network Variable (NV) delivery is not delayed by high-rate RoomTransform processing under load.
Reduce lock contention in receive/periodic paths by moving heavy transform work outside self._rooms_lock.
Limit transform CPU and bandwidth impact so control-plane messages remain responsive even with many clients.
Preserve latest-only semantics for transforms while avoiding control-plane starvation.
Description
Prioritize control messages by adding a dedicated control queue self._pub_queue_ctrl and draining it first in _publisher_loop with a CTRL_DRAIN_BATCH limit and backlog watermark CTRL_BACKLOG_WATERMARK.
Add transform shaping using a token-bucket style budget (TRANSFORM_BUDGET_BYTES_PER_SEC) and send coalesced transforms with zmq.DONTWAIT in _try_send_transform to avoid blocking the publisher on slow SUBs.
Move NV flush before transform generation in _periodic_loop and skip transform generation when control backlog is high via _control_backlog_exceeded.
Reduce lock hold time by snapshotting room clients under self._rooms_lock, extracting per-client transform body bytes at receive time (saved in self.client_transform_body_cache), and serializing room transforms from these snapshots using bytearray + struct.pack_into in _serialize_room_transform outside the rooms lock.
Testing
No automated tests were executed as part of this change (no pytest run).
This PR introduces significant improvements to the server's traffic handling architecture by prioritizing control messages (RPC/NV) over transforms and adding rate limiting to prevent control-plane starvation. The implementation is generally solid, but I've identified several areas that need attention.
Critical Issues
1. Potential Deadlock in _serialize_room_transform (High Priority)
Location: server.py:1545-1575
The _serialize_room_transform method is called from _broadcast_room (line 1538), which is invoked from _adaptive_broadcast_all_rooms (line 1525) outside the _rooms_lock. However, it calls binary_serializer._serialize_client_data_short(buffer, transform_data) which may access shared state without proper locking.
Concern: If _serialize_client_data_short or _pack_string access any shared server state, this could cause race conditions since the snapshot is taken under lock but serialization happens outside.
Recommendation:
Verify that binary_serializer._serialize_client_data_short and binary_serializer._pack_string are pure functions that don't access server state
Add documentation comments confirming the lock-free nature of serialization
Consider adding assertions in debug mode to detect any unexpected state access
2. Token Bucket Refund Logic May Under-Refund (Medium Priority)
Location: server.py:438-447
When send_multipart fails with zmq.Again, the code refunds the full cost. However, the cost is calculated as len(message_bytes) * sub_count (line 421). If the SUB count changes between the cost calculation and the refund, you might refund the wrong amount.
sub_count=max(1, self._get_sub_connection_count()) # Line 420cost=len(message_bytes) *sub_count# Line 421# ... later ...self._transform_tokens+=cost# Line 440 (refund)
Recommendation: Capture and refund the exact amount that was deducted, not recalculate:
deducted_cost=cost# Save the actual deducted amount# ... in error handlers ...self._transform_tokens+=deducted_cost
3. Client Transform Body Cache Leak Risk (Medium Priority)
Location: server.py:978, 991, 1598
The client_transform_body_cache is cleaned up when clients timeout (line 1598), but if a client_no is reused (via _find_reusable_client_no), the old cached body might still be present. This could cause stale transform data to be broadcast.
Recommendation: Clear the cache entry when reusing a client_no in _find_reusable_client_no:
The NV flush happens before the control backlog check:
# Flush Network Variables before other categories# ... NV flush code ...ifnotself._control_backlog_exceeded():
self._adaptive_broadcast_all_rooms(current_time)
Problem: If the NV flush itself enqueues many control messages via _enqueue_pub, it could push the backlog over CTRL_BACKLOG_WATERMARK, but transforms would still be skipped even though the backlog was caused by the flush, not incoming traffic.
Recommendation: Check backlog before NV flush as well, or use separate watermarks for "skip transforms" vs "skip NV flush".
The _coalesce_latest dictionary has no size limit. With many active rooms, this could grow unbounded if transforms are generated faster than the publisher can drain them.
Recommendation: Add a watermark check in _enqueue_pub_latest:
def_enqueue_pub_latest(self, topic_bytes: bytes, message_bytes: bytes):
withself._coalesce_lock:
iflen(self._coalesce_latest) >1000: # Or make this a constantlogger.warning("Coalesce buffer overflow, dropping oldest")
# Drop oldest entry (FIFO)self._coalesce_latest.pop(next(iter(self._coalesce_latest)))
self._coalesce_latest[topic_bytes] =message_bytes
Some methods acquire _rooms_lock at the top level (e.g., _get_or_assign_client_no:549), while others acquire it mid-function (e.g., _adaptive_broadcast_all_rooms:1483). This makes it harder to reason about lock ordering.
Recommendation: Establish and document a consistent pattern. Consider using context managers with named scopes:
CTRL_DRAIN_BATCH = 256 - This value appears arbitrary. Add a comment explaining the rationale (e.g., "Balance between latency and throughput based on benchmarks").
8. Potential Type Inconsistency in _adaptive_broadcast_all_rooms (Low Priority)
Uses 0.0 as the default, but previously used 0 (line 1413). While functionally equivalent, using 0.0 consistently improves type clarity.
Status: Actually, this is now consistent with line 1383. Good fix!
Performance Observations
9. Good: Lock Hold Time Reduction ✓
The snapshot-based approach in _adaptive_broadcast_all_rooms (lines 1508-1518) successfully reduces lock hold time by extracting client data under lock, then releasing before serialization. This is excellent for concurrency.
10. Good: Control Traffic Prioritization ✓
The two-phase drain in _publisher_loop (lines 477-508) properly prioritizes control messages. The CTRL_DRAIN_BATCH limit prevents RPC/NV from completely starving transforms.
11. Good: Token Bucket Rate Limiting ✓
The token bucket implementation (lines 398-447) correctly implements refill logic and considers fanout cost (sub_count). This should effectively prevent bandwidth exhaustion.
Testing Gaps
12. Missing Test Coverage (High Priority)
The PR description states: "No automated tests were executed as part of this change (no pytest run)."
Critical tests needed:
Control message priority under high transform load
This PR makes meaningful improvements to server scalability and responsiveness. The core design is sound, but testing is required before merge, and the token bucket refund logic needs a fix. With those addressed, this will be a solid enhancement.
Recommendation: Request changes for items #2, #3, and #12, then approve after fixes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
self._rooms_lock.Description
self._pub_queue_ctrland draining it first in_publisher_loopwith aCTRL_DRAIN_BATCHlimit and backlog watermarkCTRL_BACKLOG_WATERMARK.TRANSFORM_BUDGET_BYTES_PER_SEC) and send coalesced transforms withzmq.DONTWAITin_try_send_transformto avoid blocking the publisher on slow SUBs._periodic_loopand skip transform generation when control backlog is high via_control_backlog_exceeded.self._rooms_lock, extracting per-client transform body bytes at receive time (saved inself.client_transform_body_cache), and serializing room transforms from these snapshots usingbytearray+struct.pack_intoin_serialize_room_transformoutside the rooms lock.Testing
pytestrun).Codex Task