Skip to content

Conversation

@masenf
Copy link
Collaborator

@masenf masenf commented Oct 24, 2025

Handle updates sent for a token where the sid for the token is connected to a different instance of the app.

Demo app

import asyncio
import os
import time
import uuid

import reflex as rx


class State(rx.State):
    messages: list[str] = []
    current_pid: int = 0

    @rx.event
    def on_load(self):
        self.current_pid = os.getpid()

    @rx.event(background=True)
    async def many_message_stream(self):
        for _ in range(4):
            yield [
                State.message_stream,
                rx.call_script("socket.disconnect(); window.location.reload();"),
            ]
            await asyncio.sleep(1)

    @rx.event(background=True)
    async def message_stream(self):
        """A background stream that appends messages every 5 seconds."""
        task_instance = uuid.uuid4().hex[:6]
        task_pid = os.getpid()
        task_sid = self.router.session.session_id

        for count in range(1, 10):
            async with self:
                current_sid = self.router.session.session_id
                self.messages.append(
                    f"[{time.time()}] [{task_instance} -> {count}] "
                    f"from {task_pid}:{task_sid} (now {self.current_pid}:{current_sid})"
                )
            await asyncio.sleep(5)

    @rx.event
    def clear_messages(self):
        """Clear all messages."""
        self.messages = []


class CounterState(rx.State):
    count: int = 0
    last_toucher: str = ""

    @rx.event(background=True)
    async def increment(self):
        task_sid = self.router.session.session_id
        task_pid = os.getpid()
        for _ in range(1000):
            async with self:
                self.last_toucher = f"{task_pid}:{task_sid}"
                self.count += 1
           # await asyncio.sleep(0.1)


def index() -> rx.Component:
    return rx.container(
        rx.color_mode.button(position="top-right"),
        rx.vstack(
            rx.hstack(
                rx.button("Start Streamers", on_click=State.many_message_stream, color_scheme="green"),
                rx.button("Single Streamer", on_click=State.message_stream, color_scheme="blue"),
                rx.button(rx.icon("eraser"), on_click=State.clear_messages, color_scheme="red"),
                rx.badge(State.current_pid, color_scheme="purple", font_size="0.8em"),
                rx.badge(State.router.session.client_token, color_scheme="orange", font_size="0.8em"),
                rx.badge(State.router.session.session_id, color_scheme="teal", font_size="0.8em"),
            ),
            rx.hstack(
                rx.button("Increment Counter", on_click=CounterState.increment, color_scheme="green"),
                rx.badge(CounterState.count, color_scheme="purple", font_size="0.8em"),
                rx.badge(CounterState.last_toucher, color_scheme="orange", font_size="0.8em"),
            ),
            rx.foreach(
                State.messages.reverse(),
                lambda msg: rx.text(msg),
            ),
        ),
    )


app = rx.App()
app.add_page(index, on_load=State.on_load, title="Background Stream")

When an update is emitted for a token, but the websocket for that token is on
another instance of the app, post it to the lost+found channel where other
instances are listening for updates to send to their clients.
Set the groundwork for being able to broadcast updates to all connected states.
@linear
Copy link

linear bot commented Oct 24, 2025

@codspeed-hq
Copy link

codspeed-hq bot commented Oct 24, 2025

CodSpeed Performance Report

Merging #5927 will not alter performance

Comparing masenf/redis_lost+found (36496dd) with main (c4254ed)

Summary

✅ 8 untouched

@masenf masenf mentioned this pull request Oct 27, 2025
@masenf masenf marked this pull request as ready for review October 27, 2025 23:57
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

Implements a distributed "Lost+Found" mechanism for handling StateUpdates when a client's websocket connection belongs to a different app instance in multi-worker deployments.

Key Changes:

  • Replaces simple token→sid mapping with SocketRecord containing instance_id and sid to track socket ownership
  • Adds Redis pub/sub channels for cross-instance communication of StateUpdates
  • Implements enumerate_tokens() to scan all tokens across instances
  • Uses Redis keyspace notifications to keep local caches synchronized
  • Routes updates through lost+found when socket_record.instance_id != self.instance_id

Technical Approach:
The implementation uses two pub/sub mechanisms:

  1. Redis keyspace notifications for tracking socket record changes
  2. Per-instance channels for forwarding StateUpdates to the correct instance

Test Coverage:
Comprehensive test updates including real Redis-backed integration tests and unit tests for the lost+found flow.

Confidence Score: 4/5

  • Safe to merge with minor considerations around error handling in concurrent scenarios
  • Well-architected solution with good test coverage. Redis pub/sub is appropriate for this use case. Error handling is present but some edge cases around concurrent updates could be more robust.
  • reflex/utils/token_manager.py - review the concurrent update scenarios and pub/sub subscription lifecycle

Important Files Changed

File Analysis

Filename Score Overview
reflex/utils/token_manager.py 4/5 Implements Lost+Found mechanism with Redis pub/sub for cross-instance StateUpdate routing. Adds SocketRecord tracking with instance IDs and enumerate_tokens functionality.
reflex/app.py 4/5 Updates emit_update to route StateUpdates through lost+found when socket belongs to different instance. Ensures lost+found task runs on connect.

Sequence Diagram

sequenceDiagram
    participant Client as Client (Token)
    participant Instance1 as App Instance 1
    participant Redis as Redis
    participant Instance2 as App Instance 2
    
    Note over Client,Instance2: Normal Flow - Same Instance
    Client->>Instance1: Connect (token, sid)
    Instance1->>Redis: SET token_manager_socket_record_{token}
    Redis-->>Instance1: OK
    Instance1->>Instance1: Store in token_to_socket
    
    Client->>Instance1: State Event
    Instance1->>Instance1: Process Event
    Instance1->>Instance1: emit_update(StateUpdate, token)
    Instance1->>Instance1: Check: socket_record.instance_id == self.instance_id
    Instance1->>Client: Emit StateUpdate via WebSocket
    
    Note over Client,Instance2: Lost+Found Flow - Different Instance
    Client->>Instance2: Reconnect (token, sid2)
    Instance2->>Redis: GET token_manager_socket_record_{token}
    Redis-->>Instance2: {instance_id: Instance1, sid: sid1}
    Instance2->>Redis: SET token_manager_socket_record_{token} (new sid)
    Redis-->>Instance2: OK
    Instance2->>Redis: Publish keyspace notification
    Redis-->>Instance1: Keyspace event: set
    Instance1->>Redis: GET token_manager_socket_record_{token}
    Redis-->>Instance1: {instance_id: Instance2, sid: sid2}
    Instance1->>Instance1: Update local token_to_socket
    
    Instance1->>Instance1: emit_update(StateUpdate, token)
    Instance1->>Instance1: Check: socket_record.instance_id != self.instance_id
    Instance1->>Instance1: emit_lost_and_found(token, update)
    Instance1->>Redis: GET token owner (Instance2)
    Instance1->>Redis: PUBLISH channel:lost_and_found_Instance2
    Redis-->>Instance2: Lost+Found message
    Instance2->>Client: Emit StateUpdate via WebSocket
Loading

6 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

@masenf masenf merged commit 0976ece into main Oct 29, 2025
62 of 63 checks passed
@masenf masenf deleted the masenf/redis_lost+found branch October 29, 2025 23:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants