Skip to content

ObjectStorage for GCS unable to use GCP connection credential #37834

@pankajastro

Description

@pankajastro

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

Below DAG is unable to use credentials from Google Cloud connection. I tried to add the service account key JSON as well as its path. It works when I set env GOOGLE_APPLICATION_CREDENTIALS too with the value service account key JSON path.

import pendulum
import requests

from airflow.decorators import dag, task
from airflow.io.path import ObjectStoragePath

base = ObjectStoragePath("gs://airflow-tutorial-data1/", conn_id="gcp_conn_id")


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def gcs_objectstorage():
    @task
    def store_data() -> ObjectStoragePath:
        import pandas as pd

        logical_date = pd.Timestamp.now()
        formatted_date = logical_date.strftime("%Y%m%d")
        path = base / f"air_quality_{formatted_date}.parquet"

        aq_fields = {
            "calories": "int32",
            "duration": "int32",
        }

        data = {
            "calories": 420,
            "duration": 50,
        }

        df = pd.DataFrame(data, index=[0]).astype(aq_fields)
        with path.open("wb") as file:
            df.to_parquet(file)
        return path

    store_data()


gcs_objectstorage()

Without GOOGLE_APPLICATION_CREDENTIALS I'm getting below error

c434c7303a41
*** Found local files:
***   * /usr/local/airflow/logs/dag_id=gcs_objectstorage/run_id=manual__2024-03-01T17:17:58.472629+00:00/task_id=store_data/attempt=1.log
[2024-03-01, 17:17:59 UTC] {taskinstance.py:1997} INFO - Dependencies all met for dep_context=non-requeueable deps ti=
[2024-03-01, 17:17:59 UTC] {taskinstance.py:1997} INFO - Dependencies all met for dep_context=requeueable deps ti=
[2024-03-01, 17:17:59 UTC] {taskinstance.py:2211} INFO - Starting attempt 1 of 1
[2024-03-01, 17:17:59 UTC] {taskinstance.py:2232} INFO - Executing  on 2024-03-01 17:17:58.472629+00:00
[2024-03-01, 17:17:59 UTC] {standard_task_runner.py:60} INFO - Started process 194 to run task
[2024-03-01, 17:17:59 UTC] {standard_task_runner.py:87} INFO - Running: ['airflow', 'tasks', 'run', 'gcs_objectstorage', 'store_data', 'manual__2024-03-01T17:17:58.472629+00:00', '--job-id', '150', '--raw', '--subdir', 'DAGS_FOLDER/obj_storage1.py', '--cfg-path', '/tmp/tmpfmrrthnh']
[2024-03-01, 17:17:59 UTC] {standard_task_runner.py:88} INFO - Job 150: Subtask store_data
[2024-03-01, 17:17:59 UTC] {task_command.py:424} INFO - Running  on host c434c7303a41
[2024-03-01, 17:17:59 UTC] {taskinstance.py:2537} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='gcs_objectstorage' AIRFLOW_CTX_TASK_ID='store_data' AIRFLOW_CTX_EXECUTION_DATE='2024-03-01T17:17:58.472629+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-01T17:17:58.472629+00:00'
[2024-03-01, 17:18:00 UTC] {connection.py:269} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2024-03-01, 17:18:00 UTC] {base.py:83} INFO - Using connection ID 'gcp_conn_id' for task execution.
[2024-03-01, 17:18:03 UTC] {_metadata.py:139} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 3. Reason: timed out
[2024-03-01, 17:18:06 UTC] {_metadata.py:139} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 3. Reason: timed out
[2024-03-01, 17:18:06 UTC] {_metadata.py:139} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 3. Reason: [Errno 111] Connection refused
[2024-03-01, 17:18:06 UTC] {_default.py:338} WARNING - Authentication failed using Compute Engine authentication due to unavailable metadata server.
[2024-03-01, 17:18:06 UTC] {_metadata.py:208} WARNING - Compute Engine Metadata server unavailable on attempt 1 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NameResolutionError(": Failed to resolve 'metadata.google.internal' ([Errno -2] Name or service not known)"))
[2024-03-01, 17:18:06 UTC] {_metadata.py:208} WARNING - Compute Engine Metadata server unavailable on attempt 2 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NameResolutionError(": Failed to resolve 'metadata.google.internal' ([Errno -2] Name or service not known)"))
[2024-03-01, 17:18:06 UTC] {_metadata.py:208} WARNING - Compute Engine Metadata server unavailable on attempt 3 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NameResolutionError(": Failed to resolve 'metadata.google.internal' ([Errno -2] Name or service not known)"))
[2024-03-01, 17:18:06 UTC] {_metadata.py:208} WARNING - Compute Engine Metadata server unavailable on attempt 4 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NameResolutionError(": Failed to resolve 'metadata.google.internal' ([Errno -2] Name or service not known)"))
[2024-03-01, 17:18:06 UTC] {_metadata.py:208} WARNING - Compute Engine Metadata server unavailable on attempt 5 of 5. Reason: HTTPConnectionPool(host='metadata.google.internal', port=80): Max retries exceeded with url: /computeMetadata/v1/instance/service-accounts/default/?recursive=true (Caused by NameResolutionError(": Failed to resolve 'metadata.google.internal' ([Errno -2] Name or service not known)"))
[2024-03-01, 17:18:06 UTC] {retry.py:157} ERROR - _request non-retriable exception: Anonymous caller does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist)., 401
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/gcsfs/retry.py", line 123, in retry_request
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/gcsfs/core.py", line 430, in _request
    validate_response(status, contents, path, args)
  File "/usr/local/lib/python3.11/site-packages/gcsfs/retry.py", line 110, in validate_response
    raise HttpError(error)
gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist)., 401
[2024-03-01, 17:18:06 UTC] {taskinstance.py:2774} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 447, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 417, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/decorators/base.py", line 238, in execute
    return_value = super().execute(context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/operators/python.py", line 200, in execute
    return_value = self.execute_callable()
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/operators/python.py", line 217, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/obj_storage1.py", line 36, in store_data
    with path.open("wb") as file:
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 1965, in __exit__
    self.close()
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 1932, in close
    self.flush(force=True)
  File "/usr/local/lib/python3.11/site-packages/fsspec/spec.py", line 1798, in flush
    self._initiate_upload()
  File "/usr/local/lib/python3.11/site-packages/gcsfs/core.py", line 1799, in _initiate_upload
    self.location = sync(
                    ^^^^^
  File "/usr/local/lib/python3.11/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/usr/local/lib/python3.11/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
                ^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/gcsfs/core.py", line 1916, in initiate_upload
    headers, _ = await fs._call(
                 ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/gcsfs/core.py", line 437, in _call
    status, headers, info, contents = await self._request(
                                      ^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/gcsfs/retry.py", line 158, in retry_request
    raise e
  File "/usr/local/lib/python3.11/site-packages/gcsfs/retry.py", line 123, in retry_request
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/gcsfs/core.py", line 430, in _request
    validate_response(status, contents, path, args)
  File "/usr/local/lib/python3.11/site-packages/gcsfs/retry.py", line 110, in validate_response
    raise HttpError(error)
gcsfs.retry.HttpError: Anonymous caller does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist)., 401
[2024-03-01, 17:18:06 UTC] {taskinstance.py:1168} INFO - Marking task as FAILED. dag_id=gcs_objectstorage, task_id=store_data, execution_date=20240301T171758, start_date=20240301T171759, end_date=20240301T171806
[2024-03-01, 17:18:06 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 150 for task store_data (Anonymous caller does not have storage.objects.create access to the Google Cloud Storage object. Permission 'storage.objects.create' denied on resource (or it may not exist)., 401; 194)
[2024-03-01, 17:18:06 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-03-01, 17:18:06 UTC] {taskinstance.py:3357} INFO - 0 downstream tasks scheduled from follow-on schedule check

What you think should happen instead?

The above DAG should be able to use credentials from airflow connection and it should not require to export env GOOGLE_APPLICATION_CREDENTIALS

How to reproduce

Run above DAG

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Docker-Compose

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions