Skip to content

Conversation

@whynick1
Copy link
Contributor

@whynick1 whynick1 commented Apr 2, 2020


Issue link: AIRFLOW-7117

Make sure to mark the boxes below before creating PR: [x]


In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.

@boring-cyborg boring-cyborg bot added the provider:google Google (including GCP) related issues label Apr 2, 2020
@whynick1 whynick1 changed the title Mysql to gcs [AIRFLOW-7117] Honor self.schema in sql_to_gcs as schema to upload Apr 2, 2020
from tempfile import NamedTemporaryFile

import unicodecsv as csv
from six import string_types
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use six on master branch anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops, I will remove that! Good catch.

Comment on lines 262 to 268
schema = self.schema or [self.field_to_bigquery(field) for field in cursor.description]

self.log.info('Using schema for %s', self.schema_filename)
self.log.debug("Current schema: %s", schema)
tmp_schema_file_handle = NamedTemporaryFile(delete=True)
tmp_schema_file_handle.write(json.dumps(schema, sort_keys=True).encode('utf-8'))
schema_json = json.dumps(schema, sort_keys=True) if isinstance(schema, list) else schema
tmp_schema_file_handle.write(schema_json.encode('utf-8'))
Copy link
Member

@mik-laj mik-laj Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks overly complicated. What do you think about the following code?

        if self.shcema:
            schema_json = self.schema
            self.log.info("Using user schema")
        else:
            self.log.info("Starts generating schema")
            schema = self.schema or [self.field_to_bigquery(field) for field in cursor.description]            
            tmp_schema_file_handle = NamedTemporaryFile(delete=True)
            schema_json = json.dumps(schema, sort_keys=True)

        self.log.info('Using schema for %s', self.schema_filename)
        self.log.debug("Current schema: %s", schema)

        tmp_schema_file_handle.write(schema_json.encode('utf-8'))

Copy link
Contributor Author

@whynick1 whynick1 Apr 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review! I like the structure.
Just to be clear, self.schema could be either str or list, and we don't want to call json.dumps() on a string. How about this edited version? @mik-laj

        if self.shcema:
            self.log.info("Using user schema")
            schema = self.schema
        else:
            self.log.info("Starts generating schema")
            schema = [self.field_to_bigquery(field) for field in cursor.description]            
        
        if isinstance(schema, list):
            schema = json.dumps(schema, sort_keys=True)

        self.log.info('Using schema for %s', self.schema_filename)
        self.log.debug("Current schema: %s", schema)

        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
        tmp_schema_file_handle.write(schema.encode('utf-8'))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@codecov-io
Copy link

codecov-io commented Apr 3, 2020

Codecov Report

Merging #8049 into master will decrease coverage by 0.23%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #8049      +/-   ##
==========================================
- Coverage   87.25%   87.01%   -0.24%     
==========================================
  Files         935      937       +2     
  Lines       45384    45555     +171     
==========================================
+ Hits        39601    39641      +40     
- Misses       5783     5914     +131     
Impacted Files Coverage Δ
...low/providers/google/cloud/operators/sql_to_gcs.py 93.91% <100.00%> (+1.25%) ⬆️
airflow/kubernetes/volume_mount.py 44.44% <0.00%> (-55.56%) ⬇️
airflow/kubernetes/volume.py 52.94% <0.00%> (-47.06%) ⬇️
airflow/kubernetes/pod_launcher.py 47.18% <0.00%> (-45.08%) ⬇️
...viders/cncf/kubernetes/operators/kubernetes_pod.py 69.69% <0.00%> (-25.26%) ⬇️
airflow/kubernetes/refresh_config.py 50.98% <0.00%> (-23.53%) ⬇️
airflow/providers/google/ads/hooks/ads.py 71.79% <0.00%> (-6.78%) ⬇️
airflow/models/renderedtifields.py 92.85% <0.00%> (-2.98%) ⬇️
airflow/www/app.py 92.40% <0.00%> (-1.89%) ⬇️
airflow/www/utils.py 80.85% <0.00%> (-1.02%) ⬇️
... and 15 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 340e947...ec32f57. Read the comment docs.

@mik-laj mik-laj merged commit 7ef75d2 into apache:master Apr 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

provider:google Google (including GCP) related issues

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants