-
Notifications
You must be signed in to change notification settings - Fork 1.7k
ENG-8089: Implement Lost+Found with RedisTokenManager #5927
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
When an update is emitted for a token, but the websocket for that token is on another instance of the app, post it to the lost+found channel where other instances are listening for updates to send to their clients.
Set the groundwork for being able to broadcast updates to all connected states.
CodSpeed Performance ReportMerging #5927 will not alter performanceComparing Summary
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Implements a distributed "Lost+Found" mechanism for handling StateUpdates when a client's websocket connection belongs to a different app instance in multi-worker deployments.
Key Changes:
- Replaces simple token→sid mapping with
SocketRecordcontaininginstance_idandsidto track socket ownership - Adds Redis pub/sub channels for cross-instance communication of StateUpdates
- Implements
enumerate_tokens()to scan all tokens across instances - Uses Redis keyspace notifications to keep local caches synchronized
- Routes updates through lost+found when
socket_record.instance_id != self.instance_id
Technical Approach:
The implementation uses two pub/sub mechanisms:
- Redis keyspace notifications for tracking socket record changes
- Per-instance channels for forwarding StateUpdates to the correct instance
Test Coverage:
Comprehensive test updates including real Redis-backed integration tests and unit tests for the lost+found flow.
Confidence Score: 4/5
- Safe to merge with minor considerations around error handling in concurrent scenarios
- Well-architected solution with good test coverage. Redis pub/sub is appropriate for this use case. Error handling is present but some edge cases around concurrent updates could be more robust.
- reflex/utils/token_manager.py - review the concurrent update scenarios and pub/sub subscription lifecycle
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| reflex/utils/token_manager.py | 4/5 | Implements Lost+Found mechanism with Redis pub/sub for cross-instance StateUpdate routing. Adds SocketRecord tracking with instance IDs and enumerate_tokens functionality. |
| reflex/app.py | 4/5 | Updates emit_update to route StateUpdates through lost+found when socket belongs to different instance. Ensures lost+found task runs on connect. |
Sequence Diagram
sequenceDiagram
participant Client as Client (Token)
participant Instance1 as App Instance 1
participant Redis as Redis
participant Instance2 as App Instance 2
Note over Client,Instance2: Normal Flow - Same Instance
Client->>Instance1: Connect (token, sid)
Instance1->>Redis: SET token_manager_socket_record_{token}
Redis-->>Instance1: OK
Instance1->>Instance1: Store in token_to_socket
Client->>Instance1: State Event
Instance1->>Instance1: Process Event
Instance1->>Instance1: emit_update(StateUpdate, token)
Instance1->>Instance1: Check: socket_record.instance_id == self.instance_id
Instance1->>Client: Emit StateUpdate via WebSocket
Note over Client,Instance2: Lost+Found Flow - Different Instance
Client->>Instance2: Reconnect (token, sid2)
Instance2->>Redis: GET token_manager_socket_record_{token}
Redis-->>Instance2: {instance_id: Instance1, sid: sid1}
Instance2->>Redis: SET token_manager_socket_record_{token} (new sid)
Redis-->>Instance2: OK
Instance2->>Redis: Publish keyspace notification
Redis-->>Instance1: Keyspace event: set
Instance1->>Redis: GET token_manager_socket_record_{token}
Redis-->>Instance1: {instance_id: Instance2, sid: sid2}
Instance1->>Instance1: Update local token_to_socket
Instance1->>Instance1: emit_update(StateUpdate, token)
Instance1->>Instance1: Check: socket_record.instance_id != self.instance_id
Instance1->>Instance1: emit_lost_and_found(token, update)
Instance1->>Redis: GET token owner (Instance2)
Instance1->>Redis: PUBLISH channel:lost_and_found_Instance2
Redis-->>Instance2: Lost+Found message
Instance2->>Client: Emit StateUpdate via WebSocket
6 files reviewed, no comments
Handle updates sent for a token where the sid for the token is connected to a different instance of the app.
Demo app