@@ -1102,6 +1102,7 @@ def test_execute_query_success(self, mock_hook):
11021102 project_id = TEST_GCP_PROJECT_ID ,
11031103 )
11041104 result = op .execute (context = MagicMock ())
1105+ assert configuration ["labels" ] == {"airflow-dag" : "adhoc_airflow" , "airflow-task" : "insert_query_job" }
11051106
11061107 mock_hook .return_value .insert_job .assert_called_once_with (
11071108 configuration = configuration ,
@@ -1143,6 +1144,7 @@ def test_execute_copy_success(self, mock_hook):
11431144 project_id = TEST_GCP_PROJECT_ID ,
11441145 )
11451146 result = op .execute (context = MagicMock ())
1147+ assert configuration ["labels" ] == {"airflow-dag" : "adhoc_airflow" , "airflow-task" : "copy_query_job" }
11461148
11471149 mock_hook .return_value .insert_job .assert_called_once_with (
11481150 configuration = configuration ,
@@ -1753,6 +1755,138 @@ def test_execute_force_rerun_async(self, mock_hook, create_task_instance_of_oper
17531755 project_id = TEST_GCP_PROJECT_ID ,
17541756 )
17551757
1758+ @mock .patch ("airflow.providers.google.cloud.operators.bigquery.BigQueryHook" )
1759+ def test_execute_adds_to_existing_labels (self , mock_hook ):
1760+ job_id = "123456"
1761+ hash_ = "hash"
1762+ real_job_id = f"{ job_id } _{ hash_ } "
1763+
1764+ configuration = {
1765+ "query" : {
1766+ "query" : "SELECT * FROM any" ,
1767+ "useLegacySql" : False ,
1768+ },
1769+ "labels" : {"foo" : "bar" },
1770+ }
1771+ mock_hook .return_value .insert_job .return_value = MagicMock (job_id = real_job_id , error_result = False )
1772+ mock_hook .return_value .generate_job_id .return_value = real_job_id
1773+
1774+ op = BigQueryInsertJobOperator (
1775+ task_id = "insert_query_job" ,
1776+ configuration = configuration ,
1777+ location = TEST_DATASET_LOCATION ,
1778+ job_id = job_id ,
1779+ project_id = TEST_GCP_PROJECT_ID ,
1780+ )
1781+ op .execute (context = MagicMock ())
1782+ assert configuration ["labels" ] == {
1783+ "foo" : "bar" ,
1784+ "airflow-dag" : "adhoc_airflow" ,
1785+ "airflow-task" : "insert_query_job" ,
1786+ }
1787+
1788+ @mock .patch ("airflow.providers.google.cloud.operators.bigquery.BigQueryHook" )
1789+ def test_execute_respects_explicit_no_labels (self , mock_hook ):
1790+ job_id = "123456"
1791+ hash_ = "hash"
1792+ real_job_id = f"{ job_id } _{ hash_ } "
1793+
1794+ configuration = {
1795+ "query" : {
1796+ "query" : "SELECT * FROM any" ,
1797+ "useLegacySql" : False ,
1798+ },
1799+ "labels" : None ,
1800+ }
1801+ mock_hook .return_value .insert_job .return_value = MagicMock (job_id = real_job_id , error_result = False )
1802+ mock_hook .return_value .generate_job_id .return_value = real_job_id
1803+
1804+ op = BigQueryInsertJobOperator (
1805+ task_id = "insert_query_job" ,
1806+ configuration = configuration ,
1807+ location = TEST_DATASET_LOCATION ,
1808+ job_id = job_id ,
1809+ project_id = TEST_GCP_PROJECT_ID ,
1810+ )
1811+ op .execute (context = MagicMock ())
1812+ assert configuration ["labels" ] is None
1813+
1814+ def test_task_label_too_big (self ):
1815+ configuration = {
1816+ "query" : {
1817+ "query" : "SELECT * FROM any" ,
1818+ "useLegacySql" : False ,
1819+ },
1820+ }
1821+ op = BigQueryInsertJobOperator (
1822+ task_id = "insert_query_job_except_this_task_id_is_really_really_really_really_long" ,
1823+ configuration = configuration ,
1824+ location = TEST_DATASET_LOCATION ,
1825+ project_id = TEST_GCP_PROJECT_ID ,
1826+ )
1827+ op ._add_job_labels ()
1828+ assert "labels" not in configuration
1829+
1830+ def test_dag_label_too_big (self , dag_maker ):
1831+ configuration = {
1832+ "query" : {
1833+ "query" : "SELECT * FROM any" ,
1834+ "useLegacySql" : False ,
1835+ },
1836+ }
1837+ with dag_maker ("adhoc_airflow_except_this_task_id_is_really_really_really_really_long" ):
1838+ op = BigQueryInsertJobOperator (
1839+ task_id = "insert_query_job" ,
1840+ configuration = configuration ,
1841+ location = TEST_DATASET_LOCATION ,
1842+ project_id = TEST_GCP_PROJECT_ID ,
1843+ )
1844+ op ._add_job_labels ()
1845+ assert "labels" not in configuration
1846+
1847+ def test_labels_lowercase (self , dag_maker ):
1848+ configuration = {
1849+ "query" : {
1850+ "query" : "SELECT * FROM any" ,
1851+ "useLegacySql" : False ,
1852+ },
1853+ }
1854+ with dag_maker ("YELLING_DAG_NAME" ):
1855+ op = BigQueryInsertJobOperator (
1856+ task_id = "YELLING_TASK_ID" ,
1857+ configuration = configuration ,
1858+ location = TEST_DATASET_LOCATION ,
1859+ project_id = TEST_GCP_PROJECT_ID ,
1860+ )
1861+ op ._add_job_labels ()
1862+ assert configuration ["labels" ]["airflow-dag" ] == "yelling_dag_name"
1863+ assert configuration ["labels" ]["airflow-task" ] == "yelling_task_id"
1864+
1865+ def test_labels_invalid_names (self , dag_maker ):
1866+ configuration = {
1867+ "query" : {
1868+ "query" : "SELECT * FROM any" ,
1869+ "useLegacySql" : False ,
1870+ },
1871+ }
1872+ op = BigQueryInsertJobOperator (
1873+ task_id = "task.with.dots.is.allowed" ,
1874+ configuration = configuration ,
1875+ location = TEST_DATASET_LOCATION ,
1876+ project_id = TEST_GCP_PROJECT_ID ,
1877+ )
1878+ op ._add_job_labels ()
1879+ assert "labels" not in configuration
1880+
1881+ op = BigQueryInsertJobOperator (
1882+ task_id = "123_task" ,
1883+ configuration = configuration ,
1884+ location = TEST_DATASET_LOCATION ,
1885+ project_id = TEST_GCP_PROJECT_ID ,
1886+ )
1887+ op ._add_job_labels ()
1888+ assert "labels" not in configuration
1889+
17561890
17571891class TestBigQueryIntervalCheckOperator :
17581892 def test_bigquery_interval_check_operator_execute_complete (self ):
0 commit comments