Skip to content

Conversation

@hussein-awala
Copy link
Member

related: #31703

In #31703, we decided to remove certain return statements under the assumption that they were unnecessary after yielding an event. However, upon testing this strategy, it appears that in certain cases (specifically when the triggerer service is overloaded), we end up generating a significant number of events. As a result, we continue with unnecessary processing, which further burdens the triggerer.

To test that, I have created a new module airflow/tests.py:

from __future__ import annotations

from airflow.models import BaseOperator
from airflow.triggers.base import BaseTrigger, TriggerEvent


class TestTrigger(BaseTrigger):
    def serialize(self) -> tuple[str, dict]:
        return ("airflow.tests.TestTrigger", {} )

    async def run(self):
        ind = 0
        while True:
            yield TriggerEvent(
                {
                    "message": f"Hello, world! {ind}",
                }
            )
            ind += 1
            # retrun


class TestOperator(BaseOperator):
    def execute(self, context):
        self.defer(
            trigger=TestTrigger(),
            method_name="execute_complete",
        )

    def execute_complete(self, context: dict | None, event: dict):
        print(event)

And this small dag:

from datetime import datetime

from airflow.models import DAG
from airflow.tests import TestOperator

with DAG(
    dag_id="test_trigger",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,

) as dag:
    TestOperator(task_id="test_trigger")

which I tested in breeze.

When I tested it without a return, I got:

