-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Load just the first yielded event in the triggerer #31987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Hussein Awala <hussein@awala.fr>
airflow/jobs/triggerer_job_runner.py
Outdated
| self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) | ||
| self.triggers[trigger_id]["events"] += 1 | ||
| self.events.append((trigger_id, event)) | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this result in trigger.cleanup() being called before the trigger finishes though? I know we aren't looking at more than 1 event, but are we positive nothing productive happens after the first event?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can imagine at least in principle it being desirable to yield event before doing some cleanup work in the trigger itself. But ... shortly after the trigger yields first event, it will be unassigned from the task and then shortly after that it will be canceled and it will simply stop running. So it's probably not a good idea for a trigger to be designed to do anything after the first event, as it will always be subject to a race condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this result in trigger.cleanup() being called before the trigger finishes though?
Yes, it will be possible, and there won't be any issues with it. Even if the trigger fails because of executing the cleanup process (for ex: deleting some resources used the run method), the triggerer job will ignore the failure and proceed to process only the first yielded event.
airflow/jobs/triggerer_job_runner.py
Outdated
| self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event) | ||
| self.triggers[trigger_id]["events"] += 1 | ||
| self.events.append((trigger_id, event)) | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to also include a test for this in test_triggerer_job.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah it should be pretty straight forward to test and probably a good idea @hussein-awala just to document (I.e. through the tests) that execution is never handed back to run after the first event is yielded.
there's also some comments in base trigger that imply that multi-event triggers are a possibility. perhaps those should be updated too while we're at it.
i'm not sure how we might use multi event triggers. right now events can only mean one thing: resume the task. curious if others see use cases.
somewhat related, it was assumed in the design that eventually multiple task instances might someday be able to "depend on" the same trigger. but this was never fully realized.
|
If we really want to allow user to do stuff post-yield, and still accept only one event, it's possible to do: async def do_loop_with_stuff_after_yield():
inner_loops = 0
while True:
inner_loops += 1
print(f"{inner_loops}: before ")
yield "hi"
print(f"{inner_loops}: after")
return
async def main():
outer_loops = 0
async for event in do_loop_with_stuff_after_yield():
outer_loops += 1
if outer_loops > 1:
raise RuntimeError("Yielding more than one event is not supported.")
print(outer_loops, ',', event)
asyncio.run(main())But were we to do this, I think it would still be possible (though perhaps not super likely) that the trigger in question is canceled before it comes around to resume the iteration and thus the code between Though if we really desired this, we could actually guarantee that the code between So, there doesn't seem to be much value in running code post yield. At the same time, it is a little weird to just not fully consume the generator that the user wrote, and moreover to do so silently. Curious if @uranusjr might have a thought. |
|
I don’t have particular thoughts toward the analyses, but this change actually raises a big question in the interface design. If we always just break after the first yielded event, why do events need to be yielded in the first place anyway? Why is |
Very good point. I did not realise why it has bothered me, but this sums it up. |
I had the same question, but I found the answer in the deferring doc :
I don't know if this plan is still valid or not, but switching from yield event then return to just return event is a good idea. I already tested it when I added a return statement after each yield, but I tried to avoid changing the design. |
|
Yup I had the same thought, why yield at all if we're just going to accept one event. It's possible that we could support both. Immediately after we call Example code: async def not_gen():
return "hi"
async def is_gen():
yield "yo"
return
async def main():
f = not_gen()
g = is_gen()
assert isinstance(f, Coroutine)
assert isinstance(g, AsyncGenerator)
print("getting ready to run things")
print(await f)
async for thing in g:
print(thing)
asyncio.run(main()) |
Looks like classic YAGNI |
Not important, but I don't think we can do exactly this. If we call run, the return value is either coroutine or asynciterator. If it's coroutine we can await it and obtain the return. If it is asynciterator, we cannot await it but must iterate it. So I think we could must either try-await-except-iterate, or inspect the object type and handle appropriately. Either way, I am OK with evolving the interface to support return and update the docs as you suggested. But I guess it remains for us to decide whether to deprecate iterator, and if given iterator, whether to consume up to the return, or just stop at first event, or consume all events.... |
@dstandish We also need to think about triggers in the different providers and how to make them b/c; should we add an ugly Airflow version check in each trigger to know if we should yield an event or return it, or we only change the core triggers ( |
Yes, valid concern, future compat is something we must deal with pretty regularly though, nothing terribly unusal here. I.e. it's common that we add some new interface or feature in core and we have to wait for min airflow version of provider to catch up before using it. Related note, I actually have a PR on the shelf to add some amount of automation / alerting to help us with this kind of thing... i.e. reminding us to remove deprecations, or to remove future compat checks, or to update code at X min-airflow-version. I had to drop it to focus on AIP-52 but hope to pick it up again soon. |
An |
nice |
|
I experiemented this a bit. It seems like the most difficult part is actually to convince Mypy |
@uranusjr could you check my last commit? I tested it with both types, and it appears to work, and it passes the Mypy pre-commit check |
|
The static checks fail. Even the generic type didn't fix it: T = TypeVar("T")
class BaseTrigger(abc.ABC, LoggingMixin, Generic[T]):
async def run(self) -> T:
pass
class Trigger1(BaseTrigger[TriggerEvent]):
async def run(self) -> TriggerEvent:
return event
class Trigger2(BaseTrigger[AsyncIterator[TriggerEvent]]):
async def run(self) -> AsyncIterator[TriggerEvent]:
yield event |
|
This is the same problem I had. For some reason Mypy really don’t like a subclass only implement one of the unioned return types. Probably a covariance issue, but I’m not sure what the right fix is. |
Currently if the trigger yield multiple events, we load them all in a queue, and we pass through them multiple times but we use only the first one:
In this block we pass through all the events to get the trigger ids, and we add them to a set, and since they have the same trigger id, we don't need them all in this block:
airflow/airflow/jobs/triggerer_job_runner.py
Lines 658 to 663 in fc0e5a4
Then we handle them using this method:
airflow/airflow/jobs/triggerer_job_runner.py
Lines 378 to 389 in fc0e5a4
But if we check the method
Trigger.submit_event, we found that we filter onTaskInstance.statewhich we update toscheduledwhen we handle the first event, so the next sql queries are useless and will always return an empty list:airflow/airflow/models/trigger.py
Lines 142 to 160 in 0f1cef2
And if we check the result of the tests I added in #31985, we found that it's possible to have a lot of generated events from the same trigger, so we can avoid increasing the memory size and reduce the SQL queries by just loading the first event.
If there is a need to process all the generated events (which is not the case now), we need to change our handling method to wait the task termination and we need to provide all these events to the method
execute_completeas a list which is a breaking change, but for now, the best solution is just loading the first event.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.