Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout#30108
Simplify logic to resolve tasks stuck in queued despite stalled_task_timeout#30108RNHTTR wants to merge 0 commit intoapache:mainfrom
Conversation
|
For backward compatibility and semver reasons, would making the tasks go to failed be considered a breaking change? I know that behavior was different than the comparable k8s executor behavior and seems a little askew from expectations so it could be considered a bug I guess? And this does cover that case plus more, so that's good. |
|
Would appreciate your feedback @okayhooni & @repl-chris ! |
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
Do you need both of these lines? Don't you only want the one wrapped in timers.call_regular_interval
There was a problem hiding this comment.
I was following the precedent set by adopt_or_reset_orphaned_tasks (line 858), which runs on scheduler start up and then at a regular interval.
There was a problem hiding this comment.
This probably doesn't need to run at startup. Adoption makes sense to do because it's pretty likely that another scheduler just shut down if we have a new one starting, but I don't we have a similar situation here.
There was a problem hiding this comment.
Is this deprecation reflected in the code base by a DeprecationWarning?
There was a problem hiding this comment.
Updated to include DeprecationWarning
There was a problem hiding this comment.
we should remove it if no longer needed
There was a problem hiding this comment.
Would removing it unnecessarily break backward compatibility?
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
This would affect all executors, not just celery and we have some other settings in kubernetes for pending tasks etc. WDYT?
There was a problem hiding this comment.
IMO this is a general problem that applies to both kubernetes & celery executors. The relevant k8s exec setting is worker_pods_queued_check_interval -- I think this can also be handled in the scheduler. I also think this can probably replace task-adoption-timeout
There was a problem hiding this comment.
If you agree, i'll remove those configurations as well.
There was a problem hiding this comment.
worker_pods_queued_check_interval is similar, but different in that it won't automatically just reset the TI. It first checks to see if the pod exists.
worker_pods_pending_timeout is essentially this same process though. It should probably be deprecated as well (though, not sure how config handles many -> one).
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
You don't have to do this manually, Airflow does it for you when you marked it as a deprecated option.
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
worker_pods_queued_check_interval is similar, but different in that it won't automatically just reset the TI. It first checks to see if the pod exists.
worker_pods_pending_timeout is essentially this same process though. It should probably be deprecated as well (though, not sure how config handles many -> one).
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
This probably doesn't need to run at startup. Adoption makes sense to do because it's pretty likely that another scheduler just shut down if we have a new one starting, but I don't we have a similar situation here.
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
I don't see any handling of retry state in this code, we go straight to failed and bypass retry logic (unless I've forgotten how that works?)
There was a problem hiding this comment.
Would it be sufficient to call TI.handle_failure()?
There was a problem hiding this comment.
It seems like it should be based on my read of TI.handle_failure.
|
I'm unclear on why task adoption was removed. It covers a whole class of problem, running tasks whose scheduler died, that doesn't seem to be otherwise affected by this PR. |
|
Accidentally closed this and nuked my changes. I'll open a new PR or re-open this one. |
closes: #28120
closes: #21225
closes: #28943
Tasks occasionally get stuck in queued and aren't resolved by
stalled_task_timeout(#28120). This PR moves the logic for handling stalled tasks to the scheduler and simplifies the logic by marking any task that has been queued for more thanscheduler.task_queued_timeoutas failed, allowing it to be retried if the task has available retries.This doesn't require an additional scheduler nor allow for the possibility of tasks to get stuck in an infinite loop of scheduled -> queued -> scheduled ... -> queued as exists in #28943.