Skip to content

Commit 3e2bfb8

Browse files
authored
Set job labels for traceability in BigQuery jobs (#37736)
* Set job labels for traceability * Fixup * Ensure the automatic labels will be valid If not valid they won't be added. * Fixup * Merge length + regex check * Simplify the label checking logic
1 parent 654038a commit 3e2bfb8

File tree

2 files changed

+151
-0
lines changed

2 files changed

+151
-0
lines changed

airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import enum
2222
import json
23+
import re
2324
import warnings
2425
from functools import cached_property
2526
from typing import TYPE_CHECKING, Any, Iterable, Sequence, SupportsAbs
@@ -64,6 +65,8 @@
6465

6566
BIGQUERY_JOB_DETAILS_LINK_FMT = "https://console.cloud.google.com/bigquery?j={job_id}"
6667

68+
LABEL_REGEX = re.compile(r"^[a-z][\w-]{0,63}$")
69+
6770

6871
class BigQueryUIColors(enum.Enum):
6972
"""Hex colors for BigQuery operators."""
@@ -2768,11 +2771,25 @@ def prepare_template(self) -> None:
27682771
with open(self.configuration) as file:
27692772
self.configuration = json.loads(file.read())
27702773

2774+
def _add_job_labels(self) -> None:
2775+
dag_label = self.dag_id.lower()
2776+
task_label = self.task_id.lower()
2777+
2778+
if LABEL_REGEX.match(dag_label) and LABEL_REGEX.match(task_label):
2779+
automatic_labels = {"airflow-dag": dag_label, "airflow-task": task_label}
2780+
if isinstance(self.configuration.get("labels"), dict):
2781+
self.configuration["labels"].update(automatic_labels)
2782+
elif "labels" not in self.configuration:
2783+
self.configuration["labels"] = automatic_labels
2784+
27712785
def _submit_job(
27722786
self,
27732787
hook: BigQueryHook,
27742788
job_id: str,
27752789
) -> BigQueryJob:
2790+
# Annotate the job with dag and task id labels
2791+
self._add_job_labels()
2792+
27762793
# Submit a new job without waiting for it to complete.
27772794
return hook.insert_job(
27782795
configuration=self.configuration,

tests/providers/google/cloud/operators/test_bigquery.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,6 +1102,7 @@ def test_execute_query_success(self, mock_hook):
11021102
project_id=TEST_GCP_PROJECT_ID,
11031103
)
11041104
result = op.execute(context=MagicMock())
1105+
assert configuration["labels"] == {"airflow-dag": "adhoc_airflow", "airflow-task": "insert_query_job"}
11051106

11061107
mock_hook.return_value.insert_job.assert_called_once_with(
11071108
configuration=configuration,
@@ -1143,6 +1144,7 @@ def test_execute_copy_success(self, mock_hook):
11431144
project_id=TEST_GCP_PROJECT_ID,
11441145
)
11451146
result = op.execute(context=MagicMock())
1147+
assert configuration["labels"] == {"airflow-dag": "adhoc_airflow", "airflow-task": "copy_query_job"}
11461148

11471149
mock_hook.return_value.insert_job.assert_called_once_with(
11481150
configuration=configuration,
@@ -1753,6 +1755,138 @@ def test_execute_force_rerun_async(self, mock_hook, create_task_instance_of_oper
17531755
project_id=TEST_GCP_PROJECT_ID,
17541756
)
17551757

1758+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
1759+
def test_execute_adds_to_existing_labels(self, mock_hook):
1760+
job_id = "123456"
1761+
hash_ = "hash"
1762+
real_job_id = f"{job_id}_{hash_}"
1763+
1764+
configuration = {
1765+
"query": {
1766+
"query": "SELECT * FROM any",
1767+
"useLegacySql": False,
1768+
},
1769+
"labels": {"foo": "bar"},
1770+
}
1771+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
1772+
mock_hook.return_value.generate_job_id.return_value = real_job_id
1773+
1774+
op = BigQueryInsertJobOperator(
1775+
task_id="insert_query_job",
1776+
configuration=configuration,
1777+
location=TEST_DATASET_LOCATION,
1778+
job_id=job_id,
1779+
project_id=TEST_GCP_PROJECT_ID,
1780+
)
1781+
op.execute(context=MagicMock())
1782+
assert configuration["labels"] == {
1783+
"foo": "bar",
1784+
"airflow-dag": "adhoc_airflow",
1785+
"airflow-task": "insert_query_job",
1786+
}
1787+
1788+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
1789+
def test_execute_respects_explicit_no_labels(self, mock_hook):
1790+
job_id = "123456"
1791+
hash_ = "hash"
1792+
real_job_id = f"{job_id}_{hash_}"
1793+
1794+
configuration = {
1795+
"query": {
1796+
"query": "SELECT * FROM any",
1797+
"useLegacySql": False,
1798+
},
1799+
"labels": None,
1800+
}
1801+
mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=False)
1802+
mock_hook.return_value.generate_job_id.return_value = real_job_id
1803+
1804+
op = BigQueryInsertJobOperator(
1805+
task_id="insert_query_job",
1806+
configuration=configuration,
1807+
location=TEST_DATASET_LOCATION,
1808+
job_id=job_id,
1809+
project_id=TEST_GCP_PROJECT_ID,
1810+
)
1811+
op.execute(context=MagicMock())
1812+
assert configuration["labels"] is None
1813+
1814+
def test_task_label_too_big(self):
1815+
configuration = {
1816+
"query": {
1817+
"query": "SELECT * FROM any",
1818+
"useLegacySql": False,
1819+
},
1820+
}
1821+
op = BigQueryInsertJobOperator(
1822+
task_id="insert_query_job_except_this_task_id_is_really_really_really_really_long",
1823+
configuration=configuration,
1824+
location=TEST_DATASET_LOCATION,
1825+
project_id=TEST_GCP_PROJECT_ID,
1826+
)
1827+
op._add_job_labels()
1828+
assert "labels" not in configuration
1829+
1830+
def test_dag_label_too_big(self, dag_maker):
1831+
configuration = {
1832+
"query": {
1833+
"query": "SELECT * FROM any",
1834+
"useLegacySql": False,
1835+
},
1836+
}
1837+
with dag_maker("adhoc_airflow_except_this_task_id_is_really_really_really_really_long"):
1838+
op = BigQueryInsertJobOperator(
1839+
task_id="insert_query_job",
1840+
configuration=configuration,
1841+
location=TEST_DATASET_LOCATION,
1842+
project_id=TEST_GCP_PROJECT_ID,
1843+
)
1844+
op._add_job_labels()
1845+
assert "labels" not in configuration
1846+
1847+
def test_labels_lowercase(self, dag_maker):
1848+
configuration = {
1849+
"query": {
1850+
"query": "SELECT * FROM any",
1851+
"useLegacySql": False,
1852+
},
1853+
}
1854+
with dag_maker("YELLING_DAG_NAME"):
1855+
op = BigQueryInsertJobOperator(
1856+
task_id="YELLING_TASK_ID",
1857+
configuration=configuration,
1858+
location=TEST_DATASET_LOCATION,
1859+
project_id=TEST_GCP_PROJECT_ID,
1860+
)
1861+
op._add_job_labels()
1862+
assert configuration["labels"]["airflow-dag"] == "yelling_dag_name"
1863+
assert configuration["labels"]["airflow-task"] == "yelling_task_id"
1864+
1865+
def test_labels_invalid_names(self, dag_maker):
1866+
configuration = {
1867+
"query": {
1868+
"query": "SELECT * FROM any",
1869+
"useLegacySql": False,
1870+
},
1871+
}
1872+
op = BigQueryInsertJobOperator(
1873+
task_id="task.with.dots.is.allowed",
1874+
configuration=configuration,
1875+
location=TEST_DATASET_LOCATION,
1876+
project_id=TEST_GCP_PROJECT_ID,
1877+
)
1878+
op._add_job_labels()
1879+
assert "labels" not in configuration
1880+
1881+
op = BigQueryInsertJobOperator(
1882+
task_id="123_task",
1883+
configuration=configuration,
1884+
location=TEST_DATASET_LOCATION,
1885+
project_id=TEST_GCP_PROJECT_ID,
1886+
)
1887+
op._add_job_labels()
1888+
assert "labels" not in configuration
1889+
17561890

17571891
class TestBigQueryIntervalCheckOperator:
17581892
def test_bigquery_interval_check_operator_execute_complete(self):

0 commit comments

Comments
 (0)