Skip to content

Task mapping against 'params' #24014

@hammerhead

Description

@hammerhead

Apache Airflow version

2.3.1 (latest released)

What happened

Importing a DAG using PostgresOperator with expand(params=[...]) fails, claiming params was already specified as a partial argument, even though it wasn't.

What you think should happen instead

The DAG imports successfully.

How to reproduce

from pathlib import Path
import pendulum
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import dag, task


@dag(
    start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
    schedule_interval="@daily",
    catchup=False,
)
def test():
    query_values = [{"a": 1}, {"a": 2}]

    PostgresOperator.partial(
        task_id="simple_select",
        sql="SELECT {{ params.a }}",
    ).expand(params=query_values)

test_dag = test()

Exception during import:

Broken DAG: [/usr/local/airflow/dags/test_dag.py] Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 199, in expand
    prevent_duplicates(self.kwargs, mapped_kwargs, fail_reason="mapping already partial")
  File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 139, in prevent_duplicates
    raise TypeError(f"{fail_reason} argument: {duplicated_keys.pop()}")
TypeError: mapping already partial argument: params

Operating System

macOS 12.4

Versions of Apache Airflow Providers

apache-airflow-providers-postgres==4.1.0

Deployment

Astronomer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions