Skip to content

Tasks stuck in queued despite stalled_task_timeout #28120

@RNHTTR

Description

@RNHTTR

Note: This has cropped up in at least 2.3.x and remains in 2.4.3. The links to Airflow source code are from the 2.3.1 release.

It seems what’s happening is the airflow tasks run <task> command is failing on the Celery worker:

airflow.exceptions.AirflowException: Celery command failed on host: <host> with celery_task_id 20ec4a6d-21b4-4838-b7f3-fb5d52c538ee

The Celery status is set to failed , but the task in Airflow remains in queued for some arbitrary amount of time (often hours):

{scheduler_job.py:599} INFO - Executor reports execution of <task> run_id=scheduled__2022-10-26T23:00:00+00:00 exited with status failed for try_number 1

{scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=<dag id>, task_id=<task id>, run_id=scheduled__2022-10-26T23:00:00+00:00, map_index=-1, run_start_date=None, run_end_date=None, run_duration=None, state=queued, executor_state=failed, try_number=1, max_tries=2, job_id=None, pool=default_pool, queue=default, priority_weight=3, operator=DummyOperator, queued_dttm=2022-10-27 00:09:00.545894+00:00, queued_by_job_id=2664047, pid=None

Note the state=queued and executor_state=failed -- Airflow should be marking the task as failed. When this happens, these tasks also bypass stalled_task_timeout, because when update_task_state is called, the celery state is STARTED. self._set_celery_pending_task_timeout(key, None) removes the task from the list of tasks eligible for stalled_task_timeout, and so these tasks remain in queued indefinitely.


Summary of what's happening:

  1. CeleryExecutor’s update_task_state method calls fail(), which is a method from BaseExecutor.
  2. BaseExecutor's fail calls CeleryExecutor’s change_state method.
  3. CeleryExecutor’s change_state method calls BaseExecutor’s change_state method via super()
  4. The crux: BaseExecutor’s change_state method is as follows:
self.log.debug("Changing state: %s", key)
try:
    self.running.remove(key)
except KeyError:
    self.log.debug('Could not find key: %s', str(key)) 

Because the airflow tasks run command failed, the task is never set to the running state. The except KeyError block allows the code to continue unabated. Once BaseExecutor’s change_state method completes, CeleryExecutor’s change_state method completes:

def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
    super().change_state(key, state, info)
    self.tasks.pop(key, None)
    self._set_celery_pending_task_timeout(key, None)

self._set_celery_pending_task_timeout(key, None) removes the task from the list of tasks that stalled_task_timeout checks for, allowing the tasks to remain in queued indefinitely.

Instead, when the airflow tasks run command fails, the Airflow task instance should be failed or retried (if applicable).

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions