4343from requests import Session
4444
4545from airflow .exceptions import AirflowException , AirflowProviderDeprecationWarning
46+ from airflow .providers .common .compat .lineage .hook import get_hook_lineage_collector
4647from airflow .providers .google .cloud .utils .helpers import normalize_directory_path
4748from airflow .providers .google .common .consts import CLIENT_INFO
4849from airflow .providers .google .common .hooks .base_google import (
@@ -214,6 +215,16 @@ def copy(
214215 destination_object = source_bucket .copy_blob ( # type: ignore[attr-defined]
215216 blob = source_object , destination_bucket = destination_bucket , new_name = destination_object
216217 )
218+ get_hook_lineage_collector ().add_input_asset (
219+ context = self ,
220+ scheme = "gs" ,
221+ asset_kwargs = {"bucket" : source_bucket .name , "key" : source_object .name }, # type: ignore[attr-defined]
222+ )
223+ get_hook_lineage_collector ().add_output_asset (
224+ context = self ,
225+ scheme = "gs" ,
226+ asset_kwargs = {"bucket" : destination_bucket .name , "key" : destination_object .name }, # type: ignore[union-attr]
227+ )
217228
218229 self .log .info (
219230 "Object %s in bucket %s copied to object %s in bucket %s" ,
@@ -267,6 +278,16 @@ def rewrite(
267278 ).rewrite (source = source_object , token = token )
268279
269280 self .log .info ("Total Bytes: %s | Bytes Written: %s" , total_bytes , bytes_rewritten )
281+ get_hook_lineage_collector ().add_input_asset (
282+ context = self ,
283+ scheme = "gs" ,
284+ asset_kwargs = {"bucket" : source_bucket .name , "key" : source_object .name }, # type: ignore[attr-defined]
285+ )
286+ get_hook_lineage_collector ().add_output_asset (
287+ context = self ,
288+ scheme = "gs" ,
289+ asset_kwargs = {"bucket" : destination_bucket .name , "key" : destination_object }, # type: ignore[attr-defined]
290+ )
270291 self .log .info (
271292 "Object %s in bucket %s rewritten to object %s in bucket %s" ,
272293 source_object .name , # type: ignore[attr-defined]
@@ -345,9 +366,18 @@ def download(
345366
346367 if filename :
347368 blob .download_to_filename (filename , timeout = timeout )
369+ get_hook_lineage_collector ().add_input_asset (
370+ context = self , scheme = "gs" , asset_kwargs = {"bucket" : bucket .name , "key" : blob .name }
371+ )
372+ get_hook_lineage_collector ().add_output_asset (
373+ context = self , scheme = "file" , asset_kwargs = {"path" : filename }
374+ )
348375 self .log .info ("File downloaded to %s" , filename )
349376 return filename
350377 else :
378+ get_hook_lineage_collector ().add_input_asset (
379+ context = self , scheme = "gs" , asset_kwargs = {"bucket" : bucket .name , "key" : blob .name }
380+ )
351381 return blob .download_as_bytes ()
352382
353383 except GoogleCloudError :
@@ -555,6 +585,9 @@ def _call_with_retry(f: Callable[[], None]) -> None:
555585 _call_with_retry (
556586 partial (blob .upload_from_filename , filename = filename , content_type = mime_type , timeout = timeout )
557587 )
588+ get_hook_lineage_collector ().add_input_asset (
589+ context = self , scheme = "file" , asset_kwargs = {"path" : filename }
590+ )
558591
559592 if gzip :
560593 os .remove (filename )
@@ -576,6 +609,10 @@ def _call_with_retry(f: Callable[[], None]) -> None:
576609 else :
577610 raise ValueError ("'filename' and 'data' parameter missing. One is required to upload to gcs." )
578611
612+ get_hook_lineage_collector ().add_output_asset (
613+ context = self , scheme = "gs" , asset_kwargs = {"bucket" : bucket .name , "key" : blob .name }
614+ )
615+
579616 def exists (self , bucket_name : str , object_name : str , retry : Retry = DEFAULT_RETRY ) -> bool :
580617 """
581618 Check for the existence of a file in Google Cloud Storage.
@@ -691,6 +728,9 @@ def delete(self, bucket_name: str, object_name: str) -> None:
691728 bucket = client .bucket (bucket_name )
692729 blob = bucket .blob (blob_name = object_name )
693730 blob .delete ()
731+ get_hook_lineage_collector ().add_input_asset (
732+ context = self , scheme = "gs" , asset_kwargs = {"bucket" : bucket .name , "key" : blob .name }
733+ )
694734
695735 self .log .info ("Blob %s deleted." , object_name )
696736
@@ -1198,9 +1238,17 @@ def compose(self, bucket_name: str, source_objects: List[str], destination_objec
11981238 client = self .get_conn ()
11991239 bucket = client .bucket (bucket_name )
12001240 destination_blob = bucket .blob (destination_object )
1201- destination_blob .compose (
1202- sources = [bucket .blob (blob_name = source_object ) for source_object in source_objects ]
1241+ source_blobs = [bucket .blob (blob_name = source_object ) for source_object in source_objects ]
1242+ destination_blob .compose (sources = source_blobs )
1243+ get_hook_lineage_collector ().add_output_asset (
1244+ context = self , scheme = "gs" , asset_kwargs = {"bucket" : bucket .name , "key" : destination_blob .name }
12031245 )
1246+ for single_source_blob in source_blobs :
1247+ get_hook_lineage_collector ().add_input_asset (
1248+ context = self ,
1249+ scheme = "gs" ,
1250+ asset_kwargs = {"bucket" : bucket .name , "key" : single_source_blob .name },
1251+ )
12041252
12051253 self .log .info ("Completed successfully." )
12061254
0 commit comments