Skip to content

Commit 529db07

Browse files
authored
Improve Google PubSub hook publish method (#7831)
1 parent 4bde99f commit 529db07

File tree

2 files changed

+7
-6
lines changed

2 files changed

+7
-6
lines changed

airflow/providers/google/cloud/example_dags/example_pubsub.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
) as example_sensor_dag:
5353
# [START howto_operator_gcp_pubsub_create_topic]
5454
create_topic = PubSubCreateTopicOperator(
55-
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID
55+
task_id="create_topic", topic=TOPIC_FOR_SENSOR_DAG, project_id=GCP_PROJECT_ID, fail_if_exists=False
5656
)
5757
# [END howto_operator_gcp_pubsub_create_topic]
5858

@@ -84,7 +84,7 @@
8484
task_id="publish_task",
8585
project_id=GCP_PROJECT_ID,
8686
topic=TOPIC_FOR_SENSOR_DAG,
87-
messages=[MESSAGE, MESSAGE, MESSAGE],
87+
messages=[MESSAGE] * 10,
8888
)
8989
# [END howto_operator_gcp_pubsub_publish]
9090

@@ -102,8 +102,8 @@
102102
)
103103
# [END howto_operator_gcp_pubsub_delete_topic]
104104

105-
create_topic >> subscribe_task >> publish_task
106-
subscribe_task >> pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic
105+
create_topic >> subscribe_task >> [publish_task, pull_messages]
106+
pull_messages >> pull_messages_result >> unsubscribe_task >> delete_topic
107107

108108

109109
with models.DAG(

airflow/providers/google/cloud/hooks/pubsub.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,14 @@ def publish(
112112
self.log.info("Publish %d messages to topic (path) %s", len(messages), topic_path)
113113
try:
114114
for message in messages:
115-
publisher.publish(
115+
future = publisher.publish(
116116
topic=topic_path,
117117
data=message.get("data", b''),
118118
**message.get('attributes', {})
119119
)
120+
future.result()
120121
except GoogleAPICallError as e:
121-
raise PubSubException('Error publishing to topic {}'.format(topic_path), e)
122+
raise PubSubException(f'Error publishing to topic {topic_path}', e)
122123

123124
self.log.info("Published %d messages to topic (path) %s", len(messages), topic_path)
124125

0 commit comments

Comments
 (0)