|
26 | 26 | from airflow import models |
27 | 27 | from airflow.operators.bash import BashOperator |
28 | 28 | from airflow.providers.google.cloud.operators.bigquery import ( |
29 | | - BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, BigQueryCreateExternalTableOperator, |
30 | | - BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator, BigQueryExecuteQueryOperator, |
31 | | - BigQueryGetDataOperator, BigQueryGetDatasetOperator, BigQueryGetDatasetTablesOperator, |
32 | | - BigQueryPatchDatasetOperator, BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator, |
| 29 | + BigQueryCheckOperator, BigQueryCreateEmptyDatasetOperator, BigQueryCreateEmptyTableOperator, |
| 30 | + BigQueryCreateExternalTableOperator, BigQueryDeleteDatasetOperator, BigQueryDeleteTableOperator, |
| 31 | + BigQueryExecuteQueryOperator, BigQueryGetDataOperator, BigQueryGetDatasetOperator, |
| 32 | + BigQueryGetDatasetTablesOperator, BigQueryIntervalCheckOperator, BigQueryPatchDatasetOperator, |
| 33 | + BigQueryUpdateDatasetOperator, BigQueryUpsertTableOperator, BigQueryValueCheckOperator, |
33 | 34 | ) |
34 | 35 | from airflow.providers.google.cloud.operators.bigquery_to_bigquery import BigQueryToBigQueryOperator |
35 | 36 | from airflow.providers.google.cloud.operators.bigquery_to_gcs import BigQueryToGCSOperator |
|
40 | 41 |
|
41 | 42 | default_args = {"start_date": days_ago(1)} |
42 | 43 |
|
| 44 | +# [START howto_operator_bigquery_query] |
43 | 45 | MOST_VALUABLE_INCOMING_TRANSACTIONS = """ |
44 | 46 | SELECT |
45 | 47 | value, to_address |
|
52 | 54 | ORDER BY value DESC |
53 | 55 | LIMIT 1000 |
54 | 56 | """ |
| 57 | +# [END howto_operator_bigquery_query] |
55 | 58 |
|
56 | 59 | MOST_ACTIVE_PLAYERS = """ |
57 | 60 | SELECT |
|
91 | 94 | tags=['example'], |
92 | 95 | ) as dag: |
93 | 96 |
|
| 97 | + # [START howto_operator_bigquery_execute_query] |
94 | 98 | execute_query = BigQueryExecuteQueryOperator( |
95 | 99 | task_id="execute_query", |
96 | 100 | sql=MOST_VALUABLE_INCOMING_TRANSACTIONS, |
|
103 | 107 | } |
104 | 108 | ], |
105 | 109 | ) |
| 110 | + # [END howto_operator_bigquery_execute_query] |
106 | 111 |
|
| 112 | + # [START howto_operator_bigquery_execute_query_list] |
107 | 113 | bigquery_execute_multi_query = BigQueryExecuteQueryOperator( |
108 | 114 | task_id="execute_multi_query", |
109 | 115 | sql=[MOST_VALUABLE_INCOMING_TRANSACTIONS, MOST_ACTIVE_PLAYERS], |
|
116 | 122 | } |
117 | 123 | ], |
118 | 124 | ) |
| 125 | + # [END howto_operator_bigquery_execute_query_list] |
119 | 126 |
|
| 127 | + # [START howto_operator_bigquery_execute_query_save] |
120 | 128 | execute_query_save = BigQueryExecuteQueryOperator( |
121 | 129 | task_id="execute_query_save", |
122 | 130 | sql=MOST_VALUABLE_INCOMING_TRANSACTIONS, |
|
130 | 138 | } |
131 | 139 | ], |
132 | 140 | ) |
| 141 | + # [END howto_operator_bigquery_execute_query_save] |
133 | 142 |
|
| 143 | + # [START howto_operator_bigquery_get_data] |
134 | 144 | get_data = BigQueryGetDataOperator( |
135 | 145 | task_id="get_data", |
136 | 146 | dataset_id=DATASET_NAME, |
137 | 147 | table_id="save_query_result", |
138 | 148 | max_results="10", |
139 | 149 | selected_fields="value,to_address", |
140 | 150 | ) |
| 151 | + # [END howto_operator_bigquery_get_data] |
141 | 152 |
|
142 | 153 | get_data_result = BashOperator( |
143 | 154 | task_id="get_data_result", bash_command="echo \"{{ task_instance.xcom_pull('get_data') }}\"" |
144 | 155 | ) |
145 | 156 |
|
| 157 | + # [START howto_operator_bigquery_create_external_table] |
146 | 158 | create_external_table = BigQueryCreateExternalTableOperator( |
147 | 159 | task_id="create_external_table", |
148 | 160 | bucket=DATA_SAMPLE_GCS_BUCKET_NAME, |
|
151 | 163 | skip_leading_rows=1, |
152 | 164 | schema_fields=[{"name": "name", "type": "STRING"}, {"name": "post_abbr", "type": "STRING"}], |
153 | 165 | ) |
| 166 | + # [END howto_operator_bigquery_create_external_table] |
154 | 167 |
|
155 | 168 | execute_query_external_table = BigQueryExecuteQueryOperator( |
156 | 169 | task_id="execute_query_external_table", |
|
171 | 184 | destination_cloud_storage_uris=["gs://{}/export-bigquery.csv".format(DATA_EXPORT_BUCKET_NAME)], |
172 | 185 | ) |
173 | 186 |
|
| 187 | + # [START howto_operator_bigquery_create_dataset] |
174 | 188 | create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME) |
| 189 | + # [END howto_operator_bigquery_create_dataset] |
175 | 190 |
|
176 | 191 | create_dataset_with_location = BigQueryCreateEmptyDatasetOperator( |
177 | 192 | task_id="create_dataset_with_location", |
178 | 193 | dataset_id=LOCATION_DATASET_NAME, |
179 | 194 | location=BQ_LOCATION |
180 | 195 | ) |
181 | 196 |
|
| 197 | + # [START howto_operator_bigquery_create_table] |
182 | 198 | create_table = BigQueryCreateEmptyTableOperator( |
183 | 199 | task_id="create_table", |
184 | 200 | dataset_id=DATASET_NAME, |
|
188 | 204 | {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, |
189 | 205 | ], |
190 | 206 | ) |
| 207 | + # [END howto_operator_bigquery_create_table] |
191 | 208 |
|
192 | 209 | create_table_with_location = BigQueryCreateEmptyTableOperator( |
193 | 210 | task_id="create_table_with_location", |
|
199 | 216 | ], |
200 | 217 | ) |
201 | 218 |
|
| 219 | + # [START howto_operator_bigquery_create_view] |
202 | 220 | create_view = BigQueryCreateEmptyTableOperator( |
203 | 221 | task_id="create_view", |
204 | 222 | dataset_id=LOCATION_DATASET_NAME, |
|
208 | 226 | "useLegacySql": False |
209 | 227 | } |
210 | 228 | ) |
| 229 | + # [END howto_operator_bigquery_create_view] |
211 | 230 |
|
212 | 231 | get_empty_dataset_tables = BigQueryGetDatasetTablesOperator( |
213 | 232 | task_id="get_empty_dataset_tables", |
214 | 233 | dataset_id=DATASET_NAME |
215 | 234 | ) |
216 | 235 |
|
| 236 | + # [START howto_operator_bigquery_get_dataset_tables] |
217 | 237 | get_dataset_tables = BigQueryGetDatasetTablesOperator( |
218 | 238 | task_id="get_dataset_tables", |
219 | 239 | dataset_id=DATASET_NAME |
220 | 240 | ) |
| 241 | + # [END howto_operator_bigquery_get_dataset_tables] |
221 | 242 |
|
| 243 | + # [START howto_operator_bigquery_delete_view] |
222 | 244 | delete_view = BigQueryDeleteTableOperator( |
223 | 245 | task_id="delete_view", deletion_dataset_table="{}.test_view".format(DATASET_NAME) |
224 | 246 | ) |
| 247 | + # [END howto_operator_bigquery_delete_view] |
225 | 248 |
|
| 249 | + # [START howto_operator_bigquery_delete_table] |
226 | 250 | delete_table = BigQueryDeleteTableOperator( |
227 | 251 | task_id="delete_table", deletion_dataset_table="{}.test_table".format(DATASET_NAME) |
228 | 252 | ) |
| 253 | + # [END howto_operator_bigquery_delete_table] |
229 | 254 |
|
| 255 | + # [START howto_operator_bigquery_get_dataset] |
230 | 256 | get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME) |
| 257 | + # [END howto_operator_bigquery_get_dataset] |
231 | 258 |
|
232 | 259 | get_dataset_result = BashOperator( |
233 | 260 | task_id="get_dataset_result", |
234 | 261 | bash_command="echo \"{{ task_instance.xcom_pull('get-dataset')['id'] }}\"", |
235 | 262 | ) |
236 | 263 |
|
| 264 | + # [START howto_operator_bigquery_patch_dataset] |
237 | 265 | patch_dataset = BigQueryPatchDatasetOperator( |
238 | 266 | task_id="patch_dataset", |
239 | 267 | dataset_id=DATASET_NAME, |
240 | 268 | dataset_resource={"friendlyName": "Patched Dataset", "description": "Patched dataset"}, |
241 | 269 | ) |
| 270 | + # [END howto_operator_bigquery_patch_dataset] |
242 | 271 |
|
| 272 | + # [START howto_operator_bigquery_update_dataset] |
243 | 273 | update_dataset = BigQueryUpdateDatasetOperator( |
244 | 274 | task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"} |
245 | 275 | ) |
| 276 | + # [END howto_operator_bigquery_update_dataset] |
246 | 277 |
|
| 278 | + # [START howto_operator_bigquery_delete_dataset] |
247 | 279 | delete_dataset = BigQueryDeleteDatasetOperator( |
248 | 280 | task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True |
249 | 281 | ) |
| 282 | + # [END howto_operator_bigquery_delete_dataset] |
250 | 283 |
|
251 | 284 | delete_dataset_with_location = BigQueryDeleteDatasetOperator( |
252 | 285 | task_id="delete_dataset_with_location", |
253 | 286 | dataset_id=LOCATION_DATASET_NAME, |
254 | 287 | delete_contents=True |
255 | 288 | ) |
256 | 289 |
|
| 290 | + # [START howto_operator_bigquery_upsert_table] |
257 | 291 | update_table = BigQueryUpsertTableOperator( |
258 | 292 | task_id="update_table", dataset_id=DATASET_NAME, table_resource={ |
259 | 293 | "tableReference": { |
|
262 | 296 | "expirationTime": (int(time.time()) + 300) * 1000 |
263 | 297 | } |
264 | 298 | ) |
| 299 | + # [END howto_operator_bigquery_upsert_table] |
| 300 | + |
| 301 | + # [START howto_operator_bigquery_check] |
| 302 | + check_count = BigQueryCheckOperator( |
| 303 | + task_id="check_count", |
| 304 | + sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME), |
| 305 | + use_legacy_sql=False, |
| 306 | + ) |
| 307 | + # [END howto_operator_bigquery_check] |
| 308 | + |
| 309 | + # [START howto_operator_bigquery_value_check] |
| 310 | + check_value = BigQueryValueCheckOperator( |
| 311 | + task_id="check_value", |
| 312 | + sql="SELECT COUNT(*) FROM {}.save_query_result".format(DATASET_NAME), |
| 313 | + pass_value=1000, |
| 314 | + use_legacy_sql=False, |
| 315 | + ) |
| 316 | + # [END howto_operator_bigquery_value_check] |
| 317 | + |
| 318 | + # [START howto_operator_bigquery_interval_check] |
| 319 | + check_interval = BigQueryIntervalCheckOperator( |
| 320 | + task_id="check_interval", |
| 321 | + table="{}.save_query_result".format(DATASET_NAME), |
| 322 | + days_back=1, |
| 323 | + metrics_thresholds={'COUNT(*)': 1.5}, |
| 324 | + use_legacy_sql=False, |
| 325 | + ) |
| 326 | + # [END howto_operator_bigquery_interval_check] |
265 | 327 |
|
266 | 328 | create_dataset >> execute_query_save >> delete_dataset |
267 | 329 | create_dataset >> get_empty_dataset_tables >> create_table >> get_dataset_tables >> delete_dataset |
|
275 | 337 | create_table >> create_view >> delete_view >> delete_table >> delete_dataset |
276 | 338 | create_dataset_with_location >> create_table_with_location >> delete_dataset_with_location |
277 | 339 | create_dataset >> create_table >> update_table >> delete_table >> delete_dataset |
| 340 | + create_dataset >> execute_query_save >> check_count >> check_value >> \ |
| 341 | + check_interval >> delete_dataset |
0 commit comments