1919from tempfile import NamedTemporaryFile
2020from unittest import TestCase , mock
2121
22+ from parameterized import parameterized
23+
24+ from airflow .models import DAG , TaskInstance as TI
2225from airflow .providers .google .marketing_platform .operators .campaign_manager import (
2326 GoogleCampaignManagerBatchInsertConversionsOperator ,
2427 GoogleCampaignManagerBatchUpdateConversionsOperator ,
2730 GoogleCampaignManagerInsertReportOperator ,
2831 GoogleCampaignManagerRunReportOperator ,
2932)
33+ from airflow .utils import timezone
34+ from airflow .utils .session import create_session
3035
3136API_VERSION = "api_version"
3237GCP_CONN_ID = "google_cloud_default"
4651 ],
4752}
4853
54+ DEFAULT_DATE = timezone .datetime (2021 , 1 , 1 )
55+ PROFILE_ID = "profile_id"
56+ REPORT_ID = "report_id"
57+ FILE_ID = "file_id"
58+ BUCKET_NAME = "test_bucket"
59+ REPORT_NAME = "test_report.csv"
60+ TEMP_FILE_NAME = "test"
61+
4962
5063class TestGoogleCampaignManagerDeleteReportOperator (TestCase ):
5164 @mock .patch (
5265 "airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
5366 )
5467 @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator" )
5568 def test_execute (self , mock_base_op , hook_mock ):
56- profile_id = "PROFILE_ID"
57- report_id = "REPORT_ID"
5869 op = GoogleCampaignManagerDeleteReportOperator (
59- profile_id = profile_id ,
60- report_id = report_id ,
70+ profile_id = PROFILE_ID ,
71+ report_id = REPORT_ID ,
6172 api_version = API_VERSION ,
6273 task_id = "test_task" ,
6374 )
@@ -69,11 +80,19 @@ def test_execute(self, mock_base_op, hook_mock):
6980 impersonation_chain = None ,
7081 )
7182 hook_mock .return_value .delete_report .assert_called_once_with (
72- profile_id = profile_id , report_id = report_id
83+ profile_id = PROFILE_ID , report_id = REPORT_ID
7384 )
7485
7586
76- class TestGoogleCampaignManagerGetReportOperator (TestCase ):
87+ class TestGoogleCampaignManagerDownloadReportOperator (TestCase ):
88+ def setUp (self ):
89+ with create_session () as session :
90+ session .query (TI ).delete ()
91+
92+ def tearDown (self ):
93+ with create_session () as session :
94+ session .query (TI ).delete ()
95+
7796 @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.http" )
7897 @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile" )
7998 @mock .patch (
@@ -94,24 +113,17 @@ def test_execute(
94113 tempfile_mock ,
95114 http_mock ,
96115 ):
97- profile_id = "PROFILE_ID"
98- report_id = "REPORT_ID"
99- file_id = "FILE_ID"
100- bucket_name = "test_bucket"
101- report_name = "test_report.csv"
102- temp_file_name = "TEST"
103-
104116 http_mock .MediaIoBaseDownload .return_value .next_chunk .return_value = (
105117 None ,
106118 True ,
107119 )
108- tempfile_mock .NamedTemporaryFile .return_value .__enter__ .return_value .name = temp_file_name
120+ tempfile_mock .NamedTemporaryFile .return_value .__enter__ .return_value .name = TEMP_FILE_NAME
109121 op = GoogleCampaignManagerDownloadReportOperator (
110- profile_id = profile_id ,
111- report_id = report_id ,
112- file_id = file_id ,
113- bucket_name = bucket_name ,
114- report_name = report_name ,
122+ profile_id = PROFILE_ID ,
123+ report_id = REPORT_ID ,
124+ file_id = FILE_ID ,
125+ bucket_name = BUCKET_NAME ,
126+ report_name = REPORT_NAME ,
115127 api_version = API_VERSION ,
116128 task_id = "test_task" ,
117129 )
@@ -123,21 +135,78 @@ def test_execute(
123135 impersonation_chain = None ,
124136 )
125137 hook_mock .return_value .get_report_file .assert_called_once_with (
126- profile_id = profile_id , report_id = report_id , file_id = file_id
138+ profile_id = PROFILE_ID , report_id = REPORT_ID , file_id = FILE_ID
127139 )
128140 gcs_hook_mock .assert_called_once_with (
129141 gcp_conn_id = GCP_CONN_ID ,
130142 delegate_to = None ,
131143 impersonation_chain = None ,
132144 )
133145 gcs_hook_mock .return_value .upload .assert_called_once_with (
134- bucket_name = bucket_name ,
135- object_name = report_name + ".gz" ,
146+ bucket_name = BUCKET_NAME ,
147+ object_name = REPORT_NAME + ".gz" ,
148+ gzip = True ,
149+ filename = TEMP_FILE_NAME ,
150+ mime_type = "text/csv" ,
151+ )
152+ xcom_mock .assert_called_once_with (None , key = "report_name" , value = REPORT_NAME + ".gz" )
153+
154+ @parameterized .expand ([BUCKET_NAME , f"gs://{ BUCKET_NAME } " , "XComArg" , "{{ ti.xcom_pull(task_ids='f') }}" ])
155+ @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.http" )
156+ @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.tempfile" )
157+ @mock .patch (
158+ "airflow.providers.google.marketing_platform.operators.campaign_manager.GoogleCampaignManagerHook"
159+ )
160+ @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.GCSHook" )
161+ def test_set_bucket_name (
162+ self ,
163+ test_bucket_name ,
164+ gcs_hook_mock ,
165+ hook_mock ,
166+ tempfile_mock ,
167+ http_mock ,
168+ ):
169+ http_mock .MediaIoBaseDownload .return_value .next_chunk .return_value = (
170+ None ,
171+ True ,
172+ )
173+ tempfile_mock .NamedTemporaryFile .return_value .__enter__ .return_value .name = TEMP_FILE_NAME
174+
175+ dag = DAG (
176+ dag_id = "test_set_bucket_name" ,
177+ start_date = DEFAULT_DATE ,
178+ schedule_interval = None ,
179+ catchup = False ,
180+ )
181+
182+ if BUCKET_NAME not in test_bucket_name :
183+
184+ @dag .task
185+ def f ():
186+ return BUCKET_NAME
187+
188+ taskflow_op = f ()
189+ taskflow_op .operator .run (start_date = DEFAULT_DATE , end_date = DEFAULT_DATE )
190+
191+ op = GoogleCampaignManagerDownloadReportOperator (
192+ profile_id = PROFILE_ID ,
193+ report_id = REPORT_ID ,
194+ file_id = FILE_ID ,
195+ bucket_name = test_bucket_name if test_bucket_name != "XComArg" else taskflow_op ,
196+ report_name = REPORT_NAME ,
197+ api_version = API_VERSION ,
198+ task_id = "test_task" ,
199+ dag = dag ,
200+ )
201+ op .run (start_date = DEFAULT_DATE , end_date = DEFAULT_DATE )
202+
203+ gcs_hook_mock .return_value .upload .assert_called_once_with (
204+ bucket_name = BUCKET_NAME ,
205+ object_name = REPORT_NAME + ".gz" ,
136206 gzip = True ,
137- filename = temp_file_name ,
207+ filename = TEMP_FILE_NAME ,
138208 mime_type = "text/csv" ,
139209 )
140- xcom_mock .assert_called_once_with (None , key = "report_name" , value = report_name + ".gz" )
141210
142211
143212class TestGoogleCampaignManagerInsertReportOperator (TestCase ):
@@ -150,14 +219,12 @@ class TestGoogleCampaignManagerInsertReportOperator(TestCase):
150219 "campaign_manager.GoogleCampaignManagerInsertReportOperator.xcom_push"
151220 )
152221 def test_execute (self , xcom_mock , mock_base_op , hook_mock ):
153- profile_id = "PROFILE_ID"
154222 report = {"report" : "test" }
155- report_id = "test"
156223
157- hook_mock .return_value .insert_report .return_value = {"id" : report_id }
224+ hook_mock .return_value .insert_report .return_value = {"id" : REPORT_ID }
158225
159226 op = GoogleCampaignManagerInsertReportOperator (
160- profile_id = profile_id ,
227+ profile_id = PROFILE_ID ,
161228 report = report ,
162229 api_version = API_VERSION ,
163230 task_id = "test_task" ,
@@ -169,17 +236,16 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
169236 api_version = API_VERSION ,
170237 impersonation_chain = None ,
171238 )
172- hook_mock .return_value .insert_report .assert_called_once_with (profile_id = profile_id , report = report )
173- xcom_mock .assert_called_once_with (None , key = "report_id" , value = report_id )
239+ hook_mock .return_value .insert_report .assert_called_once_with (profile_id = PROFILE_ID , report = report )
240+ xcom_mock .assert_called_once_with (None , key = "report_id" , value = REPORT_ID )
174241
175242 def test_prepare_template (self ):
176- profile_id = "PROFILE_ID"
177243 report = {"key" : "value" }
178244 with NamedTemporaryFile ("w+" , suffix = ".json" ) as f :
179245 f .write (json .dumps (report ))
180246 f .flush ()
181247 op = GoogleCampaignManagerInsertReportOperator (
182- profile_id = profile_id ,
248+ profile_id = PROFILE_ID ,
183249 report = f .name ,
184250 api_version = API_VERSION ,
185251 task_id = "test_task" ,
@@ -200,16 +266,13 @@ class TestGoogleCampaignManagerRunReportOperator(TestCase):
200266 "campaign_manager.GoogleCampaignManagerRunReportOperator.xcom_push"
201267 )
202268 def test_execute (self , xcom_mock , mock_base_op , hook_mock ):
203- profile_id = "PROFILE_ID"
204- report_id = "REPORT_ID"
205- file_id = "FILE_ID"
206269 synchronous = True
207270
208- hook_mock .return_value .run_report .return_value = {"id" : file_id }
271+ hook_mock .return_value .run_report .return_value = {"id" : FILE_ID }
209272
210273 op = GoogleCampaignManagerRunReportOperator (
211- profile_id = profile_id ,
212- report_id = report_id ,
274+ profile_id = PROFILE_ID ,
275+ report_id = REPORT_ID ,
213276 synchronous = synchronous ,
214277 api_version = API_VERSION ,
215278 task_id = "test_task" ,
@@ -222,9 +285,9 @@ def test_execute(self, xcom_mock, mock_base_op, hook_mock):
222285 impersonation_chain = None ,
223286 )
224287 hook_mock .return_value .run_report .assert_called_once_with (
225- profile_id = profile_id , report_id = report_id , synchronous = synchronous
288+ profile_id = PROFILE_ID , report_id = REPORT_ID , synchronous = synchronous
226289 )
227- xcom_mock .assert_called_once_with (None , key = "file_id" , value = file_id )
290+ xcom_mock .assert_called_once_with (None , key = "file_id" , value = FILE_ID )
228291
229292
230293class TestGoogleCampaignManagerBatchInsertConversionsOperator (TestCase ):
@@ -233,18 +296,17 @@ class TestGoogleCampaignManagerBatchInsertConversionsOperator(TestCase):
233296 )
234297 @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator" )
235298 def test_execute (self , mock_base_op , hook_mock ):
236- profile_id = "PROFILE_ID"
237299 op = GoogleCampaignManagerBatchInsertConversionsOperator (
238300 task_id = "insert_conversion" ,
239- profile_id = profile_id ,
301+ profile_id = PROFILE_ID ,
240302 conversions = [CONVERSION ],
241303 encryption_source = "AD_SERVING" ,
242304 encryption_entity_type = "DCM_ADVERTISER" ,
243305 encryption_entity_id = 123456789 ,
244306 )
245307 op .execute (None )
246308 hook_mock .return_value .conversions_batch_insert .assert_called_once_with (
247- profile_id = profile_id ,
309+ profile_id = PROFILE_ID ,
248310 conversions = [CONVERSION ],
249311 encryption_source = "AD_SERVING" ,
250312 encryption_entity_type = "DCM_ADVERTISER" ,
@@ -259,18 +321,17 @@ class TestGoogleCampaignManagerBatchUpdateConversionOperator(TestCase):
259321 )
260322 @mock .patch ("airflow.providers.google.marketing_platform.operators.campaign_manager.BaseOperator" )
261323 def test_execute (self , mock_base_op , hook_mock ):
262- profile_id = "PROFILE_ID"
263324 op = GoogleCampaignManagerBatchUpdateConversionsOperator (
264325 task_id = "update_conversion" ,
265- profile_id = profile_id ,
326+ profile_id = PROFILE_ID ,
266327 conversions = [CONVERSION ],
267328 encryption_source = "AD_SERVING" ,
268329 encryption_entity_type = "DCM_ADVERTISER" ,
269330 encryption_entity_id = 123456789 ,
270331 )
271332 op .execute (None )
272333 hook_mock .return_value .conversions_batch_update .assert_called_once_with (
273- profile_id = profile_id ,
334+ profile_id = PROFILE_ID ,
274335 conversions = [CONVERSION ],
275336 encryption_source = "AD_SERVING" ,
276337 encryption_entity_type = "DCM_ADVERTISER" ,
0 commit comments