Skip to content

Commit 29eb68b

Browse files
authored
Create guide for Dataproc Operators (#9037)
* added documentation for dataproc * added more update information for updateMask * Added link to information about cluster config api request * Apply naming convention * Set all dedents from 4 to 0 * Adjust dedent to 4, for operators * removed dataproc guide from test_missing_guides
1 parent a3fc8be commit 29eb68b

File tree

4 files changed

+216
-4
lines changed

4 files changed

+216
-4
lines changed

airflow/providers/google/cloud/example_dags/example_dataproc.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
SPARKR_URI = "gs://{}/{}".format(BUCKET, SPARKR_MAIN)
4343

4444
# Cluster definition
45+
# [START how_to_cloud_dataproc_create_cluster]
46+
4547
CLUSTER = {
4648
"project_id": PROJECT_ID,
4749
"cluster_name": CLUSTER_NAME,
@@ -59,8 +61,10 @@
5961
},
6062
}
6163

64+
# [END how_to_cloud_dataproc_create_cluster]
6265

6366
# Update options
67+
# [START how_to_cloud_dataproc_updatemask_cluster_operator]
6468
CLUSTER_UPDATE = {
6569
"config": {
6670
"worker_config": {"num_instances": 3},
@@ -73,23 +77,28 @@
7377
"config.secondary_worker_config.num_instances",
7478
]
7579
}
80+
# [END how_to_cloud_dataproc_updatemask_cluster_operator]
7681

7782
TIMEOUT = {"seconds": 1 * 24 * 60 * 60}
7883

79-
8084
# Jobs definitions
85+
# [START how_to_cloud_dataproc_pig_config]
8186
PIG_JOB = {
8287
"reference": {"project_id": PROJECT_ID},
8388
"placement": {"cluster_name": CLUSTER_NAME},
8489
"pig_job": {"query_list": {"queries": ["define sin HiveUDF('sin');"]}},
8590
}
91+
# [END how_to_cloud_dataproc_pig_config]
8692

93+
# [START how_to_cloud_dataproc_sparksql_config]
8794
SPARK_SQL_JOB = {
8895
"reference": {"project_id": PROJECT_ID},
8996
"placement": {"cluster_name": CLUSTER_NAME},
9097
"spark_sql_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
9198
}
99+
# [END how_to_cloud_dataproc_sparksql_config]
92100

101+
# [START how_to_cloud_dataproc_spark_config]
93102
SPARK_JOB = {
94103
"reference": {"project_id": PROJECT_ID},
95104
"placement": {"cluster_name": CLUSTER_NAME},
@@ -98,25 +107,33 @@
98107
"main_class": "org.apache.spark.examples.SparkPi",
99108
},
100109
}
110+
# [END how_to_cloud_dataproc_spark_config]
101111

112+
# [START how_to_cloud_dataproc_pyspark_config]
102113
PYSPARK_JOB = {
103114
"reference": {"project_id": PROJECT_ID},
104115
"placement": {"cluster_name": CLUSTER_NAME},
105116
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
106117
}
118+
# [END how_to_cloud_dataproc_pyspark_config]
107119

120+
# [START how_to_cloud_dataproc_sparkr_config]
108121
SPARKR_JOB = {
109122
"reference": {"project_id": PROJECT_ID},
110123
"placement": {"cluster_name": CLUSTER_NAME},
111124
"spark_r_job": {"main_r_file_uri": SPARKR_URI},
112125
}
126+
# [END how_to_cloud_dataproc_sparkr_config]
113127

128+
# [START how_to_cloud_dataproc_hive_config]
114129
HIVE_JOB = {
115130
"reference": {"project_id": PROJECT_ID},
116131
"placement": {"cluster_name": CLUSTER_NAME},
117132
"hive_job": {"query_list": {"queries": ["SHOW DATABASES;"]}},
118133
}
134+
# [END how_to_cloud_dataproc_hive_config]
119135

136+
# [START how_to_cloud_dataproc_hadoop_config]
120137
HADOOP_JOB = {
121138
"reference": {"project_id": PROJECT_ID},
122139
"placement": {"cluster_name": CLUSTER_NAME},
@@ -125,16 +142,20 @@
125142
"args": ["wordcount", "gs://pub/shakespeare/rose.txt", OUTPUT_PATH],
126143
},
127144
}
145+
# [END how_to_cloud_dataproc_hadoop_config]
128146

129147
with models.DAG(
130148
"example_gcp_dataproc",
131149
default_args={"start_date": days_ago(1)},
132150
schedule_interval=None,
133151
) as dag:
152+
# [START how_to_cloud_dataproc_create_cluster_operator]
134153
create_cluster = DataprocCreateClusterOperator(
135154
task_id="create_cluster", project_id=PROJECT_ID, cluster=CLUSTER, region=REGION
136155
)
156+
# [END how_to_cloud_dataproc_create_cluster_operator]
137157

