-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Overview
Some Executors have their own first class CLI commands (now that’s hardcoding/coupling!) which setup or modify various components related to that Executor.
Examples
- 5a) Celery Executor commands:
airflow/airflow/cli/cli_parser.py
Lines 1689 to 1734 in 27e2101
CELERY_COMMANDS = ( ActionCommand( name='worker', help="Start a Celery worker node", func=lazy_load_command('airflow.cli.commands.celery_command.worker'), args=( ARG_QUEUES, ARG_CONCURRENCY, ARG_CELERY_HOSTNAME, ARG_PID, ARG_DAEMON, ARG_UMASK, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, ARG_AUTOSCALE, ARG_SKIP_SERVE_LOGS, ARG_WITHOUT_MINGLE, ARG_WITHOUT_GOSSIP, ), ), ActionCommand( name='flower', help="Start a Celery Flower", func=lazy_load_command('airflow.cli.commands.celery_command.flower'), args=( ARG_FLOWER_HOSTNAME, ARG_FLOWER_PORT, ARG_FLOWER_CONF, ARG_FLOWER_URL_PREFIX, ARG_FLOWER_BASIC_AUTH, ARG_BROKER_API, ARG_PID, ARG_DAEMON, ARG_STDOUT, ARG_STDERR, ARG_LOG_FILE, ), ), ActionCommand( name='stop', help="Stop the Celery worker gracefully", func=lazy_load_command('airflow.cli.commands.celery_command.stop_worker'), args=(ARG_PID,), ), ) - 5b) Kubernetes Executor commands:
airflow/airflow/cli/cli_parser.py
Lines 1754 to 1771 in 27e2101
KUBERNETES_COMMANDS = ( ActionCommand( name='cleanup-pods', help=( "Clean up Kubernetes pods " "(created by KubernetesExecutor/KubernetesPodOperator) " "in evicted/failed/succeeded/pending states" ), func=lazy_load_command('airflow.cli.commands.kubernetes_command.cleanup_pods'), args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES), ), ActionCommand( name='generate-dag-yaml', help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without " "launching into a cluster", func=lazy_load_command('airflow.cli.commands.kubernetes_command.generate_pod_yaml'), args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH), ), - 5c) Default CLI parser has hardcoded logic for Celery and Kubernetes Executors specifically:
airflow/airflow/cli/cli_parser.py
Lines 63 to 99 in 27e2101
if action.dest == 'subcommand' and value == 'celery': executor = conf.get('core', 'EXECUTOR') if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR): executor_cls, _ = ExecutorLoader.import_executor_cls(executor) classes = () try: from airflow.executors.celery_executor import CeleryExecutor classes += (CeleryExecutor,) except ImportError: message = ( "The celery subcommand requires that you pip install the celery module. " "To do it, run: pip install 'apache-airflow[celery]'" ) raise ArgumentError(action, message) try: from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor classes += (CeleryKubernetesExecutor,) except ImportError: pass if not issubclass(executor_cls, classes): message = ( f'celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and ' f'executors derived from them, your current executor: {executor}, subclassed from: ' f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}' ) raise ArgumentError(action, message) if action.dest == 'subcommand' and value == 'kubernetes': try: import kubernetes.client # noqa: F401 except ImportError: message = ( "The kubernetes subcommand requires that you pip install the kubernetes python client. " "To do it, run: pip install 'apache-airflow[cncf.kubernetes]'" ) raise ArgumentError(action, message)
Proposal
Update the BaseExecutor interface with a pluggable mechanism to vend CLI GroupCommands and parsers. Executor subclasses would then implement these methods, if applicable, which would then be called to fetch commands and parsers from within Airflow Core cli parser code. We would then migrate the existing Executor CLI code from cli_parser to the respective Executor class.
Pseudo-code example for vending GroupCommands:
# Existing code in cli_parser.py
...
airflow_commands: List[CLICommand] = [
GroupCommand(
name='dags',
help='Manage DAGs',
subcommands=DAGS_COMMANDS,
),
...
]
# New code to add groups vended by executor classes
executor_cls, _ = ExecutorLoader.import_executor_cls(conf.get('core', 'EXECUTOR'))
airflow_commands.append(executor_cls.get_cli_group_commands())
...