feat(pubsub): pause the publisher for an ordering key when there is an error#4286
feat(pubsub): pause the publisher for an ordering key when there is an error#4286PhongChuong merged 13 commits intogoogleapis:mainfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4286 +/- ##
==========================================
+ Coverage 94.85% 94.87% +0.01%
==========================================
Files 187 187
Lines 7195 7219 +24
==========================================
+ Hits 6825 6849 +24
Misses 370 370 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
dbolduc
left a comment
There was a problem hiding this comment.
I didn't get to the changes in worker.rs
Consider doing the error type refactor first, then the unit tests look more like what we want.
|
|
||
| /// Publish is paused for the ordering key. | ||
| #[error("the ordering key was paused")] | ||
| OrderingKeyPaused(()), |
There was a problem hiding this comment.
Usability question, which is not trivial, so feel free to think about it later:
Publish consumes the application's PubsubMessage. If the operation fails, what should the application do to resend the message? Would they need to hold onto a clone of the message until the operation is complete?
It would be super convenient if we could give them their message back. The plumbing on our end could be brutal, but the application would appreciate it. 🤷
There was a problem hiding this comment.
our GAPICs all have this problem too.... idk if publishing is special. Something seems wrong about just dropping their message without trying to send it. But maybe I am worrying too much about the unhappy case. 🤷
There was a problem hiding this comment.
Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:
- Keep a clone of it internally and pass it back to the user if there is a failure.
- Augment generated code/error to pass the message back if there is an error during Send.
I think it's a bigger discussion that should be left out of this PR.
There was a problem hiding this comment.
I'm not totally convinced we should treat this differently than our other GAPICs. Users wont know if the message will fail before sending, and if we only give it back on one of the error cases, they will need to keep a copy of it regardless.
| } | ||
| } | ||
|
|
||
| fn convert_error(e: crate::error::PublishError) -> gax::error::Error { |
There was a problem hiding this comment.
comment: ah, I think you are hesitating to change Output to a Result<String, PublishError> because then we have to update all the code downstream of this.
Ok.
There was a problem hiding this comment.
This is really yucky though. It might have been nicer to change the error type first.
There was a problem hiding this comment.
I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.
There was a problem hiding this comment.
It feels so wrong to me, but we do have a precedence for just throwing something in an Error::io
google-cloud-rust/src/storage/src/storage/bidi/worker.rs
Lines 148 to 149 in d51e1ca
Although in GCS, I think there was a more compelling reason. (We wanted to reuse the ReadObjectResponse type which was in terms of gax::Error.)
There was a problem hiding this comment.
Let's keep the wrapped error for now. But worth discussing if we'd rather expose a different error type for clarity instead of being consistent.
Having a PublishError type now will make that discussion very concrete :)
PhongChuong
left a comment
There was a problem hiding this comment.
Thanks for the review.
|
|
||
| /// Publish is paused for the ordering key. | ||
| #[error("the ordering key was paused")] | ||
| OrderingKeyPaused(()), |
There was a problem hiding this comment.
Agreed.
Ideally, in the error case we should give the user the message back. IMO, we can do this in 2 ways:
- Keep a clone of it internally and pass it back to the user if there is a failure.
- Augment generated code/error to pass the message back if there is an error during Send.
I think it's a bigger discussion that should be left out of this PR.
| } | ||
| } | ||
|
|
||
| fn convert_error(e: crate::error::PublishError) -> gax::error::Error { |
There was a problem hiding this comment.
I think we should discuss as a group with @suzmue if returning gax::error::Error or Publish error directly is ideal. We decided to with gax::error::Error for now as it is consistent with the other clients.
dbolduc
left a comment
There was a problem hiding this comment.
Sorry, this is my first time looking at the publisher code, so I did my best.
What happens if there is an ordering key error, the application publishes messages with that ordering key, then calls flush() ? It should be paused right?
(Also, I am too lazy to read the code, do we ever clean up BatchWorkers? like when all batches are flushed and it is empty? If so we would not want to clean up ones that are paused. I didn't see this anywhere, so probably not.)
| ]; | ||
|
|
||
| // Assert the first error is caused by the Publish send operation. | ||
| let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err(); |
There was a problem hiding this comment.
nit: here and below, no need to clone the messages.
There was a problem hiding this comment.
The messages are not Copy so we would have to either clone() or store the message in a smart pointer.
I'll keep ti as clone for now. Let me know if there's another obvious way that I might have missed.
There was a problem hiding this comment.
What do we get out of messages?. I think nothing. If you get rid of that, we can move the messages into the function (and avoid the clone()s).
|
|
||
| let client = GapicPublisher::from_stub(mock); | ||
| let publisher = PublisherBuilder::new(client, "my-topic".to_string()) | ||
| .set_message_count_threshold(1_u32) |
There was a problem hiding this comment.
We also want a test where there are subsequent messages in this batch.
(Those codecov comments are actually useful!)
There was a problem hiding this comment.
Updated the test to include this case.
We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.
There was a problem hiding this comment.
Updated the test to include this case.
Thanks.
We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.
No worries on this. Some error cases are infeasible to test.
Do we need/want a test checking errors for an ordering key with a batch of size > 1?
Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?
If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?
There was a problem hiding this comment.
Do we need/want a test checking errors for an ordering key with a batch of size > 1?
Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?
If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?
I don't think this is necessary for now. The current behavior is to return the send error for both m1 and m2 handles but I can be easily convinced the other behavior is better. Let me know if you have a preference and I can add a unittest.
There was a problem hiding this comment.
I think worth having a test that checks that they all get errors, even if not a specific type. Though we should confirm at least the first one gets the real error so we make sure it is propagated.
Go sets the same error for all in the batch: https://github.com/googleapis/google-cloud-go/blob/a80159a787f8e5f381127b0be0d579df6e882ad2/pubsub/v2/publisher.go#L500
src/pubsub/src/publisher/worker.rs
Outdated
| msg = self.rx.recv() => { | ||
| match msg { | ||
| Some(ToBatchWorker::Publish(msg)) => { | ||
| if self.paused { |
There was a problem hiding this comment.
doesn't Flush need this treatment too?
There was a problem hiding this comment.
Good catch. I completely missed the case where there is an error when Send in Flush.
The PR is updated to handle this.
PTAL.
There was a problem hiding this comment.
Ah, ok, so you made sure ordering keys are paused on flush errors. We should have a unit test verifying that.
I was also expecting to see an if self.paused { ...} in the ToBatchWorker::Flush branch, but maybe it is unnecessary and I am just ignorant of the internals.
When there is an error for an ordering key, it remains paused until the user resumes that specific ordering key. If the application perform the following:
Publish msg_a -> Flush -> Flush msg_b
Where msg_a and msg_b are both for the ordering key that is paused.
Then, we return an error for both msg_a and msg_b.
I hear you say this, and maybe the code says it too, but I am distrustful. If you want to convince me this is the behavior, you should have a unit test for this case. Those are hard to argue with.
This PR is already big. Feel free to cut it off with like a // TODO(#XXXX) - handle paused batch on flush somewhere in the code.
There was a problem hiding this comment.
Thoughts on moving this check for if paused to outside of the select? It might be easier to see what is happening for the paused ordering key (immediately returning errors, and doing nothing on Flush).
There was a problem hiding this comment.
outside of the select?
How? The select! is how we await some new change of state (a new message, or a flush, or the batch timer firing). What does it mean to not await the next action?
There was a problem hiding this comment.
I guess its more splitting the select
so we would have two, one that is firing when paused, and one for normal operation. They have significantly different behaviors and might feel less cluttered.
There was a problem hiding this comment.
I believe the paused state might actually be this simple since pause handles flushing out any pending messages. No select needed and when we add resume publish that is likely to be another message on the channel that we handle:
if self.paused {
let msg = self.rx.recv().await;
match msg {
Some(ToBatchWorker::Publish(msg)) => {
let _ = msg.tx.send(Err(crate::error::PublishError::OrderingKeyPaused(())));
}
Some(ToBatchWorker::Flush(tx)) => {
// Nothing to flush, respond immediately.
let _ = tx.send(());
}
None => break,
}
}There was a problem hiding this comment.
Moved the pause logic out of the select statement as suggested. PTAL.
I also added the Publish error -> Flush -> Publish testcase in test_ordering_error_pause_then_flush. PTAL
PhongChuong
left a comment
There was a problem hiding this comment.
Thanks for the review.
PTAL.
src/pubsub/src/publisher/worker.rs
Outdated
| msg = self.rx.recv() => { | ||
| match msg { | ||
| Some(ToBatchWorker::Publish(msg)) => { | ||
| if self.paused { |
There was a problem hiding this comment.
Good catch. I completely missed the case where there is an error when Send in Flush.
The PR is updated to handle this.
PTAL.
|
|
||
| let client = GapicPublisher::from_stub(mock); | ||
| let publisher = PublisherBuilder::new(client, "my-topic".to_string()) | ||
| .set_message_count_threshold(1_u32) |
There was a problem hiding this comment.
Updated the test to include this case.
We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.
|
@dbolduc , thanks for the review. Regarding:
When there is an error for an ordering key, it remains paused until the user resumes that specific ordering key. If the application perform the following:
We currently do not clean up the BatchWorker. There's a TODO to do so. As for cleaning up paused worker, my current preference is to keep the BatchWorker around when the ordering key is paused so that all the error handling is done in the BatchWorker. |
| } | ||
| } | ||
|
|
||
| fn convert_error(e: crate::error::PublishError) -> gax::error::Error { |
There was a problem hiding this comment.
Let's keep the wrapped error for now. But worth discussing if we'd rather expose a different error type for clarity instead of being consistent.
Having a PublishError type now will make that discussion very concrete :)
src/pubsub/src/publisher/worker.rs
Outdated
| if let Some(join_next_result) = join_next_option { | ||
| match join_next_result { | ||
| Ok(inflight_result) => { | ||
| match inflight_result { | ||
| Ok(_) => {} | ||
| Err(_) => { | ||
| // There was a non-retryable error: | ||
| // 1. We need to pause publishing and send out errors for pending_msgs. | ||
| // 2. The pending batch should have sent out error for its messages. | ||
| // 3. The messages in rx will be handled when they are received. | ||
| self.pause(); | ||
| } | ||
| } | ||
| } | ||
| Err(_) => { | ||
| // JoinError. | ||
| // This is unexpected and we should pause the publisher. | ||
| self.pause(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Simplified version of this code block. Also see https://play.rust-lang.org/?version=stable&mode=debug&edition=2024&gist=bbe99125025278a238e722ef93a6f33b
| if let Some(join_next_result) = join_next_option { | |
| match join_next_result { | |
| Ok(inflight_result) => { | |
| match inflight_result { | |
| Ok(_) => {} | |
| Err(_) => { | |
| // There was a non-retryable error: | |
| // 1. We need to pause publishing and send out errors for pending_msgs. | |
| // 2. The pending batch should have sent out error for its messages. | |
| // 3. The messages in rx will be handled when they are received. | |
| self.pause(); | |
| } | |
| } | |
| } | |
| Err(_) => { | |
| // JoinError. | |
| // This is unexpected and we should pause the publisher. | |
| self.pause(); | |
| } | |
| } | |
| } | |
| // There was a non-retryable error: | |
| // 1. We need to pause publishing and send out errors for pending_msgs. | |
| // 2. The pending batch should have sent out error for its messages. | |
| // 3. The messages in rx will be handled when they are received. | |
| if let Some(Err(_) | Ok(Err(_))) = join_next_option { | |
| publisher.pause(); | |
| } |
There was a problem hiding this comment.
Nice.
For some reason, my head still haven't gotten around that you can have more complex expressions.
Added.
src/pubsub/src/publisher/worker.rs
Outdated
| msg = self.rx.recv() => { | ||
| match msg { | ||
| Some(ToBatchWorker::Publish(msg)) => { | ||
| if self.paused { |
There was a problem hiding this comment.
Thoughts on moving this check for if paused to outside of the select? It might be easier to see what is happening for the paused ordering key (immediately returning errors, and doing nothing on Flush).
dbolduc
left a comment
There was a problem hiding this comment.
Looks like @suzmue just reviewed this. I like her suggestions. I will publish my review anyway, sorry if it is confusing.
Aside from style nits in the unit tests, I need to see unit tests for any new behavior before I can approve a PR. Mainly, if we are changing ToBatchWorker::Flush, we should call flush() at least once in a unit test.
| ]; | ||
|
|
||
| // Assert the first error is caused by the Publish send operation. | ||
| let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err(); |
There was a problem hiding this comment.
What do we get out of messages?. I think nothing. If you get rid of that, we can move the messages into the function (and avoid the clone()s).
|
|
||
| let client = GapicPublisher::from_stub(mock); | ||
| let publisher = PublisherBuilder::new(client, "my-topic".to_string()) | ||
| .set_message_count_threshold(1_u32) |
There was a problem hiding this comment.
Updated the test to include this case.
Thanks.
We're still missing the case where there is a JoinError in join_next. I'll have to take a look to see if there is way to inject tokio errors.
No worries on this. Some error cases are infeasible to test.
Do we need/want a test checking errors for an ordering key with a batch of size > 1?
Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?
If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?
src/pubsub/src/publisher/worker.rs
Outdated
| msg = self.rx.recv() => { | ||
| match msg { | ||
| Some(ToBatchWorker::Publish(msg)) => { | ||
| if self.paused { |
There was a problem hiding this comment.
Ah, ok, so you made sure ordering keys are paused on flush errors. We should have a unit test verifying that.
I was also expecting to see an if self.paused { ...} in the ToBatchWorker::Flush branch, but maybe it is unnecessary and I am just ignorant of the internals.
When there is an error for an ordering key, it remains paused until the user resumes that specific ordering key. If the application perform the following:
Publish msg_a -> Flush -> Flush msg_b
Where msg_a and msg_b are both for the ordering key that is paused.
Then, we return an error for both msg_a and msg_b.
I hear you say this, and maybe the code says it too, but I am distrustful. If you want to convince me this is the behavior, you should have a unit test for this case. Those are hard to argue with.
This PR is already big. Feel free to cut it off with like a // TODO(#XXXX) - handle paused batch on flush somewhere in the code.
| // There should be no pending messages and messages in the pending batch as | ||
| // it was already handled when this was paused. |
There was a problem hiding this comment.
comment: Ahhh. Thanks for this comment. I missed that we don't accept a message if the batch is paused, so there is never anything to flush.
PhongChuong
left a comment
There was a problem hiding this comment.
Thanks for the review both.
There's an issue where the GCB coverage test is failing but it seems to pass locally. I'll resolve this before merging.
|
|
||
| let client = GapicPublisher::from_stub(mock); | ||
| let publisher = PublisherBuilder::new(client, "my-topic".to_string()) | ||
| .set_message_count_threshold(1_u32) |
There was a problem hiding this comment.
Do we need/want a test checking errors for an ordering key with a batch of size > 1?
Like if we have a batch with m1, m2, where they both have ordering keys, and we get some send error, should we return the send error to the m1 and m2 handles? Or should m1 handle get the send error, and m2 handle error gets the ordering key error?
If either behavior is valid, then we don't need an overly restrictive test that commits to one. WDYT?
I don't think this is necessary for now. The current behavior is to return the send error for both m1 and m2 handles but I can be easily convinced the other behavior is better. Let me know if you have a preference and I can add a unittest.
src/pubsub/src/publisher/worker.rs
Outdated
| msg = self.rx.recv() => { | ||
| match msg { | ||
| Some(ToBatchWorker::Publish(msg)) => { | ||
| if self.paused { |
There was a problem hiding this comment.
Moved the pause logic out of the select statement as suggested. PTAL.
I also added the Publish error -> Flush -> Publish testcase in test_ordering_error_pause_then_flush. PTAL
| ]; | ||
|
|
||
| // Assert the first error is caused by the Publish send operation. | ||
| let mut got_err = publisher.publish(messages[0].clone()).await.unwrap_err(); |
src/pubsub/src/publisher/worker.rs
Outdated
| if let Some(join_next_result) = join_next_option { | ||
| match join_next_result { | ||
| Ok(inflight_result) => { | ||
| match inflight_result { | ||
| Ok(_) => {} | ||
| Err(_) => { | ||
| // There was a non-retryable error: | ||
| // 1. We need to pause publishing and send out errors for pending_msgs. | ||
| // 2. The pending batch should have sent out error for its messages. | ||
| // 3. The messages in rx will be handled when they are received. | ||
| self.pause(); | ||
| } | ||
| } | ||
| } | ||
| Err(_) => { | ||
| // JoinError. | ||
| // This is unexpected and we should pause the publisher. | ||
| self.pause(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Nice.
For some reason, my head still haven't gotten around that you can have more complex expressions.
Added.
This might mean there is a timing based flake. The coverage test is a lot slower because it does a lot of extra things. I think you should |
|
@dbolduc , adding |
suzmue
left a comment
There was a problem hiding this comment.
The only change I would need to see to approve this PR is to put the tokio::select behind an else, because I think that is a bug (ideally with a test case that triggers the bug in the original code).
Otherwise we've had a lot of discussion and we can keep iterating in a future PR.
|
|
||
| let client = GapicPublisher::from_stub(mock); | ||
| let publisher = PublisherBuilder::new(client, "my-topic".to_string()) | ||
| .set_message_count_threshold(1_u32) |
There was a problem hiding this comment.
I think worth having a test that checks that they all get errors, even if not a specific type. Though we should confirm at least the first one gets the real error so we make sure it is propagated.
Go sets the same error for all in the batch: https://github.com/googleapis/google-cloud-go/blob/a80159a787f8e5f381127b0be0d579df6e882ad2/pubsub/v2/publisher.go#L500
| .build(); | ||
|
|
||
| // Cause "ordering key with error" ordering key to pause. | ||
| publisher.publish( |
There was a problem hiding this comment.
nit: should we verify this result?
src/pubsub/src/publisher/worker.rs
Outdated
| } | ||
| } | ||
| } | ||
| tokio::select! { |
There was a problem hiding this comment.
I think this select needs to be in an else.
Otherwise if we get two messages in a row, we may process one in the if paused and the other in the select.
I think it might be possible to trigger this in a test by sending two messages that should both get errors, then awaiting on the results (with no flush in between)
There was a problem hiding this comment.
Good catch.
Fixed with additional validation in test_ordering_error_pause_publisher test.
|
|
||
| /// Publish is paused for the ordering key. | ||
| #[error("the ordering key was paused")] | ||
| OrderingKeyPaused(()), |
There was a problem hiding this comment.
I'm not totally convinced we should treat this differently than our other GAPICs. Users wont know if the message will fail before sending, and if we only give it back on one of the error cases, they will need to keep a copy of it regardless.
dbolduc
left a comment
There was a problem hiding this comment.
Feel free to just merge this thing and address nits in a follow up PR
| self.move_to_batch(); | ||
| if self.at_batch_threshold() { | ||
| self.pending_batch.flush(self.client.clone(), self.topic.clone(), &mut inflight); | ||
| } else { |
There was a problem hiding this comment.
optional:
if foo {
// blah blah blah
continue;
}
// else blah blah blahsaves some nesting
| .times(1) | ||
| .in_sequence(&mut seq) |
| // Assert that new publish messages return errors because the Publisher is paused. | ||
| let paused_messages = [ | ||
| PubsubMessage::new() | ||
| .set_data("hello pause 0".to_string()) |
There was a problem hiding this comment.
nit: .to_string() is unnecessary
Followup to @dbolduc 's [comments](#4286 (review)) in #4286
Pause the Publisher when we encounter an error when there is a send error.
When an error is encountered for a pending batch, we:
A resume operation will be added in a later PR.
This PR also introduce PublishError. Further work is needed to handle error propagation more fully