-
Notifications
You must be signed in to change notification settings - Fork 16.8k
Description
Note: This has cropped up in at least 2.3.x and remains in 2.4.3. The links to Airflow source code are from the 2.3.1 release.
It seems what’s happening is the airflow tasks run <task> command is failing on the Celery worker:
airflow.exceptions.AirflowException: Celery command failed on host: <host> with celery_task_id 20ec4a6d-21b4-4838-b7f3-fb5d52c538ee
The Celery status is set to failed , but the task in Airflow remains in queued for some arbitrary amount of time (often hours):
{scheduler_job.py:599} INFO - Executor reports execution of <task> run_id=scheduled__2022-10-26T23:00:00+00:00 exited with status failed for try_number 1
{scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=<dag id>, task_id=<task id>, run_id=scheduled__2022-10-26T23:00:00+00:00, map_index=-1, run_start_date=None, run_end_date=None, run_duration=None, state=queued, executor_state=failed, try_number=1, max_tries=2, job_id=None, pool=default_pool, queue=default, priority_weight=3, operator=DummyOperator, queued_dttm=2022-10-27 00:09:00.545894+00:00, queued_by_job_id=2664047, pid=None
Note the state=queued and executor_state=failed -- Airflow should be marking the task as failed. When this happens, these tasks also bypass stalled_task_timeout, because when update_task_state is called, the celery state is STARTED. self._set_celery_pending_task_timeout(key, None) removes the task from the list of tasks eligible for stalled_task_timeout, and so these tasks remain in queued indefinitely.
Summary of what's happening:
- CeleryExecutor’s
update_task_statemethod callsfail(), which is a method from BaseExecutor. - BaseExecutor's
failcalls CeleryExecutor’schange_statemethod. - CeleryExecutor’s
change_statemethod calls BaseExecutor’schange_statemethod viasuper() - The crux: BaseExecutor’s
change_statemethod is as follows:
self.log.debug("Changing state: %s", key)
try:
self.running.remove(key)
except KeyError:
self.log.debug('Could not find key: %s', str(key))
Because the airflow tasks run command failed, the task is never set to the running state. The except KeyError block allows the code to continue unabated. Once BaseExecutor’s change_state method completes, CeleryExecutor’s change_state method completes:
def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
super().change_state(key, state, info)
self.tasks.pop(key, None)
self._set_celery_pending_task_timeout(key, None)
self._set_celery_pending_task_timeout(key, None) removes the task from the list of tasks that stalled_task_timeout checks for, allowing the tasks to remain in queued indefinitely.
Instead, when the airflow tasks run command fails, the Airflow task instance should be failed or retried (if applicable).