[2023-06-17, 21:33:05 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [queued]>
[2023-06-17, 21:33:05 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [queued]>
[2023-06-17, 21:33:05 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 of 1
[2023-06-17, 21:33:05 UTC] {taskinstance.py:1359} INFO - Executing <Task(TestOperator): test_trigger> on 2023-06-17 21:33:05.186752+00:00
[2023-06-17, 21:33:05 UTC] {standard_task_runner.py:57} INFO - Started process 4802 to run task
[2023-06-17, 21:33:05 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 'manual__2023-06-17T21:33:05.186752+00:00', '--job-id', '137', '--raw', '--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmppovv3qxe']
[2023-06-17, 21:33:05 UTC] {standard_task_runner.py:85} INFO - Job 137: Subtask test_trigger
[2023-06-17, 21:33:06 UTC] {task_command.py:410} INFO - Running <TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:33:05.186752+00:00 [running]> on host d3e0d78cfce9
[2023-06-17, 21:33:06 UTC] {taskinstance.py:1637} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_trigger' AIRFLOW_CTX_TASK_ID='test_trigger' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:33:05.186752+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:33:05.186752+00:00'
[2023-06-17, 21:33:06 UTC] {taskinstance.py:1505} INFO - Pausing task as DEFERRED. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213305, start_date=20230617T213305
[2023-06-17, 21:33:06 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 0'}>
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 1'}>
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 2'}>
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 3'}>
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 4'}>
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 5'}>
[2023-06-17, 21:33:08 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 6'}>
...
[2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26651'}>
[2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26652'}>
[2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26653'}>
[2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26654'}>
[2023-06-17, 21:33:09 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:33:05.186752+00:00/test_trigger/-1/1 (ID 64) fired: TriggerEvent<{'message': 'Hello, world! 26655'}>
[2023-06-17, 21:33:09 UTC] {logging_mixin.py:152} INFO - {'message': 'Hello, world! 0'}
[2023-06-17, 21:33:09 UTC] {taskinstance.py:1377} INFO - Marking task as SUCCESS. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213305, start_date=20230617T213305, end_date=20230617T213309
[2023-06-17, 21:33:09 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-17, 21:33:09 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks scheduled from follow-on schedule check

and even more than 26k in other runs when the triggerer was overloaded.

However, when using a return statement, I consistently observed a single sent event. Moreover, I noticed that this approach consumes less memory as there are fewer events being added to the deque events.

[2023-06-17, 21:32:45 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
[2023-06-17, 21:32:45 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
[2023-06-17, 21:32:45 UTC] {taskinstance.py:1338} INFO - Starting attempt 1 of 1
[2023-06-17, 21:32:45 UTC] {taskinstance.py:1359} INFO - Executing <Task(TestOperator): test_trigger> on 2023-06-17 21:32:45.026487+00:00
[2023-06-17, 21:32:45 UTC] {standard_task_runner.py:57} INFO - Started process 4788 to run task
[2023-06-17, 21:32:45 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 'manual__2023-06-17T21:32:45.026487+00:00', '--job-id', '131', '--raw', '--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmpikm55djk']
[2023-06-17, 21:32:45 UTC] {standard_task_runner.py:85} INFO - Job 131: Subtask test_trigger
[2023-06-17, 21:32:46 UTC] {task_command.py:410} INFO - Running <TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [running]> on host d3e0d78cfce9
[2023-06-17, 21:32:46 UTC] {taskinstance.py:1637} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='test_trigger' AIRFLOW_CTX_TASK_ID='test_trigger' AIRFLOW_CTX_EXECUTION_DATE='2023-06-17T21:32:45.026487+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2023-06-17T21:32:45.026487+00:00'
[2023-06-17, 21:32:46 UTC] {taskinstance.py:1505} INFO - Pausing task as DEFERRED. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213245, start_date=20230617T213245
[2023-06-17, 21:32:46 UTC] {local_task_job_runner.py:222} INFO - Task exited with return code 100 (task deferral)
[2023-06-17, 21:32:47 UTC] {triggerer_job_runner.py:608} INFO - Trigger test_trigger/manual__2023-06-17T21:32:45.026487+00:00/test_trigger/-1/1 (ID 63) fired: TriggerEvent<{'message': 'Hello, world! 0'}>
[2023-06-17, 21:32:48 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
[2023-06-17, 21:32:48 UTC] {taskinstance.py:1135} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [queued]>
[2023-06-17, 21:32:48 UTC] {taskinstance.py:1336} INFO - Resuming after deferral
[2023-06-17, 21:32:48 UTC] {taskinstance.py:1359} INFO - Executing <Task(TestOperator): test_trigger> on 2023-06-17 21:32:45.026487+00:00
[2023-06-17, 21:32:48 UTC] {standard_task_runner.py:57} INFO - Started process 4796 to run task
[2023-06-17, 21:32:48 UTC] {standard_task_runner.py:84} INFO - Running: ['***', 'tasks', 'run', 'test_trigger', 'test_trigger', 'manual__2023-06-17T21:32:45.026487+00:00', '--job-id', '134', '--raw', '--subdir', 'DAGS_FOLDER/trigger.py', '--cfg-path', '/tmp/tmpbtzk4o5s']
[2023-06-17, 21:32:48 UTC] {standard_task_runner.py:85} INFO - Job 134: Subtask test_trigger
[2023-06-17, 21:32:48 UTC] {task_command.py:410} INFO - Running <TaskInstance: test_trigger.test_trigger manual__2023-06-17T21:32:45.026487+00:00 [running]> on host d3e0d78cfce9
[2023-06-17, 21:32:48 UTC] {logging_mixin.py:152} INFO - {'message': 'Hello, world! 0'}
[2023-06-17, 21:32:48 UTC] {taskinstance.py:1377} INFO - Marking task as SUCCESS. dag_id=test_trigger, task_id=test_trigger, execution_date=20230617T213245, start_date=20230617T213245, end_date=20230617T213248
[2023-06-17, 21:32:48 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-17, 21:32:48 UTC] {taskinstance.py:2752} INFO - 0 downstream tasks scheduled from follow-on schedule check

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

Signed-off-by: Hussein Awala <hussein@awala.fr>
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:providers provider:databricks provider:google Google (including GCP) related issues labels Jun 17, 2023
@hussein-awala
Copy link
Member Author

@eladkal I think we should merge this one before releasing the new providers wave

@hussein-awala
Copy link
Member Author

@pankajastro @potiuk could you please review this PR?

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Thought about it and it makes perfect sense (maybe @andrewgodwin can also confirm it) - as I understand it now (but I do not have huge experience in how async works), the event main loop could - generally - cause the yield to happen several time. I think what is important is to see when the trigger gets actually removed from the trigger list being served by the Triggerer. How I understand it works (but again those are a little assumptions and also looking at the observation) that they are not removed eagely, when the trigger event is emitted, but when it is actually processed.

The second part (processing the TriggerEvent) must naturally happen outside of the main event loop (the event loop executed code must be very fast and it should never block on any kind of I/O, where I undersand what trigger event processing in Trigger, necessarily involves Database Operation. Then the running trigger is removed from the trigger loop (I believe) only after that DB operation. This means that if there are not many triggers in the loop, it might well happen than the loop might get re-executed and new events yielded.

Simply speaking there are no other mechanism to signal that the event has been already emitted.

I am not sure how recovery is implemented in case the event is emitted, but the processing does not manage to complete, but I assume this can only happen in case Triggerer crashes, and in this case the Trigger will be re-started as the database operation saving the state of the triggerer will not happen, so this case should also be handled properly and there is generally no need to continue the loop anyway.

So yeah. After a bit of consideration it seems that those returns are actually needed to prevent (not even that improbable) race conditions.

@potiuk
Copy link
Member

potiuk commented Jun 18, 2023

And agree @eladkal - this one should be merged before relasing new provider wave.

@hussein-awala
Copy link
Member Author

I am not sure how recovery is implemented in case the event is emitted, but the processing does not manage to complete, but I assume this can only happen in case Triggerer crashes, and in this case the Trigger will be re-started as the database operation saving the state of the triggerer will not happen, so this case should also be handled properly and there is generally no need to continue the loop anyway.

@potiuk I agree. I have also opened and tested #31987, which can effectively prevent the problem on the triggerer side. However, even if we merge that PR, we still need the return statements added in current PR to avoid executing unnecessary checks, such as the call to the kube cluster for state verification in KubernetesPodTrigger.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues provider:databricks provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants