Skip to content

GCSToBigQueryOperator does not respect the destination project ID #29958

@chriscugliotta

Description

@chriscugliotta

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==8.10.0

Apache Airflow version

2.3.4

Operating System

Ubuntu 18.04.6 LTS

Deployment

Google Cloud Composer

Deployment details

Google Cloud Composer 2.1.2

What happened

GCSToBigQueryOperator does not respect the BigQuery project ID specified in destination_project_dataset_table argument. Instead, it prioritizes the project ID defined in the Airflow connection.

What you think should happen instead

The project ID specified via destination_project_dataset_table should be respected.

Use case: Suppose our Composer environment and service account (SA) live in project-A, and we want to transfer data into foreign projects B, C, and D. We don't have credentials (and thus don't have Airflow connections defined) for projects B, C, and D. Instead, all transfers are executed by our singular SA in project-A. (Assume this SA has cross-project IAM policies). Thus, we want to use a single SA and single Airflow connection (i.e. gcp_conn_id=google_cloud_default) to send data into 3+ destination projects. I imagine this is a fairly common setup for sending data across GCP projects.

Root cause: I've been studying the source code, and I believe the bug is caused by line 309. Experimentally, I have verified that hook.project_id traces back to the Airflow connection's project ID. If no destination project ID is explicitly specified, then it makes sense to fall back on the connection's project. However, if the destination project is explicitly provided, surely the operator should honor that. I think this bug can be fixed by amending line 309 as follows:

project=passed_in_project or hook.project_id

This pattern is used successfully in many other areas of the repo: example.

How to reproduce

Admittedly, this bug is difficult to reproduce, because it requires two GCP projects, i.e. a service account in project-A, and inbound GCS files and a destination BigQuery table in project-B. Also, you need an Airflow server with a google_cloud_default connection that points to project-A like this. Assuming all that exists, the bug can be reproduced via the following Airflow DAG:

from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime

GCS_BUCKET='my_bucket'
GCS_PREFIX='path/to/*.json'
BQ_PROJECT='project-B'
BQ_DATASET='my_dataset'
BQ_TABLE='my_table'
SERVICE_ACCOUNT='my_account@project-A.iam.gserviceaccount.com'


with DAG(
        dag_id='my_dag',
        start_date=datetime(2023, 1, 1),
        schedule_interval=None,
    ) as dag:

    task = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery',
        bucket=GCS_BUCKET,
        source_objects=GCS_PREFIX,
        source_format='NEWLINE_DELIMITED_JSON',
        destination_project_dataset_table='{}.{}.{}'.format(BQ_PROJECT, BQ_DATASET, BQ_TABLE),
        impersonation_chain=SERVICE_ACCOUNT,
    )

Stack trace:

Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/executors/debug_executor.py", line 79, in _run_task
    ti.run(job_id=ti.job_id, **params)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper
    return func(*args, session=session, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1797, in run
    self._run_raw_task(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1464, in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1612, in _execute_task_with_callbacks
    result = self._execute_task(context, task_orig)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1673, in _execute_task
    result = execute_callable(context=context)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 387, in execute
    job = self._submit_job(self.hook, job_id)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py", line 307, in _submit_job
    return hook.insert_job(
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/common/hooks/base_google.py", line 468, in inner_wrapper
    return func(self, *args, **kwargs)
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/bigquery.py", line 1549, in insert_job
    job._begin()
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/job/base.py", line 510, in _begin
    api_response = client._call_api(
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/bigquery/client.py", line 782, in _call_api
    return call()
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 283, in retry_wrapped_func
    return retry_target(
  File "/opt/python3.8/lib/python3.8/site-packages/google/api_core/retry.py", line 190, in retry_target
    return target()
  File "/opt/python3.8/lib/python3.8/site-packages/google/cloud/_http/__init__.py", line 494, in api_request
    raise exceptions.from_http_response(response)
google.api_core.exceptions.Forbidden: 403 POST https://bigquery.googleapis.com/bigquery/v2/projects/{project-A}/jobs?prettyPrint=false: Access Denied: Project {project-A}: User does not have bigquery.jobs.create permission in project {project-A}.

From the stack trace, notice the operator is (incorrectly) attempting to insert into project-A rather than project-B.

Anything else

Perhaps out-of-scope, but the inverse direction also suffers from this same problem, i.e. BigQueryToGcsOperator and line 192.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions