Skip to content

feat: implement priority-based sending and control message handling in NetSync client#318

Merged
from2001 merged 2 commits intodevelopfrom
refactor/simulator-with-recent-pr
Jan 28, 2026
Merged

feat: implement priority-based sending and control message handling in NetSync client#318
from2001 merged 2 commits intodevelopfrom
refactor/simulator-with-recent-pr

Conversation

@from2001
Copy link
Collaborator

@from2001 from2001 commented Jan 28, 2026

This Python client / simulator refactoring is intended to introduce the features which was implemented in Unity clients in the following two PRs.

#316
#311


This pull request introduces a major refactor and enhancement to the NetSync Python client, focusing on robust, priority-based message sending, improved backpressure handling, and more efficient message processing. The changes closely mirror recent improvements in the Unity client, aligning the Python implementation for feature parity, reliability, and performance.

The most important changes are:

Priority-based Outbound Messaging and Backpressure Handling:

  • Introduced OutboundPacket, OutboundLane, SendStatus, and SendOutcome abstractions to support priority-based outbound queues, TTL expiration for control messages, and explicit backpressure/failure handling. Control messages (RPC, Network Variables) are now prioritized over transform updates, and only the latest transform is retained for sending (latest-wins). [1] [2]
  • Outgoing control messages are queued with FIFO and TTL; if the queue is full, messages are dropped with logging and stats. Transform updates are not queued but replaced if a new one arrives before sending. [1] [2] [3] [4]

Efficient Socket and Queue Management:

  • Sockets are now configured with low HWM (high water mark) values to detect and respond to backpressure quickly, and to prefer recent messages over stale ones. This applies to both the main client and the simulator. [1] [2]
  • Added logic to clear outgoing buffers on shutdown and to handle cleanup more robustly. [1] [2]

Optimized Receive Loop and CPU Usage:

  • The receive loop now drains all available SUB messages, processing only the last one ("drain and keep last"), which reduces CPU usage and ensures only the latest state is processed. DEALER socket now also handles control messages from the server. Idle waiting is minimized to avoid busy-waiting.

API Changes and Documentation:

  • All sending APIs (send_transform, send_stealth_handshake, rpc, set_global_variable, set_client_variable) now enqueue messages for the network thread instead of sending immediately, with clear docstrings explaining the new semantics. [1] [2] [3] [4] [5]
  • Added and updated docstrings and comments throughout for clarity, including a summary of key features at the top of the client file.

Statistics and Diagnostics:

  • Extended statistics tracking for dropped frames, queue drops, and backpressure events, aiding diagnostics and performance monitoring. [1] [2]

These changes significantly improve the reliability, scalability, and maintainability of the NetSync Python client, especially under high load or network congestion.

@from2001
Copy link
Collaborator Author

@claude Review this PR.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces priority-based message sending, improved backpressure handling, and enhanced message processing to the NetSync Python client, mirroring improvements made to the Unity client. The implementation aims to improve reliability and performance under high load by prioritizing control messages over transforms, implementing latest-wins semantics for transforms, and draining only the most recent messages from receive queues.

Changes:

  • Implemented priority-based outbound messaging with separate queues for control messages (RPC, network variables) and transforms (latest-wins)
  • Added TTL expiration for control messages and backpressure detection via low HWM socket configuration
  • Modified receive loop to drain SUB socket and process only the last message, and added DEALER socket polling for control messages from server

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 11 comments.

File Description
client.py Implements priority-based sending with OutboundPacket/OutboundLane abstractions, adds TTL-based control queue, modifies receive loop to drain and process only latest SUB messages, adds DEALER control message receive, updates all sending APIs to use queuing, configures low HWM for backpressure detection
client_simulator.py Adds recv_dealer_control method for receiving control messages via DEALER socket, updates socket configuration with low HWM values, refactors _poll_broadcasts to handle both DEALER and SUB sources, updates SharedSubscriber documentation to reflect ROUTER unicast for ID mappings, extracts _process_mapping_message helper

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +128 to +131
attempts: int = 0
"""Number of send attempts made for this packet."""


Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OutboundPacket.attempts field is defined but never incremented or used. Either remove this unused field from the dataclass, or implement retry logic that actually uses it. Currently it just accumulates technical debt without providing any value.

Suggested change
attempts: int = 0
"""Number of send attempts made for this packet."""

Copilot uses AI. Check for mistakes.
Comment on lines +429 to +437
dealer_room_id = message_parts[0].decode("utf-8")
dealer_payload = message_parts[1]

# Only process messages for our room
if dealer_room_id == self._room:
self._process_message(dealer_payload)
received_any = True
except zmq.Again:
break
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DEALER receive loop (lines 423-437) does not catch exceptions other than zmq.Again. If recv_multipart or decode raises an exception (e.g., UnicodeDecodeError from line 429), it will propagate to the outer exception handler at line 445, which only logs the error but continues the loop. However, the inner while True loop has no protection, meaning any non-zmq.Again exception will break out of the DEALER drain loop. This is inconsistent with the SUB receive loop which also doesn't catch exceptions inside its drain loop. Consider adding exception handling similar to the simulator's recv_dealer_control method (lines 738-740, 752-759) to handle malformed messages gracefully and continue draining.

Suggested change
dealer_room_id = message_parts[0].decode("utf-8")
dealer_payload = message_parts[1]
# Only process messages for our room
if dealer_room_id == self._room:
self._process_message(dealer_payload)
received_any = True
except zmq.Again:
break
try:
dealer_room_id = message_parts[0].decode("utf-8")
except UnicodeDecodeError as e:
# Malformed room ID in control message; skip this frame
logger.warning(
"Failed to decode DEALER room id: %s", e
)
continue
dealer_payload = message_parts[1]
# Only process messages for our room
if dealer_room_id == self._room:
try:
self._process_message(dealer_payload)
received_any = True
except Exception as e:
# Malformed or unexpected control payload; skip
logger.warning(
"Error processing DEALER control message: %s",
e,
)
continue
except zmq.Again:
break
except Exception as e:
# Unexpected error while draining DEALER; log and continue
logger.warning(
"Error in DEALER receive drain loop: %s", e
)
continue

Copilot uses AI. Check for mistakes.
Comment on lines +407 to +408
except zmq.Again:
break
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SUB socket drain loop (lines 396-408) does not catch exceptions other than zmq.Again. If recv_multipart raises an exception (e.g., context terminated, socket closed), it will propagate to the outer exception handler and break the drain loop. This means that if one message in the queue causes an error, remaining messages in the SUB socket buffer won't be drained. Consider adding exception handling inside the while loop to skip malformed messages and continue draining, similar to how the simulator's recv_broadcast handles exceptions.

Suggested change
except zmq.Again:
break
except zmq.Again:
# No more messages available on SUB socket
break
except zmq.ZMQError as e:
# Handle ZMQ-specific errors without breaking outer loop
# Terminal errors: stop draining this socket
if e.errno in getattr(zmq, "ETERM", ()), getattr(zmq, "ENOTSOCK", ()):
if self._running:
logger.error(
f"ZMQ error while draining SUB socket, stopping drain: {e}"
)
break
if self._running:
logger.warning(
f"Non-fatal ZMQ error while draining SUB socket, skipping message: {e}"
)
continue
except Exception as e:
# Unexpected error: log and skip this message, keep draining
if self._running:
logger.warning(
f"Unexpected error while draining SUB socket, skipping message: {e}"
)
continue

Copilot uses AI. Check for mistakes.
Comment on lines +787 to +793
IMPORTANT: As of PR #316, the server sends ID mappings via ROUTER unicast
instead of PUB/SUB. This means SharedSubscriber will NOT receive ID mapping
messages. Each SimulatedClient must poll its DEALER socket for control
messages (including ID mappings) via _poll_dealer_control().

SharedSubscriber is kept for backward compatibility and may still receive
transform broadcasts via SUB, but ID mapping functionality is deprecated.
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documentation states that "As of PR #316, the server sends ID mappings via ROUTER unicast instead of PUB/SUB" but this contradicts the code's behavior. The _poll_broadcasts method still processes MSG_DEVICE_ID_MAPPING from both DEALER (lines 1130-1146) and SUB (lines 1148-1166), and both call _process_mapping_message. This suggests the server may still send ID mappings via both channels, or the migration is incomplete. Clarify the documentation to accurately reflect whether ID mappings can arrive via SUB, DEALER, or both, to avoid confusion for future maintainers.

Suggested change
IMPORTANT: As of PR #316, the server sends ID mappings via ROUTER unicast
instead of PUB/SUB. This means SharedSubscriber will NOT receive ID mapping
messages. Each SimulatedClient must poll its DEALER socket for control
messages (including ID mappings) via _poll_dealer_control().
SharedSubscriber is kept for backward compatibility and may still receive
transform broadcasts via SUB, but ID mapping functionality is deprecated.
IMPORTANT: As of PR #316, the intended primary path for ID mappings is
ROUTER/DEALER unicast instead of PUB/SUB. Each SimulatedClient must poll
its DEALER socket for control messages (including ID mappings) via
_poll_dealer_control().
However, for backwards compatibility with older servers and mixed
deployments, ID mapping messages (MSG_DEVICE_ID_MAPPING) may still be
delivered via PUB/SUB, and this simulator continues to support mappings
received on the shared SUB socket. In practice, ID mappings can therefore
arrive via DEALER, SUB, or both, depending on server behavior.
SharedSubscriber is kept for backward compatibility and may still receive
transform broadcasts via SUB; new features should prefer the DEALER-based
control/mapping channel.

Copilot uses AI. Check for mistakes.
Comment on lines +423 to +437
while True:
try:
message_parts = self._dealer_socket.recv_multipart(
flags=zmq.NOBLOCK
)
if len(message_parts) >= 2:
dealer_room_id = message_parts[0].decode("utf-8")
dealer_payload = message_parts[1]

# Only process messages for our room
if dealer_room_id == self._room:
self._process_message(dealer_payload)
received_any = True
except zmq.Again:
break
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DEALER socket is draining all available control messages without a limit, which could cause starvation issues. The SUB socket drains with an implicit limit (breaks when no more messages available), but the DEALER loop at lines 423-437 continues draining indefinitely. This can cause the receive loop to spend excessive time processing DEALER messages while neglecting SUB messages or outgoing sends. Consider adding a max_drain limit similar to how SUB draining works in the client_simulator (e.g., drain up to 10 control messages per iteration).

Copilot uses AI. Check for mistakes.
Comment on lines +733 to +761
for _ in range(max_drain):
try:
parts = self.socket.recv_multipart(flags=zmq.NOBLOCK)
except zmq.Again:
return None
except Exception as e:
self.logger.debug(f"DEALER receive error: {e}")
return None

if len(parts) != 2:
continue

dealer_room_id, payload = parts
room_id_str = dealer_room_id.decode("utf-8", errors="ignore")

# Only process messages for our room
if room_id_str != self.room_id:
continue

try:
if not payload:
continue
msg_type = payload[0]
_, data, _ = deserialize(payload)
return msg_type, data
except Exception:
continue

return None
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recv_dealer_control method returns only the first relevant message and exits early (line 757) when a message is successfully parsed, but continues the loop when exceptions occur (line 759). This creates inconsistent behavior where the method returns after finding one good message but continues searching through all max_drain messages if it encounters errors. This could lead to processing at most one message even if multiple valid messages are available. Consider either: (1) returning immediately on the first message (good or bad), or (2) collecting all valid messages up to max_drain and returning the first one (but this changes the semantics). The current implementation appears to be intended to return the first valid message, so it should return None after draining max_drain messages without finding a valid one.

Copilot uses AI. Check for mistakes.
Comment on lines 222 to 231
self._stats = {
"messages_received": 0,
"transforms_received": 0,
"rpc_received": 0,
"nv_updates": 0,
"last_snapshot_time": 0.0,
"would_block_count": 0,
"dropped_transform_frames": 0,
"ctrl_queue_drops": 0,
}
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Statistics dictionary is modified without thread synchronization. The _stats dictionary is updated from both the receive loop thread and the main thread (via _enqueue_control). Dictionary operations like += 1 are not atomic in Python, and concurrent modifications can lead to race conditions and lost updates. Consider protecting _stats modifications with the existing _lock or a separate statistics lock to ensure thread-safe updates.

Copilot uses AI. Check for mistakes.
return True
except Full:
self._stats["ctrl_queue_drops"] += 1
logger.warning("Control outbox full, dropping message")
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning message when control outbox is full does not include context about which type of message was dropped (RPC, network variable, or stealth handshake). This makes debugging difficult when investigating message loss. Consider including the message type or operation name in the warning to help diagnose which operations are being affected by backpressure.

Copilot uses AI. Check for mistakes.
self._dealer_socket.setsockopt(zmq.LINGER, 0)
self._dealer_socket.setsockopt(
zmq.SNDHWM, 10
) # Low HWM for backpressure
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEALER socket is missing RCVHWM configuration. The SUB socket has RCVHWM set to 2 (line 298) to prefer recent updates, but the DEALER socket only has SNDHWM configured (line 288). Since DEALER now receives control messages from the server (RPC, NV, ID mapping), it should also have a RCVHWM configured to handle potential message buildup. Without this, the DEALER socket may accumulate stale control messages in memory if processing is slow. Consider setting a reasonable RCVHWM for the DEALER socket, consistent with the client_simulator's configuration or based on expected control message volume.

Suggested change
) # Low HWM for backpressure
) # Low HWM for backpressure
self._dealer_socket.setsockopt(
zmq.RCVHWM, 10
) # Bound receive queue for control messages

Copilot uses AI. Check for mistakes.
Comment on lines +498 to +500
# Backpressure - increment counter and stop draining
# The packet is DROPPED on backpressure (not retried)
self._stats["would_block_count"] += 1
Copy link

Copilot AI Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Control messages are dropped on backpressure without being re-queued. This means critical control messages (RPCs, network variable updates) can be silently lost during periods of backpressure, which differs from the behavior for transforms (which are retried). Consider re-queuing control messages that hit backpressure, or at minimum add more prominent logging and statistics tracking to make this message loss visible to users. The current warning log at line 500 only tracks the backpressure count, but doesn't indicate which specific message types or operations were dropped.

Suggested change
# Backpressure - increment counter and stop draining
# The packet is DROPPED on backpressure (not retried)
self._stats["would_block_count"] += 1
# Backpressure - increment counter and attempt to re-queue the packet
self._stats["would_block_count"] += 1
try:
# Re-queue the control packet so it can be retried later
self._ctrl_outbox.put_nowait(packet)
except Full:
# Queue is full: this control packet is dropped; log prominently
self._stats.setdefault("control_drop_count", 0)
self._stats["control_drop_count"] += 1
logger.warning(
"Control packet dropped due to backpressure and full outbox queue; "
"would_block_count=%s, control_drop_count=%s",
self._stats.get("would_block_count"),
self._stats.get("control_drop_count"),
)
# Stop draining on backpressure to respect socket backpressure signal

Copilot uses AI. Check for mistakes.
@claude
Copy link
Contributor

claude bot commented Jan 28, 2026

Claude finished @from2001's task in 1m 35s —— View job


Code Review for PR #318

Tasks

  • Read repository guidelines (CLAUDE.md)
  • Review client.py changes
  • Review client_simulator.py changes
  • Analyze thread safety and error handling
  • Provide comprehensive feedback

Overview

This PR introduces significant architectural improvements to the Python client, mirroring the Unity client's priority-based sending and backpressure handling improvements. The implementation is well-structured and addresses important performance and reliability concerns for XR applications with continuous transform updates.

Critical Issues

1. Thread Safety: Statistics Dictionary Not Protected ⚠️

Location: client.py:231

The _stats dictionary is modified from multiple threads without synchronization:

  • Modified in receive loop thread (lines 413-415, 500, 530, 564, 610, etc.)
  • Potentially accessed from main thread via get_stats() (line 1042)

Dictionary operations like += 1 are not atomic in Python. This can cause race conditions and lost updates.

Recommendation: Protect _stats updates with self._lock or a dedicated statistics lock.

# Example fix
with self._lock:
    self._stats["would_block_count"] += 1

Fix this →

2. Unused Field: OutboundPacket.attempts 🔍

Location: client.py:128

The attempts field is defined but never incremented or used anywhere in the codebase. This suggests incomplete retry logic or technical debt.

Recommendation: Either remove the unused field or implement retry logic that uses it.

3. Missing RCVHWM on DEALER Socket ⚠️

Location: client.py:289

The DEALER socket configures SNDHWM (line 288) but lacks RCVHWM configuration. Since PR #316, the DEALER socket receives control messages (RPC, NV, ID mappings) from the server. Without RCVHWM, stale control messages can accumulate in memory during slow processing.

