Skip to content

Commit 123b656

Browse files
authored
Fix GCSSynchronizeBucketsOperator timeout error (#37237)
Update comment to be more clear
1 parent 5ad1e78 commit 123b656

File tree

2 files changed

+27
-25
lines changed
  • airflow/providers/google/cloud/hooks
  • tests/providers/google/cloud/hooks

2 files changed

+27
-25
lines changed

airflow/providers/google/cloud/hooks/gcs.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1213,15 +1213,19 @@ def sync(
12131213
:return: none
12141214
"""
12151215
client = self.get_conn()
1216+
12161217
# Create bucket object
12171218
source_bucket_obj = client.bucket(source_bucket)
12181219
destination_bucket_obj = client.bucket(destination_bucket)
1220+
12191221
# Normalize parameters when they are passed
12201222
source_object = normalize_directory_path(source_object)
12211223
destination_object = normalize_directory_path(destination_object)
1224+
12221225
# Calculate the number of characters that remove from the name, because they contain information
12231226
# about the parent's path
12241227
source_object_prefix_len = len(source_object) if source_object else 0
1228+
12251229
# Prepare synchronization plan
12261230
to_copy_blobs, to_delete_blobs, to_rewrite_blobs = self._prepare_sync_plan(
12271231
source_bucket=source_bucket_obj,
@@ -1246,13 +1250,14 @@ def sync(
12461250
dst_object = self._calculate_sync_destination_path(
12471251
blob, destination_object, source_object_prefix_len
12481252
)
1249-
self.copy(
1253+
self.rewrite(
12501254
source_bucket=source_bucket_obj.name,
12511255
source_object=blob.name,
12521256
destination_bucket=destination_bucket_obj.name,
12531257
destination_object=dst_object,
12541258
)
12551259
self.log.info("Blobs copied.")
1260+
12561261
# Delete redundant files
12571262
if not to_delete_blobs:
12581263
self.log.info("Skipped blobs deleting.")
@@ -1297,37 +1302,46 @@ def _prepare_sync_plan(
12971302
destination_object: str | None,
12981303
recursive: bool,
12991304
) -> tuple[set[storage.Blob], set[storage.Blob], set[storage.Blob]]:
1300-
# Calculate the number of characters that remove from the name, because they contain information
1305+
# Calculate the number of characters that are removed from the name, because they contain information
13011306
# about the parent's path
13021307
source_object_prefix_len = len(source_object) if source_object else 0
13031308
destination_object_prefix_len = len(destination_object) if destination_object else 0
13041309
delimiter = "/" if not recursive else None
1310+
13051311
# Fetch blobs list
13061312
source_blobs = list(source_bucket.list_blobs(prefix=source_object, delimiter=delimiter))
13071313
destination_blobs = list(
13081314
destination_bucket.list_blobs(prefix=destination_object, delimiter=delimiter)
13091315
)
1316+
13101317
# Create indexes that allow you to identify blobs based on their name
13111318
source_names_index = {a.name[source_object_prefix_len:]: a for a in source_blobs}
13121319
destination_names_index = {a.name[destination_object_prefix_len:]: a for a in destination_blobs}
1320+
13131321
# Create sets with names without parent object name
13141322
source_names = set(source_names_index.keys())
1323+
# Discards empty string from source set that creates an empty subdirectory in
1324+
# destination bucket with source subdirectory name
1325+
source_names.discard("")
13151326
destination_names = set(destination_names_index.keys())
1327+
13161328
# Determine objects to copy and delete
13171329
to_copy = source_names - destination_names
13181330
to_delete = destination_names - source_names
13191331
to_copy_blobs: set[storage.Blob] = {source_names_index[a] for a in to_copy}
13201332
to_delete_blobs: set[storage.Blob] = {destination_names_index[a] for a in to_delete}
1333+
13211334
# Find names that are in both buckets
13221335
names_to_check = source_names.intersection(destination_names)
13231336
to_rewrite_blobs: set[storage.Blob] = set()
13241337
# Compare objects based on crc32
13251338
for current_name in names_to_check:
13261339
source_blob = source_names_index[current_name]
13271340
destination_blob = destination_names_index[current_name]
1328-
# if the objects are different, save it
1341+
# If the objects are different, save it
13291342
if source_blob.crc32c != destination_blob.crc32c:
13301343
to_rewrite_blobs.add(source_blob)
1344+
13311345
return to_copy_blobs, to_delete_blobs, to_rewrite_blobs
13321346

13331347

tests/providers/google/cloud/hooks/test_gcs.py

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,6 @@ def setup_method(self):
10811081
def test_should_do_nothing_when_buckets_is_empty(
10821082
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
10831083
):
1084-
# mock_get_conn.return_value =
10851084
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
10861085
source_bucket.list_blobs.return_value = []
10871086
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1104,7 +1103,6 @@ def test_should_do_nothing_when_buckets_is_empty(
11041103
def test_should_append_slash_to_object_if_missing(
11051104
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
11061105
):
1107-
# mock_get_conn.return_value =
11081106
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
11091107
source_bucket.list_blobs.return_value = []
11101108
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1124,7 +1122,6 @@ def test_should_append_slash_to_object_if_missing(
11241122
@mock.patch(GCS_STRING.format("GCSHook.delete"))
11251123
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
11261124
def test_should_copy_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1127-
# mock_get_conn.return_value =
11281125
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
11291126
source_bucket.list_blobs.return_value = [
11301127
self._create_blob("FILE_A", "C1"),
@@ -1135,31 +1132,30 @@ def test_should_copy_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_
11351132
mock_get_conn.return_value.bucket.side_effect = [source_bucket, destination_bucket]
11361133
self.gcs_hook.sync(source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET")
11371134
mock_delete.assert_not_called()
1138-
mock_rewrite.assert_not_called()
1139-
mock_copy.assert_has_calls(
1135+
mock_rewrite.assert_has_calls(
11401136
[
11411137
mock.call(
1142-
destination_bucket="DEST_BUCKET",
1143-
destination_object="FILE_A",
11441138
source_bucket="SOURCE_BUCKET",
11451139
source_object="FILE_A",
1140+
destination_bucket="DEST_BUCKET",
1141+
destination_object="FILE_A",
11461142
),
11471143
mock.call(
1148-
destination_bucket="DEST_BUCKET",
1149-
destination_object="FILE_B",
11501144
source_bucket="SOURCE_BUCKET",
11511145
source_object="FILE_B",
1146+
destination_bucket="DEST_BUCKET",
1147+
destination_object="FILE_B",
11521148
),
11531149
],
11541150
any_order=True,
11551151
)
1152+
mock_copy.assert_not_called()
11561153

11571154
@mock.patch(GCS_STRING.format("GCSHook.copy"))
11581155
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
11591156
@mock.patch(GCS_STRING.format("GCSHook.delete"))
11601157
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
11611158
def test_should_copy_files_non_recursive(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1162-
# mock_get_conn.return_value =
11631159
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
11641160
source_bucket.list_blobs.return_value = [
11651161
self._create_blob("FILE_A", "C1"),
@@ -1177,7 +1173,6 @@ def test_should_copy_files_non_recursive(self, mock_get_conn, mock_delete, mock_
11771173
@mock.patch(GCS_STRING.format("GCSHook.delete"))
11781174
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
11791175
def test_should_copy_files_to_subdirectory(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1180-
# mock_get_conn.return_value =
11811176
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
11821177
source_bucket.list_blobs.return_value = [
11831178
self._create_blob("FILE_A", "C1"),
@@ -1190,8 +1185,7 @@ def test_should_copy_files_to_subdirectory(self, mock_get_conn, mock_delete, moc
11901185
source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", destination_object="DEST_OBJ/"
11911186
)
11921187
mock_delete.assert_not_called()
1193-
mock_rewrite.assert_not_called()
1194-
mock_copy.assert_has_calls(
1188+
mock_rewrite.assert_has_calls(
11951189
[
11961190
mock.call(
11971191
source_bucket="SOURCE_BUCKET",
@@ -1208,13 +1202,13 @@ def test_should_copy_files_to_subdirectory(self, mock_get_conn, mock_delete, moc
12081202
],
12091203
any_order=True,
12101204
)
1205+
mock_copy.assert_not_called()
12111206

12121207
@mock.patch(GCS_STRING.format("GCSHook.copy"))
12131208
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
12141209
@mock.patch(GCS_STRING.format("GCSHook.delete"))
12151210
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
12161211
def test_should_copy_files_from_subdirectory(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1217-
# mock_get_conn.return_value =
12181212
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
12191213
source_bucket.list_blobs.return_value = [
12201214
self._create_blob("SRC_OBJ/FILE_A", "C1"),
@@ -1227,8 +1221,7 @@ def test_should_copy_files_from_subdirectory(self, mock_get_conn, mock_delete, m
12271221
source_bucket="SOURCE_BUCKET", destination_bucket="DEST_BUCKET", source_object="SRC_OBJ/"
12281222
)
12291223
mock_delete.assert_not_called()
1230-
mock_rewrite.assert_not_called()
1231-
mock_copy.assert_has_calls(
1224+
mock_rewrite.assert_has_calls(
12321225
[
12331226
mock.call(
12341227
source_bucket="SOURCE_BUCKET",
@@ -1245,13 +1238,13 @@ def test_should_copy_files_from_subdirectory(self, mock_get_conn, mock_delete, m
12451238
],
12461239
any_order=True,
12471240
)
1241+
mock_copy.assert_not_called()
12481242

12491243
@mock.patch(GCS_STRING.format("GCSHook.copy"))
12501244
@mock.patch(GCS_STRING.format("GCSHook.rewrite"))
12511245
@mock.patch(GCS_STRING.format("GCSHook.delete"))
12521246
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
12531247
def test_should_overwrite_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1254-
# mock_get_conn.return_value =
12551248
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
12561249
source_bucket.list_blobs.return_value = [
12571250
self._create_blob("FILE_A", "C1"),
@@ -1293,7 +1286,6 @@ def test_should_overwrite_files(self, mock_get_conn, mock_delete, mock_rewrite,
12931286
def test_should_overwrite_files_to_subdirectory(
12941287
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
12951288
):
1296-
# mock_get_conn.return_value =
12971289
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
12981290
source_bucket.list_blobs.return_value = [
12991291
self._create_blob("FILE_A", "C1"),
@@ -1338,7 +1330,6 @@ def test_should_overwrite_files_to_subdirectory(
13381330
def test_should_overwrite_files_from_subdirectory(
13391331
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
13401332
):
1341-
# mock_get_conn.return_value =
13421333
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
13431334
source_bucket.list_blobs.return_value = [
13441335
self._create_blob("SRC_OBJ/FILE_A", "C1"),
@@ -1381,7 +1372,6 @@ def test_should_overwrite_files_from_subdirectory(
13811372
@mock.patch(GCS_STRING.format("GCSHook.delete"))
13821373
@mock.patch(GCS_STRING.format("GCSHook.get_conn"))
13831374
def test_should_delete_extra_files(self, mock_get_conn, mock_delete, mock_rewrite, mock_copy):
1384-
# mock_get_conn.return_value =
13851375
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
13861376
source_bucket.list_blobs.return_value = []
13871377
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1407,7 +1397,6 @@ def test_should_delete_extra_files(self, mock_get_conn, mock_delete, mock_rewrit
14071397
def test_should_not_delete_extra_files_when_delete_extra_files_is_disabled(
14081398
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
14091399
):
1410-
# mock_get_conn.return_value =
14111400
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
14121401
source_bucket.list_blobs.return_value = []
14131402
destination_bucket = self._create_bucket(name="DEST_BUCKET")
@@ -1430,7 +1419,6 @@ def test_should_not_delete_extra_files_when_delete_extra_files_is_disabled(
14301419
def test_should_not_overwrite_when_overwrite_is_disabled(
14311420
self, mock_get_conn, mock_delete, mock_rewrite, mock_copy
14321421
):
1433-
# mock_get_conn.return_value =
14341422
source_bucket = self._create_bucket(name="SOURCE_BUCKET")
14351423
source_bucket.list_blobs.return_value = [
14361424
self._create_blob("SRC_OBJ/FILE_A", "C1", source_bucket),

0 commit comments

Comments
 (0)