feat(lazer-pusher): add websocket-delivery library#3467
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
fbe009c to
35b5458
Compare
| } | ||
|
|
||
| pub fn is_connected(&self) -> bool { | ||
| self.connected.load(Ordering::Acquire) |
There was a problem hiding this comment.
Are we using acquire/release here to prevent data races? If so it's worth documenting what the assumptions/requirements are so we don't break things in the future.
There was a problem hiding this comment.
yeah basically, its a shared atomic. this is internal to the websocket delivery client and I don't know if consumers have to worry about it,
There was a problem hiding this comment.
I guess my question should be: what would break if these were relaxed ordering instead? It's not clear to me but I may just be missing something.
There was a problem hiding this comment.
the library wouldn't precisely know which connections are up or not, which is important for reliably sending data to all the endpoints, because when an endpoint is marked as disconnected, send_all doesn't actually send data to it, because well it couldn't anyway, and the call in send_all short circuits, and the consumers see that some endpoints will indeed not receive the message they wanted to send. So basically, it is important that we don't relax the ordering here, but its not that expensive of an operation anyway, and more importantly, the atomics stuff is all internal to the delivery library, so user's of the library don't really have to care about it.
There was a problem hiding this comment.
Totally agree that this shouldn't matter to the library users, and I don't think there's a correctness issue here either. Also there's no difference on x86 anyway. My only concern is maintenance, because in my experience doing synchronization with atomics is fragile. But given all the above if you think acq/rel is the right thing here, that works for me.
35b5458 to
90c1fc6
Compare
tejasbadadare
left a comment
There was a problem hiding this comment.
looks very nice overall! left a couple minor notes.
| pub async fn send_all<B>(&self, build_message: B) -> usize | ||
| where | ||
| B: Fn(&Url) -> String, | ||
| { | ||
| let mut sent_count = 0; | ||
| for conn_arc in &self.connections { | ||
| let conn = conn_arc.lock().await; | ||
| if conn.is_connected() { | ||
| let msg = build_message(conn.endpoint()); |
There was a problem hiding this comment.
Do we have a use case for customizing the message based on the endpoint URL?
There was a problem hiding this comment.
not atm, i can simplify it if we want
There was a problem hiding this comment.
That's okay, not much complexity there
|
|
||
| // Connect (with backoff on reconnects) | ||
| if !is_first_attempt { | ||
| debug!(endpoint = %endpoint_str, delay_ms = reconnect_delay.as_millis(), "attempting reconnection"); |
There was a problem hiding this comment.
can we upgrade this to info? good to have in the logs for investigations
| break; | ||
| } | ||
|
|
||
| while outgoing_rx.try_recv().is_ok() {} |
There was a problem hiding this comment.
I guess this drains the receiver without doing anything with the received messages. it would be good ify ou can leave a comment here clarifying the intention (discard the pending outgoing messages before recreating the WS)
There was a problem hiding this comment.
yeah, ill add a comment, but basically, the kind of application we have here just needs all messages to be sent in a timely manner, buffering 5 minutes of updates, when we are trying to reconnect for 5 minutes doesn't make sense, so we make sure to only send messages when they would actually be delivered in a timely manner. its a design choice
| Some(Ok(Message::Close(_))) => { | ||
| return DisconnectReason::ReceivedCloseFrame; | ||
| } |
There was a problem hiding this comment.
Docs say to continue calling red/write/flush to reply to the close frame, after which it's safe to drop the connection: https://docs.rs/tungstenite/latest/tungstenite/protocol/struct.WebSocket.html#method.read
Although i'm not sure how necessary it is.
There was a problem hiding this comment.
actually, it is probably better if we dont fully completely the close handshake anyway, we will be trying to reconnect ASAP, if something closes our connection we shouldn't wait around for a proper ws close sequence basically
| pub fn record_success(&self, latency_secs: f64) { | ||
| self.messages_sent.with_label_values(&["success"]).inc(); | ||
| self.delivery_latency.observe(latency_secs); | ||
| } | ||
|
|
||
| pub fn record_failure(&self, status: &str) { | ||
| self.messages_sent.with_label_values(&[status]).inc(); | ||
| } |
There was a problem hiding this comment.
Is anything keeping us from calling these in connection.rs?
There was a problem hiding this comment.
fixed, nice catch, i think i moved stuff around at the end, and we don't actually have delivery latency here, so removed that and called these
9707511 to
64a4705
Compare
Summary
Adds a websocket delivery library, that can manage reconnects, and has some metrics.
Rationale
Reusable component for lazer pushers.
How has this been tested?