Skip to content

AIP-51 - Executor Specific CLI Commands #27932

@o-nikolas

Description

@o-nikolas

Overview

Some Executors have their own first class CLI commands (now that’s hardcoding/coupling!) which setup or modify various components related to that Executor.

Examples

  • 5a) Celery Executor commands:
    CELERY_COMMANDS = (
    ActionCommand(
    name='worker',
    help="Start a Celery worker node",
    func=lazy_load_command('airflow.cli.commands.celery_command.worker'),
    args=(
    ARG_QUEUES,
    ARG_CONCURRENCY,
    ARG_CELERY_HOSTNAME,
    ARG_PID,
    ARG_DAEMON,
    ARG_UMASK,
    ARG_STDOUT,
    ARG_STDERR,
    ARG_LOG_FILE,
    ARG_AUTOSCALE,
    ARG_SKIP_SERVE_LOGS,
    ARG_WITHOUT_MINGLE,
    ARG_WITHOUT_GOSSIP,
    ),
    ),
    ActionCommand(
    name='flower',
    help="Start a Celery Flower",
    func=lazy_load_command('airflow.cli.commands.celery_command.flower'),
    args=(
    ARG_FLOWER_HOSTNAME,
    ARG_FLOWER_PORT,
    ARG_FLOWER_CONF,
    ARG_FLOWER_URL_PREFIX,
    ARG_FLOWER_BASIC_AUTH,
    ARG_BROKER_API,
    ARG_PID,
    ARG_DAEMON,
    ARG_STDOUT,
    ARG_STDERR,
    ARG_LOG_FILE,
    ),
    ),
    ActionCommand(
    name='stop',
    help="Stop the Celery worker gracefully",
    func=lazy_load_command('airflow.cli.commands.celery_command.stop_worker'),
    args=(ARG_PID,),
    ),
    )
  • 5b) Kubernetes Executor commands:
    KUBERNETES_COMMANDS = (
    ActionCommand(
    name='cleanup-pods',
    help=(
    "Clean up Kubernetes pods "
    "(created by KubernetesExecutor/KubernetesPodOperator) "
    "in evicted/failed/succeeded/pending states"
    ),
    func=lazy_load_command('airflow.cli.commands.kubernetes_command.cleanup_pods'),
    args=(ARG_NAMESPACE, ARG_MIN_PENDING_MINUTES),
    ),
    ActionCommand(
    name='generate-dag-yaml',
    help="Generate YAML files for all tasks in DAG. Useful for debugging tasks without "
    "launching into a cluster",
    func=lazy_load_command('airflow.cli.commands.kubernetes_command.generate_pod_yaml'),
    args=(ARG_DAG_ID, ARG_EXECUTION_DATE, ARG_SUBDIR, ARG_OUTPUT_PATH),
    ),
  • 5c) Default CLI parser has hardcoded logic for Celery and Kubernetes Executors specifically:
    if action.dest == 'subcommand' and value == 'celery':
    executor = conf.get('core', 'EXECUTOR')
    if executor not in (CELERY_EXECUTOR, CELERY_KUBERNETES_EXECUTOR):
    executor_cls, _ = ExecutorLoader.import_executor_cls(executor)
    classes = ()
    try:
    from airflow.executors.celery_executor import CeleryExecutor
    classes += (CeleryExecutor,)
    except ImportError:
    message = (
    "The celery subcommand requires that you pip install the celery module. "
    "To do it, run: pip install 'apache-airflow[celery]'"
    )
    raise ArgumentError(action, message)
    try:
    from airflow.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
    classes += (CeleryKubernetesExecutor,)
    except ImportError:
    pass
    if not issubclass(executor_cls, classes):
    message = (
    f'celery subcommand works only with CeleryExecutor, CeleryKubernetesExecutor and '
    f'executors derived from them, your current executor: {executor}, subclassed from: '
    f'{", ".join([base_cls.__qualname__ for base_cls in executor_cls.__bases__])}'
    )
    raise ArgumentError(action, message)
    if action.dest == 'subcommand' and value == 'kubernetes':
    try:
    import kubernetes.client # noqa: F401
    except ImportError:
    message = (
    "The kubernetes subcommand requires that you pip install the kubernetes python client. "
    "To do it, run: pip install 'apache-airflow[cncf.kubernetes]'"
    )
    raise ArgumentError(action, message)

Proposal

Update the BaseExecutor interface with a pluggable mechanism to vend CLI GroupCommands and parsers. Executor subclasses would then implement these methods, if applicable, which would then be called to fetch commands and parsers from within Airflow Core cli parser code. We would then migrate the existing Executor CLI code from cli_parser to the respective Executor class.

Pseudo-code example for vending GroupCommands:

# Existing code in cli_parser.py
...
airflow_commands: List[CLICommand] = [
GroupCommand(
        name='dags',
        help='Manage DAGs',
        subcommands=DAGS_COMMANDS,
    ),
    ...
]
# New code to add groups vended by executor classes
executor_cls, _ = ExecutorLoader.import_executor_cls(conf.get('core', 'EXECUTOR'))
airflow_commands.append(executor_cls.get_cli_group_commands())
...

Metadata

Metadata

Assignees

Labels

AIP-51AIP-51: Remove executor coupling from Core

Type

No type

Projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions