@@ -301,7 +301,7 @@ def _submit_job(
301301 # Submit a new job without waiting for it to complete.
302302 return hook .insert_job (
303303 configuration = self .configuration ,
304- project_id = self .project_id ,
304+ project_id = self .project_id or hook . project_id ,
305305 location = self .location ,
306306 job_id = job_id ,
307307 timeout = self .result_timeout ,
@@ -359,7 +359,7 @@ def execute(self, context: Context):
359359
360360 if self .external_table :
361361 self .log .info ("Creating a new BigQuery table for storing data..." )
362- table_obj_api_repr = self ._create_empty_table ()
362+ table_obj_api_repr = self ._create_external_table ()
363363
364364 BigQueryTableLink .persist (
365365 context = context ,
@@ -381,7 +381,7 @@ def execute(self, context: Context):
381381 except Conflict :
382382 # If the job already exists retrieve it
383383 job = self .hook .get_job (
384- project_id = self .hook .project_id ,
384+ project_id = self .project_id or self . hook .project_id ,
385385 location = self .location ,
386386 job_id = job_id ,
387387 )
@@ -414,12 +414,12 @@ def execute(self, context: Context):
414414 persist_kwargs = {
415415 "context" : context ,
416416 "task_instance" : self ,
417- "project_id" : self .hook .project_id ,
418417 "table_id" : table ,
419418 }
420419 if not isinstance (table , str ):
421420 persist_kwargs ["table_id" ] = table ["tableId" ]
422421 persist_kwargs ["dataset_id" ] = table ["datasetId" ]
422+ persist_kwargs ["project_id" ] = table ["projectId" ]
423423 BigQueryTableLink .persist (** persist_kwargs )
424424
425425 self .job_id = job .job_id
@@ -430,7 +430,7 @@ def execute(self, context: Context):
430430 trigger = BigQueryInsertJobTrigger (
431431 conn_id = self .gcp_conn_id ,
432432 job_id = self .job_id ,
433- project_id = self .hook .project_id ,
433+ project_id = self .project_id or self . hook .project_id ,
434434 ),
435435 method_name = "execute_complete" ,
436436 )
@@ -475,7 +475,9 @@ def _find_max_value_in_column(self):
475475 }
476476 }
477477 try :
478- job_id = hook .insert_job (configuration = self .configuration , project_id = hook .project_id )
478+ job_id = hook .insert_job (
479+ configuration = self .configuration , project_id = self .project_id or hook .project_id
480+ )
479481 rows = list (hook .get_job (job_id = job_id , location = self .location ).result ())
480482 except BadRequest as e :
481483 if "Unrecognized name:" in e .message :
@@ -498,12 +500,7 @@ def _find_max_value_in_column(self):
498500 else :
499501 raise RuntimeError (f"The { select_command } returned no rows!" )
500502
501- def _create_empty_table (self ):
502- self .project_id , dataset_id , table_id = self .hook .split_tablename (
503- table_input = self .destination_project_dataset_table ,
504- default_project_id = self .project_id or self .hook .project_id ,
505- )
506-
503+ def _create_external_table (self ):
507504 external_config_api_repr = {
508505 "autodetect" : self .autodetect ,
509506 "sourceFormat" : self .source_format ,
@@ -549,7 +546,7 @@ def _create_empty_table(self):
549546
550547 # build table definition
551548 table = Table (
552- table_ref = TableReference .from_string (self .destination_project_dataset_table , self .project_id )
549+ table_ref = TableReference .from_string (self .destination_project_dataset_table , self .hook . project_id )
553550 )
554551 table .external_data_configuration = external_config
555552 if self .labels :
@@ -567,17 +564,17 @@ def _create_empty_table(self):
567564 self .log .info ("Creating external table: %s" , self .destination_project_dataset_table )
568565 self .hook .create_empty_table (
569566 table_resource = table_obj_api_repr ,
570- project_id = self .project_id ,
567+ project_id = self .project_id or self . hook . project_id ,
571568 location = self .location ,
572569 exists_ok = True ,
573570 )
574571 self .log .info ("External table created successfully: %s" , self .destination_project_dataset_table )
575572 return table_obj_api_repr
576573
577574 def _use_existing_table (self ):
578- self . project_id , destination_dataset , destination_table = self .hook .split_tablename (
575+ destination_project_id , destination_dataset , destination_table = self .hook .split_tablename (
579576 table_input = self .destination_project_dataset_table ,
580- default_project_id = self .project_id or self . hook .project_id ,
577+ default_project_id = self .hook .project_id ,
581578 var_name = "destination_project_dataset_table" ,
582579 )
583580
@@ -597,7 +594,7 @@ def _use_existing_table(self):
597594 "autodetect" : self .autodetect ,
598595 "createDisposition" : self .create_disposition ,
599596 "destinationTable" : {
600- "projectId" : self . project_id ,
597+ "projectId" : destination_project_id ,
601598 "datasetId" : destination_dataset ,
602599 "tableId" : destination_table ,
603600 },
0 commit comments