Skip to content

Commit 4dc2c40

Browse files
authored
Fix GCSToGoogleDriveOperator and gdrive system tests (#34545)
1 parent b7f532a commit 4dc2c40

File tree

6 files changed

+281
-48
lines changed

6 files changed

+281
-48
lines changed

airflow/providers/google/suite/transfers/gcs_to_gdrive.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ def __init__(
9595
source_bucket: str,
9696
source_object: str,
9797
destination_object: str | None = None,
98-
destination_folder_id: str | None = None,
98+
destination_folder_id: str = "root",
9999
move_object: bool = False,
100100
gcp_conn_id: str = "google_cloud_default",
101101
impersonation_chain: str | Sequence[str] | None = None,

tests/providers/google/suite/transfers/test_gcs_to_gdrive.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def test_should_copy_single_file(self, mock_named_temporary_file, mock_gdrive, m
6666
mock.call().upload_file(
6767
local_location="TMP1",
6868
remote_location="copied_sales/2017/january-backup.avro",
69-
folder_id=None,
69+
folder_id="root",
7070
),
7171
]
7272
)
@@ -156,13 +156,13 @@ def test_should_copy_files(self, mock_named_temporary_file, mock_gdrive, mock_gc
156156
impersonation_chain=IMPERSONATION_CHAIN,
157157
),
158158
mock.call().upload_file(
159-
local_location="TMP1", remote_location="sales/A.avro", folder_id=None
159+
local_location="TMP1", remote_location="sales/A.avro", folder_id="root"
160160
),
161161
mock.call().upload_file(
162-
local_location="TMP2", remote_location="sales/B.avro", folder_id=None
162+
local_location="TMP2", remote_location="sales/B.avro", folder_id="root"
163163
),
164164
mock.call().upload_file(
165-
local_location="TMP3", remote_location="sales/C.avro", folder_id=None
165+
local_location="TMP3", remote_location="sales/C.avro", folder_id="root"
166166
),
167167
]
168168
)
@@ -210,13 +210,13 @@ def test_should_move_files(self, mock_named_temporary_file, mock_gdrive, mock_gc
210210
impersonation_chain=IMPERSONATION_CHAIN,
211211
),
212212
mock.call().upload_file(
213-
local_location="TMP1", remote_location="sales/A.avro", folder_id=None
213+
local_location="TMP1", remote_location="sales/A.avro", folder_id="root"
214214
),
215215
mock.call().upload_file(
216-
local_location="TMP2", remote_location="sales/B.avro", folder_id=None
216+
local_location="TMP2", remote_location="sales/B.avro", folder_id="root"
217217
),
218218
mock.call().upload_file(
219-
local_location="TMP3", remote_location="sales/C.avro", folder_id=None
219+
local_location="TMP3", remote_location="sales/C.avro", folder_id="root"
220220
),
221221
]
222222
)

tests/system/providers/google/cloud/gcs/example_gcs_to_gdrive.py

Lines changed: 78 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,109 +23,172 @@
2323
"""
2424
from __future__ import annotations
2525

26+
import json
27+
import logging
2628
import os
2729
from datetime import datetime
2830
from pathlib import Path
2931

32+
from airflow.decorators import task
33+
from airflow.models import Connection
3034
from airflow.models.dag import DAG
35+
from airflow.operators.bash import BashOperator
3136
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
3237
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
38+
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
3339
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
40+
from airflow.settings import Session
3441
from airflow.utils.trigger_rule import TriggerRule
3542

3643
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
3744
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
38-
FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", "abcd1234")
45+
FOLDER_ID = os.environ.get("GCP_GDRIVE_FOLDER_ID", None)
3946

4047
DAG_ID = "example_gcs_to_gdrive"
4148

4249
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
50+
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
4351

4452
TMP_PATH = "tmp"
45-
53+
WORK_DIR = f"folder_{DAG_ID}_{ENV_ID}".replace("-", "_")
4654
CURRENT_FOLDER = Path(__file__).parent
4755
LOCAL_PATH = str(Path(CURRENT_FOLDER) / "resources")
48-
4956
FILE_LOCAL_PATH = str(Path(LOCAL_PATH))
5057
FILE_NAME = "example_upload.txt"
5158

59+
log = logging.getLogger(__name__)
60+
5261

5362
with DAG(
5463
DAG_ID,
5564
schedule="@once",
5665
start_date=datetime(2021, 1, 1),
5766
catchup=False,
58-
tags=["example", "gcs"],
67+
tags=["example", "gcs", "gdrive"],
5968
) as dag:
69+
70+
@task
71+
def create_temp_gcp_connection():
72+
conn = Connection(
73+
conn_id=CONNECTION_ID,
74+
conn_type="google_cloud_platform",
75+
)
76+
conn_extra_json = json.dumps(
77+
{
78+
"scope": "https://www.googleapis.com/auth/drive,"
79+
"https://www.googleapis.com/auth/cloud-platform"
80+
}
81+
)
82+
conn.set_extra(conn_extra_json)
83+
84+
session: Session = Session()
85+
if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first():
86+
log.warning("Connection %s already exists", CONNECTION_ID)
87+
return None
88+
session.add(conn)
89+
session.commit()
90+
91+
create_temp_gcp_connection_task = create_temp_gcp_connection()
92+
6093
create_bucket = GCSCreateBucketOperator(
6194
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
6295
)
6396

64-
upload_file = LocalFilesystemToGCSOperator(
65-
task_id="upload_file",
97+
upload_file_1 = LocalFilesystemToGCSOperator(
98+
task_id="upload_file_1",
6699
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
67100
dst=f"{TMP_PATH}/{FILE_NAME}",
68101
bucket=BUCKET_NAME,
69102
)
70103

71104
upload_file_2 = LocalFilesystemToGCSOperator(
72-
task_id="upload_fil_2",
105+
task_id="upload_file_2",
73106
src=f"{FILE_LOCAL_PATH}/{FILE_NAME}",
74107
dst=f"{TMP_PATH}/2_{FILE_NAME}",
75108
bucket=BUCKET_NAME,
76109
)
77110
# [START howto_operator_gcs_to_gdrive_copy_single_file]
78111
copy_single_file = GCSToGoogleDriveOperator(
79112
task_id="copy_single_file",
113+
gcp_conn_id=CONNECTION_ID,
80114
source_bucket=BUCKET_NAME,
81115
source_object=f"{TMP_PATH}/{FILE_NAME}",
82-
destination_object=f"copied_tmp/copied_{FILE_NAME}",
116+
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
83117
)
84118
# [END howto_operator_gcs_to_gdrive_copy_single_file]
85119

86120
# [START howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
87121
copy_single_file_into_folder = GCSToGoogleDriveOperator(
88122
task_id="copy_single_file_into_folder",
123+
gcp_conn_id=CONNECTION_ID,
89124
source_bucket=BUCKET_NAME,
90125
source_object=f"{TMP_PATH}/{FILE_NAME}",
91-
destination_object=f"copied_tmp/copied_{FILE_NAME}",
126+
destination_object=f"{WORK_DIR}/copied_{FILE_NAME}",
92127
destination_folder_id=FOLDER_ID,
93128
)
94129
# [END howto_operator_gcs_to_gdrive_copy_single_file_into_folder]
95130

96131
# [START howto_operator_gcs_to_gdrive_copy_files]
97132
copy_files = GCSToGoogleDriveOperator(
98133
task_id="copy_files",
134+
gcp_conn_id=CONNECTION_ID,
99135
source_bucket=BUCKET_NAME,
100136
source_object=f"{TMP_PATH}/*",
101-
destination_object="copied_tmp/",
137+
destination_object=f"{WORK_DIR}/",
102138
)
103139
# [END howto_operator_gcs_to_gdrive_copy_files]
104140

105141
# [START howto_operator_gcs_to_gdrive_move_files]
106142
move_files = GCSToGoogleDriveOperator(
107143
task_id="move_files",
144+
gcp_conn_id=CONNECTION_ID,
108145
source_bucket=BUCKET_NAME,
109146
source_object=f"{TMP_PATH}/*.txt",
147+
destination_object=f"{WORK_DIR}/",
110148
move_object=True,
111149
)
112150
# [END howto_operator_gcs_to_gdrive_move_files]
113151

152+
@task(trigger_rule=TriggerRule.ALL_DONE)
153+
def remove_files_from_drive():
154+
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
155+
root_path = (
156+
service.files()
157+
.list(q=f"name = '{WORK_DIR}' and mimeType = 'application/vnd.google-apps.folder'")
158+
.execute()
159+
)
160+
if files := root_path["files"]:
161+
batch = service.new_batch_http_request()
162+
for file in files:
163+
log.info("Preparing to remove file: {}", file)
164+
batch.add(service.files().delete(fileId=file["id"]))
165+
batch.execute()
166+
log.info("Selected files removed.")
167+
168+
remove_files_from_drive_task = remove_files_from_drive()
169+
114170
delete_bucket = GCSDeleteBucketOperator(
115171
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
116172
)
117173

174+
delete_temp_gcp_connection_task = BashOperator(
175+
task_id="delete_temp_gcp_connection",
176+
bash_command=f"airflow connections delete {CONNECTION_ID}",
177+
trigger_rule=TriggerRule.ALL_DONE,
178+
)
179+
180+
# TEST SETUP
181+
create_bucket >> [upload_file_1, upload_file_2]
118182
(
119-
# TEST SETUP
120-
create_bucket
121-
>> upload_file
122-
>> upload_file_2
183+
[upload_file_1, upload_file_2, create_temp_gcp_connection_task]
123184
# TEST BODY
124185
>> copy_single_file
186+
>> copy_single_file_into_folder
125187
>> copy_files
126188
>> move_files
127189
# TEST TEARDOWN
128-
>> delete_bucket
190+
>> remove_files_from_drive_task
191+
>> [delete_bucket, delete_temp_gcp_connection_task]
129192
)
130193

131194
from tests.system.utils.watcher import watcher

tests/system/providers/google/cloud/gcs/example_gdrive_to_gcs.py

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,23 @@
1717
# under the License.
1818
from __future__ import annotations
1919

20+
import json
21+
import logging
2022
import os
2123
from datetime import datetime
2224
from pathlib import Path
2325

26+
from airflow.decorators import task
27+
from airflow.models import Connection
2428
from airflow.models.dag import DAG
29+
from airflow.operators.bash import BashOperator
2530
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
2631
from airflow.providers.google.cloud.transfers.gdrive_to_gcs import GoogleDriveToGCSOperator
2732
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
33+
from airflow.providers.google.suite.hooks.drive import GoogleDriveHook
2834
from airflow.providers.google.suite.sensors.drive import GoogleDriveFileExistenceSensor
2935
from airflow.providers.google.suite.transfers.gcs_to_gdrive import GCSToGoogleDriveOperator
36+
from airflow.settings import Session
3037
from airflow.utils.trigger_rule import TriggerRule
3138

3239
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
@@ -35,20 +42,48 @@
3542
DAG_ID = "example_gdrive_to_gcs_with_gdrive_sensor"
3643

3744
BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
45+
CONNECTION_ID = f"connection_{DAG_ID}_{ENV_ID}"
3846

3947
OBJECT = "abc123xyz"
4048
FOLDER_ID = ""
4149
FILE_NAME = "example_upload.txt"
50+
DRIVE_FILE_NAME = f"example_upload_{DAG_ID}_{ENV_ID}.txt"
4251
LOCAL_PATH = str(Path(__file__).parent / "resources" / FILE_NAME)
4352

53+
log = logging.getLogger(__name__)
54+
55+
4456
with DAG(
4557
DAG_ID,
4658
schedule="@once",
4759
start_date=datetime(2021, 1, 1),
4860
catchup=False,
49-
tags=["example"],
61+
tags=["example", "gcs", "gdrive"],
5062
) as dag:
5163

64+
@task
65+
def create_temp_gcp_connection():
66+
conn = Connection(
67+
conn_id=CONNECTION_ID,
68+
conn_type="google_cloud_platform",
69+
)
70+
conn_extra_json = json.dumps(
71+
{
72+
"scope": "https://www.googleapis.com/auth/drive,"
73+
"https://www.googleapis.com/auth/cloud-platform"
74+
}
75+
)
76+
conn.set_extra(conn_extra_json)
77+
78+
session: Session = Session()
79+
if session.query(Connection).filter(Connection.conn_id == CONNECTION_ID).first():
80+
log.warning("Connection %s already exists", CONNECTION_ID)
81+
return None
82+
session.add(conn)
83+
session.commit()
84+
85+
create_temp_gcp_connection_task = create_temp_gcp_connection()
86+
5287
create_bucket = GCSCreateBucketOperator(
5388
task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID
5489
)
@@ -62,41 +97,62 @@
6297

6398
copy_single_file = GCSToGoogleDriveOperator(
6499
task_id="copy_single_file",
100+
gcp_conn_id=CONNECTION_ID,
65101
source_bucket=BUCKET_NAME,
66102
source_object=FILE_NAME,
67-
destination_object=FILE_NAME,
103+
destination_object=DRIVE_FILE_NAME,
68104
)
69105

70106
# [START detect_file]
71107
detect_file = GoogleDriveFileExistenceSensor(
72-
task_id="detect_file", folder_id=FOLDER_ID, file_name=FILE_NAME
108+
task_id="detect_file",
109+
folder_id=FOLDER_ID,
110+
file_name=DRIVE_FILE_NAME,
111+
gcp_conn_id=CONNECTION_ID,
73112
)
74113
# [END detect_file]
75114

76115
# [START upload_gdrive_to_gcs]
77116
upload_gdrive_to_gcs = GoogleDriveToGCSOperator(
78117
task_id="upload_gdrive_object_to_gcs",
118+
gcp_conn_id=CONNECTION_ID,
79119
folder_id=FOLDER_ID,
80-
file_name=FILE_NAME,
120+
file_name=DRIVE_FILE_NAME,
81121
bucket_name=BUCKET_NAME,
82122
object_name=OBJECT,
83123
)
84124
# [END upload_gdrive_to_gcs]
85125

126+
@task(trigger_rule=TriggerRule.ALL_DONE)
127+
def remove_files_from_drive():
128+
service = GoogleDriveHook(gcp_conn_id=CONNECTION_ID).get_conn()
129+
response = service.files().list(q=f"name = '{DRIVE_FILE_NAME}'").execute()
130+
if files := response["files"]:
131+
file = files[0]
132+
log.info("Deleting file {}...", file)
133+
service.files().delete(fileId=file["id"])
134+
log.info("Done.")
135+
136+
remove_files_from_drive_task = remove_files_from_drive()
137+
86138
delete_bucket = GCSDeleteBucketOperator(
87139
task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE
88140
)
89141

142+
delete_temp_gcp_connection_task = BashOperator(
143+
task_id="delete_temp_gcp_connection",
144+
bash_command=f"airflow connections delete {CONNECTION_ID}",
145+
trigger_rule=TriggerRule.ALL_DONE,
146+
)
147+
90148
(
91-
# TEST SETUP
92-
create_bucket
93-
>> upload_file
94-
>> copy_single_file
149+
[create_bucket >> upload_file >> copy_single_file, create_temp_gcp_connection_task]
95150
# TEST BODY
96151
>> detect_file
97152
>> upload_gdrive_to_gcs
98153
# TEST TEARDOWN
99-
>> delete_bucket
154+
>> remove_files_from_drive_task
155+
>> [delete_bucket, delete_temp_gcp_connection_task]
100156
)
101157

102158
from tests.system.utils.watcher import watcher

0 commit comments

Comments
 (0)