1818"""This module contains Google BigQuery to MSSQL operator."""
1919from __future__ import annotations
2020
21+ import warnings
2122from typing import TYPE_CHECKING , Sequence
2223
23- from airflow .models import BaseOperator
24- from airflow .providers .google .cloud .hooks .bigquery import BigQueryHook
24+ from airflow .exceptions import AirflowProviderDeprecationWarning
2525from airflow .providers .google .cloud .links .bigquery import BigQueryTableLink
26- from airflow .providers .google .cloud .utils . bigquery_get_data import bigquery_get_data
26+ from airflow .providers .google .cloud .transfers . bigquery_to_sql import BigQueryToSqlBaseOperator
2727from airflow .providers .microsoft .mssql .hooks .mssql import MsSqlHook
2828
2929if TYPE_CHECKING :
3030 from airflow .utils .context import Context
3131
3232
33- class BigQueryToMsSqlOperator (BaseOperator ):
33+ class BigQueryToMsSqlOperator (BigQueryToSqlBaseOperator ):
3434 """
3535 Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
3636 and insert that data into a MSSQL table.
@@ -39,86 +39,62 @@ class BigQueryToMsSqlOperator(BaseOperator):
3939 For more information on how to use this operator, take a look at the guide:
4040 :ref:`howto/operator:BigQueryToMsSqlOperator`
4141
42- .. note::
43- If you pass fields to ``selected_fields`` which are in different order than the
44- order of columns already in
45- BQ table, the data will still be in the order of BQ table.
46- For example if the BQ table has 3 columns as
47- ``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
48- the data would still be of the form ``'A,B'`` and passed through this form
49- to MSSQL
50-
51- **Example**: ::
52-
53- transfer_data = BigQueryToMsSqlOperator(
54- task_id='task_id',
55- source_project_dataset_table='my-project.mydataset.mytable',
56- mssql_table='dest_table_name',
57- replace=True,
58- )
59-
6042 :param source_project_dataset_table: A dotted ``<project>.<dataset>.<table>``:
6143 the big query table of origin
62- :param selected_fields: List of fields to return (comma-separated). If
63- unspecified, all fields are returned.
64- :param gcp_conn_id: reference to a specific Google Cloud hook.
44+ :param mssql_table: target MsSQL table. It is deprecated: use target_table_name instead. (templated)
45+ :param target_table_name: target MsSQL table. It takes precedence over mssql_table. (templated)
6546 :param mssql_conn_id: reference to a specific mssql hook
66- :param database: name of database which overwrite defined one in connection
67- :param replace: Whether to replace instead of insert
68- :param batch_size: The number of rows to take in each batch
69- :param location: The location used for the operation.
70- :param impersonation_chain: Optional service account to impersonate using short-term
71- credentials, or chained list of accounts required to get the access_token
72- of the last account in the list, which will be impersonated in the request.
73- If set as a string, the account must grant the originating account
74- the Service Account Token Creator IAM role.
75- If set as a sequence, the identities from the list must grant
76- Service Account Token Creator IAM role to the directly preceding identity, with first
77- account from the list granting this role to the originating account (templated).
7847 """
7948
80- template_fields : Sequence [str ] = ("source_project_dataset_table" , "mssql_table" , "impersonation_chain" )
49+ template_fields : Sequence [str ] = tuple (BigQueryToSqlBaseOperator .template_fields ) + (
50+ "source_project_dataset_table" ,
51+ )
8152 operator_extra_links = (BigQueryTableLink (),)
8253
8354 def __init__ (
8455 self ,
8556 * ,
8657 source_project_dataset_table : str ,
87- mssql_table : str ,
88- selected_fields : list [str ] | str | None = None ,
89- gcp_conn_id : str = "google_cloud_default" ,
58+ mssql_table : str | None = None ,
59+ target_table_name : str | None = None ,
9060 mssql_conn_id : str = "mssql_default" ,
91- database : str | None = None ,
92- replace : bool = False ,
93- batch_size : int = 1000 ,
94- location : str | None = None ,
95- impersonation_chain : str | Sequence [str ] | None = None ,
9661 ** kwargs ,
9762 ) -> None :
98- super ().__init__ (** kwargs )
99- self .selected_fields = selected_fields
100- self .gcp_conn_id = gcp_conn_id
101- self .mssql_conn_id = mssql_conn_id
102- self .database = database
103- self .mssql_table = mssql_table
104- self .replace = replace
105- self .batch_size = batch_size
106- self .location = location
107- self .impersonation_chain = impersonation_chain
63+ if mssql_table is not None :
64+ warnings .warn (
65+ # fmt: off
66+ "The `mssql_table` parameter has been deprecated. "
67+ "Use `target_table_name` instead." ,
68+ # fmt: on
69+ AirflowProviderDeprecationWarning ,
70+ )
71+
72+ if target_table_name is not None :
73+ raise ValueError (
74+ f"Cannot set both arguments: mssql_table={ mssql_table !r} and "
75+ f"target_table_name={ target_table_name !r} ."
76+ )
77+
78+ target_table_name = mssql_table
79+
10880 try :
109- _ , self . dataset_id , self . table_id = source_project_dataset_table .split ("." )
81+ _ , dataset_id , table_id = source_project_dataset_table .split ("." )
11082 except ValueError :
11183 raise ValueError (
11284 f"Could not parse { source_project_dataset_table } as <project>.<dataset>.<table>"
11385 ) from None
86+ super ().__init__ (
87+ target_table_name = target_table_name ,
88+ dataset_table = f"{ dataset_id } .{ table_id } " ,
89+ ** kwargs ,
90+ )
91+ self .mssql_conn_id = mssql_conn_id
11492 self .source_project_dataset_table = source_project_dataset_table
11593
116- def execute (self , context : Context ) -> None :
117- big_query_hook = BigQueryHook (
118- gcp_conn_id = self .gcp_conn_id ,
119- location = self .location ,
120- impersonation_chain = self .impersonation_chain ,
121- )
94+ def get_sql_hook (self ) -> MsSqlHook :
95+ return MsSqlHook (schema = self .database , mysql_conn_id = self .mssql_conn_id )
96+
97+ def persist_links (self , context : Context ) -> None :
12298 project_id , dataset_id , table_id = self .source_project_dataset_table .split ("." )
12399 BigQueryTableLink .persist (
124100 context = context ,
@@ -127,18 +103,3 @@ def execute(self, context: Context) -> None:
127103 project_id = project_id ,
128104 table_id = table_id ,
129105 )
130- mssql_hook = MsSqlHook (mssql_conn_id = self .mssql_conn_id , schema = self .database )
131- for rows in bigquery_get_data (
132- self .log ,
133- self .dataset_id ,
134- self .table_id ,
135- big_query_hook ,
136- self .batch_size ,
137- self .selected_fields ,
138- ):
139- mssql_hook .insert_rows (
140- table = self .mssql_table ,
141- rows = rows ,
142- target_fields = self .selected_fields ,
143- replace = self .replace ,
144- )
0 commit comments