3131
3232from airflow .models import BaseOperator
3333from airflow .providers .google .cloud .hooks .workflows import WorkflowsHook
34+ from airflow .providers .google .cloud .links .workflows import (
35+ WorkflowsExecutionLink ,
36+ WorkflowsListOfWorkflowsLink ,
37+ WorkflowsWorkflowDetailsLink ,
38+ )
3439
3540if TYPE_CHECKING :
3641 from airflow .utils .context import Context
@@ -60,6 +65,7 @@ class WorkflowsCreateWorkflowOperator(BaseOperator):
6065
6166 template_fields : Sequence [str ] = ("location" , "workflow" , "workflow_id" )
6267 template_fields_renderers = {"workflow" : "json" }
68+ operator_extra_links = (WorkflowsWorkflowDetailsLink (),)
6369
6470 def __init__ (
6571 self ,
@@ -132,6 +138,15 @@ def execute(self, context: 'Context'):
132138 timeout = self .timeout ,
133139 metadata = self .metadata ,
134140 )
141+
142+ WorkflowsWorkflowDetailsLink .persist (
143+ context = context ,
144+ task_instance = self ,
145+ location_id = self .location ,
146+ workflow_id = self .workflow_id ,
147+ project_id = self .project_id or hook .project_id ,
148+ )
149+
135150 return Workflow .to_dict (workflow )
136151
137152
@@ -162,6 +177,7 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator):
162177
163178 template_fields : Sequence [str ] = ("workflow_id" , "update_mask" )
164179 template_fields_renderers = {"update_mask" : "json" }
180+ operator_extra_links = (WorkflowsWorkflowDetailsLink (),)
165181
166182 def __init__ (
167183 self ,
@@ -209,6 +225,15 @@ def execute(self, context: 'Context'):
209225 metadata = self .metadata ,
210226 )
211227 workflow = operation .result ()
228+
229+ WorkflowsWorkflowDetailsLink .persist (
230+ context = context ,
231+ task_instance = self ,
232+ location_id = self .location ,
233+ workflow_id = self .workflow_id ,
234+ project_id = self .project_id or hook .project_id ,
235+ )
236+
212237 return Workflow .to_dict (workflow )
213238
214239
@@ -296,6 +321,7 @@ class WorkflowsListWorkflowsOperator(BaseOperator):
296321 """
297322
298323 template_fields : Sequence [str ] = ("location" , "order_by" , "filter_" )
324+ operator_extra_links = (WorkflowsListOfWorkflowsLink (),)
299325
300326 def __init__ (
301327 self ,
@@ -335,6 +361,13 @@ def execute(self, context: 'Context'):
335361 timeout = self .timeout ,
336362 metadata = self .metadata ,
337363 )
364+
365+ WorkflowsListOfWorkflowsLink .persist (
366+ context = context ,
367+ task_instance = self ,
368+ project_id = self .project_id or hook .project_id ,
369+ )
370+
338371 return [Workflow .to_dict (w ) for w in workflows_iter ]
339372
340373
@@ -357,6 +390,7 @@ class WorkflowsGetWorkflowOperator(BaseOperator):
357390 """
358391
359392 template_fields : Sequence [str ] = ("location" , "workflow_id" )
393+ operator_extra_links = (WorkflowsWorkflowDetailsLink (),)
360394
361395 def __init__ (
362396 self ,
@@ -393,6 +427,15 @@ def execute(self, context: 'Context'):
393427 timeout = self .timeout ,
394428 metadata = self .metadata ,
395429 )
430+
431+ WorkflowsWorkflowDetailsLink .persist (
432+ context = context ,
433+ task_instance = self ,
434+ location_id = self .location ,
435+ workflow_id = self .workflow_id ,
436+ project_id = self .project_id or hook .project_id ,
437+ )
438+
396439 return Workflow .to_dict (workflow )
397440
398441
@@ -418,6 +461,7 @@ class WorkflowsCreateExecutionOperator(BaseOperator):
418461
419462 template_fields : Sequence [str ] = ("location" , "workflow_id" , "execution" )
420463 template_fields_renderers = {"execution" : "json" }
464+ operator_extra_links = (WorkflowsExecutionLink (),)
421465
422466 def __init__ (
423467 self ,
@@ -459,6 +503,16 @@ def execute(self, context: 'Context'):
459503 )
460504 execution_id = execution .name .split ("/" )[- 1 ]
461505 self .xcom_push (context , key = "execution_id" , value = execution_id )
506+
507+ WorkflowsExecutionLink .persist (
508+ context = context ,
509+ task_instance = self ,
510+ location_id = self .location ,
511+ workflow_id = self .workflow_id ,
512+ execution_id = execution_id ,
513+ project_id = self .project_id or hook .project_id ,
514+ )
515+
462516 return Execution .to_dict (execution )
463517
464518
@@ -482,6 +536,7 @@ class WorkflowsCancelExecutionOperator(BaseOperator):
482536 """
483537
484538 template_fields : Sequence [str ] = ("location" , "workflow_id" , "execution_id" )
539+ operator_extra_links = (WorkflowsExecutionLink (),)
485540
486541 def __init__ (
487542 self ,
@@ -521,6 +576,16 @@ def execute(self, context: 'Context'):
521576 timeout = self .timeout ,
522577 metadata = self .metadata ,
523578 )
579+
580+ WorkflowsExecutionLink .persist (
581+ context = context ,
582+ task_instance = self ,
583+ location_id = self .location ,
584+ workflow_id = self .workflow_id ,
585+ execution_id = self .execution_id ,
586+ project_id = self .project_id or hook .project_id ,
587+ )
588+
524589 return Execution .to_dict (execution )
525590
526591
@@ -549,6 +614,7 @@ class WorkflowsListExecutionsOperator(BaseOperator):
549614 """
550615
551616 template_fields : Sequence [str ] = ("location" , "workflow_id" )
617+ operator_extra_links = (WorkflowsWorkflowDetailsLink (),)
552618
553619 def __init__ (
554620 self ,
@@ -588,6 +654,14 @@ def execute(self, context: 'Context'):
588654 metadata = self .metadata ,
589655 )
590656
657+ WorkflowsWorkflowDetailsLink .persist (
658+ context = context ,
659+ task_instance = self ,
660+ location_id = self .location ,
661+ workflow_id = self .workflow_id ,
662+ project_id = self .project_id or hook .project_id ,
663+ )
664+
591665 return [Execution .to_dict (e ) for e in execution_iter if e .start_time > self .start_date_filter ]
592666
593667
@@ -611,6 +685,7 @@ class WorkflowsGetExecutionOperator(BaseOperator):
611685 """
612686
613687 template_fields : Sequence [str ] = ("location" , "workflow_id" , "execution_id" )
688+ operator_extra_links = (WorkflowsExecutionLink (),)
614689
615690 def __init__ (
616691 self ,
@@ -650,4 +725,14 @@ def execute(self, context: 'Context'):
650725 timeout = self .timeout ,
651726 metadata = self .metadata ,
652727 )
728+
729+ WorkflowsExecutionLink .persist (
730+ context = context ,
731+ task_instance = self ,
732+ location_id = self .location ,
733+ workflow_id = self .workflow_id ,
734+ execution_id = self .execution_id ,
735+ project_id = self .project_id or hook .project_id ,
736+ )
737+
653738 return Execution .to_dict (execution )
0 commit comments