Skip to content

Commit 67caae0

Browse files
authored
Add system test for gcs_to_bigquery (#8556)
1 parent d8cb0b5 commit 67caae0

File tree

4 files changed

+58
-17
lines changed

4 files changed

+58
-17
lines changed

airflow/providers/google/cloud/example_dags/example_gcs_to_bq.py renamed to airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,56 @@
1515
# KIND, either express or implied. See the License for the
1616
# specific language governing permissions and limitations
1717
# under the License.
18+
1819
"""
1920
Example DAG using GCSToBigQueryOperator.
2021
"""
22+
23+
import os
24+
2125
from airflow import models
22-
from airflow.operators.bash import BashOperator
26+
from airflow.providers.google.cloud.operators.bigquery import (
27+
BigQueryCreateEmptyDatasetOperator, BigQueryDeleteDatasetOperator,
28+
)
2329
from airflow.providers.google.cloud.operators.gcs_to_bigquery import GCSToBigQueryOperator
2430
from airflow.utils.dates import days_ago
2531

32+
DATASET_NAME = os.environ.get("GCP_DATASET_NAME", 'airflow_test')
33+
TABLE_NAME = os.environ.get("GCP_TABLE_NAME", 'gcs_to_bq_table')
34+
2635
args = {
27-
'owner': 'airflow',
2836
'start_date': days_ago(2)
2937
}
3038

3139
dag = models.DAG(
32-
dag_id='example_gcs_to_bq_operator', default_args=args,
40+
dag_id='example_gcs_to_bigquery_operator', default_args=args,
3341
schedule_interval=None, tags=['example'])
3442

35-
create_test_dataset = BashOperator(
43+
create_test_dataset = BigQueryCreateEmptyDatasetOperator(
3644
task_id='create_airflow_test_dataset',
37-
bash_command='bq mk airflow_test',
38-
dag=dag)
45+
dataset_id=DATASET_NAME,
46+
dag=dag
47+
)
3948

40-
# [START howto_operator_gcs_to_bq]
49+
# [START howto_operator_gcs_to_bigquery]
4150
load_csv = GCSToBigQueryOperator(
42-
task_id='gcs_to_bq_example',
51+
task_id='gcs_to_bigquery_example',
4352
bucket='cloud-samples-data',
4453
source_objects=['bigquery/us-states/us-states.csv'],
45-
destination_project_dataset_table='airflow_test.gcs_to_bq_table',
54+
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
4655
schema_fields=[
4756
{'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE'},
4857
{'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE'},
4958
],
5059
write_disposition='WRITE_TRUNCATE',
5160
dag=dag)
52-
# [END howto_operator_gcs_to_bq]
61+
# [END howto_operator_gcs_to_bigquery]
5362

54-
delete_test_dataset = BashOperator(
63+
delete_test_dataset = BigQueryDeleteDatasetOperator(
5564
task_id='delete_airflow_test_dataset',
56-
bash_command='bq rm -rf airflow_test',
57-
dag=dag)
65+
dataset_id=DATASET_NAME,
66+
delete_contents=True,
67+
dag=dag
68+
)
5869

5970
create_test_dataset >> load_csv >> delete_test_dataset

docs/howto/operator/gcp/gcs.rst

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,10 @@ Use the
3838
:class:`~airflow.providers.google.cloud.operators.gcs_to_bigquery.GCSToBigQueryOperator`
3939
to execute a BigQuery load job.
4040

41-
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_bq.py
41+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_gcs_to_bigquery.py
4242
:language: python
43-
:start-after: [START howto_operator_gcs_to_bq]
44-
:end-before: [END howto_operator_gcs_to_bq]
43+
:start-after: [START howto_operator_gcs_to_bigquery]
44+
:end-before: [END howto_operator_gcs_to_bigquery]
4545

4646
.. _howto/operator:GCSBucketCreateAclEntryOperator:
4747

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
import pytest
20+
21+
from tests.providers.google.cloud.utils.gcp_authenticator import GCP_BIGQUERY_KEY
22+
from tests.test_utils.gcp_system_helpers import CLOUD_DAG_FOLDER, GoogleSystemTest, provide_gcp_context
23+
24+
25+
@pytest.mark.backend("mysql", "postgres")
26+
@pytest.mark.credential_file(GCP_BIGQUERY_KEY)
27+
class TestGoogleCloudStorageToBigQueryExample(GoogleSystemTest):
28+
29+
@provide_gcp_context(GCP_BIGQUERY_KEY)
30+
def test_run_example_dag_gcs_to_bigquery_operator(self):
31+
self.run_dag('example_gcs_to_bigquery_operator', CLOUD_DAG_FOLDER)

tests/test_project_structure.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ def test_keep_missing_test_files_update(self):
130130
class TestGoogleProviderProjectStructure(unittest.TestCase):
131131
MISSING_EXAMPLE_DAGS = {
132132
('cloud', 'text_to_speech'),
133-
('cloud', 'gcs_to_bigquery'),
134133
('cloud', 'adls_to_gcs'),
135134
('cloud', 'sql_to_gcs'),
136135
('cloud', 's3_to_gcs'),

0 commit comments

Comments
 (0)