Skip to content

fix(pubsub): update Flush to not block the Publisher#4823

Merged
PhongChuong merged 1 commit intogoogleapis:mainfrom
PhongChuong:pubsubFlushNonBlocking
Feb 26, 2026
Merged

fix(pubsub): update Flush to not block the Publisher#4823
PhongChuong merged 1 commit intogoogleapis:mainfrom
PhongChuong:pubsubFlushNonBlocking

Conversation

@PhongChuong
Copy link
Copy Markdown
Collaborator

@PhongChuong PhongChuong commented Feb 25, 2026

The Publisher currently blocks on a Flush operation to complete across all ordering keys before additional operations can be processed. For example during a Flush operation, when ordering key A Flush is complete and ordering key B is still pending, the Publisher will not process new operations for A until after B's Flush is complete.

This PR updates the Publisher to spawn a task to await on all the Flush operations instead of blocking the main Dispatcher loop. This allows the Publisher to dispatch new operations to the ordering key actors without awaiting for all keys Flush to complete. In the example above, new operations for A will make process. We are guarantee correct ordering because the ToBatchActor::Flush has been sent to each batch actors.

Fixes #4505

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Feb 25, 2026
@PhongChuong PhongChuong changed the title fix(pubsub): update Flush to be non-blocking fix(pubsub): update Flush to not block all Publishing operations Feb 25, 2026
@PhongChuong PhongChuong changed the title fix(pubsub): update Flush to not block all Publishing operations fix(pubsub): update Flush to not block the Publisher Feb 25, 2026
@PhongChuong PhongChuong marked this pull request as ready for review February 25, 2026 19:36
@PhongChuong PhongChuong requested a review from a team as a code owner February 25, 2026 19:36
@codecov
Copy link
Copy Markdown

codecov bot commented Feb 25, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.96%. Comparing base (7ec259c) to head (e1b6b51).
⚠️ Report is 12 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4823      +/-   ##
==========================================
- Coverage   94.97%   94.96%   -0.02%     
==========================================
  Files         205      205              
  Lines        8059     8059              
==========================================
- Hits         7654     7653       -1     
- Misses        405      406       +1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Collaborator

@suzmue suzmue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit on description, should be fixes

Does this fix the bug? I would think there would need to be changes to the batch actors as well to make it "not block".

@PhongChuong
Copy link
Copy Markdown
Collaborator Author

@suzmue , I've updated the PR description. See if that explains scenario better. If not, can you describe the scenario that you have in mind?

@suzmue
Copy link
Copy Markdown
Collaborator

suzmue commented Feb 26, 2026

I've updated the PR description. See if that explains scenario better. If not, can you describe the scenario that you have in mind?

Ok sounds good.

The scenario I am thinking of is when we are doing concurrent publishes (no ordering keys), the batcher is blocked waiting for those flush operations to complete:

Some(ToBatchActor::Flush(tx)) => {
self.flush(&mut inflight, &mut batch);
inflight.join_all().await;
inflight = JoinSet::new();
let _ = tx.send(());
},

@PhongChuong
Copy link
Copy Markdown
Collaborator Author

For your described scenario, we first need to discuss and clarify the behavior for Flush as defined [here] (

/// Flushes all outstanding messages.
///
/// This method sends any messages that have been published but not yet sent,
/// regardless of the configured batching options (`delay_threshold`, etc.).
///
/// This method is `async` and returns only after all publish attempts for the
/// messages in the snapshot have completed. A "completed" attempt means the
/// message has either been successfully sent, or has failed permanently after
/// exhausting any applicable retry policies.
///
/// After flush()` returns, the final result of each individual publish
/// operation (i.e., a success with a message ID or a terminal error) will
/// be available on its corresponding [PublishFuture](crate::publisher::PublishFuture).
///
/// Messages published after `flush()` is called will be buffered for a
/// subsequent batch and are not included in this flush operation.
). It is not clear to me when Flush completes for the following scenario (no ordering key):

  1. let publish_1 = Publish(msg_1);
  2. let flush_1 = Flush();
  3. let publish_2 = Publish(msg_2);
  4. let flush_2 = Flush();

Does flush_2 returning guarantee that all previous publish are complete (both publish_[1, 2])? Or just up to the previous Flush (just publish_2)?

I'll add a tracking issue and/or fix it in a following PR after the discussion.

@PhongChuong PhongChuong merged commit 58f2e41 into googleapis:main Feb 26, 2026
35 checks passed
@PhongChuong PhongChuong deleted the pubsubFlushNonBlocking branch February 27, 2026 16:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Pubsub Publisher flush operation blocks until completion

3 participants