Skip to content

Conversation

@hussein-awala
Copy link
Member

Currently if the trigger yield multiple events, we load them all in a queue, and we pass through them multiple times but we use only the first one:
In this block we pass through all the events to get the trigger ids, and we add them to a set, and since they have the same trigger id, we don't need them all in this block:

known_trigger_ids = (
running_trigger_ids.union(x[0] for x in self.events)
.union(self.to_cancel)
.union(x[0] for x in self.to_create)
.union(trigger[0] for trigger in self.failed_triggers)
)

Then we handle them using this method:

def handle_events(self):
"""
Handles outbound events from triggers - dispatching them into the Trigger
model where they are then pushed into the relevant task instances.
"""
while self.trigger_runner.events:
# Get the event and its trigger ID
trigger_id, event = self.trigger_runner.events.popleft()
# Tell the model to wake up its tasks
Trigger.submit_event(trigger_id=trigger_id, event=event)
# Emit stat event
Stats.incr("triggers.succeeded")

But if we check the method Trigger.submit_event, we found that we filter on TaskInstance.state which we update to scheduled when we handle the first event, so the next sql queries are useless and will always return an empty list:

@classmethod
@internal_api_call
@provide_session
def submit_event(cls, trigger_id, event, session: Session = NEW_SESSION) -> None:
"""
Takes an event from an instance of itself, and triggers all dependent
tasks to resume.
"""
for task_instance in session.query(TaskInstance).filter(
TaskInstance.trigger_id == trigger_id, TaskInstance.state == TaskInstanceState.DEFERRED
):
# Add the event's payload into the kwargs for the task
next_kwargs = task_instance.next_kwargs or {}
next_kwargs["event"] = event.payload
task_instance.next_kwargs = next_kwargs
# Remove ourselves as its trigger
task_instance.trigger_id = None
# Finally, mark it as scheduled so it gets re-queued
task_instance.state = TaskInstanceState.SCHEDULED

And if we check the result of the tests I added in #31985, we found that it's possible to have a lot of generated events from the same trigger, so we can avoid increasing the memory size and reduce the SQL queries by just loading the first event.

If there is a need to process all the generated events (which is not the case now), we need to change our handling method to wait the task termination and we need to provide all these events to the method execute_complete as a list which is a breaking change, but for now, the best solution is just loading the first event.


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Signed-off-by: Hussein Awala <hussein@awala.fr>
@boring-cyborg boring-cyborg bot added the area:Scheduler including HA (high availability) scheduler label Jun 17, 2023
@hussein-awala hussein-awala added the type:improvement Changelog: Improvements label Jun 17, 2023
@hussein-awala hussein-awala added this to the Airlfow 2.6.3 milestone Jun 17, 2023
@uranusjr
Copy link
Member

cc @andrewgodwin

@pankajkoti pankajkoti requested review from dstandish, pankajastro and uranusjr and removed request for uranusjr August 2, 2023 12:06
self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this result in trigger.cleanup() being called before the trigger finishes though? I know we aren't looking at more than 1 event, but are we positive nothing productive happens after the first event?

Copy link
Contributor

@dstandish dstandish Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can imagine at least in principle it being desirable to yield event before doing some cleanup work in the trigger itself. But ... shortly after the trigger yields first event, it will be unassigned from the task and then shortly after that it will be canceled and it will simply stop running. So it's probably not a good idea for a trigger to be designed to do anything after the first event, as it will always be subject to a race condition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this result in trigger.cleanup() being called before the trigger finishes though?

Yes, it will be possible, and there won't be any issues with it. Even if the trigger fails because of executing the cleanup process (for ex: deleting some resources used the run method), the triggerer job will ignore the failure and proceed to process only the first yielded event.

self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
self.triggers[trigger_id]["events"] += 1
self.events.append((trigger_id, event))
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to also include a test for this in test_triggerer_job.py

Copy link
Contributor

@dstandish dstandish Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it should be pretty straight forward to test and probably a good idea @hussein-awala just to document (I.e. through the tests) that execution is never handed back to run after the first event is yielded.

there's also some comments in base trigger that imply that multi-event triggers are a possibility. perhaps those should be updated too while we're at it.

i'm not sure how we might use multi event triggers. right now events can only mean one thing: resume the task. curious if others see use cases.

somewhat related, it was assumed in the design that eventually multiple task instances might someday be able to "depend on" the same trigger. but this was never fully realized.

@dstandish
Copy link
Contributor

If we really want to allow user to do stuff post-yield, and still accept only one event, it's possible to do:

async def do_loop_with_stuff_after_yield():
    inner_loops = 0
    while True:
        inner_loops += 1
        print(f"{inner_loops}: before ")
        yield "hi"
        print(f"{inner_loops}: after")
        return

async def main():
    outer_loops = 0
    async for event in do_loop_with_stuff_after_yield():
        outer_loops += 1
        if outer_loops > 1:
            raise RuntimeError("Yielding more than one event is not supported.")
        print(outer_loops, ',', event)

asyncio.run(main())

But were we to do this, I think it would still be possible (though perhaps not super likely) that the trigger in question is canceled before it comes around to resume the iteration and thus the code between yield and return would not have been run. As such, it does not seem like something anyone should ever do.

Though if we really desired this, we could actually guarantee that the code between yield and return is always run. And the way to do this would be to delay adding the event to the events deque until after the generator exits. But then the event emission would be delayed until the trigger returns, which begs the question, why not just delay yielding the event in the first place.

So, there doesn't seem to be much value in running code post yield. At the same time, it is a little weird to just not fully consume the generator that the user wrote, and moreover to do so silently. Curious if @uranusjr might have a thought.

@uranusjr
Copy link
Member

uranusjr commented Aug 7, 2023

I don’t have particular thoughts toward the analyses, but this change actually raises a big question in the interface design. If we always just break after the first yielded event, why do events need to be yielded in the first place anyway? Why is run on a trigger not just returning an event instead? That would resolve the entire post-yield work discussion since it’s not possible to do things after a return in the first place. It seems to me if we do this, we should start re-design the trigger interface for 2.8.

@potiuk
Copy link
Member

potiuk commented Aug 7, 2023

I don’t have particular thoughts toward the analyses, but this change actually raises a big question in the interface design. If we always just break after the first yielded event, why do events need to be yielded in the first place anyway? Why is run on a trigger not just returning an event instead? That would resolve the entire post-yield work discussion since it’s not possible to do things after a return in the first place. It seems to me if we do this, we should start re-design the trigger interface for 2.8.

Very good point. I did not realise why it has bothered me, but this sums it up.

@hussein-awala
Copy link
Member Author

Why is run on a trigger not just returning an event instead? That would resolve the entire post-yield work discussion since it’s not possible to do things after a return in the first place. It seems to me if we do this, we should start re-design the trigger interface for 2.8.

I had the same question, but I found the answer in the deferring doc :

Currently Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful.

I don't know if this plan is still valid or not, but switching from yield event then return to just return event is a good idea. I already tested it when I added a return statement after each yield, but I tried to avoid changing the design.

@dstandish
Copy link
Contributor

dstandish commented Aug 7, 2023

Yup I had the same thought, why yield at all if we're just going to accept one event. It's possible that we could support both. Immediately after we call run, nothing happens at that point and we can inspect whether it's a generator or just coroutine and handle it appropriately. Thus I think we could incrementally expand the interface (by putting it in user's hands, that they can use simple return) while maintaining backcompat by supporting yielding events as well. Or deprecate.

Example code:

async def not_gen():
    return "hi"
async def is_gen():
    yield "yo"
    return

async def main():
    f = not_gen()
    g = is_gen()
    assert isinstance(f, Coroutine)
    assert isinstance(g, AsyncGenerator)
    print("getting ready to run things")
    print(await f)
    async for thing in g:
        print(thing)

asyncio.run(main())

@potiuk
Copy link
Member

potiuk commented Aug 7, 2023

Currently Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful.

Looks like classic YAGNI

@dstandish
Copy link
Contributor

We can definitely support both pretty trivially. The simplest way would be to just call run and check if the return value is a TriggerEvent (if not, run is a generator, not a plain function). We can change the docs so most people use the plain function variant.

Not important, but I don't think we can do exactly this. If we call run, the return value is either coroutine or asynciterator. If it's coroutine we can await it and obtain the return. If it is asynciterator, we cannot await it but must iterate it. So I think we could must either try-await-except-iterate, or inspect the object type and handle appropriately.

Either way, I am OK with evolving the interface to support return and update the docs as you suggested.

But I guess it remains for us to decide whether to deprecate iterator, and if given iterator, whether to consume up to the return, or just stop at first event, or consume all events....

@hussein-awala
Copy link
Member Author

But I guess it remains for us to decide whether to deprecate iterator, and if given iterator, whether to consume up to the return, or just stop at first event, or consume all events....

@dstandish We also need to think about triggers in the different providers and how to make them b/c; should we add an ugly Airflow version check in each trigger to know if we should yield an event or return it, or we only change the core triggers (airflow/triggers/*) and wait until bumping the minimum version of Airflow in providers to 2.7.x to make the change.

@dstandish
Copy link
Contributor

@dstandish We also need to think about triggers in the different providers and how to make them b/c; should we add an ugly Airflow version check in each trigger to know if we should yield an event or return it, or we only change the core triggers (airflow/triggers/*) and wait until bumping the minimum version of Airflow in providers to 2.7.x to make the change

Yes, valid concern, future compat is something we must deal with pretty regularly though, nothing terribly unusal here. I.e. it's common that we add some new interface or feature in core and we have to wait for min airflow version of provider to catch up before using it. Related note, I actually have a PR on the shelf to add some amount of automation / alerting to help us with this kind of thing... i.e. reminding us to remove deprecations, or to remove future compat checks, or to update code at X min-airflow-version. I had to drop it to focus on AIP-52 but hope to pick it up again soon.

@uranusjr
Copy link
Member

If it's coroutine we can await it and obtain the return. If it is asynciterator, we cannot await it but must iterate it.

An isawaitable check would cover this pretty easily.

@dstandish
Copy link
Contributor

An isawaitable check would cover this pretty easily.

nice

@uranusjr
Copy link
Member

I experiemented this a bit. It seems like the most difficult part is actually to convince Mypy run() can either be implemented as an async generator function or a coroutine function. I want to believe there’s a way but have not been able to find it.

@hussein-awala hussein-awala marked this pull request as draft August 10, 2023 19:17
@hussein-awala
Copy link
Member Author

I experiemented this a bit. It seems like the most difficult part is actually to convince Mypy run() can either be implemented as an async generator function or a coroutine function. I want to believe there’s a way but have not been able to find it.

@uranusjr could you check my last commit? I tested it with both types, and it appears to work, and it passes the Mypy pre-commit check

@hussein-awala
Copy link
Member Author

The static checks fail.

Even the generic type didn't fix it:

T = TypeVar("T")

class BaseTrigger(abc.ABC, LoggingMixin, Generic[T]):
    async def run(self) -> T:
        pass

class Trigger1(BaseTrigger[TriggerEvent]):
    async def run(self) -> TriggerEvent:
        return event

class Trigger2(BaseTrigger[AsyncIterator[TriggerEvent]]):
    async def run(self) -> AsyncIterator[TriggerEvent]:
        yield event

@uranusjr
Copy link
Member

This is the same problem I had. For some reason Mypy really don’t like a subclass only implement one of the unioned return types. Probably a covariance issue, but I’m not sure what the right fix is.

@hussein-awala hussein-awala added the pinned Protect from Stalebot auto closing label Sep 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler pinned Protect from Stalebot auto closing type:improvement Changelog: Improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants