Skip to content

Commit 2d690de

Browse files
Fix BigQuery transfer operators to respect project_id arguments (#32232)
1 parent b3db4de commit 2d690de

File tree

4 files changed

+86
-55
lines changed

4 files changed

+86
-55
lines changed

airflow/providers/google/cloud/transfers/bigquery_to_gcs.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
146146
def _prepare_configuration(self):
147147
source_project, source_dataset, source_table = self.hook.split_tablename(
148148
table_input=self.source_project_dataset_table,
149-
default_project_id=self.project_id or self.hook.project_id,
149+
default_project_id=self.hook.project_id,
150150
var_name="source_project_dataset_table",
151151
)
152152

@@ -184,7 +184,7 @@ def _submit_job(
184184

185185
return hook.insert_job(
186186
configuration=configuration,
187-
project_id=configuration["extract"]["sourceTable"]["projectId"],
187+
project_id=self.project_id or hook.project_id,
188188
location=self.location,
189189
job_id=job_id,
190190
timeout=self.result_timeout,
@@ -255,7 +255,7 @@ def execute(self, context: Context):
255255
trigger=BigQueryInsertJobTrigger(
256256
conn_id=self.gcp_conn_id,
257257
job_id=job_id,
258-
project_id=self.hook.project_id,
258+
project_id=self.project_id or self.hook.project_id,
259259
),
260260
method_name="execute_complete",
261261
)

airflow/providers/google/cloud/transfers/gcs_to_bigquery.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ def _submit_job(
301301
# Submit a new job without waiting for it to complete.
302302
return hook.insert_job(
303303
configuration=self.configuration,
304-
project_id=self.project_id,
304+
project_id=self.project_id or hook.project_id,
305305
location=self.location,
306306
job_id=job_id,
307307
timeout=self.result_timeout,
@@ -359,7 +359,7 @@ def execute(self, context: Context):
359359

360360
if self.external_table:
361361
self.log.info("Creating a new BigQuery table for storing data...")
362-
table_obj_api_repr = self._create_empty_table()
362+
table_obj_api_repr = self._create_external_table()
363363

364364
BigQueryTableLink.persist(
365365
context=context,
@@ -381,7 +381,7 @@ def execute(self, context: Context):
381381
except Conflict:
382382
# If the job already exists retrieve it
383383
job = self.hook.get_job(
384-
project_id=self.hook.project_id,
384+
project_id=self.project_id or self.hook.project_id,
385385
location=self.location,
386386
job_id=job_id,
387387
)
@@ -414,12 +414,12 @@ def execute(self, context: Context):
414414
persist_kwargs = {
415415
"context": context,
416416
"task_instance": self,
417-
"project_id": self.hook.project_id,
418417
"table_id": table,
419418
}
420419
if not isinstance(table, str):
421420
persist_kwargs["table_id"] = table["tableId"]
422421
persist_kwargs["dataset_id"] = table["datasetId"]
422+
persist_kwargs["project_id"] = table["projectId"]
423423
BigQueryTableLink.persist(**persist_kwargs)
424424

425425
self.job_id = job.job_id
@@ -430,7 +430,7 @@ def execute(self, context: Context):
430430
trigger=BigQueryInsertJobTrigger(
431431
conn_id=self.gcp_conn_id,
432432
job_id=self.job_id,
433-
project_id=self.hook.project_id,
433+
project_id=self.project_id or self.hook.project_id,
434434
),
435435
method_name="execute_complete",
436436
)
@@ -475,7 +475,9 @@ def _find_max_value_in_column(self):
475475
}
476476
}
477477
try:
478-
job_id = hook.insert_job(configuration=self.configuration, project_id=hook.project_id)
478+
job_id = hook.insert_job(
479+
configuration=self.configuration, project_id=self.project_id or hook.project_id
480+
)
479481
rows = list(hook.get_job(job_id=job_id, location=self.location).result())
480482
except BadRequest as e:
481483
if "Unrecognized name:" in e.message:
@@ -498,12 +500,7 @@ def _find_max_value_in_column(self):
498500
else:
499501
raise RuntimeError(f"The {select_command} returned no rows!")
500502

501-
def _create_empty_table(self):
502-
self.project_id, dataset_id, table_id = self.hook.split_tablename(
503-
table_input=self.destination_project_dataset_table,
504-
default_project_id=self.project_id or self.hook.project_id,
505-
)
506-
503+
def _create_external_table(self):
507504
external_config_api_repr = {
508505
"autodetect": self.autodetect,
509506
"sourceFormat": self.source_format,
@@ -549,7 +546,7 @@ def _create_empty_table(self):
549546

550547
# build table definition
551548
table = Table(
552-
table_ref=TableReference.from_string(self.destination_project_dataset_table, self.project_id)
549+
table_ref=TableReference.from_string(self.destination_project_dataset_table, self.hook.project_id)
553550
)
554551
table.external_data_configuration = external_config
555552
if self.labels:
@@ -567,17 +564,17 @@ def _create_empty_table(self):
567564
self.log.info("Creating external table: %s", self.destination_project_dataset_table)
568565
self.hook.create_empty_table(
569566
table_resource=table_obj_api_repr,
570-
project_id=self.project_id,
567+
project_id=self.project_id or self.hook.project_id,
571568
location=self.location,
572569
exists_ok=True,
573570
)
574571
self.log.info("External table created successfully: %s", self.destination_project_dataset_table)
575572
return table_obj_api_repr
576573

577574
def _use_existing_table(self):
578-
self.project_id, destination_dataset, destination_table = self.hook.split_tablename(
575+
destination_project_id, destination_dataset, destination_table = self.hook.split_tablename(
579576
table_input=self.destination_project_dataset_table,
580-
default_project_id=self.project_id or self.hook.project_id,
577+
default_project_id=self.hook.project_id,
581578
var_name="destination_project_dataset_table",
582579
)
583580

@@ -597,7 +594,7 @@ def _use_existing_table(self):
597594
"autodetect": self.autodetect,
598595
"createDisposition": self.create_disposition,
599596
"destinationTable": {
600-
"projectId": self.project_id,
597+
"projectId": destination_project_id,
601598
"datasetId": destination_dataset,
602599
"tableId": destination_table,
603600
},

tests/providers/google/cloud/transfers/test_bigquery_to_gcs.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
TEST_DATASET = "test-dataset"
3232
TEST_TABLE_ID = "test-table-id"
3333
PROJECT_ID = "test-project-id"
34+
JOB_PROJECT_ID = "job-project-id"
3435

3536

3637
class TestBigQueryToGCSOperator:
@@ -66,7 +67,7 @@ def test_execute(self, mock_hook):
6667
mock_hook.return_value.split_tablename.return_value = (PROJECT_ID, TEST_DATASET, TEST_TABLE_ID)
6768
mock_hook.return_value.generate_job_id.return_value = real_job_id
6869
mock_hook.return_value.insert_job.return_value = MagicMock(job_id="real_job_id", error_result=False)
69-
mock_hook.return_value.project_id = PROJECT_ID
70+
mock_hook.return_value.project_id = JOB_PROJECT_ID
7071

7172
operator = BigQueryToGCSOperator(
7273
task_id=TASK_ID,
@@ -77,13 +78,14 @@ def test_execute(self, mock_hook):
7778
field_delimiter=field_delimiter,
7879
print_header=print_header,
7980
labels=labels,
81+
project_id=JOB_PROJECT_ID,
8082
)
8183
operator.execute(context=mock.MagicMock())
8284

8385
mock_hook.return_value.insert_job.assert_called_once_with(
8486
job_id="123456_hash",
8587
configuration=expected_configuration,
86-
project_id=PROJECT_ID,
88+
project_id=JOB_PROJECT_ID,
8789
location=None,
8890
timeout=None,
8991
retry=DEFAULT_RETRY,
@@ -122,10 +124,10 @@ def test_execute_deferrable_mode(self, mock_hook):
122124
mock_hook.return_value.split_tablename.return_value = (PROJECT_ID, TEST_DATASET, TEST_TABLE_ID)
123125
mock_hook.return_value.generate_job_id.return_value = real_job_id
124126
mock_hook.return_value.insert_job.return_value = MagicMock(job_id="real_job_id", error_result=False)
125-
mock_hook.return_value.project_id = PROJECT_ID
127+
mock_hook.return_value.project_id = JOB_PROJECT_ID
126128

127129
operator = BigQueryToGCSOperator(
128-
project_id=PROJECT_ID,
130+
project_id=JOB_PROJECT_ID,
129131
task_id=TASK_ID,
130132
source_project_dataset_table=source_project_dataset_table,
131133
destination_cloud_storage_uris=destination_cloud_storage_uris,
@@ -146,7 +148,7 @@ def test_execute_deferrable_mode(self, mock_hook):
146148
mock_hook.return_value.insert_job.assert_called_once_with(
147149
configuration=expected_configuration,
148150
job_id="123456_hash",
149-
project_id=PROJECT_ID,
151+
project_id=JOB_PROJECT_ID,
150152
location=None,
151153
timeout=None,
152154
retry=DEFAULT_RETRY,

0 commit comments

Comments
 (0)