Skip to content

Commit 6d6588f

Browse files
turbaszekTobiasz Kędzierski
andauthored
Add Google Cloud Workflows Operators (#13366)
Add Google Cloud Workflows Operators, system test, example and sensor Co-authored-by: Tobiasz Kędzierski <tobiasz.kedzierski@polidea.com>
1 parent 8cb85e8 commit 6d6588f

File tree

12 files changed

+2413
-0
lines changed

12 files changed

+2413
-0
lines changed
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import os
19+
20+
from airflow import DAG
21+
from airflow.providers.google.cloud.operators.workflows import (
22+
WorkflowsCancelExecutionOperator,
23+
WorkflowsCreateExecutionOperator,
24+
WorkflowsCreateWorkflowOperator,
25+
WorkflowsDeleteWorkflowOperator,
26+
WorkflowsGetExecutionOperator,
27+
WorkflowsGetWorkflowOperator,
28+
WorkflowsListExecutionsOperator,
29+
WorkflowsListWorkflowsOperator,
30+
WorkflowsUpdateWorkflowOperator,
31+
)
32+
from airflow.providers.google.cloud.sensors.workflows import WorkflowExecutionSensor
33+
from airflow.utils.dates import days_ago
34+
35+
LOCATION = os.environ.get("GCP_WORKFLOWS_LOCATION", "us-central1")
36+
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "an-id")
37+
38+
WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_WORKFLOW_ID", "airflow-test-workflow")
39+
40+
# [START how_to_define_workflow]
41+
WORKFLOW_CONTENT = """
42+
- getCurrentTime:
43+
call: http.get
44+
args:
45+
url: https://us-central1-workflowsample.cloudfunctions.net/datetime
46+
result: currentTime
47+
- readWikipedia:
48+
call: http.get
49+
args:
50+
url: https://en.wikipedia.org/w/api.php
51+
query:
52+
action: opensearch
53+
search: ${currentTime.body.dayOfTheWeek}
54+
result: wikiResult
55+
- returnResult:
56+
return: ${wikiResult.body[1]}
57+
"""
58+
59+
WORKFLOW = {
60+
"description": "Test workflow",
61+
"labels": {"airflow-version": "dev"},
62+
"source_contents": WORKFLOW_CONTENT,
63+
}
64+
# [END how_to_define_workflow]
65+
66+
EXECUTION = {"argument": ""}
67+
68+
SLEEP_WORKFLOW_ID = os.getenv("GCP_WORKFLOWS_SLEEP_WORKFLOW_ID", "sleep_workflow")
69+
SLEEP_WORKFLOW_CONTENT = """
70+
- someSleep:
71+
call: sys.sleep
72+
args:
73+
seconds: 120
74+
"""
75+
76+
SLEEP_WORKFLOW = {
77+
"description": "Test workflow",
78+
"labels": {"airflow-version": "dev"},
79+
"source_contents": SLEEP_WORKFLOW_CONTENT,
80+
}
81+
82+
83+
with DAG("example_cloud_workflows", start_date=days_ago(1), schedule_interval=None) as dag:
84+
# [START how_to_create_workflow]
85+
create_workflow = WorkflowsCreateWorkflowOperator(
86+
task_id="create_workflow",
87+
location=LOCATION,
88+
project_id=PROJECT_ID,
89+
workflow=WORKFLOW,
90+
workflow_id=WORKFLOW_ID,
91+
)
92+
# [END how_to_create_workflow]
93+
94+
# [START how_to_update_workflow]
95+
update_workflows = WorkflowsUpdateWorkflowOperator(
96+
task_id="update_workflows",
97+
location=LOCATION,
98+
project_id=PROJECT_ID,
99+
workflow_id=WORKFLOW_ID,
100+
update_mask={"paths": ["name", "description"]},
101+
)
102+
# [END how_to_update_workflow]
103+
104+
# [START how_to_get_workflow]
105+
get_workflow = WorkflowsGetWorkflowOperator(
106+
task_id="get_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
107+
)
108+
# [END how_to_get_workflow]
109+
110+
# [START how_to_list_workflows]
111+
list_workflows = WorkflowsListWorkflowsOperator(
112+
task_id="list_workflows",
113+
location=LOCATION,
114+
project_id=PROJECT_ID,
115+
)
116+
# [END how_to_list_workflows]
117+
118+
# [START how_to_delete_workflow]
119+
delete_workflow = WorkflowsDeleteWorkflowOperator(
120+
task_id="delete_workflow", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
121+
)
122+
# [END how_to_delete_workflow]
123+
124+
# [START how_to_create_execution]
125+
create_execution = WorkflowsCreateExecutionOperator(
126+
task_id="create_execution",
127+
location=LOCATION,
128+
project_id=PROJECT_ID,
129+
execution=EXECUTION,
130+
workflow_id=WORKFLOW_ID,
131+
)
132+
# [END how_to_create_execution]
133+
134+
# [START how_to_wait_for_execution]
135+
wait_for_execution = WorkflowExecutionSensor(
136+
task_id="wait_for_execution",
137+
location=LOCATION,
138+
project_id=PROJECT_ID,
139+
workflow_id=WORKFLOW_ID,
140+
execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
141+
)
142+
# [END how_to_wait_for_execution]
143+
144+
# [START how_to_get_execution]
145+
get_execution = WorkflowsGetExecutionOperator(
146+
task_id="get_execution",
147+
location=LOCATION,
148+
project_id=PROJECT_ID,
149+
workflow_id=WORKFLOW_ID,
150+
execution_id='{{ task_instance.xcom_pull("create_execution", key="execution_id") }}',
151+
)
152+
# [END how_to_get_execution]
153+
154+
# [START how_to_list_executions]
155+
list_executions = WorkflowsListExecutionsOperator(
156+
task_id="list_executions", location=LOCATION, project_id=PROJECT_ID, workflow_id=WORKFLOW_ID
157+
)
158+
# [END how_to_list_executions]
159+
160+
create_workflow_for_cancel = WorkflowsCreateWorkflowOperator(
161+
task_id="create_workflow_for_cancel",
162+
location=LOCATION,
163+
project_id=PROJECT_ID,
164+
workflow=SLEEP_WORKFLOW,
165+
workflow_id=SLEEP_WORKFLOW_ID,
166+
)
167+
168+
create_execution_for_cancel = WorkflowsCreateExecutionOperator(
169+
task_id="create_execution_for_cancel",
170+
location=LOCATION,
171+
project_id=PROJECT_ID,
172+
execution=EXECUTION,
173+
workflow_id=SLEEP_WORKFLOW_ID,
174+
)
175+
176+
# [START how_to_cancel_execution]
177+
cancel_execution = WorkflowsCancelExecutionOperator(
178+
task_id="cancel_execution",
179+
location=LOCATION,
180+
project_id=PROJECT_ID,
181+
workflow_id=SLEEP_WORKFLOW_ID,
182+
execution_id='{{ task_instance.xcom_pull("create_execution_for_cancel", key="execution_id") }}',
183+
)
184+
# [END how_to_cancel_execution]
185+
186+
create_workflow >> update_workflows >> [get_workflow, list_workflows]
187+
update_workflows >> [create_execution, create_execution_for_cancel]
188+
189+
create_execution >> wait_for_execution >> [get_execution, list_executions]
190+
create_workflow_for_cancel >> create_execution_for_cancel >> cancel_execution
191+
192+
[cancel_execution, list_executions] >> delete_workflow
193+
194+
195+
if __name__ == '__main__':
196+
dag.clear(dag_run_state=None)
197+
dag.run()

0 commit comments

Comments
 (0)