Affected module
Ingestion Framework — openmetadata-ingestion, Airflow pipeline connector (metadata/ingestion/source/pipeline/airflow/metadata.py)
Describe the bug
Inside AirflowSource.yield_pipeline_status(), every DagRun returned by get_pipeline_status triggers a separate SELECT against the task_instance table:
# Inside "for dag_run in dag_run_list:" — fires once per DagRun
tasks = self.get_task_instances(
dag_id=dag_run.dag_id,
run_id=dag_run.run_id, # ← different run_id each iteration
serialized_tasks=pipeline_details.tasks,
)
get_pipeline_status returns up to numberOfStatus DagRuns (default 10). For a deployment with 200 DAGs and the default numberOfStatus=10, this produces 2 000 extra round-trips per ingestion run, on top of the 200 DagRun queries. The number of task-instance queries scales as DAGs × numberOfStatus, and is entirely avoidable: all TaskInstances for a DAG can be fetched in a single query using run_id IN (...).
To Reproduce
- Connect OpenMetadata ingestion to an Airflow database with ≥ 50 DAGs.
- Enable SQL query logging on the Airflow metadata database.
- Run the Airflow ingestion pipeline.
- Observe one
SELECT ... FROM task_instance WHERE dag_id = ? AND run_id = ? per DagRun per DAG, in addition to the paginated dag_run queries.
Expected behavior
TaskInstances for all runs of a given DAG should be fetched in a single bulk query (run_id IN (...)) and grouped in memory, so that yield_pipeline_status issues exactly 1 TaskInstance query per DAG regardless of numberOfStatus.
Version:
- Python version: 3.10 / 3.11
- OpenMetadata version: 1.6.x / 1.7.x
- OpenMetadata Ingestion package version:
openmetadata-ingestion[airflow]
Additional context
The fix is straightforward:
- Change
get_task_instances(dag_id, run_id, serialized_tasks) to accept run_ids: List[str] instead of a single run_id: str.
- Replace
.filter(TaskInstance.run_id == run_id) with .filter(TaskInstance.run_id.in_(run_ids)).
- Return
Dict[str, List[OMTaskInstance]] keyed by run_id (built with defaultdict(list)).
- In
yield_pipeline_status, collect all run_ids from dag_run_list before the loop, call the bulk method once, and look up results per run with tasks_by_run_id.get(dag_run.run_id, []).
Query count drops from 1 + N_runs per DAG to exactly 2 per DAG (1 for DagRuns, 1 for all TaskInstances).
Affected module
Ingestion Framework —
openmetadata-ingestion, Airflow pipeline connector (metadata/ingestion/source/pipeline/airflow/metadata.py)Describe the bug
Inside
AirflowSource.yield_pipeline_status(), everyDagRunreturned byget_pipeline_statustriggers a separateSELECTagainst thetask_instancetable:get_pipeline_statusreturns up tonumberOfStatusDagRuns (default 10). For a deployment with 200 DAGs and the defaultnumberOfStatus=10, this produces 2 000 extra round-trips per ingestion run, on top of the 200 DagRun queries. The number of task-instance queries scales asDAGs × numberOfStatus, and is entirely avoidable: all TaskInstances for a DAG can be fetched in a single query usingrun_id IN (...).To Reproduce
SELECT ... FROM task_instance WHERE dag_id = ? AND run_id = ?per DagRun per DAG, in addition to the paginateddag_runqueries.Expected behavior
TaskInstances for all runs of a given DAG should be fetched in a single bulk query (
run_id IN (...)) and grouped in memory, so thatyield_pipeline_statusissues exactly 1 TaskInstance query per DAG regardless ofnumberOfStatus.Version:
openmetadata-ingestion[airflow]Additional context
The fix is straightforward:
get_task_instances(dag_id, run_id, serialized_tasks)to acceptrun_ids: List[str]instead of a singlerun_id: str..filter(TaskInstance.run_id == run_id)with.filter(TaskInstance.run_id.in_(run_ids)).Dict[str, List[OMTaskInstance]]keyed byrun_id(built withdefaultdict(list)).yield_pipeline_status, collect allrun_ids fromdag_run_listbefore the loop, call the bulk method once, and look up results per run withtasks_by_run_id.get(dag_run.run_id, []).Query count drops from
1 + N_runsper DAG to exactly2per DAG (1 for DagRuns, 1 for all TaskInstances).