-
Notifications
You must be signed in to change notification settings - Fork 16.3k
[AIRFLOW-7117] Honor self.schema in sql_to_gcs as schema to upload #8049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| from tempfile import NamedTemporaryFile | ||
|
|
||
| import unicodecsv as csv | ||
| from six import string_types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use six on master branch anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Woops, I will remove that! Good catch.
| schema = self.schema or [self.field_to_bigquery(field) for field in cursor.description] | ||
|
|
||
| self.log.info('Using schema for %s', self.schema_filename) | ||
| self.log.debug("Current schema: %s", schema) | ||
| tmp_schema_file_handle = NamedTemporaryFile(delete=True) | ||
| tmp_schema_file_handle.write(json.dumps(schema, sort_keys=True).encode('utf-8')) | ||
| schema_json = json.dumps(schema, sort_keys=True) if isinstance(schema, list) else schema | ||
| tmp_schema_file_handle.write(schema_json.encode('utf-8')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks overly complicated. What do you think about the following code?
if self.shcema:
schema_json = self.schema
self.log.info("Using user schema")
else:
self.log.info("Starts generating schema")
schema = self.schema or [self.field_to_bigquery(field) for field in cursor.description]
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
schema_json = json.dumps(schema, sort_keys=True)
self.log.info('Using schema for %s', self.schema_filename)
self.log.debug("Current schema: %s", schema)
tmp_schema_file_handle.write(schema_json.encode('utf-8'))There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review! I like the structure.
Just to be clear, self.schema could be either str or list, and we don't want to call json.dumps() on a string. How about this edited version? @mik-laj
if self.shcema:
self.log.info("Using user schema")
schema = self.schema
else:
self.log.info("Starts generating schema")
schema = [self.field_to_bigquery(field) for field in cursor.description]
if isinstance(schema, list):
schema = json.dumps(schema, sort_keys=True)
self.log.info('Using schema for %s', self.schema_filename)
self.log.debug("Current schema: %s", schema)
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
tmp_schema_file_handle.write(schema.encode('utf-8'))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Codecov Report
@@ Coverage Diff @@
## master #8049 +/- ##
==========================================
- Coverage 87.25% 87.01% -0.24%
==========================================
Files 935 937 +2
Lines 45384 45555 +171
==========================================
+ Hits 39601 39641 +40
- Misses 5783 5914 +131
Continue to review full report at Codecov.
|
Issue link: AIRFLOW-7117
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.