Skip to content

Commit fb51e04

Browse files
authored
Migrate Google firestore example to new design AIP-47 (#24830)
related: #22447, #22430
1 parent f5cd2c3 commit fb51e04

File tree

5 files changed

+52
-89
lines changed

5 files changed

+52
-89
lines changed

airflow/providers/google/firebase/example_dags/__init__.py

Lines changed: 0 additions & 16 deletions
This file was deleted.

docs/apache-airflow-providers-google/example-dags.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ You can learn how to use Google integrations by analyzing the source code of the
2323
* `Google Ads <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/ads/example_dags>`__
2424
* `Google Cloud (legacy) <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/cloud/example_dags>`__
2525
* `Google Cloud <https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/cloud>`__
26-
* `Google Firebase <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/firebase/example_dags>`__
26+
* `Google Firebase <https://github.com/apache/airflow/tree/providers-google/8.1.0/tests/system/providers/google/firebase>`__
2727
* `Google Marketing Platform <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/marketing_platform/example_dags>`__
2828
* `Google Workplace <https://github.com/apache/airflow/tree/providers-google/8.0.0/airflow/providers/google/suite/example_dags>`__ (formerly Google Suite)
2929
* `Google LevelDB <https://github.com/apache/airflow/tree/providers-google/8.0.0/tests/system/providers/google/leveldb>`__

docs/apache-airflow-providers-google/operators/firebase/firestore.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ Export database
4141
Exports a copy of all or a subset of documents from Google Cloud Firestore to Google Cloud Storage is performed with the
4242
:class:`~airflow.providers.google.firebase.operators.firestore.CloudFirestoreExportDatabaseOperator` operator.
4343

44-
.. exampleinclude:: /../../airflow/providers/google/firebase/example_dags/example_firestore.py
44+
.. exampleinclude:: /../../tests/system/providers/google/cloud/gcs/example_firestore.py
4545
:language: python
4646
:dedent: 4
4747
:start-after: [START howto_operator_export_database_to_gcs]

tests/providers/google/firebase/operators/test_firestore_system.py

Lines changed: 0 additions & 48 deletions
This file was deleted.

airflow/providers/google/firebase/example_dags/example_firestore.py renamed to tests/system/providers/google/cloud/gcs/example_firestore.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,43 +48,39 @@
4848
from urllib.parse import urlparse
4949

5050
from airflow import models
51-
from airflow.models.baseoperator import chain
5251
from airflow.providers.google.cloud.operators.bigquery import (
5352
BigQueryCreateEmptyDatasetOperator,
5453
BigQueryCreateExternalTableOperator,
5554
BigQueryDeleteDatasetOperator,
5655
BigQueryInsertJobOperator,
5756
)
57+
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
5858
from airflow.providers.google.firebase.operators.firestore import CloudFirestoreExportDatabaseOperator
59+
from airflow.utils.trigger_rule import TriggerRule
5960

61+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
62+
DAG_ID = "example_google_firestore"
6063
GCP_PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-gcp-project")
6164
FIRESTORE_PROJECT_ID = os.environ.get("G_FIRESTORE_PROJECT_ID", "example-firebase-project")
6265

66+
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
67+
DATASET_NAME = f"dataset_{DAG_ID}_{ENV_ID}"
6368
EXPORT_DESTINATION_URL = os.environ.get("GCP_FIRESTORE_ARCHIVE_URL", "gs://INVALID BUCKET NAME/namespace/")
64-
BUCKET_NAME = urlparse(EXPORT_DESTINATION_URL).hostname
6569
EXPORT_PREFIX = urlparse(EXPORT_DESTINATION_URL).path
66-
6770
EXPORT_COLLECTION_ID = os.environ.get("GCP_FIRESTORE_COLLECTION_ID", "firestore_collection_id")
68-
DATASET_NAME = os.environ.get("GCP_FIRESTORE_DATASET_NAME", "test_firestore_export")
6971
DATASET_LOCATION = os.environ.get("GCP_FIRESTORE_DATASET_LOCATION", "EU")
7072

7173
if BUCKET_NAME is None:
7274
raise ValueError("Bucket name is required. Please set GCP_FIRESTORE_ARCHIVE_URL env variable.")
7375

7476
with models.DAG(
75-
"example_google_firestore",
77+
DAG_ID,
7678
start_date=datetime(2021, 1, 1),
7779
schedule_interval='@once',
7880
catchup=False,
79-
tags=["example"],
81+
tags=["example", "firestore"],
8082
) as dag:
81-
# [START howto_operator_export_database_to_gcs]
82-
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
83-
task_id="export_database_to_gcs",
84-
project_id=FIRESTORE_PROJECT_ID,
85-
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]},
86-
)
87-
# [END howto_operator_export_database_to_gcs]
83+
create_bucket = GCSCreateBucketOperator(task_id="create_bucket", bucket_name=BUCKET_NAME)
8884

8985
create_dataset = BigQueryCreateEmptyDatasetOperator(
9086
task_id="create_dataset",
@@ -93,9 +89,13 @@
9389
project_id=GCP_PROJECT_ID,
9490
)
9591

96-
delete_dataset = BigQueryDeleteDatasetOperator(
97-
task_id="delete_dataset", dataset_id=DATASET_NAME, project_id=GCP_PROJECT_ID, delete_contents=True
92+
# [START howto_operator_export_database_to_gcs]
93+
export_database_to_gcs = CloudFirestoreExportDatabaseOperator(
94+
task_id="export_database_to_gcs",
95+
project_id=FIRESTORE_PROJECT_ID,
96+
body={"outputUriPrefix": EXPORT_DESTINATION_URL, "collectionIds": [EXPORT_COLLECTION_ID]},
9897
)
98+
# [END howto_operator_export_database_to_gcs]
9999

100100
# [START howto_operator_create_external_table_multiple_types]
101101
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
@@ -132,12 +132,39 @@
132132
},
133133
)
134134

135-
chain(
136-
# Firestore
137-
export_database_to_gcs,
138-
# BigQuery
139-
create_dataset,
140-
create_external_table_multiple_types,
141-
read_data_from_gcs_multiple_types,
142-
delete_dataset,
135+
delete_dataset = BigQueryDeleteDatasetOperator(
136+
task_id="delete_dataset",
137+
dataset_id=DATASET_NAME,
138+
project_id=GCP_PROJECT_ID,
139+
delete_contents=True,
140+
trigger_rule=TriggerRule.ALL_DONE,
143141
)
142+
143+
delete_bucket = GCSDeleteBucketOperator(
144+
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
145+
)
146+
147+
(
148+
# TEST SETUP
149+
create_bucket
150+
>> create_dataset
151+
# TEST BODY
152+
>> export_database_to_gcs
153+
>> create_external_table_multiple_types
154+
>> read_data_from_gcs_multiple_types
155+
# TEST TEARDOWN
156+
>> delete_dataset
157+
>> delete_bucket
158+
)
159+
160+
from tests.system.utils.watcher import watcher
161+
162+
# This test needs watcher in order to properly mark success/failure
163+
# when "tearDown" task with trigger rule is part of the DAG
164+
list(dag.tasks) >> watcher()
165+
166+
167+
from tests.system.utils import get_test_run # noqa: E402
168+
169+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
170+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)