Skip to content

Commit e13b159

Browse files
Wojciech JanuszekWojciech Januszek
andauthored
Workflows assets & system tests migration (AIP-47) (#24105)
* Workflows assets & system tests migration (AIP-47) Co-authored-by: Wojciech Januszek <januszek@google.com>
1 parent b5218de commit e13b159

File tree

8 files changed

+272
-70
lines changed

8 files changed

+272
-70
lines changed
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""This module contains Google Workflows links."""
19+
from typing import TYPE_CHECKING, Optional
20+
21+
from airflow.models import BaseOperator
22+
from airflow.providers.google.cloud.links.base import BaseGoogleLink
23+
24+
if TYPE_CHECKING:
25+
from airflow.utils.context import Context
26+
27+
WORKFLOWS_BASE_LINK = "https://console.cloud.google.com/workflows"
28+
WORKFLOW_LINK = WORKFLOWS_BASE_LINK + "/workflow/{location_id}/{workflow_id}/executions?project={project_id}"
29+
WORKFLOWS_LINK = WORKFLOWS_BASE_LINK + "?project={project_id}"
30+
EXECUTION_LINK = (
31+
WORKFLOWS_BASE_LINK
32+
+ "/workflow/{location_id}/{workflow_id}/execution/{execution_id}?project={project_id}"
33+
)
34+
35+
36+
class WorkflowsWorkflowDetailsLink(BaseGoogleLink):
37+
"""Helper class for constructing Workflow details Link"""
38+
39+
name = "Workflow details"
40+
key = "workflow_details"
41+
format_str = WORKFLOW_LINK
42+
43+
@staticmethod
44+
def persist(
45+
context: "Context",
46+
task_instance: BaseOperator,
47+
location_id: str,
48+
workflow_id: str,
49+
project_id: Optional[str],
50+
):
51+
task_instance.xcom_push(
52+
context,
53+
key=WorkflowsWorkflowDetailsLink.key,
54+
value={"location_id": location_id, "workflow_id": workflow_id, "project_id": project_id},
55+
)
56+
57+
58+
class WorkflowsListOfWorkflowsLink(BaseGoogleLink):
59+
"""Helper class for constructing list of Workflows Link"""
60+
61+
name = "List of workflows"
62+
key = "list_of_workflows"
63+
format_str = WORKFLOWS_LINK
64+
65+
@staticmethod
66+
def persist(
67+
context: "Context",
68+
task_instance: BaseOperator,
69+
project_id: Optional[str],
70+
):
71+
task_instance.xcom_push(
72+
context,
73+
key=WorkflowsListOfWorkflowsLink.key,
74+
value={"project_id": project_id},
75+
)
76+
77+
78+
class WorkflowsExecutionLink(BaseGoogleLink):
79+
"""Helper class for constructing Workflows Execution Link"""
80+
81+
name = "Workflow Execution"
82+
key = "workflow_execution"
83+
format_str = EXECUTION_LINK
84+
85+
@staticmethod
86+
def persist(
87+
context: "Context",
88+
task_instance: BaseOperator,
89+
location_id: str,
90+
workflow_id: str,
91+
execution_id: str,
92+
project_id: Optional[str],
93+
):
94+
task_instance.xcom_push(
95+
context,
96+
key=WorkflowsExecutionLink.key,
97+
value={
98+
"location_id": location_id,
99+
"workflow_id": workflow_id,
100+
"execution_id": execution_id,
101+
"project_id": project_id,
102+
},
103+
)

airflow/providers/google/cloud/operators/workflows.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@
3131

3232
from airflow.models import BaseOperator
3333
from airflow.providers.google.cloud.hooks.workflows import WorkflowsHook
34+
from airflow.providers.google.cloud.links.workflows import (
35+
WorkflowsExecutionLink,
36+
WorkflowsListOfWorkflowsLink,
37+
WorkflowsWorkflowDetailsLink,
38+
)
3439

3540
if TYPE_CHECKING:
3641
from airflow.utils.context import Context
@@ -60,6 +65,7 @@ class WorkflowsCreateWorkflowOperator(BaseOperator):
6065

6166
template_fields: Sequence[str] = ("location", "workflow", "workflow_id")
6267
template_fields_renderers = {"workflow": "json"}
68+
operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
6369

6470
def __init__(
6571
self,
@@ -132,6 +138,15 @@ def execute(self, context: 'Context'):
132138
timeout=self.timeout,
133139
metadata=self.metadata,
134140
)
141+
142+
WorkflowsWorkflowDetailsLink.persist(
143+
context=context,
144+
task_instance=self,
145+
location_id=self.location,
146+
workflow_id=self.workflow_id,
147+
project_id=self.project_id or hook.project_id,
148+
)
149+
135150
return Workflow.to_dict(workflow)
136151

137152

@@ -162,6 +177,7 @@ class WorkflowsUpdateWorkflowOperator(BaseOperator):
162177

163178
template_fields: Sequence[str] = ("workflow_id", "update_mask")
164179
template_fields_renderers = {"update_mask": "json"}
180+
operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
165181

166182
def __init__(
167183
self,
@@ -209,6 +225,15 @@ def execute(self, context: 'Context'):
209225
metadata=self.metadata,
210226
)
211227
workflow = operation.result()
228+
229+
WorkflowsWorkflowDetailsLink.persist(
230+
context=context,
231+
task_instance=self,
232+
location_id=self.location,
233+
workflow_id=self.workflow_id,
234+
project_id=self.project_id or hook.project_id,
235+
)
236+
212237
return Workflow.to_dict(workflow)
213238

214239

@@ -296,6 +321,7 @@ class WorkflowsListWorkflowsOperator(BaseOperator):
296321
"""
297322

298323
template_fields: Sequence[str] = ("location", "order_by", "filter_")
324+
operator_extra_links = (WorkflowsListOfWorkflowsLink(),)
299325

300326
def __init__(
301327
self,
@@ -335,6 +361,13 @@ def execute(self, context: 'Context'):
335361
timeout=self.timeout,
336362
metadata=self.metadata,
337363
)
364+
365+
WorkflowsListOfWorkflowsLink.persist(
366+
context=context,
367+
task_instance=self,
368+
project_id=self.project_id or hook.project_id,
369+
)
370+
338371
return [Workflow.to_dict(w) for w in workflows_iter]
339372

340373

@@ -357,6 +390,7 @@ class WorkflowsGetWorkflowOperator(BaseOperator):
357390
"""
358391

359392
template_fields: Sequence[str] = ("location", "workflow_id")
393+
operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
360394

361395
def __init__(
362396
self,
@@ -393,6 +427,15 @@ def execute(self, context: 'Context'):
393427
timeout=self.timeout,
394428
metadata=self.metadata,
395429
)
430+
431+
WorkflowsWorkflowDetailsLink.persist(
432+
context=context,
433+
task_instance=self,
434+
location_id=self.location,
435+
workflow_id=self.workflow_id,
436+
project_id=self.project_id or hook.project_id,
437+
)
438+
396439
return Workflow.to_dict(workflow)
397440

398441

@@ -418,6 +461,7 @@ class WorkflowsCreateExecutionOperator(BaseOperator):
418461

419462
template_fields: Sequence[str] = ("location", "workflow_id", "execution")
420463
template_fields_renderers = {"execution": "json"}
464+
operator_extra_links = (WorkflowsExecutionLink(),)
421465

422466
def __init__(
423467
self,
@@ -459,6 +503,16 @@ def execute(self, context: 'Context'):
459503
)
460504
execution_id = execution.name.split("/")[-1]
461505
self.xcom_push(context, key="execution_id", value=execution_id)
506+
507+
WorkflowsExecutionLink.persist(
508+
context=context,
509+
task_instance=self,
510+
location_id=self.location,
511+
workflow_id=self.workflow_id,
512+
execution_id=execution_id,
513+
project_id=self.project_id or hook.project_id,
514+
)
515+
462516
return Execution.to_dict(execution)
463517

464518

@@ -482,6 +536,7 @@ class WorkflowsCancelExecutionOperator(BaseOperator):
482536
"""
483537

484538
template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")
539+
operator_extra_links = (WorkflowsExecutionLink(),)
485540

486541
def __init__(
487542
self,
@@ -521,6 +576,16 @@ def execute(self, context: 'Context'):
521576
timeout=self.timeout,
522577
metadata=self.metadata,
523578
)
579+
580+
WorkflowsExecutionLink.persist(
581+
context=context,
582+
task_instance=self,
583+
location_id=self.location,
584+
workflow_id=self.workflow_id,
585+
execution_id=self.execution_id,
586+
project_id=self.project_id or hook.project_id,
587+
)
588+
524589
return Execution.to_dict(execution)
525590

526591

@@ -549,6 +614,7 @@ class WorkflowsListExecutionsOperator(BaseOperator):
549614
"""
550615

551616
template_fields: Sequence[str] = ("location", "workflow_id")
617+
operator_extra_links = (WorkflowsWorkflowDetailsLink(),)
552618

553619
def __init__(
554620
self,
@@ -588,6 +654,14 @@ def execute(self, context: 'Context'):
588654
metadata=self.metadata,
589655
)
590656

657+
WorkflowsWorkflowDetailsLink.persist(
658+
context=context,
659+
task_instance=self,
660+
location_id=self.location,
661+
workflow_id=self.workflow_id,
662+
project_id=self.project_id or hook.project_id,
663+
)
664+
591665
return [Execution.to_dict(e) for e in execution_iter if e.start_time > self.start_date_filter]
592666

593667

@@ -611,6 +685,7 @@ class WorkflowsGetExecutionOperator(BaseOperator):
611685
"""
612686

613687
template_fields: Sequence[str] = ("location", "workflow_id", "execution_id")
688+
operator_extra_links = (WorkflowsExecutionLink(),)
614689

615690
def __init__(
616691
self,
@@ -650,4 +725,14 @@ def execute(self, context: 'Context'):
650725
timeout=self.timeout,
651726
metadata=self.metadata,
652727
)
728+
729+
WorkflowsExecutionLink.persist(
730+
context=context,
731+
task_instance=self,
732+
location_id=self.location,
733+
workflow_id=self.workflow_id,
734+
execution_id=self.execution_id,
735+
project_id=self.project_id or hook.project_id,
736+
)
737+
653738
return Execution.to_dict(execution)

airflow/providers/google/cloud/sensors/workflows.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def __init__(
5656
workflow_id: str,
5757
execution_id: str,
5858
location: str,
59-
project_id: str,
59+
project_id: Optional[str] = None,
6060
success_states: Optional[Set[Execution.State]] = None,
6161
failure_states: Optional[Set[Execution.State]] = None,
6262
retry: Union[Retry, _MethodDefault] = DEFAULT,

airflow/providers/google/provider.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,9 @@ extra-links:
908908
- airflow.providers.google.cloud.links.vertex_ai.VertexAIBatchPredictionJobListLink
909909
- airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointLink
910910
- airflow.providers.google.cloud.links.vertex_ai.VertexAIEndpointListLink
911+
- airflow.providers.google.cloud.links.workflows.WorkflowsWorkflowDetailsLink
912+
- airflow.providers.google.cloud.links.workflows.WorkflowsListOfWorkflowsLink
913+
- airflow.providers.google.cloud.links.workflows.WorkflowsExecutionLink
911914
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentLink
912915
- airflow.providers.google.cloud.operators.cloud_composer.CloudComposerEnvironmentsLink
913916
- airflow.providers.google.cloud.links.dataflow.DataflowJobLink

0 commit comments

Comments
 (0)