|
48 | 48 | from urllib.parse import urlparse |
49 | 49 |
|
50 | 50 | from airflow import models |
51 | | -from airflow.models.baseoperator import chain |
52 | 51 | from airflow.providers.google.cloud.operators.bigquery import ( |
53 | 52 | BigQueryCreateEmptyDatasetOperator, |
54 | 53 | BigQueryCreateExternalTableOperator, |
55 | 54 | BigQueryDeleteDatasetOperator, |
56 | 55 | BigQueryInsertJobOperator, |
57 | 56 | ) |
| 57 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
58 | 58 | from airflow.providers.google.firebase.operators.firestore import CloudFirestoreExportDatabaseOperator |
| 59 | +from airflow.utils.trigger_rule import TriggerRule |
59 | 60 |
|
| 61 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 62 | +DAG_ID = "example_google_firestore" |
60 | 63 | GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-gcp-project") |
61 | 64 | FIRESTORE_PROJECT_ID = os.environ.get("G_FIRESTORE_PROJECT_ID", "example-firebase-project") |
62 | 65 |
|
| 66 | +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" |
| 67 | +DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}" |
63 | 68 | EXPORT_DESTINATION_URL = os.environ.get("GCP_FIRESTORE_ARCHIVE_URL", "gs://INVALID BUCKET NAME/namespace/") |
64 | | -BUCKET_NAME = urlparse(EXPORT_DESTINATION_URL).hostname |
65 | 69 | EXPORT_PREFIX = urlparse(EXPORT_DESTINATION_URL).path |
66 | | - |
67 | 70 | EXPORT_COLLECTION_ID = os.environ.get("GCP_FIRESTORE_COLLECTION_ID", "firestore_collection_id") |
68 | | -DATASET_NAME = os.environ.get("GCP_FIRESTORE_DATASET_NAME", "test_firestore_export") |
69 | 71 | DATASET_LOCATION = os.environ.get("GCP_FIRESTORE_DATASET_LOCATION", "EU") |
70 | 72 |
|
71 | 73 | if BUCKET_NAME is None: |
72 | 74 | raise ValueError("Bucket name is required. Please set GCP_FIRESTORE_ARCHIVE_URL env variable.") |
73 | 75 |
|
74 | 76 | with models.DAG( |
75 | | - "example_google_firestore", |
| 77 | + DAG_ID, |
76 | 78 | start_date=datetime(2021, 1, 1), |
77 | 79 | schedule_interval='@once', |
78 | 80 | catchup=False, |
79 | | - tags=["example"], |
| 81 | + tags=["example", "firestore"], |
80 | 82 | ) as dag: |
81 | | - # [START howto_operator_export_database_to_gcs] |
82 | | - export_database_to_gcs = CloudFirestoreExportDatabaseOperator( |
83 | | - task_id="export_database_to_gcs", |
84 | | - project_id=FIRESTORE_PROJECT_ID, |
85 | | - body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]}, |
86 | | - ) |
87 | | - # [END howto_operator_export_database_to_gcs] |
| 83 | + create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME) |
88 | 84 |
|
89 | 85 | create_dataset = BigQueryCreateEmptyDatasetOperator( |
90 | 86 | task_id="create_dataset", |
|
93 | 89 | project_id=GCP_PROJECT_ID, |
94 | 90 | ) |
95 | 91 |
|
96 | | - delete_dataset = BigQueryDeleteDatasetOperator( |
97 | | - task_id="delete_dataset", dataset_id=DATASET_NAME, project_id=GCP_PROJECT_ID, delete_contents=True |
| 92 | + # [START howto_operator_export_database_to_gcs] |
| 93 | + export_database_to_gcs = CloudFirestoreExportDatabaseOperator( |
| 94 | + task_id="export_database_to_gcs", |
| 95 | + project_id=FIRESTORE_PROJECT_ID, |
| 96 | + body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]}, |
98 | 97 | ) |
| 98 | + # [END howto_operator_export_database_to_gcs] |
99 | 99 |
|
100 | 100 | # [START howto_operator_create_external_table_multiple_types] |
101 | 101 | create_external_table_multiple_types = BigQueryCreateExternalTableOperator( |
|
132 | 132 | }, |
133 | 133 | ) |
134 | 134 |
|
135 | | - chain( |
136 | | - # Firestore |
137 | | - export_database_to_gcs, |
138 | | - # BigQuery |
139 | | - create_dataset, |
140 | | - create_external_table_multiple_types, |
141 | | - read_data_from_gcs_multiple_types, |
142 | | - delete_dataset, |
| 135 | + delete_dataset = BigQueryDeleteDatasetOperator( |
| 136 | + task_id="delete_dataset", |
| 137 | + dataset_id=DATASET_NAME, |
| 138 | + project_id=GCP_PROJECT_ID, |
| 139 | + delete_contents=True, |
| 140 | + trigger_rule=TriggerRule.ALL_DONE, |
143 | 141 | ) |
| 142 | + |
| 143 | + delete_bucket = GCSDeleteBucketOperator( |
| 144 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 145 | + ) |
| 146 | + |
| 147 | + ( |
| 148 | + # TEST SETUP |
| 149 | + create_bucket |
| 150 | + >> create_dataset |
| 151 | + # TEST BODY |
| 152 | + >> export_database_to_gcs |
| 153 | + >> create_external_table_multiple_types |
| 154 | + >> read_data_from_gcs_multiple_types |
| 155 | + # TEST TEARDOWN |
| 156 | + >> delete_dataset |
| 157 | + >> delete_bucket |
| 158 | + ) |
| 159 | + |
| 160 | + from tests.system.utils.watcher import watcher |
| 161 | + |
| 162 | + # This test needs watcher in order to properly mark success/failure |
| 163 | + # when "tearDown" task with trigger rule is part of the DAG |
| 164 | + list(dag.tasks) >> watcher() |
| 165 | + |
| 166 | + |
| 167 | +from tests.system.utils import get_test_run # noqa: E402 |
| 168 | + |
| 169 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 170 | +test_run = get_test_run(dag) |
0 commit comments