Skip to content

feat(pubsub): Initial support message ordering in Publisher#4016

Merged
PhongChuong merged 6 commits intogoogleapis:mainfrom
PhongChuong:ordering
Dec 11, 2025
Merged

feat(pubsub): Initial support message ordering in Publisher#4016
PhongChuong merged 6 commits intogoogleapis:mainfrom
PhongChuong:ordering

Conversation

@PhongChuong
Copy link
Copy Markdown
Collaborator

Add initial support for message ordering in Publisher. This feature does not retry and does not respect message ordering in the case of failure. These will be supported in a later PR.

Towards: #4012

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Dec 5, 2025
@codecov
Copy link
Copy Markdown

codecov bot commented Dec 5, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 95.27%. Comparing base (41015c9) to head (9bd06ac).
⚠️ Report is 41 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #4016   +/-   ##
=======================================
  Coverage   95.27%   95.27%           
=======================================
  Files         167      167           
  Lines        6324     6326    +2     
=======================================
+ Hits         6025     6027    +2     
  Misses        299      299           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review December 8, 2025 18:00
@PhongChuong PhongChuong requested review from a team and suzmue December 8, 2025 18:00
_ = &mut timer => {
for (_, outstanding) in pending_batches.iter_mut() {
if !outstanding.pending_batch.is_empty() {
outstanding.pending_batch.flush(self.client.clone(), self.topic_name.clone(), &mut inflight);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to double check with rest of PR, but I don't think this takes into account that there may be a batch already out for that ordering key?

I think this will be easier to do with the refactoring that we talked about where more of the control for flushing batches is moved to the OutstandingPublishes, so I am okay with doing in a follow up, but please add a TODO for it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. This does not take into account if there is a batch inflight.
The TODO on worker.rs:156 is to handle this.

// A dictionary of ordering key to outstanding publish operations.
// We batch publish operations on the same ordering key together.
// Publish without ordering keys are treated as having the key "".
let mut pending_batches: HashMap<String, OutstandingPublishes> = HashMap::new();
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to think about, do we ever remove ordering keys from this map? Or will it grow indefinitely?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will grow indefinitely, I've added a TODO.

while flushing.next().await.is_some() {}
for (_, outstanding) in pending_batches.iter_mut() {
if !outstanding.pending_batch.is_empty() {
outstanding.pending_batch.flush(self.client.clone(), self.topic_name.clone(), &mut inflight);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would at least add a comment / TODO here for the behavior of flush when there are ordering keys. This will involve sending multiple batches sequentially since they need to be ordered. So it is not as simple as just flushing the existing messages and waiting on the response.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a TODO here with more explicit callout.

@PhongChuong PhongChuong changed the title feat(pubsub): Support message ordering in Publisher feat(pubsub): Initial support message ordering in Publisher Dec 11, 2025
@PhongChuong
Copy link
Copy Markdown
Collaborator Author

Thanks for the review. PTAL.

Copy link
Copy Markdown
Collaborator

@suzmue suzmue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the TODOs, I'm fine to submit this to keep making forward progress.

@PhongChuong PhongChuong merged commit 07c6f2c into googleapis:main Dec 11, 2025
29 checks passed
@PhongChuong PhongChuong deleted the ordering branch January 24, 2026 03:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants