@@ -35,10 +35,21 @@ def _create_list_response(messages, token):
3535 return mock .MagicMock (pages = (n for n in [page ]), next_page_token = token )
3636
3737
38+ def _remove_stackdriver_handlers ():
39+ for handler_ref in reversed (logging ._handlerList [:]):
40+ handler = handler_ref ()
41+ if not isinstance (handler , StackdriverTaskHandler ):
42+ continue
43+ logging ._removeHandlerRef (handler_ref )
44+ del handler
45+
46+
3847class TestStackdriverLoggingHandlerStandalone (unittest .TestCase ):
3948 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
4049 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client' )
4150 def test_should_pass_message_to_client (self , mock_client , mock_get_creds_and_project_id ):
51+ self .addCleanup (_remove_stackdriver_handlers )
52+
4253 mock_get_creds_and_project_id .return_value = ('creds' , 'project_id' )
4354
4455 transport_type = mock .MagicMock ()
@@ -69,6 +80,7 @@ def setUp(self) -> None:
6980 self .ti .try_number = 1
7081 self .ti .state = State .RUNNING
7182 self .addCleanup (self .dag .clear )
83+ self .addCleanup (_remove_stackdriver_handlers )
7284
7385 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
7486 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.gcp_logging.Client' )
@@ -128,14 +140,18 @@ def test_should_read_logs_for_all_try(self, mock_client, mock_get_creds_and_proj
128140
129141 logs , metadata = self .stackdriver_task_handler .read (self .ti )
130142 mock_client .return_value .list_entries .assert_called_once_with (
131- filter_ = 'resource.type="global"\n '
132- 'logName="projects/asf-project/logs/airflow"\n '
133- 'labels.task_id="task_for_testing_file_log_handler"\n '
134- 'labels.dag_id="dag_for_testing_file_task_handler"\n '
135- 'labels.execution_date="2016-01-01T00:00:00+00:00"' ,
143+ filter_ = (
144+ 'resource.type="global"\n '
145+ 'logName="projects/asf-project/logs/airflow"\n '
146+ 'labels.task_id="task_for_testing_file_log_handler"\n '
147+ 'labels.dag_id="dag_for_testing_file_task_handler"\n '
148+ 'labels.execution_date="2016-01-01T00:00:00+00:00"'
149+ ),
150+ order_by = 'timestamp asc' ,
151+ page_size = 1000 ,
136152 page_token = None ,
137153 )
138- assert [' MSG1\n MSG2' ] == logs
154+ assert [(( 'default-hostname' , ' MSG1\n MSG2'),) ] == logs
139155 assert [{'end_of_log' : True }] == metadata
140156
141157 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
@@ -149,14 +165,18 @@ def test_should_read_logs_for_task_with_quote(self, mock_client, mock_get_creds_
149165 self .ti .task_id = "K\" OT"
150166 logs , metadata = self .stackdriver_task_handler .read (self .ti )
151167 mock_client .return_value .list_entries .assert_called_once_with (
152- filter_ = 'resource.type="global"\n '
153- 'logName="projects/asf-project/logs/airflow"\n '
154- 'labels.task_id="K\\ "OT"\n '
155- 'labels.dag_id="dag_for_testing_file_task_handler"\n '
156- 'labels.execution_date="2016-01-01T00:00:00+00:00"' ,
168+ filter_ = (
169+ 'resource.type="global"\n '
170+ 'logName="projects/asf-project/logs/airflow"\n '
171+ 'labels.task_id="K\\ "OT"\n '
172+ 'labels.dag_id="dag_for_testing_file_task_handler"\n '
173+ 'labels.execution_date="2016-01-01T00:00:00+00:00"'
174+ ),
175+ order_by = 'timestamp asc' ,
176+ page_size = 1000 ,
157177 page_token = None ,
158178 )
159- assert [' MSG1\n MSG2' ] == logs
179+ assert [(( 'default-hostname' , ' MSG1\n MSG2'),) ] == logs
160180 assert [{'end_of_log' : True }] == metadata
161181
162182 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
@@ -170,15 +190,19 @@ def test_should_read_logs_for_single_try(self, mock_client, mock_get_creds_and_p
170190
171191 logs , metadata = self .stackdriver_task_handler .read (self .ti , 3 )
172192 mock_client .return_value .list_entries .assert_called_once_with (
173- filter_ = 'resource.type="global"\n '
174- 'logName="projects/asf-project/logs/airflow"\n '
175- 'labels.task_id="task_for_testing_file_log_handler"\n '
176- 'labels.dag_id="dag_for_testing_file_task_handler"\n '
177- 'labels.execution_date="2016-01-01T00:00:00+00:00"\n '
178- 'labels.try_number="3"' ,
193+ filter_ = (
194+ 'resource.type="global"\n '
195+ 'logName="projects/asf-project/logs/airflow"\n '
196+ 'labels.task_id="task_for_testing_file_log_handler"\n '
197+ 'labels.dag_id="dag_for_testing_file_task_handler"\n '
198+ 'labels.execution_date="2016-01-01T00:00:00+00:00"\n '
199+ 'labels.try_number="3"'
200+ ),
201+ order_by = 'timestamp asc' ,
202+ page_size = 1000 ,
179203 page_token = None ,
180204 )
181- assert [' MSG1\n MSG2' ] == logs
205+ assert [(( 'default-hostname' , ' MSG1\n MSG2'),) ] == logs
182206 assert [{'end_of_log' : True }] == metadata
183207
184208 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
@@ -190,14 +214,18 @@ def test_should_read_logs_with_pagination(self, mock_client, mock_get_creds_and_
190214 ]
191215 mock_get_creds_and_project_id .return_value = ('creds' , 'project_id' )
192216 logs , metadata1 = self .stackdriver_task_handler .read (self .ti , 3 )
193- mock_client .return_value .list_entries .assert_called_once_with (filter_ = mock .ANY , page_token = None )
194- assert ['MSG1\n MSG2' ] == logs
217+ mock_client .return_value .list_entries .assert_called_once_with (
218+ filter_ = mock .ANY , order_by = 'timestamp asc' , page_size = 1000 , page_token = None
219+ )
220+ assert [(('default-hostname' , 'MSG1\n MSG2' ),)] == logs
195221 assert [{'end_of_log' : False , 'next_page_token' : 'TOKEN1' }] == metadata1
196222
197223 mock_client .return_value .list_entries .return_value .next_page_token = None
198224 logs , metadata2 = self .stackdriver_task_handler .read (self .ti , 3 , metadata1 [0 ])
199- mock_client .return_value .list_entries .assert_called_with (filter_ = mock .ANY , page_token = "TOKEN1" )
200- assert ['MSG3\n MSG4' ] == logs
225+ mock_client .return_value .list_entries .assert_called_with (
226+ filter_ = mock .ANY , order_by = 'timestamp asc' , page_size = 1000 , page_token = "TOKEN1"
227+ )
228+ assert [(('default-hostname' , 'MSG3\n MSG4' ),)] == logs
201229 assert [{'end_of_log' : True }] == metadata2
202230
203231 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
@@ -211,7 +239,7 @@ def test_should_read_logs_with_download(self, mock_client, mock_get_creds_and_pr
211239
212240 logs , metadata1 = self .stackdriver_task_handler .read (self .ti , 3 , {'download_logs' : True })
213241
214- assert [' MSG1\n MSG2\n MSG3\n MSG4' ] == logs
242+ assert [(( 'default-hostname' , ' MSG1\n MSG2\n MSG3\n MSG4'),) ] == logs
215243 assert [{'end_of_log' : True }] == metadata1
216244
217245 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
@@ -240,17 +268,21 @@ def test_should_read_logs_with_custom_resources(self, mock_client, mock_get_cred
240268
241269 logs , metadata = self .stackdriver_task_handler .read (self .ti )
242270 mock_client .return_value .list_entries .assert_called_once_with (
243- filter_ = 'resource.type="cloud_composer_environment"\n '
244- 'logName="projects/asf-project/logs/airflow"\n '
245- 'resource.labels."environment.name"="test-instancce"\n '
246- 'resource.labels.location="europpe-west-3"\n '
247- 'resource.labels.project_id="asf-project"\n '
248- 'labels.task_id="task_for_testing_file_log_handler"\n '
249- 'labels.dag_id="dag_for_testing_file_task_handler"\n '
250- 'labels.execution_date="2016-01-01T00:00:00+00:00"' ,
271+ filter_ = (
272+ 'resource.type="cloud_composer_environment"\n '
273+ 'logName="projects/asf-project/logs/airflow"\n '
274+ 'resource.labels."environment.name"="test-instancce"\n '
275+ 'resource.labels.location="europpe-west-3"\n '
276+ 'resource.labels.project_id="asf-project"\n '
277+ 'labels.task_id="task_for_testing_file_log_handler"\n '
278+ 'labels.dag_id="dag_for_testing_file_task_handler"\n '
279+ 'labels.execution_date="2016-01-01T00:00:00+00:00"'
280+ ),
281+ order_by = 'timestamp asc' ,
282+ page_size = 1000 ,
251283 page_token = None ,
252284 )
253- assert [' TEXT\n TEXT' ] == logs
285+ assert [(( 'default-hostname' , ' TEXT\n TEXT'),) ] == logs
254286 assert [{'end_of_log' : True }] == metadata
255287
256288 @mock .patch ('airflow.providers.google.cloud.log.stackdriver_task_handler.get_credentials_and_project_id' )
0 commit comments