@@ -88,16 +88,19 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
8888 "message" : "Job completed" ,
8989 }
9090 )
91+ return
9192 elif response_from_hook == "pending" :
9293 self .log .info ("Query is still running..." )
9394 self .log .info ("Sleeping for %s seconds." , self .poll_interval )
9495 await asyncio .sleep (self .poll_interval )
9596 else :
9697 yield TriggerEvent ({"status" : "error" , "message" : response_from_hook })
98+ return
9799
98100 except Exception as e :
99101 self .log .exception ("Exception occurred while checking for query completion" )
100102 yield TriggerEvent ({"status" : "error" , "message" : str (e )})
103+ return
101104
102105 def _get_async_hook (self ) -> BigQueryAsyncHook :
103106 return BigQueryAsyncHook (gcp_conn_id = self .conn_id )
@@ -140,6 +143,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
140143 "records" : None ,
141144 }
142145 )
146+ return
143147 else :
144148 # Extract only first record from the query results
145149 first_record = records .pop (0 )
@@ -149,16 +153,19 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
149153 "records" : first_record ,
150154 }
151155 )
156+ return
152157
153158 elif response_from_hook == "pending" :
154159 self .log .info ("Query is still running..." )
155160 self .log .info ("Sleeping for %s seconds." , self .poll_interval )
156161 await asyncio .sleep (self .poll_interval )
157162 else :
158163 yield TriggerEvent ({"status" : "error" , "message" : response_from_hook })
164+ return
159165 except Exception as e :
160166 self .log .exception ("Exception occurred while checking for query completion" )
161167 yield TriggerEvent ({"status" : "error" , "message" : str (e )})
168+ return
162169
163170
164171class BigQueryGetDataTrigger (BigQueryInsertJobTrigger ):
@@ -206,15 +213,18 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
206213 "records" : records ,
207214 }
208215 )
216+ return
209217 elif response_from_hook == "pending" :
210218 self .log .info ("Query is still running..." )
211219 self .log .info ("Sleeping for %s seconds." , self .poll_interval )
212220 await asyncio .sleep (self .poll_interval )
213221 else :
214222 yield TriggerEvent ({"status" : "error" , "message" : response_from_hook })
223+ return
215224 except Exception as e :
216225 self .log .exception ("Exception occurred while checking for query completion" )
217226 yield TriggerEvent ({"status" : "error" , "message" : str (e )})
227+ return
218228
219229
220230class BigQueryIntervalCheckTrigger (BigQueryInsertJobTrigger ):
@@ -345,6 +355,7 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
345355 "second_row_data" : second_job_row ,
346356 }
347357 )
358+ return
348359 elif first_job_response_from_hook == "pending" or second_job_response_from_hook == "pending" :
349360 self .log .info ("Query is still running..." )
350361 self .log .info ("Sleeping for %s seconds." , self .poll_interval )
@@ -353,10 +364,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
353364 yield TriggerEvent (
354365 {"status" : "error" , "message" : second_job_response_from_hook , "data" : None }
355366 )
367+ return
356368
357369 except Exception as e :
358370 self .log .exception ("Exception occurred while checking for query completion" )
359371 yield TriggerEvent ({"status" : "error" , "message" : str (e )})
372+ return
360373
361374
362375class BigQueryValueCheckTrigger (BigQueryInsertJobTrigger ):
@@ -428,16 +441,18 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
428441 records = records .pop (0 ) if records else None
429442 hook .value_check (self .sql , self .pass_value , records , self .tolerance )
430443 yield TriggerEvent ({"status" : "success" , "message" : "Job completed" , "records" : records })
444+ return
431445 elif response_from_hook == "pending" :
432446 self .log .info ("Query is still running..." )
433447 self .log .info ("Sleeping for %s seconds." , self .poll_interval )
434448 await asyncio .sleep (self .poll_interval )
435449 else :
436450 yield TriggerEvent ({"status" : "error" , "message" : response_from_hook , "records" : None })
437-
451+ return
438452 except Exception as e :
439453 self .log .exception ("Exception occurred while checking for query completion" )
440454 yield TriggerEvent ({"status" : "error" , "message" : str (e )})
455+ return
441456
442457
443458class BigQueryTableExistenceTrigger (BaseTrigger ):
@@ -495,10 +510,12 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
495510 )
496511 if response :
497512 yield TriggerEvent ({"status" : "success" , "message" : "success" })
513+ return
498514 await asyncio .sleep (self .poll_interval )
499515 except Exception as e :
500516 self .log .exception ("Exception occurred while checking for Table existence" )
501517 yield TriggerEvent ({"status" : "error" , "message" : str (e )})
518+ return
502519
503520 async def _table_exists (
504521 self , hook : BigQueryTableAsyncHook , dataset : str , table_id : str , project_id : str
@@ -577,9 +594,11 @@ async def run(self) -> AsyncIterator[TriggerEvent]: # type: ignore[override]
577594 "message" : f"Partition: { self .partition_id } in table: { self .table_id } " ,
578595 }
579596 )
597+ return
580598 job_id = None
581599 elif status == "error" :
582600 yield TriggerEvent ({"status" : "error" , "message" : status })
601+ return
583602 self .log .info ("Sleeping for %s seconds." , self .poll_interval )
584603 await asyncio .sleep (self .poll_interval )
585604
0 commit comments