Skip to content

Commit dcc3edd

Browse files
authored
feat(pubsub): retry batches for ordering key messages forever (googleapis#4722)
The Pub/Sub client libraries support ordered publishing for messages with ordering keys. Publishing for an ordering key is stopped when there is a permanent error. Since the cost of failing on trayient libraries will retry transient errors *indefinitely*. This change overrides the retry policy for batches with ordering keys. There is currently no way to configure this retry policy for the clients, we may wish to add one in the future. For googleapis#4013
1 parent d001e84 commit dcc3edd

3 files changed

Lines changed: 190 additions & 4 deletions

File tree

src/pubsub/src/publisher/actor.rs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ impl SequentialBatchActor {
371371
// a single inflight task at any given time, the use of JoinSet
372372
// simplify the managing the inflight JoinHandle.
373373
let mut inflight: JoinSet<crate::Result<()>> = JoinSet::new();
374-
let mut batch = Batch::new(
374+
let mut batch = Batch::new_with_ordering_key(
375375
self.context.topic.len() as u32,
376376
self.context.batching_options.clone(),
377377
);
@@ -1124,6 +1124,72 @@ mod tests {
11241124
Ok(())
11251125
}
11261126

1127+
#[cfg_attr(
1128+
tokio_unstable,
1129+
tokio::test(
1130+
start_paused = true,
1131+
flavor = "current_thread",
1132+
unhandled_panic = "shutdown_runtime"
1133+
)
1134+
)]
1135+
#[cfg_attr(not(tokio_unstable), tokio::test(start_paused = true))]
1136+
async fn sequential_actor_overrides_retry() -> anyhow::Result<()> {
1137+
let mut mock = MockGapicPublisherWithFuture::new();
1138+
let mut seq = Sequence::new();
1139+
mock.expect_publish()
1140+
.times(1)
1141+
.in_sequence(&mut seq)
1142+
.withf(|_, o| o.retry_policy().is_some())
1143+
.returning(|r, o| Box::pin(async { publish_ok(r, o) }));
1144+
1145+
let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel();
1146+
tokio::spawn(
1147+
SequentialBatchActor::new(
1148+
TOPIC.to_string(),
1149+
GapicPublisher::from_stub(mock),
1150+
BatchingOptions::default().set_message_count_threshold(1_u32),
1151+
actor_rx,
1152+
)
1153+
.run(),
1154+
);
1155+
1156+
assert_publish_is_ok!(actor_tx, 1);
1157+
Ok(())
1158+
}
1159+
1160+
#[cfg_attr(
1161+
tokio_unstable,
1162+
tokio::test(
1163+
start_paused = true,
1164+
flavor = "current_thread",
1165+
unhandled_panic = "shutdown_runtime"
1166+
)
1167+
)]
1168+
#[cfg_attr(not(tokio_unstable), tokio::test(start_paused = true))]
1169+
async fn concurrent_actor_uses_default_retry() -> anyhow::Result<()> {
1170+
let mut mock = MockGapicPublisherWithFuture::new();
1171+
let mut seq = Sequence::new();
1172+
mock.expect_publish()
1173+
.times(1)
1174+
.in_sequence(&mut seq)
1175+
.withf(|_, o| o.retry_policy().is_none())
1176+
.returning(|r, o| Box::pin(async { publish_ok(r, o) }));
1177+
1178+
let (actor_tx, actor_rx) = tokio::sync::mpsc::unbounded_channel();
1179+
tokio::spawn(
1180+
ConcurrentBatchActor::new(
1181+
TOPIC.to_string(),
1182+
GapicPublisher::from_stub(mock),
1183+
BatchingOptions::default().set_message_count_threshold(1_u32),
1184+
actor_rx,
1185+
)
1186+
.run(),
1187+
);
1188+
1189+
assert_publish_is_ok!(actor_tx, 1);
1190+
Ok(())
1191+
}
1192+
11271193
#[cfg_attr(
11281194
tokio_unstable,
11291195
tokio::test(
@@ -1138,7 +1204,10 @@ mod tests {
11381204
mock.expect_publish()
11391205
.withf(|req, _o| {
11401206
// Recreate the batch from req to calculate the batch size.
1141-
let mut batch = Batch::new(req.topic.len() as u32, BatchingOptions::default());
1207+
let mut batch = Batch::new_with_ordering_key(
1208+
req.topic.len() as u32,
1209+
BatchingOptions::default(),
1210+
);
11421211
req.messages.iter().for_each(|msg| {
11431212
let (tx, _rx) = tokio::sync::oneshot::channel();
11441213
batch.push(BundledMessage {

src/pubsub/src/publisher/batch.rs

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub(crate) struct Batch {
2626
initial_size: u32,
2727
messages_byte_size: u32,
2828
batching_options: BatchingOptions,
29+
has_ordering_key: bool,
2930
}
3031

3132
impl Batch {
@@ -34,7 +35,21 @@ impl Batch {
3435
initial_size,
3536
messages_byte_size: initial_size,
3637
batching_options,
37-
..Batch::default()
38+
has_ordering_key: false,
39+
messages: Vec::new(),
40+
}
41+
}
42+
43+
pub(crate) fn new_with_ordering_key(
44+
initial_size: u32,
45+
batching_options: BatchingOptions,
46+
) -> Self {
47+
Batch {
48+
initial_size,
49+
messages_byte_size: initial_size,
50+
batching_options,
51+
has_ordering_key: true,
52+
messages: Vec::new(),
3853
}
3954
}
4055

@@ -97,6 +112,7 @@ impl Batch {
97112
messages: self.messages.drain(..).collect(),
98113
messages_byte_size: self.messages_byte_size,
99114
batching_options: self.batching_options.clone(),
115+
has_ordering_key: self.has_ordering_key,
100116
};
101117
self.messages_byte_size = self.initial_size;
102118
inflight.spawn(batch_to_send.send(client, topic));
@@ -109,7 +125,12 @@ impl Batch {
109125
.into_iter()
110126
.map(|msg| (msg.msg, msg.tx))
111127
.unzip();
112-
let request = client.publish().set_topic(topic).set_messages(msgs);
128+
let mut request = client.publish().set_topic(topic).set_messages(msgs);
129+
if self.has_ordering_key {
130+
use google_cloud_gax::options::RequestOptionsBuilder;
131+
request =
132+
request.with_retry_policy(super::retry_policy::default_ordering_retry_policy());
133+
}
113134

114135
// Handle the response by extracting the message ID on success.
115136
match request.send().await {
@@ -241,6 +262,93 @@ mod tests {
241262
Ok(())
242263
}
243264

265+
#[cfg_attr(
266+
tokio_unstable,
267+
tokio::test(
268+
start_paused = true,
269+
flavor = "current_thread",
270+
unhandled_panic = "shutdown_runtime"
271+
)
272+
)]
273+
#[cfg_attr(not(tokio_unstable), tokio::test(start_paused = true))]
274+
async fn send_overrides_ordering_retry_policy() {
275+
let mut mock = MockGapicPublisher::new();
276+
mock.expect_publish()
277+
.withf(|r, o| r.topic == "topic" && o.retry_policy().is_some())
278+
.times(2)
279+
.returning(|_, _| Ok(crate::Response::from(PublishResponse::new())));
280+
let client = GapicPublisher::from_stub(mock);
281+
282+
let mut batch =
283+
Batch::new_with_ordering_key("topic".len() as u32, BatchingOptions::default());
284+
assert!(batch.is_empty());
285+
286+
let (message_a, _rx_a) = create_bundled_message_from_bytes("hello");
287+
batch.push(message_a);
288+
assert_eq!(batch.len(), 1);
289+
290+
let mut inflight = JoinSet::new();
291+
batch.flush(client.clone(), "topic".to_string(), &mut inflight);
292+
assert_eq!(batch.len(), 0);
293+
inflight.join_all().await;
294+
295+
let (message_b, _rx_b) = create_bundled_message_from_bytes(", ");
296+
batch.push(message_b);
297+
assert_eq!(batch.len(), 1);
298+
299+
let (message_c, _rx_c) = create_bundled_message_from_bytes("world");
300+
batch.push(message_c);
301+
assert_eq!(batch.len(), 2);
302+
303+
let mut inflight = JoinSet::new();
304+
batch.flush(client, "topic".to_string(), &mut inflight);
305+
assert_eq!(batch.len(), 0);
306+
inflight.join_all().await;
307+
}
308+
309+
#[cfg_attr(
310+
tokio_unstable,
311+
tokio::test(
312+
start_paused = true,
313+
flavor = "current_thread",
314+
unhandled_panic = "shutdown_runtime"
315+
)
316+
)]
317+
#[cfg_attr(not(tokio_unstable), tokio::test(start_paused = true))]
318+
async fn send_uses_default_retry() {
319+
let mut mock = MockGapicPublisher::new();
320+
mock.expect_publish()
321+
.withf(|r, o| r.topic == "topic" && o.retry_policy().is_none())
322+
.times(2)
323+
.returning(|_, _| Ok(crate::Response::from(PublishResponse::new())));
324+
let client = GapicPublisher::from_stub(mock);
325+
326+
let mut batch = Batch::new("topic".len() as u32, BatchingOptions::default());
327+
assert!(batch.is_empty());
328+
329+
let (message_a, _rx_a) = create_bundled_message_from_bytes("hello");
330+
batch.push(message_a);
331+
assert_eq!(batch.len(), 1);
332+
333+
let mut inflight = JoinSet::new();
334+
batch.flush(client.clone(), "topic".to_string(), &mut inflight);
335+
assert_eq!(batch.len(), 0);
336+
inflight.join_all().await;
337+
338+
let (message_b, _rx_b) = create_bundled_message_from_bytes(", ");
339+
batch.push(message_b);
340+
assert_eq!(batch.len(), 1);
341+
342+
let (message_c, _rx_c) = create_bundled_message_from_bytes("world");
343+
batch.push(message_c);
344+
assert_eq!(batch.len(), 2);
345+
346+
let mut inflight = JoinSet::new();
347+
batch.flush(client, "topic".to_string(), &mut inflight);
348+
assert_eq!(batch.len(), 0);
349+
inflight.join_all().await;
350+
}
351+
244352
fn create_bundled_message_from_bytes<T: Into<::bytes::Bytes>>(
245353
data: T,
246354
) -> (

src/pubsub/src/publisher/retry_policy.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ pub(crate) fn default_retry_policy() -> impl RetryPolicy {
4747
RetryableErrors.with_time_limit(Duration::from_secs(600))
4848
}
4949

50+
/// The default retry policy for messages with ordering keys in the Pub/Sub
51+
/// publisher.
52+
///
53+
/// The client will retry all the errors shown as retryable in the service
54+
/// documentation forever.
55+
pub(crate) fn default_ordering_retry_policy() -> impl RetryPolicy {
56+
RetryableErrors
57+
}
58+
5059
/// Follows the retry strategy recommended by the Cloud Pub/Sub guides on
5160
/// [error codes].
5261
///

0 commit comments

Comments
 (0)