Skip to content

feat(pubsub): subscriber stream resumes#4381

Merged
dbolduc merged 3 commits intogoogleapis:mainfrom
dbolduc:feat-pubsub-stream-resumes
Jan 25, 2026
Merged

feat(pubsub): subscriber stream resumes#4381
dbolduc merged 3 commits intogoogleapis:mainfrom
dbolduc:feat-pubsub-stream-resumes

Conversation

@dbolduc
Copy link
Copy Markdown
Member

@dbolduc dbolduc commented Jan 25, 2026

Most of the work left for #4097

Resume streams that were once successful and fail with a transient error.

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 25, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Jan 25, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.93%. Comparing base (cdbb7de) to head (453e240).
⚠️ Report is 6 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #4381   +/-   ##
=======================================
  Coverage   94.92%   94.93%           
=======================================
  Files         191      191           
  Lines        7258     7261    +3     
=======================================
+ Hits         6890     6893    +3     
  Misses        368      368           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@dbolduc dbolduc marked this pull request as ready for review January 25, 2026 01:52
@dbolduc dbolduc requested a review from a team January 25, 2026 01:52
Copy link
Copy Markdown
Collaborator

@coryan coryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is unclear. Maybe a comment would explain things. But happy to hear that I am wrong.

Comment on lines +161 to +164
if let Err(e) = self.stream_next().await? {
// TODO(#4097) - support stream resumes.
return Some(Err(e));
match StreamRetryPolicy::is_transient(e) {
RetryResult::Continue(_) => {
// The stream failed with a transient error. Reset the stream.
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if self.stream_next().await returns Some(Ok(...)) we do nothing? That seems odd and maybe should get a comment?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think self.stream_next() was a really bad name. Changed to self.read_from_stream() so hopefully it's more clear.

And documented the member function for read_from_stream() to say that it populates the message pool.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returns Some(Ok(...)) we do nothing?

It is Some(Ok(())), there is nothing to do.

I think your point is that from reading just this part of the code, the ... type is not obvious. I am open to suggestions here if you have any.

@dbolduc
Copy link
Copy Markdown
Member Author

dbolduc commented Jan 25, 2026

I think the code is unclear. Maybe a comment would explain things. But happy to hear that I am wrong.

Totally fair. I think the diff also didn't help show context. I added a comment.

Also, you should know that I considered this way of writing the logic:

    pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
        while self.pool.is_empty() {
            // We do not have a message to serve. Keep reading responses from
            // the stream until we do.
            //
            // Note that a successful read does not necessarily mean there is a
            // message in the pool. The server occasionally sends heartbeats
            // (responses with an empty message list). Hence the loop.
            if let Err(e) = self.read_from_stream().await? {
                // Handle errors opening or reading from the stream.
                match StreamRetryPolicy::is_transient(e) {
                    RetryResult::Continue(_) => {
                        // The stream failed with a transient error. Reset the stream.
                        self.stream = None;
                        continue;
                    }
                    RetryResult::Permanent(e) | RetryResult::Exhausted(e) => {
                        // The stream failed with a permanent error. Return the error.
                        return Some(Err(e));
                    }
                }
            }
        }
        Some(Ok(self
            .pool
            .pop_front()
            .expect("I literally just told you the pool is not empty")))
    }

It is the code I would write in C++. The while makes it totally clear under what condition we stop reading from the stream. But the fact that I have to unwrap() from the front of the deque, when I know the deque is not empty just felt not Rust-y.

Let me know if you prefer this spelling of the code. My preference is not strong at all.

Copy link
Copy Markdown
Collaborator

@coryan coryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is unclear. Maybe a comment would explain things. But happy to hear that I am wrong.

Totally fair. I think the diff also didn't help show context. I added a comment.

... snip snip snip ...

It is the code I would write in C++. The while makes it totally clear under what condition we stop reading from the stream. But the fact that I have to unwrap() from the front of the deque, when I know the deque is not empty just felt not Rust-y.

Let me know if you prefer this spelling of the code. My preference is not strong at all.

What about:

    pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
        let Some((message, handler)) = self.pool.pop_front() else {
            return self.populate_pool().await;
        };
        Some(Ok((message, handler)))
    }

    async fn populate_pool_with_retry(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
        loop {
            if let Some((m, h)) = self.pool.pop_front() {
                return Some(Ok((m, h)));
            };
            let error = match self.populate_pool().await {
                None => return None,
                Some(Ok(_)) => continue,
                Some(e) => e,
            };
            match StreamPolicy::is_transient(error) {
                RetryResult::Continue(_) => {
                    self.stream = None; 
                    continue;
                },
               RetryResult::Permanent(e) | RetryResult::Exhausted(e) => return Some(Err(e)),
            }
        }
        // populate the pool and return the first element on success.
  }

Comment on lines +199 to +205
/// Reads the next response from the stream.
///
/// If we receive a response, we store the messages in `self.pool` and
/// forward the ack IDs to the lease management task.
///
/// If we receive an error reading from the stream, we return it.
async fn read_from_stream(&mut self) -> Option<Result<()>> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this should be called populate_message_pool()?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. read_from_stream is more "what" we are doing, populate_message_pool() is more "why" we are doing it. Answering "why" seems better than answering "what"...

but we don't always receive a message... so it does not always populate the message pool... so I think I still prefer read_from_stream

@dbolduc
Copy link
Copy Markdown
Member Author

dbolduc commented Jan 25, 2026

What about:

    pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
       let Some((message, handler)) = self.pool.pop_front() else {
           return self.populate_pool_with_retry().await;
       };
       Some(Ok((message, handler)))
   }

But then we would just say...

     pub async fn next(&mut self) -> Option<Result<(PubsubMessage, Handler)>> {
        self.populate_pool_with_retry().await
    }

and then at that point, why even have populate_pool_with_retry()?

@coryan
Copy link
Copy Markdown
Collaborator

coryan commented Jan 25, 2026

and then at that point, why even have populate_pool_with_retry()?

Fair.

Copy link
Copy Markdown
Collaborator

@coryan coryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think that something like populate would be better, or maybe read_into_pool() or something.

@dbolduc dbolduc merged commit bfe1f11 into googleapis:main Jan 25, 2026
30 checks passed
@dbolduc dbolduc deleted the feat-pubsub-stream-resumes branch January 25, 2026 21:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants