Skip to content

Commit 4e661f5

Browse files
turbaszekmik-laj
authored andcommitted
[AIRFLOW-5379] Add Google Search Ads 360 operators (#6228)
1 parent b439221 commit 4e661f5

File tree

12 files changed

+924
-0
lines changed

12 files changed

+924
-0
lines changed
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
"""
20+
Example Airflow DAG that shows how to use SearchAds.
21+
"""
22+
import os
23+
24+
from airflow import models
25+
from airflow.providers.google.marketing_platform.operators.search_ads import (
26+
GoogleSearchAdsDownloadReportOperator, GoogleSearchAdsInsertReportOperator,
27+
)
28+
from airflow.providers.google.marketing_platform.sensors.search_ads import GoogleSearchAdsReportSensor
29+
from airflow.utils import dates
30+
31+
# [START howto_search_ads_env_variables]
32+
AGENCY_ID = os.environ.get("GMP_AGENCY_ID")
33+
ADVERTISER_ID = os.environ.get("GMP_ADVERTISER_ID")
34+
GCS_BUCKET = os.environ.get("GMP_GCS_BUCKET", "test-cm-bucket")
35+
36+
REPORT = {
37+
"reportScope": {"agencyId": AGENCY_ID, "advertiserId": ADVERTISER_ID},
38+
"reportType": "account",
39+
"columns": [{"columnName": "agency"}, {"columnName": "lastModifiedTimestamp"}],
40+
"includeRemovedEntities": False,
41+
"statisticsCurrency": "usd",
42+
"maxRowsPerFile": 1000000,
43+
"downloadFormat": "csv",
44+
}
45+
# [END howto_search_ads_env_variables]
46+
47+
default_args = {"start_date": dates.days_ago(1)}
48+
49+
with models.DAG(
50+
"example_search_ads",
51+
default_args=default_args,
52+
schedule_interval=None, # Override to match your needs
53+
) as dag:
54+
# [START howto_search_ads_generate_report_operator]
55+
generate_report = GoogleSearchAdsInsertReportOperator(
56+
report=REPORT, task_id="generate_report"
57+
)
58+
# [END howto_search_ads_generate_report_operator]
59+
60+
# [START howto_search_ads_get_report_id]
61+
report_id = "{{ task_instance.xcom_pull('generate_report', key='report_id') }}"
62+
# [END howto_search_ads_get_report_id]
63+
64+
# [START howto_search_ads_get_report_operator]
65+
wait_for_report = GoogleSearchAdsReportSensor(
66+
report_id=report_id, task_id="wait_for_report"
67+
)
68+
# [END howto_search_ads_get_report_operator]
69+
70+
# [START howto_search_ads_getfile_report_operator]
71+
download_report = GoogleSearchAdsDownloadReportOperator(
72+
report_id=report_id, bucket_name=GCS_BUCKET, task_id="download_report"
73+
)
74+
# [END howto_search_ads_getfile_report_operator]
75+
76+
generate_report >> wait_for_report >> download_report
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
"""
20+
This module contains Google Search Ads 360 hook.
21+
"""
22+
from typing import Any, Dict, Optional
23+
24+
from googleapiclient.discovery import build
25+
26+
from airflow.gcp.hooks.base import GoogleCloudBaseHook
27+
28+
29+
class GoogleSearchAdsHook(GoogleCloudBaseHook):
30+
"""
31+
Hook for Google Search Ads 360.
32+
"""
33+
34+
_conn = None # type: Optional[Any]
35+
36+
def __init__(
37+
self,
38+
api_version: str = "v2",
39+
gcp_conn_id: str = "google_cloud_default",
40+
delegate_to: Optional[str] = None,
41+
) -> None:
42+
super().__init__(gcp_conn_id, delegate_to)
43+
self.api_version = api_version
44+
45+
def get_conn(self):
46+
"""
47+
Retrieves connection to Google SearchAds.
48+
"""
49+
if not self._conn:
50+
http_authorized = self._authorize()
51+
self._conn = build(
52+
"doubleclicksearch",
53+
self.api_version,
54+
http=http_authorized,
55+
cache_discovery=False,
56+
)
57+
return self._conn
58+
59+
def insert_report(self, report: Dict[str, Any]) -> Any:
60+
"""
61+
Inserts a report request into the reporting system.
62+
63+
:param report: Report to be generated.
64+
:type report: Dict[str, Any]
65+
"""
66+
response = (
67+
self.get_conn() # pylint: disable=no-member
68+
.reports()
69+
.request(body=report)
70+
.execute(num_retries=self.num_retries)
71+
)
72+
return response
73+
74+
def get(self, report_id: str) -> Any:
75+
"""
76+
Polls for the status of a report request.
77+
78+
:param report_id: ID of the report request being polled.
79+
:type report_id: str
80+
"""
81+
response = (
82+
self.get_conn() # pylint: disable=no-member
83+
.reports()
84+
.get(reportId=report_id)
85+
.execute(num_retries=self.num_retries)
86+
)
87+
return response
88+
89+
def get_file(self, report_fragment: int, report_id: str) -> Any:
90+
"""
91+
Downloads a report file encoded in UTF-8.
92+
93+
:param report_fragment: The index of the report fragment to download.
94+
:type report_fragment: int
95+
:param report_id: ID of the report.
96+
:type report_id: str
97+
"""
98+
response = (
99+
self.get_conn() # pylint: disable=no-member
100+
.reports()
101+
.getFile(reportFragment=report_fragment, reportId=report_id)
102+
.execute(num_retries=self.num_retries)
103+
)
104+
return response
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
"""
20+
This module contains Google Search Ads operators.
21+
"""
22+
from tempfile import NamedTemporaryFile
23+
from typing import Any, Dict, Optional
24+
25+
from airflow import AirflowException
26+
from airflow.gcp.hooks.gcs import GoogleCloudStorageHook
27+
from airflow.models.baseoperator import BaseOperator
28+
from airflow.providers.google.marketing_platform.hooks.search_ads import GoogleSearchAdsHook
29+
from airflow.utils.decorators import apply_defaults
30+
31+
32+
class GoogleSearchAdsInsertReportOperator(BaseOperator):
33+
"""
34+
Inserts a report request into the reporting system.
35+
36+
.. seealso:
37+
For API documentation check:
38+
https://developers.google.com/search-ads/v2/reference/reports/request
39+
40+
.. seealso::
41+
For more information on how to use this operator, take a look at the guide:
42+
:ref:`howto/operator:GoogleSearchAdsInsertReportOperator`
43+
44+
:param report: Report to be generated
45+
:type report: Dict[str, Any]
46+
:param api_version: The version of the api that will be requested for example 'v3'.
47+
:type api_version: str
48+
:param gcp_conn_id: The connection ID to use when fetching connection info.
49+
:type gcp_conn_id: str
50+
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
51+
request must have domain-wide delegation enabled.
52+
:type delegate_to: str
53+
"""
54+
55+
template_fields = ("report",)
56+
template_ext = (".json",)
57+
58+
@apply_defaults
59+
def __init__(
60+
self,
61+
report: Dict[str, Any],
62+
api_version: str = "v2",
63+
gcp_conn_id: str = "google_cloud_default",
64+
delegate_to: Optional[str] = None,
65+
*args,
66+
**kwargs
67+
):
68+
super().__init__(*args, **kwargs)
69+
self.report = report
70+
self.api_version = api_version
71+
self.gcp_conn_id = gcp_conn_id
72+
self.delegate_to = delegate_to
73+
74+
def execute(self, context: Dict):
75+
hook = GoogleSearchAdsHook(
76+
gcp_conn_id=self.gcp_conn_id,
77+
delegate_to=self.delegate_to,
78+
api_version=self.api_version,
79+
)
80+
self.log.info("Generating Search Ads report")
81+
response = hook.insert_report(report=self.report)
82+
report_id = response.get("id")
83+
self.xcom_push(context, key="report_id", value=report_id)
84+
self.log.info("Report generated, id: %s", report_id)
85+
return response
86+
87+
88+
class GoogleSearchAdsDownloadReportOperator(BaseOperator):
89+
"""
90+
Downloads a report to GCS bucket.
91+
92+
.. seealso:
93+
For API documentation check:
94+
https://developers.google.com/search-ads/v2/reference/reports/getFile
95+
96+
.. seealso::
97+
For more information on how to use this operator, take a look at the guide:
98+
:ref:`howto/operator:GoogleSearchAdsGetfileReportOperator`
99+
100+
:param report_id: ID of the report.
101+
:type report_id: str
102+
:param bucket_name: The bucket to upload to.
103+
:type bucket_name: str
104+
:param report_name: The report name to set when uploading the local file. If not provided then
105+
report_id is used.
106+
:type report_name: str
107+
:param gzip: Option to compress local file or file data for upload
108+
:type gzip: bool
109+
:param api_version: The version of the api that will be requested for example 'v3'.
110+
:type api_version: str
111+
:param gcp_conn_id: The connection ID to use when fetching connection info.
112+
:type gcp_conn_id: str
113+
:param delegate_to: The account to impersonate, if any. For this to work, the service accountmaking the
114+
request must have domain-wide delegation enabled.
115+
:type delegate_to: str
116+
"""
117+
118+
template_fields = ("report_name", "report_id", "bucket_name")
119+
120+
@apply_defaults
121+
def __init__(
122+
self,
123+
report_id: str,
124+
bucket_name: str,
125+
report_name: Optional[str] = None,
126+
gzip: bool = True,
127+
chunk_size: int = 10 * 1024 * 1024,
128+
api_version: str = "v2",
129+
gcp_conn_id: str = "google_cloud_default",
130+
delegate_to: Optional[str] = None,
131+
*args,
132+
**kwargs
133+
) -> None:
134+
super().__init__(*args, **kwargs)
135+
self.report_id = report_id
136+
self.api_version = api_version
137+
self.gcp_conn_id = gcp_conn_id
138+
self.delegate_to = delegate_to
139+
self.report_id = report_id
140+
self.chunk_size = chunk_size
141+
self.gzip = gzip
142+
self.bucket_name = self._set_bucket_name(bucket_name)
143+
self.report_name = report_name
144+
145+
def _resolve_file_name(self, name: str) -> str:
146+
csv = ".csv"
147+
gzip = ".gz"
148+
if not name.endswith(csv):
149+
name += csv
150+
if self.gzip:
151+
name += gzip
152+
return name
153+
154+
@staticmethod
155+
def _set_bucket_name(name: str) -> str:
156+
bucket = name if not name.startswith("gs://") else name[5:]
157+
return bucket.strip("/")
158+
159+
@staticmethod
160+
def _handle_report_fragment(fragment: bytes) -> bytes:
161+
fragment_records = fragment.split(b"\n", 1)
162+
if len(fragment_records) > 1:
163+
return fragment_records[1]
164+
return b""
165+
166+
def execute(self, context: Dict):
167+
hook = GoogleSearchAdsHook(
168+
gcp_conn_id=self.gcp_conn_id,
169+
delegate_to=self.delegate_to,
170+
api_version=self.api_version,
171+
)
172+
173+
gcs_hook = GoogleCloudStorageHook(
174+
gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to
175+
)
176+
177+
# Resolve file name of the report
178+
report_name = self.report_name or self.report_id
179+
report_name = self._resolve_file_name(report_name)
180+
181+
response = hook.get(report_id=self.report_id)
182+
if not response['isReportReady']:
183+
raise AirflowException('Report {} is not ready yet'.format(self.report_id))
184+
185+
# Resolve report fragments
186+
fragments_count = len(response["files"])
187+
188+
# Download chunks of report's data
189+
self.log.info("Downloading Search Ads report %s", self.report_id)
190+
with NamedTemporaryFile() as temp_file:
191+
for i in range(fragments_count):
192+
byte_content = hook.get_file(
193+
report_fragment=i, report_id=self.report_id
194+
)
195+
fragment = (
196+
byte_content
197+
if i == 0
198+
else self._handle_report_fragment(byte_content)
199+
)
200+
temp_file.write(fragment)
201+
202+
temp_file.flush()
203+
204+
gcs_hook.upload(
205+
bucket_name=self.bucket_name,
206+
object_name=report_name,
207+
gzip=self.gzip,
208+
filename=temp_file.name,
209+
)
210+
self.xcom_push(context, key="file_name", value=report_name)

0 commit comments

Comments
 (0)