Skip to content

feat(lazer-pusher): add websocket-delivery library#3467

Merged
Dewyer merged 3 commits intomainfrom
lazer-pusher/2-websocket-delivery
Feb 20, 2026
Merged

feat(lazer-pusher): add websocket-delivery library#3467
Dewyer merged 3 commits intomainfrom
lazer-pusher/2-websocket-delivery

Conversation

@Dewyer
Copy link
Contributor

@Dewyer Dewyer commented Feb 13, 2026

Summary

Adds a websocket delivery library, that can manage reconnects, and has some metrics.

Rationale

Reusable component for lazer pushers.

How has this been tested?

  • Current tests cover my changes
  • Added new tests
  • Manually tested the code

@vercel
Copy link

vercel bot commented Feb 13, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
api-reference Ready Ready Preview, Comment Feb 18, 2026 10:01am
component-library Ready Ready Preview, Comment Feb 18, 2026 10:01am
developer-hub Ready Ready Preview, Comment Feb 18, 2026 10:01am
entropy-explorer Ready Ready Preview, Comment Feb 18, 2026 10:01am
insights Ready Ready Preview, Comment Feb 18, 2026 10:01am
proposals Ready Ready Preview, Comment Feb 18, 2026 10:01am
staking Ready Ready Preview, Comment Feb 18, 2026 10:01am

Request Review

}

pub fn is_connected(&self) -> bool {
self.connected.load(Ordering::Acquire)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using acquire/release here to prevent data races? If so it's worth documenting what the assumptions/requirements are so we don't break things in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah basically, its a shared atomic. this is internal to the websocket delivery client and I don't know if consumers have to worry about it,

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess my question should be: what would break if these were relaxed ordering instead? It's not clear to me but I may just be missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the library wouldn't precisely know which connections are up or not, which is important for reliably sending data to all the endpoints, because when an endpoint is marked as disconnected, send_all doesn't actually send data to it, because well it couldn't anyway, and the call in send_all short circuits, and the consumers see that some endpoints will indeed not receive the message they wanted to send. So basically, it is important that we don't relax the ordering here, but its not that expensive of an operation anyway, and more importantly, the atomics stuff is all internal to the delivery library, so user's of the library don't really have to care about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree that this shouldn't matter to the library users, and I don't think there's a correctness issue here either. Also there's no difference on x86 anyway. My only concern is maintenance, because in my experience doing synchronization with atomics is fragile. But given all the above if you think acq/rel is the right thing here, that works for me.

Copy link
Contributor

@tejasbadadare tejasbadadare left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks very nice overall! left a couple minor notes.

Comment on lines +57 to +65
pub async fn send_all<B>(&self, build_message: B) -> usize
where
B: Fn(&Url) -> String,
{
let mut sent_count = 0;
for conn_arc in &self.connections {
let conn = conn_arc.lock().await;
if conn.is_connected() {
let msg = build_message(conn.endpoint());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a use case for customizing the message based on the endpoint URL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not atm, i can simplify it if we want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's okay, not much complexity there


// Connect (with backoff on reconnects)
if !is_first_attempt {
debug!(endpoint = %endpoint_str, delay_ms = reconnect_delay.as_millis(), "attempting reconnection");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we upgrade this to info? good to have in the logs for investigations

break;
}

while outgoing_rx.try_recv().is_ok() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this drains the receiver without doing anything with the received messages. it would be good ify ou can leave a comment here clarifying the intention (discard the pending outgoing messages before recreating the WS)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, ill add a comment, but basically, the kind of application we have here just needs all messages to be sent in a timely manner, buffering 5 minutes of updates, when we are trying to reconnect for 5 minutes doesn't make sense, so we make sure to only send messages when they would actually be delivered in a timely manner. its a design choice

Comment on lines +345 to +347
Some(Ok(Message::Close(_))) => {
return DisconnectReason::ReceivedCloseFrame;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs say to continue calling red/write/flush to reply to the close frame, after which it's safe to drop the connection: https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocket.html#method.read
Although i'm not sure how necessary it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, it is probably better if we dont fully completely the close handshake anyway, we will be trying to reconnect ASAP, if something closes our connection we shouldn't wait around for a proper ws close sequence basically

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah makes sense

Comment on lines +85 to +92
pub fn record_success(&self, latency_secs: f64) {
self.messages_sent.with_label_values(&["success"]).inc();
self.delivery_latency.observe(latency_secs);
}

pub fn record_failure(&self, status: &str) {
self.messages_sent.with_label_values(&[status]).inc();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is anything keeping us from calling these in connection.rs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, nice catch, i think i moved stuff around at the end, and we don't actually have delivery latency here, so removed that and called these

@Dewyer Dewyer merged commit 5a4555e into main Feb 20, 2026
8 checks passed
@Dewyer Dewyer deleted the lazer-pusher/2-websocket-delivery branch February 20, 2026 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants