Skip to content

Commit 17e60b0

Browse files
authored
fix: Use prefixes instead of all file paths for OpenLineage datasets in GCSDeleteObjectsOperator (#39059)
Signed-off-by: Kacper Muda <mudakacper@gmail.com>
1 parent 8890470 commit 17e60b0

File tree

2 files changed

+58
-37
lines changed
  • airflow/providers/google/cloud/operators
  • tests/providers/google/cloud/operators

2 files changed

+58
-37
lines changed

airflow/providers/google/cloud/operators/gcs.py

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ def __init__(
297297
*,
298298
bucket_name: str,
299299
objects: list[str] | None = None,
300-
prefix: str | None = None,
300+
prefix: str | list[str] | None = None,
301301
gcp_conn_id: str = "google_cloud_default",
302302
impersonation_chain: str | Sequence[str] | None = None,
303303
**kwargs,
@@ -309,12 +309,14 @@ def __init__(
309309
self.impersonation_chain = impersonation_chain
310310

311311
if objects is None and prefix is None:
312-
err_message = "(Task {task_id}) Either object or prefix should be set. Both are None.".format(
312+
err_message = "(Task {task_id}) Either objects or prefix should be set. Both are None.".format(
313313
**kwargs
314314
)
315315
raise ValueError(err_message)
316+
if objects is not None and prefix is not None:
317+
err_message = "(Task {task_id}) Objects or prefix should be set. Both provided.".format(**kwargs)
318+
raise ValueError(err_message)
316319

317-
self._objects: list[str] = []
318320
super().__init__(**kwargs)
319321

320322
def execute(self, context: Context) -> None:
@@ -324,15 +326,14 @@ def execute(self, context: Context) -> None:
324326
)
325327

326328
if self.objects is not None:
327-
self._objects = self.objects
329+
objects = self.objects
328330
else:
329-
self._objects = hook.list(bucket_name=self.bucket_name, prefix=self.prefix)
330-
self.log.info("Deleting %s objects from %s", len(self._objects), self.bucket_name)
331-
for object_name in self._objects:
331+
objects = hook.list(bucket_name=self.bucket_name, prefix=self.prefix)
332+
self.log.info("Deleting %s objects from %s", len(objects), self.bucket_name)
333+
for object_name in objects:
332334
hook.delete(bucket_name=self.bucket_name, object_name=object_name)
333335

334-
def get_openlineage_facets_on_complete(self, task_instance):
335-
"""Implement on_complete as execute() resolves object names."""
336+
def get_openlineage_facets_on_start(self):
336337
from openlineage.client.facet import (
337338
LifecycleStateChange,
338339
LifecycleStateChangeDatasetFacet,
@@ -342,8 +343,17 @@ def get_openlineage_facets_on_complete(self, task_instance):
342343

343344
from airflow.providers.openlineage.extractors import OperatorLineage
344345

345-
if not self._objects:
346-
return OperatorLineage()
346+
objects = []
347+
if self.objects is not None:
348+
objects = self.objects
349+
elif self.prefix is not None:
350+
prefixes = [self.prefix] if isinstance(self.prefix, str) else self.prefix
351+
for pref in prefixes:
352+
# Use parent if not a file (dot not in name) and not a dir (ends with slash)
353+
if "." not in pref.split("/")[-1] and not pref.endswith("/"):
354+
pref = Path(pref).parent.as_posix()
355+
pref = "/" if pref in (".", "", "/") else pref.rstrip("/")
356+
objects.append(pref)
347357

348358
bucket_url = f"gs://{self.bucket_name}"
349359
input_datasets = [
@@ -360,7 +370,7 @@ def get_openlineage_facets_on_complete(self, task_instance):
360370
)
361371
},
362372
)
363-
for object_name in self._objects
373+
for object_name in objects
364374
]
365375

366376
return OperatorLineage(inputs=input_datasets)

tests/providers/google/cloud/operators/test_gcs.py

Lines changed: 36 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -172,48 +172,59 @@ def test_delete_prefix_as_empty_string(self, mock_hook):
172172
any_order=True,
173173
)
174174

175-
@mock.patch("airflow.providers.google.cloud.operators.gcs.GCSHook")
176-
def test_get_openlineage_facets_on_complete(self, mock_hook):
175+
@pytest.mark.parametrize(
176+
("objects", "prefix", "inputs"),
177+
(
178+
(["folder/a.txt", "b.json"], None, ["folder/a.txt", "b.json"]),
179+
(["folder/a.txt", "folder/b.json"], None, ["folder/a.txt", "folder/b.json"]),
180+
(None, ["folder/a.txt", "b.json"], ["folder/a.txt", "b.json"]),
181+
(None, "dir/pre", ["dir"]),
182+
(None, ["dir/"], ["dir"]),
183+
(None, "", ["/"]),
184+
(None, "/", ["/"]),
185+
(None, "pre", ["/"]),
186+
(None, "dir/pre*", ["dir"]),
187+
(None, "*", ["/"]),
188+
),
189+
ids=(
190+
"objects",
191+
"multiple objects in the same dir",
192+
"objects as prefixes",
193+
"directory with prefix",
194+
"directory",
195+
"empty prefix",
196+
"slash as prefix",
197+
"prefix with no ending slash",
198+
"directory with prefix with wildcard",
199+
"just wildcard",
200+
),
201+
)
202+
def test_get_openlineage_facets_on_start(self, objects, prefix, inputs):
177203
bucket_url = f"gs://{TEST_BUCKET}"
178204
expected_inputs = [
179205
Dataset(
180206
namespace=bucket_url,
181-
name="folder/a.txt",
207+
name=name,
182208
facets={
183209
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
184210
lifecycleStateChange=LifecycleStateChange.DROP.value,
185211
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
186212
namespace=bucket_url,
187-
name="folder/a.txt",
213+
name=name,
188214
),
189215
)
190216
},
191-
),
192-
Dataset(
193-
namespace=bucket_url,
194-
name="b.txt",
195-
facets={
196-
"lifecycleStateChange": LifecycleStateChangeDatasetFacet(
197-
lifecycleStateChange=LifecycleStateChange.DROP.value,
198-
previousIdentifier=LifecycleStateChangeDatasetFacetPreviousIdentifier(
199-
namespace=bucket_url,
200-
name="b.txt",
201-
),
202-
)
203-
},
204-
),
217+
)
218+
for name in inputs
205219
]
206220

207221
operator = GCSDeleteObjectsOperator(
208-
task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=["folder/a.txt", "b.txt"]
222+
task_id=TASK_ID, bucket_name=TEST_BUCKET, objects=objects, prefix=prefix
209223
)
210-
211-
operator.execute(None)
212-
213-
lineage = operator.get_openlineage_facets_on_complete(None)
214-
assert len(lineage.inputs) == 2
224+
lineage = operator.get_openlineage_facets_on_start()
225+
assert len(lineage.inputs) == len(inputs)
215226
assert len(lineage.outputs) == 0
216-
assert lineage.inputs == expected_inputs
227+
assert sorted(lineage.inputs) == sorted(expected_inputs)
217228

218229

219230
class TestGoogleCloudStorageListOperator:

0 commit comments

Comments
 (0)