Skip to content

Conversation

@pankajastro
Copy link
Member

We have a couple of trigger classes where we yield as well return the return statement should not require once we yield


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

We have couple of trigger class where we yield as well return
the return statement should not require once we yield
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:providers provider:databricks provider:google Google (including GCP) related issues labels Jun 4, 2023
@pankajastro pankajastro marked this pull request as ready for review June 4, 2023 20:45
@potiuk potiuk merged commit 86b5ba2 into apache:main Jun 4, 2023
@pankajastro pankajastro deleted the remove_return branch June 5, 2023 07:28
@hussein-awala
Copy link
Member

I am not an expert in async methods in Python, but as far as I know, the method continues execution when it yields something (it's a generator).

In our case, the trigger is interrupted by Airflow when Airflow calls the execute_complete method. However, between the moment the TriggerEvent is yielded and the moment the trigger is stopped, it continues execution and may yield additional TriggerEvents that are not processed.

This is just a personal hypothesis that could potentially explain the issue #31795, which I believe might be resolved by merging #31803. However, if this is indeed the case, it is important to note that some return statements are necessary to stop the trigger immediately after yielding a TriggerEvent.

WDYT?

@pankajastro
Copy link
Member Author

Thank you @hussein-awala for the pointer.

I tried to do some quick testing but did not see the behaviour which you are pointing.

But, I'll not rule out that run method might yield more event than desire after looking at the triggerer job.

Currently, we call run method

async for event in trigger.run()

async for event in trigger.run():

then use the event

I was expecting that when we call run method from triggerer job it should stop the iteration after first yield but it will might not.

So, yes what you are pointing might be the case.

But what will change if we add return statement after yield

it would yield => suspend => return

and return will help us stoping the loop

But do we need to suspend once we yield first event, not sure if I'm missing something here but I would expect run method would emit only one event and we should not need to suspend once it yield i.e we should stop the loop?

Also, if we add return or break to exit from run method loop we need it many places since we have lots of async operator and corresponding trigger run method.

I would like to propose to change

async for event in trigger.run():

to

event = await trigger.run().asend(None)

wdyt?

@hussein-awala
Copy link
Member

Maybe there is (or will be) some cases where we need to yield multiple events, so IMO we should keep it like that.

But for now, and after checking the block you mentioned:

async for event in trigger.run():
self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))

it seems like we wait the end of the run method.

But if you check the code of this method in DatabricksExecutionTrigger after your change:

    async def run(self):
        async with self.hook:
            while True:
                run_state = await self.hook.a_get_run_state(self.run_id)
                if run_state.is_terminal:
                    yield TriggerEvent(
                        {
                            "run_id": self.run_id,
                            "run_page_url": self.run_page_url,
                            "run_state": run_state.to_json(),
                        }
                    )
                else:
                    self.log.info(
                        "run-id %s in run state %s. sleeping for %s seconds",
                        self.run_id,
                        run_state,
                        self.polling_period_seconds,
                    )
                    await asyncio.sleep(self.polling_period_seconds)

it will never finish, as it will be stuck in an infinite loop without the return statement (or an exception), that's what I said in my first comment:

it is important to note that some return statements are necessary to stop the trigger immediately after yielding a TriggerEvent.

Also it seems like the next_method is called with a single event and not a list of events, so maybe there is something to fix in the part.

@potiuk @uranusjr I'm interested in what you think about this topic.

@potiuk
Copy link
Member

potiuk commented Jun 11, 2023

I am not 100% either but maybe @andrewgodwin with his expert knowledge for async cauld chime-in here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues provider:databricks provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants