-
Notifications
You must be signed in to change notification settings - Fork 16.3k
GCSToBigQueryOperator allows autodetect None and infers schema #28564
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GCSToBigQueryOperator allows autodetect None and infers schema #28564
Conversation
eladkal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@vchiapaikeo can you check the failures in doc build?
|
Agh, sorry about that. Tests rerunning now |
00a3a74 to
569eecd
Compare
eladkal
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thank you @vchiapaikeo
|
[vchiapaikeo] Quick question, did you get a chance to test this out for JSON file? Looks like the operator is failing for JSON formats. |
|
Hmm I cannot repro. Can you provide a working repro of the bug? Here is my test on newline delimited json: file.json gsutil cp file.json gs://my-bucket/vchiapaikeo/file.json DAG: from airflow import DAG
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
DEFAULT_TASK_ARGS = {
"owner": "gcp-data-platform",
"retries": 1,
"retry_delay": 10,
"start_date": "2022-08-01",
}
with DAG(
max_active_runs=1,
concurrency=2,
catchup=False,
schedule_interval="@daily",
dag_id="test_os_patch_gcs_to_bigquery",
default_args=DEFAULT_TASK_ARGS,
) as dag:
test_gcs_to_bigquery_allows_autodetect_none_and_infers_schema = GCSToBigQueryOperator(
task_id="test_gcs_to_bigquery_allows_autodetect_none_and_infers_schema",
create_disposition="CREATE_IF_NEEDED",
# Need to explicitly set autodetect to None
autodetect=None,
write_disposition="WRITE_TRUNCATE",
destination_project_dataset_table="my-project.vchiapaikeo.test1",
bucket="my-bucket",
source_format="NEWLINE_DELIMITED_JSON",
source_objects=["vchiapaikeo/file.json"],
)Success: BQ Results: |
Error: [2023-01-06, 23:26:20 UTC] {standard_task_runner.py:92} ERROR - Failed to execute job 161546 for task gcs_to_gbq (400 POST https://bigquery.googleapis.com/bigquery/v2/projects/jobs?prettyPrint=false: Field "amount": "1"} already exists in schema; 167458) I'm not sure why the operator is throwing a schema error for a write truncate operation and autodetect=True. This never happened and it worked until last week. I hope this information helps. |
|
The change in this PR would only affect usages where autodetect is None or False. Not where autodetect is True. I think you are looking at the wrong commit here @apallerlamudi |
|
Got it. Thank you. By any chance, do you know what might be causing the issue? |
|
There was a big refactor here that may have affected you: Can you give us more hints @apallerlamudi ? What does your json file look like? What do you think the error |
|
If you are looking for a workaround, maybe specifying the schema will do the trick for you? Either through schema_object or schema_fields? |
|
Hi Team :) Can you please provide more information for us about the data you are trying to store? May be check the version of Airflow? |
|
@VladaZakharova we're using composer version 2.1.2 and airflow version 2.3.4 this is the latest version GCP allows us to upgrade to. As @apallerlamudi mentioned above, since we upgraded to this version, the gcs_to_bigquery operator has not worked for us for JSON files. We have several dags getting similar errors using this operator. The dags we have transferring CSV files are working the same as always. The API error our JSON dags are getting all say this: and the error we see in the dag logs says something like this: Invalid field name "(some field)". Fields must contain only letters, numbers, and underscores, start with a letter or underscore, and be at most 300 characters long. We're setting the |
|
Version 2.3.4 wouldn't have this commit or Vlada's, @msievers13 . What version of google providers are you using? I'd recommend submitting a bug to GCP Cloud Support and have them help triage. |
|
thank you @vchiapaikeo! |
|
Just any FYI if anyone else runs into a similar error, we can't revert our Composer environment to a lower version (GCP won't allow it) however, I was able to override the apache-airflow-providers-google version in the PyPi packages on our environment. I set it to the >=8.7.0 and our issue was resolved. |







closes: #12329
A check for autodetect being falsey instead of it being None stops us from being able to use a feature from the Job API where a BQ table exists and we don't want to specify schema_fields or a schema_obj. Change this check to look for explicit False thus allowing us to be able to pass None to autodetect without schema_fields or a schema_obj.
Testing Notes
To verify, I tried this on my local setup with a simple dag:
I then created a simple table in BigQuery:
And ran the dag:
Task logs:
^ omitted some redundant log lines
PR: #28564
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.cc: @eladkal , @VladaZakharova