-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Closed
Labels
Milestone
Description
Apache Airflow version
2.3.1 (latest released)
What happened
Importing a DAG using PostgresOperator with expand(params=[...]) fails, claiming params was already specified as a partial argument, even though it wasn't.
What you think should happen instead
The DAG imports successfully.
How to reproduce
from pathlib import Path
import pendulum
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.decorators import dag, task
@dag(
start_date=pendulum.datetime(2021, 11, 19, tz="UTC"),
schedule_interval="@daily",
catchup=False,
)
def test():
query_values = [{"a": 1}, {"a": 2}]
PostgresOperator.partial(
task_id="simple_select",
sql="SELECT {{ params.a }}",
).expand(params=query_values)
test_dag = test()Exception during import:
Broken DAG: [/usr/local/airflow/dags/test_dag.py] Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 199, in expand
prevent_duplicates(self.kwargs, mapped_kwargs, fail_reason="mapping already partial")
File "/usr/local/lib/python3.9/site-packages/airflow/models/mappedoperator.py", line 139, in prevent_duplicates
raise TypeError(f"{fail_reason} argument: {duplicated_keys.pop()}")
TypeError: mapping already partial argument: params
Operating System
macOS 12.4
Versions of Apache Airflow Providers
apache-airflow-providers-postgres==4.1.0
Deployment
Astronomer
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct