-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Remove return statement after yield from triggers class #31703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We have couple of trigger class where we yield as well return the return statement should not require once we yield
5b36c07 to
82d8b69
Compare
|
I am not an expert in async methods in Python, but as far as I know, the method continues execution when it yields something (it's a generator). In our case, the trigger is interrupted by Airflow when Airflow calls the This is just a personal hypothesis that could potentially explain the issue #31795, which I believe might be resolved by merging #31803. However, if this is indeed the case, it is important to note that some return statements are necessary to stop the trigger immediately after yielding a WDYT? |
|
Thank you @hussein-awala for the pointer. I tried to do some quick testing but did not see the behaviour which you are pointing. But, I'll not rule out that run method might yield more event than desire after looking at the triggerer job. Currently, we call run method airflow/airflow/jobs/triggerer_job_runner.py Line 607 in 2b2b0a7
then use the event I was expecting that when we call run method from triggerer job it should stop the iteration after first yield but it will might not. So, yes what you are pointing might be the case. But what will change if we add return statement after yield it would yield => suspend => return and return will help us stoping the loop But do we need to suspend once we yield first event, not sure if I'm missing something here but I would expect run method would emit only one event and we should not need to suspend once it yield i.e we should stop the loop? Also, if we add return or break to exit from run method loop we need it many places since we have lots of async operator and corresponding trigger run method. I would like to propose to change airflow/airflow/jobs/triggerer_job_runner.py Line 607 in 2b2b0a7
to wdyt? |
|
Maybe there is (or will be) some cases where we need to yield multiple events, so IMO we should keep it like that. But for now, and after checking the block you mentioned: airflow/airflow/jobs/triggerer_job_runner.py Lines 607 to 610 in 2b2b0a7
it seems like we wait the end of the run method. But if you check the code of this method in async def run(self):
async with self.hook:
while True:
run_state = await self.hook.a_get_run_state(self.run_id)
if run_state.is_terminal:
yield TriggerEvent(
{
"run_id": self.run_id,
"run_page_url": self.run_page_url,
"run_state": run_state.to_json(),
}
)
else:
self.log.info(
"run-id %s in run state %s. sleeping for %s seconds",
self.run_id,
run_state,
self.polling_period_seconds,
)
await asyncio.sleep(self.polling_period_seconds)it will never finish, as it will be stuck in an infinite loop without the return statement (or an exception), that's what I said in my first comment:
Also it seems like the @potiuk @uranusjr I'm interested in what you think about this topic. |
|
I am not 100% either but maybe @andrewgodwin with his expert knowledge for async cauld chime-in here. |
We have a couple of trigger classes where we yield as well return the return statement should not require once we yield
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.