Skip to content

Commit b7a0983

Browse files
authored
docs for DataprocSubmitJobOperator (Presto job) (#32798)
1 parent e934603 commit b7a0983

File tree

2 files changed

+139
-0
lines changed

2 files changed

+139
-0
lines changed

docs/apache-airflow-providers-google/operators/cloud/dataproc.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,18 @@ With this configuration we can create the cluster:
7575
:start-after: [START how_to_cloud_dataproc_create_cluster_operator_in_gke]
7676
:end-before: [END how_to_cloud_dataproc_create_cluster_operator_in_gke]
7777

78+
You can also create Dataproc cluster with optional component Presto.
79+
To do so, please use the following configuration.
80+
Note that default image might not support the chosen optional component.
81+
If this is your case, please specify correct ``image_version`` that you can find in the
82+
`documentation. <https://cloud.google.com/dataproc/docs/concepts/components/overview#available_optional_components>`__
83+
84+
.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py
85+
:language: python
86+
:dedent: 0
87+
:start-after: [START how_to_cloud_dataproc_create_cluster]
88+
:end-before: [END how_to_cloud_dataproc_create_cluster]
89+
7890
You can use deferrable mode for this action in order to run the operator asynchronously:
7991

8092
.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py
@@ -239,6 +251,14 @@ Example of the configuration for a SparkR:
239251
:start-after: [START how_to_cloud_dataproc_sparkr_config]
240252
:end-before: [END how_to_cloud_dataproc_sparkr_config]
241253

254+
Example of the configuration for a Presto Job:
255+
256+
.. exampleinclude:: /../../tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py
257+
:language: python
258+
:dedent: 0
259+
:start-after: [START how_to_cloud_dataproc_presto_config]
260+
:end-before: [END how_to_cloud_dataproc_presto_config]
261+
242262
Working with workflows templates
243263
--------------------------------
244264

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""
19+
Example Airflow DAG for DataprocSubmitJobOperator with presto job.
20+
"""
21+
from __future__ import annotations
22+
23+
import os
24+
from datetime import datetime
25+
26+
from airflow import models
27+
from airflow.providers.google.cloud.operators.dataproc import (
28+
DataprocCreateClusterOperator,
29+
DataprocDeleteClusterOperator,
30+
DataprocSubmitJobOperator,
31+
)
32+
from airflow.utils.trigger_rule import TriggerRule
33+
34+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
35+
DAG_ID = "dataproc_presto"
36+
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT")
37+
38+
CLUSTER_NAME = f"cluster-{ENV_ID}-{DAG_ID}".replace("_", "-")
39+
REGION = "europe-west1"
40+
41+
# Cluster definition
42+
# [START how_to_cloud_dataproc_create_cluster]
43+
CLUSTER_CONFIG = {
44+
"master_config": {
45+
"num_instances": 1,
46+
"machine_type_uri": "n1-standard-4",
47+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
48+
},
49+
"worker_config": {
50+
"num_instances": 2,
51+
"machine_type_uri": "n1-standard-4",
52+
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
53+
},
54+
"software_config": {
55+
"optional_components": [
56+
"PRESTO",
57+
],
58+
"image_version": "2.0",
59+
},
60+
}
61+
# [END how_to_cloud_dataproc_create_cluster]
62+
63+
# Jobs definitions
64+
# [START how_to_cloud_dataproc_presto_config]
65+
PRESTO_JOB = {
66+
"reference": {"project_id": PROJECT_ID},
67+
"placement": {"cluster_name": CLUSTER_NAME},
68+
"presto_job": {"query_list": {"queries": ["SHOW CATALOGS"]}},
69+
}
70+
# [END how_to_cloud_dataproc_presto_config]
71+
72+
73+
with models.DAG(
74+
DAG_ID,
75+
schedule="@once",
76+
start_date=datetime(2021, 1, 1),
77+
catchup=False,
78+
tags=["example", "dataproc", "presto"],
79+
) as dag:
80+
create_cluster = DataprocCreateClusterOperator(
81+
task_id="create_cluster",
82+
project_id=PROJECT_ID,
83+
cluster_config=CLUSTER_CONFIG,
84+
region=REGION,
85+
cluster_name=CLUSTER_NAME,
86+
)
87+
88+
presto_task = DataprocSubmitJobOperator(
89+
task_id="presto_task", job=PRESTO_JOB, region=REGION, project_id=PROJECT_ID
90+
)
91+
92+
delete_cluster = DataprocDeleteClusterOperator(
93+
task_id="delete_cluster",
94+
project_id=PROJECT_ID,
95+
cluster_name=CLUSTER_NAME,
96+
region=REGION,
97+
trigger_rule=TriggerRule.ALL_DONE,
98+
)
99+
100+
(
101+
# TEST SETUP
102+
create_cluster
103+
# TEST BODY
104+
>> presto_task
105+
# TEST TEARDOWN
106+
>> delete_cluster
107+
)
108+
109+
from tests.system.utils.watcher import watcher
110+
111+
# This test needs watcher in order to properly mark success/failure
112+
# when "teardown" task with trigger rule is part of the DAG
113+
list(dag.tasks) >> watcher()
114+
115+
116+
from tests.system.utils import get_test_run # noqa: E402
117+
118+
# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
119+
test_run = get_test_run(dag)

0 commit comments

Comments
 (0)