feat(pubsub): Initial support message ordering in Publisher#4016
feat(pubsub): Initial support message ordering in Publisher#4016PhongChuong merged 6 commits intogoogleapis:mainfrom
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4016 +/- ##
=======================================
Coverage 95.27% 95.27%
=======================================
Files 167 167
Lines 6324 6326 +2
=======================================
+ Hits 6025 6027 +2
Misses 299 299 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| _ = &mut timer => { | ||
| for (_, outstanding) in pending_batches.iter_mut() { | ||
| if !outstanding.pending_batch.is_empty() { | ||
| outstanding.pending_batch.flush(self.client.clone(), self.topic_name.clone(), &mut inflight); |
There was a problem hiding this comment.
Need to double check with rest of PR, but I don't think this takes into account that there may be a batch already out for that ordering key?
I think this will be easier to do with the refactoring that we talked about where more of the control for flushing batches is moved to the OutstandingPublishes, so I am okay with doing in a follow up, but please add a TODO for it.
There was a problem hiding this comment.
Yep. This does not take into account if there is a batch inflight.
The TODO on worker.rs:156 is to handle this.
| // A dictionary of ordering key to outstanding publish operations. | ||
| // We batch publish operations on the same ordering key together. | ||
| // Publish without ordering keys are treated as having the key "". | ||
| let mut pending_batches: HashMap<String, OutstandingPublishes> = HashMap::new(); |
There was a problem hiding this comment.
One thing to think about, do we ever remove ordering keys from this map? Or will it grow indefinitely?
There was a problem hiding this comment.
It will grow indefinitely, I've added a TODO.
| while flushing.next().await.is_some() {} | ||
| for (_, outstanding) in pending_batches.iter_mut() { | ||
| if !outstanding.pending_batch.is_empty() { | ||
| outstanding.pending_batch.flush(self.client.clone(), self.topic_name.clone(), &mut inflight); |
There was a problem hiding this comment.
I would at least add a comment / TODO here for the behavior of flush when there are ordering keys. This will involve sending multiple batches sequentially since they need to be ordered. So it is not as simple as just flushing the existing messages and waiting on the response.
There was a problem hiding this comment.
Added a TODO here with more explicit callout.
|
Thanks for the review. PTAL. |
suzmue
left a comment
There was a problem hiding this comment.
With the TODOs, I'm fine to submit this to keep making forward progress.
Add initial support for message ordering in Publisher. This feature does not retry and does not respect message ordering in the case of failure. These will be supported in a later PR.
Towards: #4012