Skip to content

Commit b198a1f

Browse files
Create guide for BigQuery operators (#8276)
1 parent 54d3c9a commit b198a1f

File tree

4 files changed

+414
-6
lines changed

4 files changed

+414
-6
lines changed

airflow/providers/google/cloud/example_dags/example_bigquery.py

Lines changed: 68 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@
2626
from airflow import models
2727
from airflow.operators.bash import BashOperator
2828
from airflow.providers.google.cloud.operators.bigquery import (
29-
BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryCreateExternalTableOperator,
30-
BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator, BigQueryExecuteQueryOperator,
31-
BigQueryGetDataOperator, BigQueryGetDatasetOperator, BigQueryGetDatasetTablesOperator,
32-
BigQueryPatchDatasetOperator, BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator,
29+
BigQueryCheckOperator, BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator,
30+
BigQueryCreateExternalTableOperator, BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator,
31+
BigQueryExecuteQueryOperator, BigQueryGetDataOperator, BigQueryGetDatasetOperator,
32+
BigQueryGetDatasetTablesOperator, BigQueryIntervalCheckOperator, BigQueryPatchDatasetOperator,
33+
BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator, BigQueryValueCheckOperator,
3334
)
3435
from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator
3536
from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator
@@ -40,6 +41,7 @@
4041

4142
default_args = {"start_date": days_ago(1)}
4243

44+
# [START howto_operator_bigquery_query]
4345
MOST_VALUABLE_INCOMING_TRANSACTIONS = """
4446
SELECT
4547
value, to_address
@@ -52,6 +54,7 @@
5254
ORDER BY value DESC
5355
LIMIT 1000
5456
"""
57+
# [END howto_operator_bigquery_query]
5558

5659
MOST_ACTIVE_PLAYERS = """
5760
SELECT
@@ -91,6 +94,7 @@
9194
tags=['example'],
9295
) as dag:
9396

97+
# [START howto_operator_bigquery_execute_query]
9498
execute_query = BigQueryExecuteQueryOperator(
9599
task_id="execute_query",
96100
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
@@ -103,7 +107,9 @@
103107
}
104108
],
105109
)
110+
# [END howto_operator_bigquery_execute_query]
106111

112+
# [START howto_operator_bigquery_execute_query_list]
107113
bigquery_execute_multi_query = BigQueryExecuteQueryOperator(
108114
task_id="execute_multi_query",
109115
sql=[MOST_VALUABLE_INCOMING_TRANSACTIONS, MOST_ACTIVE_PLAYERS],
@@ -116,7 +122,9 @@
116122
}
117123
],
118124
)
125+
# [END howto_operator_bigquery_execute_query_list]
119126

127+
# [START howto_operator_bigquery_execute_query_save]
120128
execute_query_save = BigQueryExecuteQueryOperator(
121129
task_id="execute_query_save",
122130
sql=MOST_VALUABLE_INCOMING_TRANSACTIONS,
@@ -130,19 +138,23 @@
130138
}
131139
],
132140
)
141+
# [END howto_operator_bigquery_execute_query_save]
133142

143+
# [START howto_operator_bigquery_get_data]
134144
get_data = BigQueryGetDataOperator(
135145
task_id="get_data",
136146
dataset_id=DATASET_NAME,
137147
table_id="save_query_result",
138148
max_results="10",
139149
selected_fields="value,to_address",
140150
)
151+
# [END howto_operator_bigquery_get_data]
141152

142153
get_data_result = BashOperator(
143154
task_id="get_data_result", bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\""
144155
)
145156

157+
# [START howto_operator_bigquery_create_external_table]
146158
create_external_table = BigQueryCreateExternalTableOperator(
147159
task_id="create_external_table",
148160
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
@@ -151,6 +163,7 @@
151163
skip_leading_rows=1,
152164
schema_fields=[{"name": "name", "type": "STRING"}, {"name": "post_abbr", "type": "STRING"}],
153165
)
166+
# [END howto_operator_bigquery_create_external_table]
154167

155168
execute_query_external_table = BigQueryExecuteQueryOperator(
156169
task_id="execute_query_external_table",
@@ -171,14 +184,17 @@
171184
destination_cloud_storage_uris=["gs://{}/export-bigquery.csv".format(DATA_EXPORT_BUCKET_NAME)],
172185
)
173186

187+
# [START howto_operator_bigquery_create_dataset]
174188
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)
189+
# [END howto_operator_bigquery_create_dataset]
175190

176191
create_dataset_with_location = BigQueryCreateEmptyDatasetOperator(
177192
task_id="create_dataset_with_location",
178193
dataset_id=LOCATION_DATASET_NAME,
179194
location=BQ_LOCATION
180195
)
181196

197+
# [START howto_operator_bigquery_create_table]
182198
create_table = BigQueryCreateEmptyTableOperator(
183199
task_id="create_table",
184200
dataset_id=DATASET_NAME,
@@ -188,6 +204,7 @@
188204
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
189205
],
190206
)
207+
# [END howto_operator_bigquery_create_table]
191208

192209
create_table_with_location = BigQueryCreateEmptyTableOperator(
193210
task_id="create_table_with_location",
@@ -199,6 +216,7 @@
199216
],
200217
)
201218

219+
# [START howto_operator_bigquery_create_view]
202220
create_view = BigQueryCreateEmptyTableOperator(
203221
task_id="create_view",
204222
dataset_id=LOCATION_DATASET_NAME,
@@ -208,52 +226,68 @@
208226
"useLegacySql": False
209227
}
210228
)
229+
# [END howto_operator_bigquery_create_view]
211230

212231
get_empty_dataset_tables = BigQueryGetDatasetTablesOperator(
213232
task_id="get_empty_dataset_tables",
214233
dataset_id=DATASET_NAME
215234
)
216235

236+
# [START howto_operator_bigquery_get_dataset_tables]
217237
get_dataset_tables = BigQueryGetDatasetTablesOperator(
218238
task_id="get_dataset_tables",
219239
dataset_id=DATASET_NAME
220240
)
241+
# [END howto_operator_bigquery_get_dataset_tables]
221242

243+
# [START howto_operator_bigquery_delete_view]
222244
delete_view = BigQueryDeleteTableOperator(
223245
task_id="delete_view", deletion_dataset_table="{}.test_view".format(DATASET_NAME)
224246
)
247+
# [END howto_operator_bigquery_delete_view]
225248

249+
# [START howto_operator_bigquery_delete_table]
226250
delete_table = BigQueryDeleteTableOperator(
227251
task_id="delete_table", deletion_dataset_table="{}.test_table".format(DATASET_NAME)
228252
)
253+
# [END howto_operator_bigquery_delete_table]
229254

255+
# [START howto_operator_bigquery_get_dataset]
230256
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
257+
# [END howto_operator_bigquery_get_dataset]
231258

232259
get_dataset_result = BashOperator(
233260
task_id="get_dataset_result",
234261
bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"",
235262
)
236263

264+
# [START howto_operator_bigquery_patch_dataset]
237265
patch_dataset = BigQueryPatchDatasetOperator(
238266
task_id="patch_dataset",
239267
dataset_id=DATASET_NAME,
240268
dataset_resource={"friendlyName": "Patched Dataset", "description": "Patched dataset"},
241269
)
270+
# [END howto_operator_bigquery_patch_dataset]
242271

272+
# [START howto_operator_bigquery_update_dataset]
243273
update_dataset = BigQueryUpdateDatasetOperator(
244274
task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"}
245275
)
276+
# [END howto_operator_bigquery_update_dataset]
246277

278+
# [START howto_operator_bigquery_delete_dataset]
247279
delete_dataset = BigQueryDeleteDatasetOperator(
248280
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
249281
)
282+
# [END howto_operator_bigquery_delete_dataset]
250283

251284
delete_dataset_with_location = BigQueryDeleteDatasetOperator(
252285
task_id="delete_dataset_with_location",
253286
dataset_id=LOCATION_DATASET_NAME,
254287
delete_contents=True
255288
)
256289

290+
# [START howto_operator_bigquery_upsert_table]
257291
update_table = BigQueryUpsertTableOperator(
258292
task_id="update_table", dataset_id=DATASET_NAME, table_resource={
259293
"tableReference": {
@@ -262,6 +296,34 @@
262296
"expirationTime": (int(time.time()) + 300) * 1000
263297
}
264298
)
299+
# [END howto_operator_bigquery_upsert_table]
300+
301+
# [START howto_operator_bigquery_check]
302+
check_count = BigQueryCheckOperator(
303+
task_id="check_count",
304+
sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME),
305+
use_legacy_sql=False,
306+
)
307+
# [END howto_operator_bigquery_check]
308+
309+
# [START howto_operator_bigquery_value_check]
310+
check_value = BigQueryValueCheckOperator(
311+
task_id="check_value",
312+
sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME),
313+
pass_value=1000,
314+
use_legacy_sql=False,
315+
)
316+
# [END howto_operator_bigquery_value_check]
317+
318+
# [START howto_operator_bigquery_interval_check]
319+
check_interval = BigQueryIntervalCheckOperator(
320+
task_id="check_interval",
321+
table="{}.save_query_result".format(DATASET_NAME),
322+
days_back=1,
323+
metrics_thresholds={'COUNT(*)': 1.5},
324+
use_legacy_sql=False,
325+
)
326+
# [END howto_operator_bigquery_interval_check]
265327

266328
create_dataset >> execute_query_save >> delete_dataset
267329
create_dataset >> get_empty_dataset_tables >> create_table >> get_dataset_tables >> delete_dataset
@@ -275,3 +337,5 @@
275337
create_table >> create_view >> delete_view >> delete_table >> delete_dataset
276338
create_dataset_with_location >> create_table_with_location >> delete_dataset_with_location
277339
create_dataset >> create_table >> update_table >> delete_table >> delete_dataset
340+
create_dataset >> execute_query_save >> check_count >> check_value >> \
341+
check_interval >> delete_dataset

0 commit comments

Comments
 (0)