2525import uuid
2626import warnings
2727from datetime import datetime
28- from typing import Any , Dict , Iterable , List , Optional , Sequence , Set , SupportsAbs , Union
28+ from typing import Any , Dict , Iterable , List , Optional , Sequence , Set , SupportsAbs , Union , cast
2929
3030import attr
3131from google .api_core .exceptions import Conflict
@@ -91,7 +91,7 @@ def get_link(self, operator: BaseOperator, dttm: datetime):
9191
9292
9393class _BigQueryDbHookMixin :
94- def get_db_hook (self ) -> BigQueryHook :
94+ def get_db_hook (self : 'BigQueryCheckOperator' ) -> BigQueryHook : # type:ignore[misc]
9595 """Get BigQuery DB Hook"""
9696 return BigQueryHook (
9797 gcp_conn_id = self .gcp_conn_id ,
@@ -948,7 +948,8 @@ def execute(self, context) -> None:
948948 delegate_to = self .delegate_to ,
949949 impersonation_chain = self .impersonation_chain ,
950950 )
951- schema_fields = json .loads (gcs_hook .download (gcs_bucket , gcs_object ).decode ("utf-8" ))
951+ schema_fields_string = gcs_hook .download_as_byte_array (gcs_bucket , gcs_object ).decode ("utf-8" )
952+ schema_fields = json .loads (schema_fields_string )
952953 else :
953954 schema_fields = self .schema_fields
954955
@@ -1090,8 +1091,8 @@ def __init__(
10901091 self ,
10911092 * ,
10921093 bucket : Optional [str ] = None ,
1093- source_objects : Optional [List ] = None ,
1094- destination_project_dataset_table : str = None ,
1094+ source_objects : Optional [List [ str ] ] = None ,
1095+ destination_project_dataset_table : Optional [ str ] = None ,
10951096 table_resource : Optional [Dict [str , Any ]] = None ,
10961097 schema_fields : Optional [List ] = None ,
10971098 schema_object : Optional [str ] = None ,
@@ -1115,11 +1116,6 @@ def __init__(
11151116 ) -> None :
11161117 super ().__init__ (** kwargs )
11171118
1118- # GCS config
1119- self .bucket = bucket
1120- self .source_objects = source_objects
1121- self .schema_object = schema_object
1122-
11231119 # BQ config
11241120 kwargs_passed = any (
11251121 [
@@ -1158,22 +1154,30 @@ def __init__(
11581154 skip_leading_rows = 0
11591155 if not field_delimiter :
11601156 field_delimiter = ","
1157+ if not destination_project_dataset_table :
1158+ raise ValueError (
1159+ "`destination_project_dataset_table` is required when not using `table_resource`."
1160+ )
1161+ self .bucket = bucket
1162+ self .source_objects = source_objects
1163+ self .schema_object = schema_object
1164+ self .destination_project_dataset_table = destination_project_dataset_table
1165+ self .schema_fields = schema_fields
1166+ self .source_format = source_format
1167+ self .compression = compression
1168+ self .skip_leading_rows = skip_leading_rows
1169+ self .field_delimiter = field_delimiter
1170+ self .table_resource = None
1171+ else :
1172+ self .table_resource = table_resource
11611173
11621174 if table_resource and kwargs_passed :
11631175 raise ValueError ("You provided both `table_resource` and exclusive keywords arguments." )
11641176
1165- self .table_resource = table_resource
1166- self .destination_project_dataset_table = destination_project_dataset_table
1167- self .schema_fields = schema_fields
1168- self .source_format = source_format
1169- self .compression = compression
1170- self .skip_leading_rows = skip_leading_rows
1171- self .field_delimiter = field_delimiter
11721177 self .max_bad_records = max_bad_records
11731178 self .quote_character = quote_character
11741179 self .allow_quoted_newlines = allow_quoted_newlines
11751180 self .allow_jagged_rows = allow_jagged_rows
1176-
11771181 self .bigquery_conn_id = bigquery_conn_id
11781182 self .google_cloud_storage_conn_id = google_cloud_storage_conn_id
11791183 self .delegate_to = delegate_to
@@ -1203,7 +1207,10 @@ def execute(self, context) -> None:
12031207 delegate_to = self .delegate_to ,
12041208 impersonation_chain = self .impersonation_chain ,
12051209 )
1206- schema_fields = json .loads (gcs_hook .download (self .bucket , self .schema_object ).decode ("utf-8" ))
1210+ schema_fields_bytes_or_string = gcs_hook .download (self .bucket , self .schema_object )
1211+ if hasattr (schema_fields_bytes_or_string , 'decode' ):
1212+ schema_fields_bytes_or_string = cast (bytes , schema_fields_bytes_or_string ).decode ("utf-8" )
1213+ schema_fields = json .loads (schema_fields_bytes_or_string )
12071214 else :
12081215 schema_fields = self .schema_fields
12091216
0 commit comments