Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/io/gcp/gcsfilesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,11 @@ def delete(self, paths):

for path in paths:
if path.endswith('/'):
path_to_use = path + '*'
# This is a directory. Remove all of its contents including
# objects and subdirectories.
# TODO: IMPLEMENT Statuses
self._gcsIO().delete_objects_and_directories(path)
return
else:
path_to_use = path
match_result = self.match([path_to_use])[0]
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/io/gcp/gcsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,21 @@ def delete(self, path):
except NotFound:
return

def delete_objects_and_directories(self, path):
"""
Deletes the objects and directories at the given GCS path.
Args:
path: GCS file path pattern in the form gs://<bucket>/<name>.
"""
bucket_name, dir_name = parse_gcs_path(path)
bucket = self.client.get_bucket(bucket_name)
blobs = bucket.list_blobs(versions=True)
for blob in blobs:
if blob.name.startswith(dir_name):
logging.info("Deleting blob: %s", blob.name)
print("Deleting blob: %s", blob.name)
blob.delete()

def delete_batch(self, paths):
"""Deletes the objects at the given GCS paths.

Expand Down