-
Notifications
You must be signed in to change notification settings - Fork 216
Description
It is possible to call activate_ordering_keys with keys which are not present in messages_on_hold._pending_ordered_messages. This can happen if:
-
A message (or messages) lease exceeds max_lease_duration, it is dropped from maintain_leases
-
We call drop in the dispatcher
def drop( -
If there are no messages remaining in messages_on_hold._pending_ordered_messages, therefore the pending_ordered_messages queue for the ordering key is removed:
https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py#L154 -
The message is acked from the user callback, the ack completes and drop is called in the dispatcher:
self.drop(requests_completed) -
We call _activate_ordering_keys with a key that does not exist in messages_on_hold._pending_ordered_messages:
https://github.com/googleapis/python-pubsub/blob/main/google/cloud/pubsub_v1/subscriber/_protocol/messages_on_hold.py#L122
Reproduction:
- Publish a single message with an ordering key:
- Subscribe with flow control max_lease_duration < time to ack the message, to ensure that the library will drop leasing prior to the message being acked.
Results:
Listening for messages...
Received b'message1'.
Dropping 1 items because they were leased too long.
Acking message with ordering key: key1
AssertionError: A message queue should exist for every ordered message in flight.