Skip to content

Commit 01f9942

Browse files
authored
Add download/upload operators for GCS and Google Sheets (#7866)
* Add download/upload operators for GCS and Google Sheets * fixup! Add download/upload operators for GCS and Google Sheets * fixup! fixup! Add download/upload operators for GCS and Google Sheets
1 parent 29423b6 commit 01f9942

File tree

9 files changed

+747
-3
lines changed

9 files changed

+747
-3
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
import os
20+
21+
from airflow import models
22+
from airflow.operators.bash import BashOperator
23+
from airflow.providers.google.suite.operators.sheets import (
24+
GCStoGoogleSheets, GoogleSheetsCreateSpreadsheet, GoogleSheetsToGCSOperator,
25+
)
26+
from airflow.utils.dates import days_ago
27+
28+
GCS_BUCKET = os.environ.get("SHEETS_GCS_BUCKET", "test28397ye")
29+
SPREADSHEET_ID = os.environ.get("SPREADSHEET_ID", "1234567890qwerty")
30+
NEW_SPREADSHEET_ID = os.environ.get("NEW_SPREADSHEET_ID", "1234567890qwerty")
31+
32+
SPREADSHEET = {
33+
"properties": {"title": "Test1"},
34+
"sheets": [{"properties": {"title": "Sheet1"}}],
35+
}
36+
37+
default_args = {"start_date": days_ago(1)}
38+
39+
with models.DAG(
40+
"example_sheets_gcs",
41+
default_args=default_args,
42+
schedule_interval=None, # Override to match your needs
43+
tags=["example"],
44+
) as dag:
45+
# [START upload_sheet_to_gcs]
46+
upload_sheet_to_gcs = GoogleSheetsToGCSOperator(
47+
task_id="upload_sheet_to_gcs",
48+
destination_bucket=GCS_BUCKET,
49+
spreadsheet_id=SPREADSHEET_ID,
50+
)
51+
# [END upload_sheet_to_gcs]
52+
53+
# [START create_spreadsheet]
54+
create_spreadsheet = GoogleSheetsCreateSpreadsheet(
55+
task_id="create_spreadsheet", spreadsheet=SPREADSHEET
56+
)
57+
# [END create_spreadsheet]
58+
59+
# [START print_spreadsheet_url]
60+
print_spreadsheet_url = BashOperator(
61+
task_id="print_spreadsheet_url",
62+
bash_command="echo {{ task_instance.xcom_pull('create_spreadsheet', key='spreadsheet_url') }}",
63+
)
64+
# [END print_spreadsheet_url]
65+
66+
# [START upload_gcs_to_sheet]
67+
upload_gcs_to_sheet = GCStoGoogleSheets(
68+
task_id="upload_gcs_to_sheet",
69+
bucket_name=GCS_BUCKET,
70+
object_name="{{ task_instance.xcom_pull('upload_sheet_to_gcs')[0] }}",
71+
spreadsheet_id=NEW_SPREADSHEET_ID,
72+
)
73+
# [END upload_gcs_to_sheet]
74+
75+
create_spreadsheet >> print_spreadsheet_url
76+
upload_sheet_to_gcs >> upload_gcs_to_sheet

airflow/providers/google/suite/hooks/sheets.py

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ def update_values(
162162
Updates values from Google Sheet from a single range
163163
https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets.values/update
164164
165-
:param spreadsheet_id: The Google Sheet ID to interact with
165+
:param spreadsheet_id: The Google Sheet ID to interact with.
166166
:type spreadsheet_id: str
167167
:param range_: The A1 notation of the values to retrieve.
168168
:type range_: str
@@ -375,3 +375,60 @@ def batch_clear(self, spreadsheet_id: str, ranges: List) -> Dict:
375375
).execute(num_retries=self.num_retries)
376376

377377
return response
378+
379+
def get_spreadsheet(self, spreadsheet_id: str):
380+
"""
381+
Retrieves spreadsheet matching the given id.
382+
383+
:param spreadsheet_id: The spreadsheet id.
384+
:type spreadsheet_id: str
385+
:return: An spreadsheet that matches the sheet filter.
386+
"""
387+
response = (
388+
self.get_conn() # pylint: disable=no-member
389+
.spreadsheets()
390+
.get(spreadsheetId=spreadsheet_id)
391+
.execute(num_retries=self.num_retries)
392+
)
393+
return response
394+
395+
def get_sheet_titles(self, spreadsheet_id: str, sheet_filter: Optional[List[str]] = None):
396+
"""
397+
Retrieves the sheet titles from a spreadsheet matching the given id and sheet filter.
398+
399+
:param spreadsheet_id: The spreadsheet id.
400+
:type spreadsheet_id: str
401+
:param sheet_filter: List of sheet title to retrieve from sheet.
402+
:type sheet_filter: List[str]
403+
:return: An list of sheet titles from the specified sheet that match
404+
the sheet filter.
405+
"""
406+
response = self.get_spreadsheet(spreadsheet_id=spreadsheet_id)
407+
408+
if sheet_filter:
409+
titles = [
410+
sh['properties']['title'] for sh in response['sheets']
411+
if sh['properties']['title'] in sheet_filter
412+
]
413+
else:
414+
titles = [sh['properties']['title'] for sh in response['sheets']]
415+
return titles
416+
417+
def create_spreadsheet(self, spreadsheet: Dict[str, Any]) -> Dict[str, Any]:
418+
"""
419+
Creates a spreadsheet, returning the newly created spreadsheet.
420+
421+
:param spreadsheet: an instance of Spreadsheet
422+
https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets#Spreadsheet
423+
:type spreadsheet: Dict[str, Any]
424+
:return: An spreadsheet object.
425+
"""
426+
self.log.info("Creating spreadsheet: %s", spreadsheet['properties']['title'])
427+
response = (
428+
self.get_conn() # pylint: disable=no-member
429+
.spreadsheets()
430+
.create(body=spreadsheet)
431+
.execute(num_retries=self.num_retries)
432+
)
433+
self.log.info("Spreadsheet: %s created", spreadsheet['properties']['title'])
434+
return response
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
import csv
19+
from tempfile import NamedTemporaryFile
20+
from typing import Any, Dict, List, Optional
21+
22+
from airflow.models import BaseOperator
23+
from airflow.providers.google.cloud.hooks.gcs import GCSHook
24+
from airflow.providers.google.suite.hooks.sheets import GSheetsHook
25+
26+
27+
class GoogleSheetsCreateSpreadsheet(BaseOperator):
28+
"""
29+
Creates a new spreadsheet.
30+
31+
.. seealso::
32+
For more information on how to use this operator, take a look at the guide:
33+
:ref:`howto/operator:GoogleSheetsCreateSpreadsheet`
34+
35+
:param spreadsheet: an instance of Spreadsheet
36+
https://developers.google.com/sheets/api/reference/rest/v4/spreadsheets#Spreadsheet
37+
:type spreadsheet: Dict[str, Any]
38+
:param gcp_conn_id: The connection ID to use when fetching connection info.
39+
:type gcp_conn_id: str
40+
:param delegate_to: The account to impersonate, if any.
41+
:type delegate_to: str
42+
"""
43+
44+
template_fields = ["spreadsheet"]
45+
46+
def __init__(
47+
self,
48+
spreadsheet: Dict[str, Any],
49+
gcp_conn_id: str = "google_cloud_default",
50+
delegate_to: Optional[str] = None,
51+
*args,
52+
**kwargs,
53+
) -> None:
54+
super().__init__(*args, **kwargs)
55+
self.gcp_conn_id = gcp_conn_id
56+
self.spreadsheet = spreadsheet
57+
self.delegate_to = delegate_to
58+
59+
def execute(self, context: Any):
60+
hook = GSheetsHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
61+
spreadsheet = hook.create_spreadsheet(spreadsheet=self.spreadsheet)
62+
self.xcom_push(context, "spreadsheet_id", spreadsheet["spreadsheetId"])
63+
self.xcom_push(context, "spreadsheet_url", spreadsheet["spreadsheetUrl"])
64+
return spreadsheet
65+
66+
67+
class GoogleSheetsToGCSOperator(BaseOperator):
68+
"""
69+
Writes Google Sheet data into Google Cloud Storage.
70+
71+
.. seealso::
72+
For more information on how to use this operator, take a look at the guide:
73+
:ref:`howto/operator:GoogleSheetsToGCSOperator`
74+
75+
:param spreadsheet_id: The Google Sheet ID to interact with.
76+
:type spreadsheet_id: str
77+
:param sheet_filter: Default to None, if provided, Should be an array of the sheet
78+
titles to pull from.
79+
:type sheet_filter: List[str]
80+
:param destination_bucket: The destination Google cloud storage bucket where the
81+
report should be written to. (templated)
82+
:param destination_bucket: str
83+
:param destination_path: The Google cloud storage URI array for the object created by the operator.
84+
For example: ``path/to/my/files``.
85+
:type destination_path: str
86+
:param gcp_conn_id: The connection ID to use when fetching connection info.
87+
:type gcp_conn_id: str
88+
:param delegate_to: The account to impersonate, if any.
89+
:type delegate_to: str
90+
"""
91+
92+
template_fields = ["spreadsheet_id", "destination_bucket", "destination_path", "sheet_filter"]
93+
94+
def __init__(
95+
self,
96+
spreadsheet_id: str,
97+
destination_bucket: str,
98+
sheet_filter: Optional[List[str]] = None,
99+
destination_path: Optional[str] = None,
100+
gcp_conn_id: str = "google_cloud_default",
101+
delegate_to: Optional[str] = None,
102+
*args,
103+
**kwargs,
104+
) -> None:
105+
super().__init__(*args, **kwargs)
106+
self.gcp_conn_id = gcp_conn_id
107+
self.spreadsheet_id = spreadsheet_id
108+
self.sheet_filter = sheet_filter
109+
self.destination_bucket = destination_bucket
110+
self.destination_path = destination_path
111+
self.delegate_to = delegate_to
112+
113+
def _upload_data(
114+
self,
115+
gcs_hook: GCSHook,
116+
hook: GSheetsHook,
117+
sheet_range: str,
118+
sheet_values: List[Any],
119+
) -> str:
120+
# Construct destination file path
121+
sheet = hook.get_spreadsheet(self.spreadsheet_id)
122+
file_name = f"{sheet['properties']['title']}_{sheet_range}.csv".replace(
123+
" ", "_"
124+
)
125+
dest_file_name = (
126+
f"{self.destination_path.strip('/')}/{file_name}"
127+
if self.destination_path
128+
else file_name
129+
)
130+
131+
with NamedTemporaryFile("w+") as temp_file:
132+
# Write data
133+
writer = csv.writer(temp_file)
134+
writer.writerows(sheet_values)
135+
temp_file.flush()
136+
137+
# Upload to GCS
138+
gcs_hook.upload(
139+
bucket_name=self.destination_bucket,
140+
object_name=dest_file_name,
141+
filename=temp_file.name,
142+
)
143+
return dest_file_name
144+
145+
def execute(self, context):
146+
sheet_hook = GSheetsHook(
147+
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
148+
)
149+
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
150+
151+
# Pull data and upload
152+
destination_array: List[str] = []
153+
sheet_titles = sheet_hook.get_sheet_titles(
154+
spreadsheet_id=self.spreadsheet_id, sheet_filter=self.sheet_filter
155+
)
156+
for sheet_range in sheet_titles:
157+
data = sheet_hook.get_values(
158+
spreadsheet_id=self.spreadsheet_id, range_=sheet_range
159+
)
160+
gcs_path_to_file = self._upload_data(
161+
gcs_hook, sheet_hook, sheet_range, data
162+
)
163+
destination_array.append(gcs_path_to_file)
164+
165+
self.xcom_push(context, "destination_objects", destination_array)
166+
return destination_array
167+
168+
169+
class GCStoGoogleSheets(BaseOperator):
170+
"""
171+
Uploads .csv file from Google Cloud Storage to provided Google Spreadsheet.
172+
173+
.. seealso::
174+
For more information on how to use this operator, take a look at the guide:
175+
:ref:`howto/operator:GCStoGoogleSheets`
176+
177+
:param spreadsheet_id: The Google Sheet ID to interact with.
178+
:type spreadsheet_id: str
179+
:param bucket_name: Name of GCS bucket.:
180+
:type bucket_name: str
181+
:param object_name: Path to the .csv file on the GCS bucket.
182+
:type object_name: str
183+
:param spreadsheet_range: The A1 notation of the values to retrieve.
184+
:type spreadsheet_range: str
185+
:param gcp_conn_id: The connection ID to use when fetching connection info.
186+
:type gcp_conn_id: str
187+
:param delegate_to: The account to impersonate, if any.
188+
:type delegate_to: str
189+
"""
190+
191+
template_fields = [
192+
"spreadsheet_id",
193+
"bucket_name",
194+
"object_name",
195+
"spreadsheet_range",
196+
]
197+
198+
def __init__(
199+
self,
200+
spreadsheet_id: str,
201+
bucket_name: str,
202+
object_name: Optional[str] = None,
203+
spreadsheet_range: str = "Sheet1",
204+
gcp_conn_id: str = "google_cloud_default",
205+
delegate_to: Optional[str] = None,
206+
*args,
207+
**kwargs,
208+
) -> None:
209+
super().__init__(*args, **kwargs)
210+
211+
self.gcp_conn_id = gcp_conn_id
212+
self.spreadsheet_id = spreadsheet_id
213+
self.spreadsheet_range = spreadsheet_range
214+
self.bucket_name = bucket_name
215+
self.object_name = object_name
216+
self.delegate_to = delegate_to
217+
218+
def execute(self, context: Any):
219+
sheet_hook = GSheetsHook(
220+
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
221+
)
222+
gcs_hook = GCSHook(gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to)
223+
with NamedTemporaryFile("w+") as temp_file:
224+
# Download data
225+
gcs_hook.download(
226+
bucket_name=self.bucket_name,
227+
object_name=self.object_name,
228+
filename=temp_file.name,
229+
)
230+
231+
# Upload data
232+
values = list(csv.reader(temp_file))
233+
sheet_hook.update_values(
234+
spreadsheet_id=self.spreadsheet_id,
235+
range_=self.spreadsheet_range,
236+
values=values,
237+
)

0 commit comments

Comments
 (0)