Recommendation: Add RCVHWM configuration consistent with the simulator (line 556 shows SNDHWM=10):

self._dealer_socket.setsockopt(zmq.RCVHWM, 10)  # Bound receive queue

Fix this →

High Priority Issues

4. Incomplete Error Handling in SUB Drain Loop 🔍

Location: client.py:396-408

The SUB socket drain loop only catches zmq.Again. Other exceptions (e.g., context terminated, socket closed, malformed messages) will break the drain loop and propagate to the outer handler, preventing remaining messages from being drained.

Recommendation: Add exception handling inside the while loop to skip malformed messages and continue draining:

except zmq.Again:
    break
except zmq.ZMQError as e:
    if e.errno in (zmq.ETERM, zmq.ENOTSOCK):
        break
    if self._running:
        logger.warning(f"Non-fatal ZMQ error while draining SUB: {e}")
    continue
except Exception as e:
    if self._running:
        logger.warning(f"Unexpected error draining SUB, skipping message: {e}")
    continue

5. Incomplete Error Handling in DEALER Drain Loop 🔍

Location: client.py:423-437

Similar to issue #4, the DEALER receive loop lacks comprehensive error handling. Malformed messages or decode errors will break the drain loop.

Recommendation: Add error handling for UnicodeDecodeError (line 429) and general exceptions within the loop, similar to the simulator's approach (lines 738-759).

6. Control Messages Dropped on Backpressure Without Retry ⚠️

Location: client.py:497-501

When backpressure occurs, control messages (RPCs, network variables) are silently dropped without retry. This differs from transform behavior (which retries) and can lead to critical message loss.

The current implementation:

elif outcome.is_backpressure:
    # Backpressure - increment counter and stop draining
    # The packet is DROPPED on backpressure (not retried)
    self._stats["would_block_count"] += 1
    break

Recommendation: Consider re-queuing control packets on backpressure, or at minimum, add more prominent logging:

elif outcome.is_backpressure:
    self._stats["would_block_count"] += 1
    logger.warning(
        "Control packet dropped due to backpressure (type: %s), "
        "consider increasing HWM or processing speed",
        packet.lane.value
    )
    break

7. DEALER Drain Without Limit Can Cause Starvation 🔍

Location: client.py:423-437

The DEALER socket drains indefinitely (while True) without a message count limit. This can starve SUB message processing and outgoing sends if many control messages arrive.

Recommendation: Add a max drain limit (e.g., 10-64 messages) similar to _ctrl_drain_batch:

dealer_drained = 0
while dealer_drained < 64:  # Limit drain iterations
    try:
        # ... drain logic ...
        dealer_drained += 1
    except zmq.Again:
        break

8. Stealth Handshake Lacks Delivery Confirmation 🔍

Location: client.py:792-807

Stealth handshakes are critical for establishing client state but are sent via the control queue without delivery confirmation. If the queue is full, send_stealth_handshake() returns True even though the message was dropped.

Recommendation: Return the actual result from _enqueue_control():

def send_stealth_handshake(self) -> bool:
    # ...
    return self._enqueue_control(self._room, message)  # Already returns False on full queue

Medium Priority Issues

9. Insufficient Context in Control Queue Full Warning 📝

Location: client.py:882

The warning message doesn't indicate which type of control message was dropped (RPC, network variable, stealth handshake), making debugging difficult.

Recommendation: Include message type context:

logger.warning(
    "Control outbox full, dropping message (type: %s, queue_drops: %d)",
    "control",  # Could track packet type in OutboundPacket
    self._stats["ctrl_queue_drops"]
)

10. Documentation Inconsistency in Simulator 📝

Location: client_simulator.py:787-793

The SharedSubscriber docstring states ID mappings are sent via ROUTER unicast (PR #316), but the code still processes ID mappings from both DEALER (lines 1130-1146) and SUB (lines 1148-1166). This suggests incomplete migration or mixed deployment support.

Recommendation: Clarify documentation to reflect actual behavior:

"""Single SUB socket that receives broadcasts and shares mappings.

IMPORTANT: As of PR #316, the primary path for ID mappings is ROUTER/DEALER 
unicast. However, for backward compatibility with older servers, ID mappings 
may still arrive via PUB/SUB. This simulator supports both channels.
"""

11. Simulator recv_dealer_control Early Exit Inconsistency 🔍

Location: client_simulator.py:716-761

The method returns immediately on first valid message (line 757) but continues looping on exceptions (line 759). This means it may process fewer messages than max_drain if errors occur, creating inconsistent behavior.

Recommendation: Decide on consistent behavior - either return immediately always, or collect all valid messages up to max_drain.

Positive Aspects ✅

  1. Well-Documented Architecture: The file header clearly explains key features and design decisions
  2. Strong Type Safety: Proper use of dataclasses and enums for type safety
  3. Clean Separation of Concerns: Priority lanes, TTL handling, and backpressure detection are well-separated
  4. Consistent with Unity Client: The implementation successfully mirrors Unity's ConnectionManager improvements
  5. Comprehensive Statistics: Good diagnostic capabilities with detailed stats tracking
  6. Resource Management: Proper socket cleanup and thread management

Testing Recommendations

Before merging, consider testing:

  1. Concurrency: Multi-threaded stats access under load
  2. Backpressure: Control message behavior when HWM is reached
  3. Error Recovery: Malformed message handling in drain loops
  4. Edge Cases: DEALER drain starvation with many control messages
  5. Integration: Compatibility with Unity clients and server

Backward Compatibility

No concerns: Per CLAUDE.md, network protocol changes do not require backward compatibility. This is clearly a protocol enhancement.

Summary

This is a solid implementation that brings important improvements to the Python client. The architectural decisions are sound and well-aligned with the XR use case. However, there are several thread safety and error handling issues that should be addressed before merging:

Must Fix:

Should Fix:

Consider:

Overall assessment: Approve with changes requested for thread safety and critical error handling issues.


@from2001
Copy link
Collaborator Author

@claude Most of issues were fixed. Review the latest code.

@claude
Copy link
Contributor

claude bot commented Jan 28, 2026

Claude finished @from2001's task in 1m 15s —— View job


Re-reviewing PR #318

Tasks

  • Read latest client.py
  • Read latest client_simulator.py
  • Verify previous issues are fixed
  • Check for new issues
  • Provide comprehensive feedback

Review Summary

I've reviewed the latest code changes and can confirm that most of the critical issues from the previous review have been successfully addressed. The implementation is now in much better shape. Here's the detailed analysis:

✅ Issues Successfully Fixed

1. Thread Safety for Statistics (Issue #1) - FIXED ✅

Location: client.py:557-558, 564-565, 602-603

The _stats dictionary modifications are now properly protected with self._lock:

with self._lock:
    self._stats["would_block_count"] += 1

This resolves the race condition concern from the previous review.

2. RCVHWM on DEALER Socket (Issue #3) - FIXED ✅

Location: client.py:287-289

The DEALER socket now has both SNDHWM and RCVHWM configured:

self._dealer_socket.setsockopt(zmq.SNDHWM, 10)  # Low HWM for backpressure
self._dealer_socket.setsockopt(zmq.RCVHWM, 10)  # Bound receive queue for control messages

This prevents control message accumulation.

3. Enhanced Error Handling in SUB Drain Loop (Issue #4) - FIXED ✅

Location: client.py:410-430

Comprehensive error handling has been added:

except zmq.ZMQError as e:
    if e.errno in (zmq.ETERM, zmq.ENOTSOCK):
        if self._running:
            logger.error(f"ZMQ error while draining SUB socket: {e}")
        break
    if self._running:
        logger.warning(f"Non-fatal ZMQ error draining SUB, skipping: {e}")
    continue
except Exception as e:
    if self._running:
        logger.warning(f"Unexpected error draining SUB, skipping: {e}")
    continue

4. Enhanced Error Handling in DEALER Drain Loop (Issue #5) - FIXED ✅

Location: client.py:456-476, 479-495

Similar comprehensive error handling added for DEALER socket with proper UnicodeDecodeError handling and exception recovery.

5. Control Message Retry on Backpressure (Issue #6) - FIXED ✅

Location: client.py:555-573

Control messages are now re-queued on backpressure:

elif outcome.is_backpressure:
    with self._lock:
        self._stats["would_block_count"] += 1
    try:
        # Re-queue the control packet so it can be retried later
        self._ctrl_outbox.put_nowait(packet)
    except Full:
        # Queue is full: this control packet is dropped; log prominently
        with self._lock:
            self._stats["ctrl_queue_drops"] += 1
        logger.warning(
            "Control packet dropped due to backpressure and full queue; "
            "would_block=%s, queue_drops=%s",
            self._stats.get("would_block_count"),
            self._stats.get("ctrl_queue_drops"),
        )
    break

6. DEALER Drain Limit (Issue #7) - FIXED ✅

Location: client.py:447-449

A maximum drain limit has been added:

dealer_drained = 0
max_dealer_drain = 64  # Limit drain iterations
while dealer_drained < max_dealer_drain:

7. Stealth Handshake Delivery Confirmation (Issue #8) - FIXED ✅

Location: client.py:882-884

The method now returns the actual result from _enqueue_control():

return self._enqueue_control(
    self._room, message, msg_type="stealth_handshake"
)

8. Enhanced Logging Context (Issue #9) - FIXED ✅

Location: client.py:974-978

Control message drops now include message type:

logger.warning(
    "Control outbox full, dropping %s message (queue_drops=%s)",
    msg_type,
    self._stats.get("ctrl_queue_drops"),
)

9. Documentation Clarity (Issue #10) - FIXED ✅

Location: client_simulator.py:800-814

The SharedSubscriber documentation has been significantly improved:

"""Single SUB socket that receives broadcasts and shares mappings.

IMPORTANT: As of PR #316, the intended primary path for ID mappings is
ROUTER/DEALER unicast instead of PUB/SUB. Each SimulatedClient must poll
its DEALER socket for control messages (including ID mappings) via
_poll_dealer_control().

However, for backwards compatibility with older servers and mixed
deployments, ID mapping messages (MSG_DEVICE_ID_MAPPING) may still be
delivered via PUB/SUB, and this simulator continues to support mappings
received on the shared SUB socket. In practice, ID mappings can therefore
arrive via DEALER, SUB, or both, depending on server behavior.

10. Simulator recv_dealer_control Behavior (Issue #11) - IMPROVED ✅

Location: client_simulator.py:748-751

The method now continues draining on exception rather than returning early:

except Exception as e:
    self.logger.debug(f"DEALER receive error: {e}")
    # Continue to next iteration rather than returning early
    continue

🔍 Remaining Minor Issue

Unused Field: OutboundPacket.attempts (Issue #2) - STILL PRESENT

Location: client.py:126

The attempts field in the OutboundPacket dataclass is still defined but never used. This is a minor code cleanliness issue, not a functional bug.

Recommendation:

  • Remove the unused field if retry logic is not planned
  • Or implement retry counting if it's intended for future use
@dataclass
class OutboundPacket:
    """Outbound packet waiting to be sent with TTL support."""
    
    lane: OutboundLane
    room_id: str
    payload: bytes
    enqueued_at: float = field(default_factory=time.monotonic)
    # Note: No attempts field if unused

Fix this →

🎯 Overall Assessment

Status: ✅ APPROVED - Excellent work!

The implementation has addressed 10 out of 11 issues from the previous review, with only one minor code cleanliness issue remaining (unused field). All critical thread safety, error handling, and backpressure concerns have been properly resolved.

Key Improvements Summary:

  • ✅ Thread-safe statistics tracking
  • ✅ Proper socket HWM configuration
  • ✅ Comprehensive error handling in drain loops
  • ✅ Control message retry on backpressure
  • ✅ Drain starvation prevention
  • ✅ Improved logging and diagnostics
  • ✅ Clear documentation of behavior

Code Quality:

  • Well-structured and maintainable
  • Properly documented with clear docstrings
  • Follows Python best practices
  • Thread-safe by design
  • Robust error recovery

The only remaining issue is cosmetic (unused field) and does not impact functionality. This PR is ready to merge.

Recommendation: Merge after addressing or documenting the decision on the unused attempts field.


@from2001 from2001 marked this pull request as ready for review January 28, 2026 13:47
@from2001 from2001 merged commit 7314499 into develop Jan 28, 2026
@from2001 from2001 deleted the refactor/simulator-with-recent-pr branch January 28, 2026 13:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants