Skip to content

Conversation

@vchiapaikeo
Copy link
Contributor

@vchiapaikeo vchiapaikeo commented Dec 23, 2022

closes: #12329

A check for autodetect being falsey instead of it being None stops us from being able to use a feature from the Job API where a BQ table exists and we don't want to specify schema_fields or a schema_obj. Change this check to look for explicit False thus allowing us to be able to pass None to autodetect without schema_fields or a schema_obj.

Testing Notes

To verify, I tried this on my local setup with a simple dag:

from airflow import DAG

from etsy.operators.gcs_to_bigquery import GCSToBigQueryOperator

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_os_patch_gcs_to_bigquery",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    test_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="test_gcs_to_bigquery",
        create_disposition="CREATE_IF_NEEDED",
        # Need to explicitly set autodetect to None
        autodetect=None,
        write_disposition="WRITE_TRUNCATE",
        destination_project_dataset_table="my-project.vchiapaikeo.test1",
        bucket="my-bucket",
        source_format="CSV",
        source_objects=["vchiapaikeo/file.csv"],
    )

I then created a simple table in BigQuery:

image

And ran the dag:

image

image

Task logs:

[2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [queued]>
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1087} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [queued]>
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1283} INFO - 
--------------------------------------------------------------------------------
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1284} INFO - Starting attempt 15 of 16
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1285} INFO - 
--------------------------------------------------------------------------------
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1304} INFO - Executing <Task(GCSToBigQueryOperator): test_gcs_to_bigquery> on 2022-12-22 00:00:00+00:00
[2022-12-23, 20:30:32 UTC] {standard_task_runner.py:55} INFO - Started process 5611 to run task
[2022-12-23, 20:30:32 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_os_patch_gcs_to_bigquery', 'test_gcs_to_bigquery', 'scheduled__2022-12-22T00:00:00+00:00', '--job-id', '17', '--raw', '--subdir', 'DAGS_FOLDER/dataeng/batch/test_os_patch_gcs_to_bigquery.py', '--cfg-path', '/tmp/tmpoxitwl1m']
[2022-12-23, 20:30:32 UTC] {standard_task_runner.py:83} INFO - Job 17: Subtask test_gcs_to_bigquery
[2022-12-23, 20:30:32 UTC] {task_command.py:389} INFO - Running <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-22T00:00:00+00:00 [running]> on host f3b7042f4dc5
[2022-12-23, 20:30:32 UTC] {taskinstance.py:1511} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_patch_gcs_to_bigquery
AIRFLOW_CTX_TASK_ID=test_gcs_to_bigquery
AIRFLOW_CTX_EXECUTION_DATE=2022-12-22T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=15
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-22T00:00:00+00:00
[2022-12-23, 20:30:32 UTC] {metastore.py:45} INFO - Default connection request. Checking conn_id google_cloud_gcp_data_platform
[2022-12-23, 20:30:32 UTC] {connection.py:210} WARNING - Connection schemes (type: google_cloud_platform) shall not contain '_' according to RFC3986.
[2022-12-23, 20:30:32 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.
[2022-12-23, 20:30:32 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-23, 20:30:32 UTC] {gcs_to_bigquery.py:370} INFO - Using existing BigQuery table for storing data...
[2022-12-23, 20:30:32 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-23, 20:30:34 UTC] {gcs_to_bigquery.py:374} INFO - Executing: {'load': {'autodetect': None, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'my-project', 'datasetId': 'vchiapaikeo', 'tableId': 'test1'}, 'sourceFormat': 'CSV', 'sourceUris': ['gs://my-bucket/vchiapaikeo/file.csv'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False, 'skipLeadingRows': None, 'fieldDelimiter': ',', 'quote': None, 'allowQuotedNewlines': False, 'encoding': 'UTF-8'}}
[2022-12-23, 20:30:34 UTC] {bigquery.py:1539} INFO - Inserting job airflow_test_os_patch_gcs_to_bigquery_test_gcs_to_bigquery_2022_12_22T00_00_00_00_00_8c90b0141a25c185bab829d91cc9a474
[2022-12-23, 20:30:37 UTC] {taskinstance.py:1322} INFO - Marking task as SUCCESS. dag_id=test_os_patch_gcs_to_bigquery, task_id=test_gcs_to_bigquery, execution_date=20221222T000000, start_date=20221223T203032, end_date=20221223T203037
[2022-12-23, 20:30:37 UTC] {connection.py:210} WARNING - Connection schemes (type: datahub_rest) shall not contain '_' according to RFC3986.
[2022-12-23, 20:30:40 UTC] {local_task_job.py:159} INFO - Task exited with return code 0
[2022-12-23, 20:30:40 UTC] {taskinstance.py:2582} INFO - 0 downstream tasks scheduled from follow-on schedule check

^ omitted some redundant log lines

PR: #28564


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

cc: @eladkal , @VladaZakharova

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
@vchiapaikeo can you check the failures in doc build?

@vchiapaikeo
Copy link
Contributor Author

Agh, sorry about that. Tests rerunning now

@vchiapaikeo vchiapaikeo force-pushed the vchiapaikeo/feature-gcs-to-bigquery-write-dispo-append-v1 branch from 00a3a74 to 569eecd Compare December 24, 2022 13:13
@vchiapaikeo
Copy link
Contributor Author

vchiapaikeo commented Dec 24, 2022

Also tested this dag in breeze locally after adding a google_default_connection and things seemed to work fine as well:

Dag:

from airflow import DAG

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator,
)

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_os_patch_gcs_to_bigquery",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    test_gcs_to_bigquery = GCSToBigQueryOperator(
        task_id="test_gcs_to_bigquery",
        create_disposition="CREATE_IF_NEEDED",
        # Need to explicitly set autodetect to None
        autodetect=None,
        write_disposition="WRITE_TRUNCATE",
        destination_project_dataset_table="my-project.vchiapaikeo.test1",
        bucket="my-bucket",
        source_format="CSV",
        source_objects=["vchiapaikeo/file.csv"],
    )

Task Logs:

*** Reading local file: /root/airflow/logs/dag_id=test_os_patch_gcs_to_bigquery/run_id=scheduled__2022-12-23T00:00:00+00:00/task_id=test_gcs_to_bigquery/attempt=5.log
[2022-12-24, 13:38:59 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-23T00:00:00+00:00 [queued]>
[2022-12-24, 13:38:59 UTC] {taskinstance.py:1084} INFO - Dependencies all met for <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-23T00:00:00+00:00 [queued]>
[2022-12-24, 13:39:00 UTC] {taskinstance.py:1282} INFO - 
--------------------------------------------------------------------------------
[2022-12-24, 13:39:00 UTC] {taskinstance.py:1283} INFO - Starting attempt 5 of 6
[2022-12-24, 13:39:00 UTC] {taskinstance.py:1284} INFO - 
--------------------------------------------------------------------------------
[2022-12-24, 13:39:00 UTC] {taskinstance.py:1303} INFO - Executing <Task(GCSToBigQueryOperator): test_gcs_to_bigquery> on 2022-12-23 00:00:00+00:00
[2022-12-24, 13:39:00 UTC] {standard_task_runner.py:55} INFO - Started process 396 to run task
[2022-12-24, 13:39:00 UTC] {standard_task_runner.py:82} INFO - Running: ['airflow', 'tasks', 'run', 'test_os_patch_gcs_to_bigquery', 'test_gcs_to_bigquery', 'scheduled__2022-12-23T00:00:00+00:00', '--job-id', '11', '--raw', '--subdir', 'DAGS_FOLDER/test.py', '--cfg-path', '/tmp/tmpoc_4ow50']
[2022-12-24, 13:39:00 UTC] {standard_task_runner.py:83} INFO - Job 11: Subtask test_gcs_to_bigquery
[2022-12-24, 13:39:00 UTC] {task_command.py:388} INFO - Running <TaskInstance: test_os_patch_gcs_to_bigquery.test_gcs_to_bigquery scheduled__2022-12-23T00:00:00+00:00 [running]> on host d6d6ca865d2e
[2022-12-24, 13:39:00 UTC] {taskinstance.py:1512} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=gcp-data-platform
AIRFLOW_CTX_DAG_ID=test_os_patch_gcs_to_bigquery
AIRFLOW_CTX_TASK_ID=test_gcs_to_bigquery
AIRFLOW_CTX_EXECUTION_DATE=2022-12-23T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=5
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2022-12-23T00:00:00+00:00
[2022-12-24, 13:39:00 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2022-12-24, 13:39:00 UTC] {gcs_to_bigquery.py:377} INFO - Using existing BigQuery table for storing data...
[2022-12-24, 13:39:00 UTC] {credentials_provider.py:323} INFO - Getting connection using `google.auth.default()` since no key file is defined for hook.
[2022-12-24, 13:39:00 UTC] {_default.py:649} WARNING - No project ID could be determined. Consider running `gcloud config set project` or setting the GOOGLE_CLOUD_PROJECT environment variable
[2022-12-24, 13:39:00 UTC] {gcs_to_bigquery.py:381} INFO - Executing: {'load': {'autodetect': None, 'createDisposition': 'CREATE_IF_NEEDED', 'destinationTable': {'projectId': 'my-project', 'datasetId': 'vchiapaikeo', 'tableId': 'test1'}, 'sourceFormat': 'CSV', 'sourceUris': ['gs://my-bucket/vchiapaikeo/file.csv'], 'writeDisposition': 'WRITE_TRUNCATE', 'ignoreUnknownValues': False, 'skipLeadingRows': None, 'fieldDelimiter': ',', 'quote': None, 'allowQuotedNewlines': False, 'encoding': 'UTF-8'}}
[2022-12-24, 13:39:00 UTC] {bigquery.py:1539} INFO - Inserting job airflow_test_os_patch_gcs_to_bigquery_test_gcs_to_bigquery_2022_12_23T00_00_00_00_00_81a09730bb5999ef34166fdfa7b80799
[2022-12-24, 13:39:06 UTC] {taskinstance.py:1326} INFO - Marking task as SUCCESS. dag_id=test_os_patch_gcs_to_bigquery, task_id=test_gcs_to_bigquery, execution_date=20221223T000000, start_date=20221224T133859, end_date=20221224T133906
[2022-12-24, 13:39:06 UTC] {local_task_job.py:208} INFO - Task exited with return code 0
[2022-12-24, 13:39:06 UTC] {taskinstance.py:2598} INFO - 0 downstream tasks scheduled from follow-on schedule check

Grid View:

image

BigQuery Output:
image

When the write disposition is changed to WRITE_APPEND, we get the records duplicated after a rerun as expected:
image

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
thank you @vchiapaikeo

@apallerlamudi
Copy link

[vchiapaikeo] Quick question, did you get a chance to test this out for JSON file? Looks like the operator is failing for JSON formats.

@vchiapaikeo
Copy link
Contributor Author

Hmm I cannot repro. Can you provide a working repro of the bug?

Here is my test on newline delimited json:

file.json

{"col1": 2, "col2": "a"}
{"col1": 3, "col2": "b"}
{"col1": 5, "col2": "c"}
{"col1": 10, "col2": "d"}

gsutil cp file.json gs://my-bucket/vchiapaikeo/file.json

DAG:

from airflow import DAG

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

DEFAULT_TASK_ARGS = {
    "owner": "gcp-data-platform",
    "retries": 1,
    "retry_delay": 10,
    "start_date": "2022-08-01",
}

with DAG(
    max_active_runs=1,
    concurrency=2,
    catchup=False,
    schedule_interval="@daily",
    dag_id="test_os_patch_gcs_to_bigquery",
    default_args=DEFAULT_TASK_ARGS,
) as dag:

    test_gcs_to_bigquery_allows_autodetect_none_and_infers_schema = GCSToBigQueryOperator(
        task_id="test_gcs_to_bigquery_allows_autodetect_none_and_infers_schema",
        create_disposition="CREATE_IF_NEEDED",
        # Need to explicitly set autodetect to None
        autodetect=None,
        write_disposition="WRITE_TRUNCATE",
        destination_project_dataset_table="my-project.vchiapaikeo.test1",
        bucket="my-bucket",
        source_format="NEWLINE_DELIMITED_JSON",
        source_objects=["vchiapaikeo/file.json"],
    )

Success:

image

BQ Results:

image

@apallerlamudi
Copy link

vchiapaikeo

with DAG('sample', schedule_interval='0 13 * * *', catchup=False, is_paused_upon_creation=True, default_args=default_args) as dag:


        gcs_to_gbq = GCSToBigQueryOperator(
        task_id = 'gcs_to_gbq',
        bucket=GCS_BUCKET,
        source_objects="landing/sample_{d}.json".format(d=pull_date),
        autodetect=True,
        destination_project_dataset_table= '{}.{}.{}'.format(wi_project_id,'dataset','table'),
        source_format='NEWLINE_DELIMITED_JSON',
        allow_quoted_newlines=True,
        encoding='UTF-8',
        write_disposition='WRITE_TRUNCATE',
        dag=dag
        )

Error:

[2023-01-06, 23:26:20 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 161546 for task gcs_to_gbq (400 POST https://bigquery.googleapis.com/bigquery/v2/projects/jobs?prettyPrint=false: Field "amount": "1"} already exists in schema; 167458)
[2023-01-06, 23:26:20 UTC] {local_task_job.py:156} INFO - Task exited with return code 1
[2023-01-06, 23:26:21 UTC] {local_task_job.py:279} INFO - 0 downstream tasks scheduled from follow-on schedule check


I'm not sure why the operator is throwing a schema error for a write truncate operation and autodetect=True.

This never happened and it worked until last week.

I hope this information helps.

@vchiapaikeo
Copy link
Contributor Author

vchiapaikeo commented Jan 7, 2023

The change in this PR would only affect usages where autodetect is None or False. Not where autodetect is True. I think you are looking at the wrong commit here @apallerlamudi

@apallerlamudi
Copy link

Got it. Thank you. By any chance, do you know what might be causing the issue?

@vchiapaikeo
Copy link
Contributor Author

There was a big refactor here that may have affected you:

#28284

Can you give us more hints @apallerlamudi ? What does your json file look like? What do you think the error Field "amount": "1"} already exists in schema; 167458) is pointing to? What else do you have set in default args?

@vchiapaikeo
Copy link
Contributor Author

If you are looking for a workaround, maybe specifying the schema will do the trick for you? Either through schema_object or schema_fields?

@VladaZakharova
Copy link
Contributor

Hi Team :)
As for the changes in #28284 - all the changes for autodetect=True field were also tested for the JSON format files.
I was also trying to reproduce the error from @apallerlamudi, but didn't see any errors. The dag i was using:

load_json = GCSToBigQueryOperator(
        task_id="gcs_to_bigquery_example_date_json",
        bucket="cloud-samples-data",
        source_objects="bigquery/us-states/us-states.json",
        source_format="NEWLINE_DELIMITED_JSON",
        destination_project_dataset_table="dataset.table",
        write_disposition="WRITE_TRUNCATE",
        allow_quoted_newlines=True,
        encoding='UTF-8',
        autodetect=True,
    )

And the result:
image

Can you please provide more information for us about the data you are trying to store? May be check the version of Airflow?

@msievers13
Copy link

@VladaZakharova we're using composer version 2.1.2 and airflow version 2.3.4 this is the latest version GCP allows us to upgrade to. As @apallerlamudi mentioned above, since we upgraded to this version, the gcs_to_bigquery operator has not worked for us for JSON files. We have several dags getting similar errors using this operator. The dags we have transferring CSV files are working the same as always.

The API error our JSON dags are getting all say this: "message": "Request is missing required authentication credential. Expected OAuth 2 access token, login cookie or other valid authentication credential. See https://developers.google.com/identity/sign-in/web/devconsole-project."

and the error we see in the dag logs says something like this: Invalid field name "(some field)". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 300 characters long.

We're setting the gcp_conn_id in the dag args and it's set the same was on all dags. Really not sure what has changed to be causing these errors with JSON files only.

@vchiapaikeo
Copy link
Contributor Author

Version 2.3.4 wouldn't have this commit or Vlada's, @msievers13 . What version of google providers are you using? I'd recommend submitting a bug to GCP Cloud Support and have them help triage.

@vchiapaikeo
Copy link
Contributor Author

vchiapaikeo commented Jan 9, 2023

Aha, version 8.5.0 -->
image

So your bug was likely fixed in either 8.6.0 or 8.7.0. I'd revert your upgrade for now until Cloud Composer releases this version of apache-airflow-providers-google

@msievers13
Copy link

thank you @vchiapaikeo!

@msievers13
Copy link

Just any FYI if anyone else runs into a similar error, we can't revert our Composer environment to a lower version (GCP won't allow it) however, I was able to override the apache-airflow-providers-google version in the PyPi packages on our environment. I set it to the >=8.7.0 and our issue was resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

GCSToBigQueryOperator - allow upload to existing table without specifying schema_fields/schema_object

5 participants