158+
# [START how_to_cloud_dataproc_update_cluster_operator]
138159
scale_cluster = DataprocUpdateClusterOperator(
139160
task_id="scale_cluster",
140161
cluster_name=CLUSTER_NAME,
@@ -144,11 +165,11 @@
144165
project_id=PROJECT_ID,
145166
location=REGION,
146167
)
168+
# [END how_to_cloud_dataproc_update_cluster_operator]
147169

148170
pig_task = DataprocSubmitJobOperator(
149171
task_id="pig_task", job=PIG_JOB, location=REGION, project_id=PROJECT_ID
150172
)
151-
152173
spark_sql_task = DataprocSubmitJobOperator(
153174
task_id="spark_sql_task",
154175
job=SPARK_SQL_JOB,
@@ -160,9 +181,11 @@
160181
task_id="spark_task", job=SPARK_JOB, location=REGION, project_id=PROJECT_ID
161182
)
162183

184+
# [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
163185
pyspark_task = DataprocSubmitJobOperator(
164186
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
165187
)
188+
# [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
166189

167190
sparkr_task = DataprocSubmitJobOperator(
168191
task_id="sparkr_task", job=SPARKR_JOB, location=REGION, project_id=PROJECT_ID
@@ -176,12 +199,14 @@
176199
task_id="hadoop_task", job=HADOOP_JOB, location=REGION, project_id=PROJECT_ID
177200
)
178201

202+
# [START how_to_cloud_dataproc_delete_cluster_operator]
179203
delete_cluster = DataprocDeleteClusterOperator(
180204
task_id="delete_cluster",
181205
project_id=PROJECT_ID,
182206
cluster_name=CLUSTER_NAME,
183207
region=REGION,
184208
)
209+
# [END how_to_cloud_dataproc_delete_cluster_operator]
185210

186211
create_cluster >> scale_cluster
187212
scale_cluster >> hive_task >> delete_cluster
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
or more contributor license agreements. See the NOTICE file
3+
distributed with this work for additional information
4+
regarding copyright ownership. The ASF licenses this file
5+
to you under the Apache License, Version 2.0 (the
6+
"License"); you may not use this file except in compliance
7+
with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
software distributed under the License is distributed on an
13+
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
KIND, either express or implied. See the License for the
15+
specific language governing permissions and limitations
16+
under the License.
17+
18+
Google Cloud Dataproc Operators
19+
===============================
20+
21+
Dataproc is a managed Apache Spark and Apache Hadoop service that lets you
22+
take advantage of open source data tools for batch processing, querying, streaming and machine learning.
23+
Dataproc automation helps you create clusters quickly, manage them easily, and
24+
save money by turning clusters off when you don't need them.
25+
26+
For more information about the service visit `Dataproc production documentation <Product documentation <https://cloud.google.com/dataproc/docs/reference>`__
27+
28+
.. contents::
29+
:depth: 1
30+
:local:
31+
32+
Prerequisite Tasks
33+
------------------
34+
35+
.. include:: _partials/prerequisite_tasks.rst
36+
37+
38+
.. _howto/operator:DataprocCreateClusterOperator:
39+
40+
Create a Cluster
41+
----------------
42+
43+
Before you create a dataproc cluster you need to define the cluster.
44+
It describes the identifying information, config, and status of a cluster of Compute Engine instances.
45+
For more information about the available fields to pass when creating a cluster, visit `Dataproc create cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters#Cluster>`__
46+
47+
A cluster configuration can look as followed:
48+
49+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
50+
:language: python
51+
:dedent: 0
52+
:start-after: [START how_to_cloud_dataproc_create_cluster]
53+
:end-before: [END how_to_cloud_dataproc_create_cluster]
54+
55+
With this configuration we can create the cluster:
56+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateClusterOperator`
57+
58+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
59+
:language: python
60+
:dedent: 4
61+
:start-after: [START how_to_cloud_dataproc_create_cluster_operator]
62+
:end-before: [END how_to_cloud_dataproc_create_cluster_operator]
63+
64+
Update a cluster
65+
----------------
66+
You can scale the cluster up or down by providing a cluster config and a updateMask.
67+
In the updateMask argument you specifies the path, relative to Cluster, of the field to update.
68+
For more information on updateMask and other parameters take a look at `Dataproc update cluster API. <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.clusters/patch>`__
69+
70+
An example of a new cluster config and the updateMask:
71+
72+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
73+
:language: python
74+
:dedent: 0
75+
:start-after: [START how_to_cloud_dataproc_updatemask_cluster_operator]
76+
:end-before: [END how_to_cloud_dataproc_updatemask_cluster_operator]
77+
78+
To update a cluster you can use:
79+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocUpdateClusterOperator`
80+
81+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
82+
:language: python
83+
:dedent: 4
84+
:start-after: [START how_to_cloud_dataproc_update_cluster_operator]
85+
:end-before: [END how_to_cloud_dataproc_update_cluster_operator]
86+
87+
Deleting a cluster
88+
------------------
89+
90+
To delete a cluster you can use:
91+
92+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocDeleteClusterOperator`.
93+
94+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
95+
:language: python
96+
:dedent: 4
97+
:start-after: [START how_to_cloud_dataproc_delete_cluster_operator]
98+
:end-before: [END how_to_cloud_dataproc_delete_cluster_operator]
99+
100+
Submit a job to a cluster
101+
-------------------------
102+
103+
Dataproc supports submitting jobs of different big data components.
104+
The list currently includes Spark, Hadoop, Pig and Hive.
105+
For more information on versions and images take a look at `Cloud Dataproc Image version list <https://cloud.google.com/dataproc/docs/concepts/versioning/dataproc-versions>`__
106+
107+
To submit a job to the cluster you need a provide a job source file. The job source file can be on GCS, the cluster or on your local
108+
file system. You can specify a file:/// path to refer to a local file on a cluster's master node.
109+
110+
The job configuration can be submitted by using:
111+
:class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`.
112+
113+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
114+
:language: python
115+
:dedent: 4
116+
:start-after: [START how_to_cloud_dataproc_submit_job_to_cluster_operator]
117+
:end-before: [END how_to_cloud_dataproc_submit_job_to_cluster_operator]
118+
119+
Examples of job configurations to submit
120+
----------------------------------------
121+
122+
We have provided an example for every framework below.
123+
There are more arguments to provide in the jobs than the examples show. For the complete list of arguments take a look at
124+
`DataProc Job arguments <https://cloud.google.com/dataproc/docs/reference/rest/v1/projects.regions.jobs>`__
125+
126+
Example of the configuration for a PySpark Job:
127+
128+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
129+
:language: python
130+
:dedent: 0
131+
:start-after: [START how_to_cloud_dataproc_pyspark_config]
132+
:end-before: [END how_to_cloud_dataproc_pyspark_config]
133+
134+
Example of the configuration for a SparkSQl Job:
135+
136+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
137+
:language: python
138+
:dedent: 0
139+
:start-after: [START how_to_cloud_dataproc_sparksql_config]
140+
:end-before: [END how_to_cloud_dataproc_sparksql_config]
141+
142+
Example of the configuration for a Spark Job:
143+
144+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
145+
:language: python
146+
:dedent: 0
147+
:start-after: [START how_to_cloud_dataproc_spark_config]
148+
:end-before: [END how_to_cloud_dataproc_spark_config]
149+
150+
Example of the configuration for a Hive Job:
151+
152+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
153+
:language: python
154+
:dedent: 0
155+
:start-after: [START how_to_cloud_dataproc_hive_config]
156+
:end-before: [END how_to_cloud_dataproc_hive_config]
157+
158+
Example of the configuration for a Hadoop Job:
159+
160+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
161+
:language: python
162+
:dedent: 0
163+
:start-after: [START how_to_cloud_dataproc_hadoop_config]
164+
:end-before: [END how_to_cloud_dataproc_hadoop_config]
165+
166+
Example of the configuration for a Pig Job:
167+
168+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
169+
:language: python
170+
:dedent: 0
171+
:start-after: [START how_to_cloud_dataproc_pig_config]
172+
:end-before: [END how_to_cloud_dataproc_pig_config]
173+
174+
175+
Example of the configuration for a SparkR:
176+
177+
.. exampleinclude:: ../../../../airflow/providers/google/cloud/example_dags/example_dataproc.py
178+
:language: python
179+
:dedent: 0
180+
:start-after: [START how_to_cloud_dataproc_sparkr_config]
181+
:end-before: [END how_to_cloud_dataproc_sparkr_config]
182+
183+
References
184+
^^^^^^^^^^
185+
For further information, take a look at:
186+
187+
* `DataProc API documentation <https://cloud.google.com/dataproc/docs/reference>`__
188+
* `Product documentation <https://cloud.google.com/dataproc/docs/reference>`__

docs/operators-and-hooks-ref.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ These integrations allow you to perform various operations within the Google Clo
697697
-
698698

699699
* - `Dataproc <https://cloud.google.com/dataproc/>`__
700-
-
700+
- :doc:`How to use <howto/operator/gcp/dataproc>`
701701
- :mod:`airflow.providers.google.cloud.hooks.dataproc`
702702
- :mod:`airflow.providers.google.cloud.operators.dataproc`
703703
-

tests/test_project_structure.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase):
144144
'bigquery_to_mysql',
145145
'cassandra_to_gcs',
146146
'dataflow',
147-
'dataproc',
148147
'datastore',
149148
'dlp',
150149
'gcs_to_bigquery',

0 commit comments

Comments
 (0)