fix(pubsub): permanent errors end stream#4849
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4849 +/- ##
=======================================
Coverage 94.60% 94.61%
=======================================
Files 208 208
Lines 8143 8148 +5
=======================================
+ Hits 7704 7709 +5
Misses 439 439 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| /// If a stream is not yet open, this method opens the stream. | ||
| async fn mut_stream(&mut self) -> Result<&mut Stream<Transport>> { | ||
| if self.stream.is_none() { | ||
| async fn mut_stream(&mut self) -> Option<Result<&mut Stream<Transport>>> { |
There was a problem hiding this comment.
Not pretty, I think returning &mut Stream<Transport> is causing you grief, maybe you should just have a function that returns Option<Result< Message Type >. Or maybe your StreamState should implement next_message(start: AsyncFn() -> Result<Stream::<Transport>> and you just call that function on it with:
self.stream_state.next_message(async || { Stream::<Transport>::new(self.inner.clone(), self.initial_req.clone()).await })
The lifetimes could get tricky on that, maybe.
There was a problem hiding this comment.
With apologies to the archaeologists, I did some refactoring in this PR.
I took your suggestions and split out (1) opening the stream and (2) receiving the next response from the stream. And also your suggestion on naming from: #4381 (review)
I think the code reads a lot nicer, so thanks.
I was a bit too trigger happy on merging #4849. This is my final answer for how to spell the code.
Fixes #4848
Track whether the stream has failed with a permanent error. If so, stop trying to recreate the stream, if the application asks for more messages.
This is as Rust-y as I could get it, but I still needed an
unreachable!()after the state update. 🤷StreamStatemay grow to includeShutdownin the future. (Or we may just useFailedto meanShutdown, kinda how we useUnstartedto also meanShouldRetry). Open to naming suggestions here.