Skip to content

feat(pubsub): pause the publisher for an ordering key when there is an error#4286

Merged
PhongChuong merged 13 commits intogoogleapis:mainfrom
PhongChuong:orderingError
Jan 21, 2026
Merged

feat(pubsub): pause the publisher for an ordering key when there is an error#4286
PhongChuong merged 13 commits intogoogleapis:mainfrom
PhongChuong:orderingError

Conversation

@PhongChuong
Copy link
Copy Markdown
Collaborator

Pause the Publisher when we encounter an error when there is a send error.

When an error is encountered for a pending batch, we:

  1. In BatchWorker, pause publishing and send out errors for pending_msgs.
  2. In the pending batch, send out error for its messages.
  3. New messages in the rx receiver are handled as they are received by the BatchWorker.

A resume operation will be added in a later PR.

This PR also introduce PublishError. Further work is needed to handle error propagation more fully

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the Pub/Sub API. label Jan 15, 2026
@codecov
Copy link
Copy Markdown

codecov bot commented Jan 15, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 94.87%. Comparing base (b789046) to head (7290eb6).
⚠️ Report is 4 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #4286      +/-   ##
==========================================
+ Coverage   94.85%   94.87%   +0.01%     
==========================================
  Files         187      187              
  Lines        7195     7219      +24     
==========================================
+ Hits         6825     6849      +24     
  Misses        370      370              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@PhongChuong PhongChuong marked this pull request as ready for review January 15, 2026 16:35
@PhongChuong PhongChuong requested review from a team and suzmue January 15, 2026 16:35
Copy link
Copy Markdown
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get to the changes in worker.rs

Consider doing the error type refactor first, then the unit tests look more like what we want.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usability question, which is not trivial, so feel free to think about it later:

Publish consumes the application's PubsubMessage. If the operation fails, what should the application do to resend the message? Would they need to hold onto a clone of the message until the operation is complete?

It would be super convenient if we could give them their message back. The plumbing on our end could be brutal, but the application would appreciate it. 🤷

Copy link
Copy Markdown
Member

@dbolduc dbolduc Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our GAPICs all have this problem too.... idk if publishing is special. Something seems wrong about just dropping their message without trying to send it. But maybe I am worrying too much about the unhappy case. 🤷

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:

  1. Keep a clone of it internally and pass it back to the user if there is a failure.
  2. Augment generated code/error to pass the message back if there is an error during Send.
    I think it's a bigger discussion that should be left out of this PR.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally convinced we should treat this differently than our other GAPICs. Users wont know if the message will fail before sending, and if we only give it back on one of the error cases, they will need to keep a copy of it regardless.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: ah, I think you are hesitating to change Output to a Result<String, PublishError> because then we have to update all the code downstream of this.

Ok.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really yucky though. It might have been nicer to change the error type first.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels so wrong to me, but we do have a precedence for just throwing something in an Error::io

// TODO(#3626) - reconsider the error kind.
result.map_err(crate::Error::io)

Although in GCS, I think there was a more compelling reason. (We wanted to reuse the ReadObjectResponse type which was in terms of gax::Error.)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the wrapped error for now. But worth discussing if we'd rather expose a different error type for clarity instead of being consistent.

Having a PublishError type now will make that discussion very concrete :)

Copy link
Copy Markdown
Collaborator Author

@PhongChuong PhongChuong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:

  1. Keep a clone of it internally and pass it back to the user if there is a failure.
  2. Augment generated code/error to pass the message back if there is an error during Send.
    I think it's a bigger discussion that should be left out of this PR.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.

Copy link
Copy Markdown
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is my first time looking at the publisher code, so I did my best.

What happens if there is an ordering key error, the application publishes messages with that ordering key, then calls flush() ? It should be paused right?

(Also, I am too lazy to read the code, do we ever clean up BatchWorkers? like when all batches are flushed and it is empty? If so we would not want to clean up ones that are paused. I didn't see this anywhere, so probably not.)

];

// Assert the first error is caused by the Publish send operation.
let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: here and below, no need to clone the messages.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The messages are not Copy so we would have to either clone() or store the message in a smart pointer.
I'll keep ti as clone for now. Let me know if there's another obvious way that I might have missed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we get out of messages?. I think nothing. If you get rid of that, we can move the messages into the function (and avoid the clone()s).

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE.


let client = GapicPublisher::from_stub(mock);
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also want a test where there are subsequent messages in this batch.

(Those codecov comments are actually useful!)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to include this case.

We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to include this case.

Thanks.

We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.

No worries on this. Some error cases are infeasible to test.


Do we need/want a test checking errors for an ordering key with a batch of size > 1?

Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?

If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need/want a test checking errors for an ordering key with a batch of size > 1?
Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?
If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?

I don't think this is necessary for now. The current behavior is to return the send error for both m1 and m2 handles but I can be easily convinced the other behavior is better. Let me know if you have a preference and I can add a unittest.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think worth having a test that checks that they all get errors, even if not a specific type. Though we should confirm at least the first one gets the real error so we make sure it is propagated.

Go sets the same error for all in the batch: https://github.com/googleapis/google-cloud-go/blob/a80159a787f8e5f381127b0be0d579df6e882ad2/pubsub/v2/publisher.go#L500

msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
if self.paused {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't Flush need this treatment too?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I completely missed the case where there is an error when Send in Flush.
The PR is updated to handle this.
PTAL.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, so you made sure ordering keys are paused on flush errors. We should have a unit test verifying that.


I was also expecting to see an if self.paused { ...} in the ToBatchWorker::Flush branch, but maybe it is unnecessary and I am just ignorant of the internals.

When there is an error for an ordering key, it remains paused until the user resumes that specific ordering key. If the application perform the following:
Publish msg_a -> Flush -> Flush msg_b
Where msg_a and msg_b are both for the ordering key that is paused.
Then, we return an error for both msg_a and msg_b.

I hear you say this, and maybe the code says it too, but I am distrustful. If you want to convince me this is the behavior, you should have a unit test for this case. Those are hard to argue with.

This PR is already big. Feel free to cut it off with like a // TODO(#XXXX) - handle paused batch on flush somewhere in the code.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on moving this check for if paused to outside of the select? It might be easier to see what is happening for the paused ordering key (immediately returning errors, and doing nothing on Flush).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outside of the select?

How? The select! is how we await some new change of state (a new message, or a flush, or the batch timer firing). What does it mean to not await the next action?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess its more splitting the select

so we would have two, one that is firing when paused, and one for normal operation. They have significantly different behaviors and might feel less cluttered.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the paused state might actually be this simple since pause handles flushing out any pending messages. No select needed and when we add resume publish that is likely to be another message on the channel that we handle:

if self.paused {
            let msg = self.rx.recv().await;
            match msg {
                Some(ToBatchWorker::Publish(msg)) => {
                    let _ = msg.tx.send(Err(crate::error::PublishError::OrderingKeyPaused(())));
                }
                Some(ToBatchWorker::Flush(tx)) => {
                    // Nothing to flush, respond immediately.
                    let _ = tx.send(());
                }
                None => break,
            }
        }

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the pause logic out of the select statement as suggested. PTAL.

I also added the Publish error -> Flush -> Publish testcase in test_ordering_error_pause_then_flush. PTAL

Copy link
Copy Markdown
Collaborator Author

@PhongChuong PhongChuong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review.
PTAL.

msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
if self.paused {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I completely missed the case where there is an error when Send in Flush.
The PR is updated to handle this.
PTAL.


let client = GapicPublisher::from_stub(mock);
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to include this case.

We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.

@PhongChuong
Copy link
Copy Markdown
Collaborator Author

PhongChuong commented Jan 20, 2026

@dbolduc , thanks for the review.

Regarding:

What happens if there is an ordering key error, the application publishes messages with that ordering key, then calls flush() ? It should be paused right?

When there is an error for an ordering key, it remains paused until the user resumes that specific ordering key. If the application perform the following:
Publish msg_a -> Flush -> Flush msg_b
Where msg_a and msg_b are both for the ordering key that is paused.
Then, we return an error for both msg_a and msg_b. Flush does not return an error as Flush is not ordering key specific. Potentially, we can also return a list of ordering keys that are paused for Flush but we don't.

(Also, I am too lazy to read the code, do we ever clean up BatchWorkers? like when all batches are flushed and it is empty? If so we would not want to clean up ones that are paused. I didn't see this anywhere, so probably not.)

We currently do not clean up the BatchWorker. There's a TODO to do so. As for cleaning up paused worker, my current preference is to keep the BatchWorker around when the ordering key is paused so that all the error handling is done in the BatchWorker.

}
}

fn convert_error(e: crate::error::PublishError) -> gax::error::Error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the wrapped error for now. But worth discussing if we'd rather expose a different error type for clarity instead of being consistent.

Having a PublishError type now will make that discussion very concrete :)

Comment on lines +304 to +324
if let Some(join_next_result) = join_next_option {
match join_next_result {
Ok(inflight_result) => {
match inflight_result {
Ok(_) => {}
Err(_) => {
// There was a non-retryable error:
// 1. We need to pause publishing and send out errors for pending_msgs.
// 2. The pending batch should have sent out error for its messages.
// 3. The messages in rx will be handled when they are received.
self.pause();
}
}
}
Err(_) => {
// JoinError.
// This is unexpected and we should pause the publisher.
self.pause();
}
}
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplified version of this code block. Also see https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=bbe99125025278a238e722ef93a6f33b

Suggested change
if let Some(join_next_result) = join_next_option {
match join_next_result {
Ok(inflight_result) => {
match inflight_result {
Ok(_) => {}
Err(_) => {
// There was a non-retryable error:
// 1. We need to pause publishing and send out errors for pending_msgs.
// 2. The pending batch should have sent out error for its messages.
// 3. The messages in rx will be handled when they are received.
self.pause();
}
}
}
Err(_) => {
// JoinError.
// This is unexpected and we should pause the publisher.
self.pause();
}
}
}
// There was a non-retryable error:
// 1. We need to pause publishing and send out errors for pending_msgs.
// 2. The pending batch should have sent out error for its messages.
// 3. The messages in rx will be handled when they are received.
if let Some(Err(_) | Ok(Err(_))) = join_next_option {
publisher.pause();
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.
For some reason, my head still haven't gotten around that you can have more complex expressions.
Added.

msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
if self.paused {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts on moving this check for if paused to outside of the select? It might be easier to see what is happening for the paused ordering key (immediately returning errors, and doing nothing on Flush).

Copy link
Copy Markdown
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like @suzmue just reviewed this. I like her suggestions. I will publish my review anyway, sorry if it is confusing.

Aside from style nits in the unit tests, I need to see unit tests for any new behavior before I can approve a PR. Mainly, if we are changing ToBatchWorker::Flush, we should call flush() at least once in a unit test.

];

// Assert the first error is caused by the Publish send operation.
let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we get out of messages?. I think nothing. If you get rid of that, we can move the messages into the function (and avoid the clone()s).


let client = GapicPublisher::from_stub(mock);
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test to include this case.

Thanks.

We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.

No worries on this. Some error cases are infeasible to test.


Do we need/want a test checking errors for an ordering key with a batch of size > 1?

Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?

If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?

msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
if self.paused {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, so you made sure ordering keys are paused on flush errors. We should have a unit test verifying that.


I was also expecting to see an if self.paused { ...} in the ToBatchWorker::Flush branch, but maybe it is unnecessary and I am just ignorant of the internals.

When there is an error for an ordering key, it remains paused until the user resumes that specific ordering key. If the application perform the following:
Publish msg_a -> Flush -> Flush msg_b
Where msg_a and msg_b are both for the ordering key that is paused.
Then, we return an error for both msg_a and msg_b.

I hear you say this, and maybe the code says it too, but I am distrustful. If you want to convince me this is the behavior, you should have a unit test for this case. Those are hard to argue with.

This PR is already big. Feel free to cut it off with like a // TODO(#XXXX) - handle paused batch on flush somewhere in the code.

dbolduc
dbolduc previously approved these changes Jan 20, 2026
Comment on lines +330 to +331
// There should be no pending messages and messages in the pending batch as
// it was already handled when this was paused.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment: Ahhh. Thanks for this comment. I missed that we don't accept a message if the batch is paused, so there is never anything to flush.

Copy link
Copy Markdown
Collaborator Author

@PhongChuong PhongChuong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review both.
There's an issue where the GCB coverage test is failing but it seems to pass locally. I'll resolve this before merging.


let client = GapicPublisher::from_stub(mock);
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need/want a test checking errors for an ordering key with a batch of size > 1?
Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?
If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?

I don't think this is necessary for now. The current behavior is to return the send error for both m1 and m2 handles but I can be easily convinced the other behavior is better. Let me know if you have a preference and I can add a unittest.

msg = self.rx.recv() => {
match msg {
Some(ToBatchWorker::Publish(msg)) => {
if self.paused {
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the pause logic out of the select statement as suggested. PTAL.

I also added the Publish error -> Flush -> Publish testcase in test_ordering_error_pause_then_flush. PTAL

];

// Assert the first error is caused by the Publish send operation.
let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err();
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DONE.

Comment on lines +304 to +324
if let Some(join_next_result) = join_next_option {
match join_next_result {
Ok(inflight_result) => {
match inflight_result {
Ok(_) => {}
Err(_) => {
// There was a non-retryable error:
// 1. We need to pause publishing and send out errors for pending_msgs.
// 2. The pending batch should have sent out error for its messages.
// 3. The messages in rx will be handled when they are received.
self.pause();
}
}
}
Err(_) => {
// JoinError.
// This is unexpected and we should pause the publisher.
self.pause();
}
}
}
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice.
For some reason, my head still haven't gotten around that you can have more complex expressions.
Added.

@dbolduc
Copy link
Copy Markdown
Member

dbolduc commented Jan 20, 2026

GCB coverage test

This might mean there is a timing based flake. The coverage test is a lot slower because it does a lot of extra things.

I think you should start_paused = true for the new tests.

@PhongChuong
Copy link
Copy Markdown
Collaborator Author

@dbolduc , adding start_paused = true fixed the issue. Thanks.

Copy link
Copy Markdown
Collaborator

@suzmue suzmue left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only change I would need to see to approve this PR is to put the tokio::select behind an else, because I think that is a bug (ideally with a test case that triggers the bug in the original code).

Otherwise we've had a lot of discussion and we can keep iterating in a future PR.


let client = GapicPublisher::from_stub(mock);
let publisher = PublisherBuilder::new(client, "my-topic".to_string())
.set_message_count_threshold(1_u32)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think worth having a test that checks that they all get errors, even if not a specific type. Though we should confirm at least the first one gets the real error so we make sure it is propagated.

Go sets the same error for all in the batch: https://github.com/googleapis/google-cloud-go/blob/a80159a787f8e5f381127b0be0d579df6e882ad2/pubsub/v2/publisher.go#L500

.build();

// Cause "ordering key with error" ordering key to pause.
publisher.publish(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we verify this result?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
}
}
tokio::select! {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this select needs to be in an else.

Otherwise if we get two messages in a row, we may process one in the if paused and the other in the select.

I think it might be possible to trigger this in a test by sending two messages that should both get errors, then awaiting on the results (with no flush in between)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.
Fixed with additional validation in test_ordering_error_pause_publisher test.


/// Publish is paused for the ordering key.
#[error("the ordering key was paused")]
OrderingKeyPaused(()),
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally convinced we should treat this differently than our other GAPICs. Users wont know if the message will fail before sending, and if we only give it back on one of the error cases, they will need to keep a copy of it regardless.

suzmue
suzmue previously approved these changes Jan 21, 2026
Copy link
Copy Markdown
Member

@dbolduc dbolduc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to just merge this thing and address nits in a follow up PR

self.move_to_batch();
if self.at_batch_threshold() {
self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight);
} else {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional:

if foo {
  // blah blah blah
  continue;
}

// else blah blah blah

saves some nesting

Comment on lines +1100 to +1101
.times(1)
.in_sequence(&mut seq)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seq is adding nothing.

// Assert that new publish messages return errors because the Publisher is paused.
let paused_messages = [
PubsubMessage::new()
.set_data("hello pause 0".to_string())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .to_string() is unnecessary

@PhongChuong PhongChuong merged commit 7653338 into googleapis:main Jan 21, 2026
30 checks passed
PhongChuong added a commit that referenced this pull request Jan 22, 2026
@PhongChuong PhongChuong deleted the orderingError branch January 24, 2026 03:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants