Skip to content

Airflow ingestion fires N extra DB queries (one per DagRun) to fetch TaskInstances inside yield_pipeline_status #27150

@RajdeepKushwaha5

Description

@RajdeepKushwaha5

Affected module
Ingestion Framework — openmetadata-ingestion, Airflow pipeline connector (metadata/ingestion/source/pipeline/airflow/metadata.py)

Describe the bug

Inside AirflowSource.yield_pipeline_status(), every DagRun returned by get_pipeline_status triggers a separate SELECT against the task_instance table:

# Inside "for dag_run in dag_run_list:" — fires once per DagRun
tasks = self.get_task_instances(
    dag_id=dag_run.dag_id,
    run_id=dag_run.run_id,          # ← different run_id each iteration
    serialized_tasks=pipeline_details.tasks,
)

get_pipeline_status returns up to numberOfStatus DagRuns (default 10). For a deployment with 200 DAGs and the default numberOfStatus=10, this produces 2 000 extra round-trips per ingestion run, on top of the 200 DagRun queries. The number of task-instance queries scales as DAGs × numberOfStatus, and is entirely avoidable: all TaskInstances for a DAG can be fetched in a single query using run_id IN (...).

To Reproduce

  1. Connect OpenMetadata ingestion to an Airflow database with ≥ 50 DAGs.
  2. Enable SQL query logging on the Airflow metadata database.
  3. Run the Airflow ingestion pipeline.
  4. Observe one SELECT ... FROM task_instance WHERE dag_id = ? AND run_id = ? per DagRun per DAG, in addition to the paginated dag_run queries.

Expected behavior

TaskInstances for all runs of a given DAG should be fetched in a single bulk query (run_id IN (...)) and grouped in memory, so that yield_pipeline_status issues exactly 1 TaskInstance query per DAG regardless of numberOfStatus.

Version:

  • Python version: 3.10 / 3.11
  • OpenMetadata version: 1.6.x / 1.7.x
  • OpenMetadata Ingestion package version: openmetadata-ingestion[airflow]

Additional context

The fix is straightforward:

  1. Change get_task_instances(dag_id, run_id, serialized_tasks) to accept run_ids: List[str] instead of a single run_id: str.
  2. Replace .filter(TaskInstance.run_id == run_id) with .filter(TaskInstance.run_id.in_(run_ids)).
  3. Return Dict[str, List[OMTaskInstance]] keyed by run_id (built with defaultdict(list)).
  4. In yield_pipeline_status, collect all run_ids from dag_run_list before the loop, call the bulk method once, and look up results per run with tasks_by_run_id.get(dag_run.run_id, []).

Query count drops from 1 + N_runs per DAG to exactly 2 per DAG (1 for DagRuns, 1 for all TaskInstances).

Metadata

Metadata

Labels

Type

Projects

Status

Done ✅

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions