feat(pubsub): subscriber stream resumes#4381
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4381 +/- ##
=======================================
Coverage 94.92% 94.93%
=======================================
Files 191 191
Lines 7258 7261 +3
=======================================
+ Hits 6890 6893 +3
Misses 368 368 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
coryan
left a comment
There was a problem hiding this comment.
I think the code is unclear. Maybe a comment would explain things. But happy to hear that I am wrong.
src/pubsub/src/subscriber/session.rs
Outdated
| if let Err(e) = self.stream_next().await? { | ||
| // TODO(#4097) - support stream resumes. | ||
| return Some(Err(e)); | ||
| match StreamRetryPolicy::is_transient(e) { | ||
| RetryResult::Continue(_) => { | ||
| // The stream failed with a transient error. Reset the stream. |
There was a problem hiding this comment.
So if self.stream_next().await returns Some(Ok(...)) we do nothing? That seems odd and maybe should get a comment?
There was a problem hiding this comment.
I think self.stream_next() was a really bad name. Changed to self.read_from_stream() so hopefully it's more clear.
And documented the member function for read_from_stream() to say that it populates the message pool.
There was a problem hiding this comment.
returns
Some(Ok(...))we do nothing?
It is Some(Ok(())), there is nothing to do.
I think your point is that from reading just this part of the code, the ... type is not obvious. I am open to suggestions here if you have any.
Totally fair. I think the diff also didn't help show context. I added a comment. Also, you should know that I considered this way of writing the logic: pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
while self.pool.is_empty() {
// We do not have a message to serve. Keep reading responses from
// the stream until we do.
//
// Note that a successful read does not necessarily mean there is a
// message in the pool. The server occasionally sends heartbeats
// (responses with an empty message list). Hence the loop.
if let Err(e) = self.read_from_stream().await? {
// Handle errors opening or reading from the stream.
match StreamRetryPolicy::is_transient(e) {
RetryResult::Continue(_) => {
// The stream failed with a transient error. Reset the stream.
self.stream = None;
continue;
}
RetryResult::Permanent(e) | RetryResult::Exhausted(e) => {
// The stream failed with a permanent error. Return the error.
return Some(Err(e));
}
}
}
}
Some(Ok(self
.pool
.pop_front()
.expect("I literally just told you the pool is not empty")))
}It is the code I would write in C++. The Let me know if you prefer this spelling of the code. My preference is not strong at all. |
coryan
left a comment
There was a problem hiding this comment.
I think the code is unclear. Maybe a comment would explain things. But happy to hear that I am wrong.
Totally fair. I think the diff also didn't help show context. I added a comment.
... snip snip snip ...
It is the code I would write in C++. The
whilemakes it totally clear under what condition we stop reading from the stream. But the fact that I have tounwrap()from the front of the deque, when I know the deque is not empty just felt not Rust-y.Let me know if you prefer this spelling of the code. My preference is not strong at all.
What about:
pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
let Some((message, handler)) = self.pool.pop_front() else {
return self.populate_pool().await;
};
Some(Ok((message, handler)))
}
async fn populate_pool_with_retry(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
loop {
if let Some((m, h)) = self.pool.pop_front() {
return Some(Ok((m, h)));
};
let error = match self.populate_pool().await {
None => return None,
Some(Ok(_)) => continue,
Some(e) => e,
};
match StreamPolicy::is_transient(error) {
RetryResult::Continue(_) => {
self.stream = None;
continue;
},
RetryResult::Permanent(e) | RetryResult::Exhausted(e) => return Some(Err(e)),
}
}
// populate the pool and return the first element on success.
}| /// Reads the next response from the stream. | ||
| /// | ||
| /// If we receive a response, we store the messages in `self.pool` and | ||
| /// forward the ack IDs to the lease management task. | ||
| /// | ||
| /// If we receive an error reading from the stream, we return it. | ||
| async fn read_from_stream(&mut self) -> Option<Result<()>> { |
There was a problem hiding this comment.
Maybe this should be called populate_message_pool()?
There was a problem hiding this comment.
Hm. read_from_stream is more "what" we are doing, populate_message_pool() is more "why" we are doing it. Answering "why" seems better than answering "what"...
but we don't always receive a message... so it does not always populate the message pool... so I think I still prefer read_from_stream
But then we would just say... pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
self.populate_pool_with_retry().await
}and then at that point, why even have |
Fair. |
coryan
left a comment
There was a problem hiding this comment.
I still think that something like populate would be better, or maybe read_into_pool() or something.
Most of the work left for #4097
Resume streams that were once successful and fail with a transient error.