Environment details
- OS type and version: Debian Bookworm (Docker slim image)
- Python version: 3.9.18
- pip version: 23.1
- google-cloud-pubsub version: 2.18.4
- grpcio: 1.59.0
Description
Hello there,
We’ve encountered some publishing issues for a while now and ran out of ideas on how to fix them. The behaviour is the following. We publish messages into a few topics that are configured with an ordering key. Multiple times a week (or even a day), the publishing process fails and the client publisher stops (it seems to be stuck?), as no messages are published. On Google Cloud Monitoring, it usually states a ‘deadline exceeded’ and indeed, we do have timeouts when that happens (but no exception on the client side). This first observation is a bit weird though, given that we don’t publish massive loads of messages (just a few messages every second).
After (!) reading the documentation, this behaviour is supposed to be expected with ordering keys.
Code example
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True,
flow_control=PublishFlowControl(
message_limit=2000,
limit_exceeded_behavior=LimitExceededBehavior.BLOCK
),)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
batch_settings = BatchSettings(
1 * 1000 * 1000,
0.01,
1000,
)
publisher = pubsub_v1.PublisherClient(
batch_settings,
publisher_options=publisher_options,
client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
future.add_done_callback(resume_if_error)
def resume_if_error(future):
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")
We wanted to avoid calling result() in the main thread as it is blocking, but use a callback instead. Anyway, this does not work, as we still have the same issues.
Can anyone help?
Environment details
Description
Hello there,
We’ve encountered some publishing issues for a while now and ran out of ideas on how to fix them. The behaviour is the following. We publish messages into a few topics that are configured with an ordering key. Multiple times a week (or even a day), the publishing process fails and the client publisher stops (it seems to be stuck?), as no messages are published. On Google Cloud Monitoring, it usually states a ‘deadline exceeded’ and indeed, we do have timeouts when that happens (but no exception on the client side). This first observation is a bit weird though, given that we don’t publish massive loads of messages (just a few messages every second).
After (!) reading the documentation, this behaviour is supposed to be expected with ordering keys.
Code example
We wanted to avoid calling
result()in the main thread as it is blocking, but use a callback instead. Anyway, this does not work, as we still have the same issues.Can anyone help?