-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==8.10.0
Apache Airflow version
2.3.4
Operating System
Ubuntu 18.04.6 LTS
Deployment
Google Cloud Composer
Deployment details
Google Cloud Composer 2.1.2
What happened
GCSToBigQueryOperator does not respect the BigQuery project ID specified in destination_project_dataset_table argument. Instead, it prioritizes the project ID defined in the Airflow connection.
What you think should happen instead
The project ID specified via destination_project_dataset_table should be respected.
Use case: Suppose our Composer environment and service account (SA) live in project-A, and we want to transfer data into foreign projects B, C, and D. We don't have credentials (and thus don't have Airflow connections defined) for projects B, C, and D. Instead, all transfers are executed by our singular SA in project-A. (Assume this SA has cross-project IAM policies). Thus, we want to use a single SA and single Airflow connection (i.e. gcp_conn_id=google_cloud_default) to send data into 3+ destination projects. I imagine this is a fairly common setup for sending data across GCP projects.
Root cause: I've been studying the source code, and I believe the bug is caused by line 309. Experimentally, I have verified that hook.project_id traces back to the Airflow connection's project ID. If no destination project ID is explicitly specified, then it makes sense to fall back on the connection's project. However, if the destination project is explicitly provided, surely the operator should honor that. I think this bug can be fixed by amending line 309 as follows:
project=passed_in_project or hook.project_idThis pattern is used successfully in many other areas of the repo: example.
How to reproduce
Admittedly, this bug is difficult to reproduce, because it requires two GCP projects, i.e. a service account in project-A, and inbound GCS files and a destination BigQuery table in project-B. Also, you need an Airflow server with a google_cloud_default connection that points to project-A like this. Assuming all that exists, the bug can be reproduced via the following Airflow DAG:
from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime
GCS_BUCKET='my_bucket'
GCS_PREFIX='path/to/*.json'
BQ_PROJECT='project-B'
BQ_DATASET='my_dataset'
BQ_TABLE='my_table'
SERVICE_ACCOUNT='my_account@project-A.iam.gserviceaccount.com'
with DAG(
dag_id='my_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
) as dag:
task = GCSToBigQueryOperator(
task_id='gcs_to_bigquery',
bucket=GCS_BUCKET,
source_objects=GCS_PREFIX,
source_format='NEWLINE_DELIMITED_JSON',
destination_project_dataset_table='{}.{}.{}'.format(BQ_PROJECT, BQ_DATASET, BQ_TABLE),
impersonation_chain=SERVICE_ACCOUNT,
)Stack trace:
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
ti.run(job_id=ti.job_id, **params)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1797, in run
self._run_raw_task(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper
return func(*args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1464, in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1612, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1673, in _execute_task
result = execute_callable(context=context)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 387, in execute
job = self._submit_job(self.hook, job_id)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 307, in _submit_job
return hook.insert_job(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 468, in inner_wrapper
return func(self, *args, **kwargs)
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1549, in insert_job
job._begin()
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 510, in _begin
api_response = client._call_api(
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 782, in _call_api
return call()
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 283, in retry_wrapped_func
return retry_target(
File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 190, in retry_target
return target()
File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
raise exceptions.from_http_response(response)
google.api_core.exceptions.Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/{project-A}/jobs?prettyPrint=false: Access Denied: Project {project-A}: User does not have bigquery.jobs.create permission in project {project-A}.
From the stack trace, notice the operator is (incorrectly) attempting to insert into project-A rather than project-B.
Anything else
Perhaps out-of-scope, but the inverse direction also suffers from this same problem, i.e. BigQueryToGcsOperator and line 192.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct