Skip to content

Commit cf1e26b

Browse files
jbbqqfeladkal
andauthored
Add BigQueryToPostgresOperator (#30658)
Co-authored-by: eladkal <45845474+eladkal@users.noreply.github.com>
1 parent d9f70df commit cf1e26b

File tree

14 files changed

+579
-180
lines changed

14 files changed

+579
-180
lines changed

airflow/providers/google/cloud/transfers/bigquery_to_mssql.py

Lines changed: 39 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,19 @@
1818
"""This module contains Google BigQuery to MSSQL operator."""
1919
from __future__ import annotations
2020

21+
import warnings
2122
from typing import TYPE_CHECKING, Sequence
2223

23-
from airflow.models import BaseOperator
24-
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
24+
from airflow.exceptions import AirflowProviderDeprecationWarning
2525
from airflow.providers.google.cloud.links.bigquery import BigQueryTableLink
26-
from airflow.providers.google.cloud.utils.bigquery_get_data import bigquery_get_data
26+
from airflow.providers.google.cloud.transfers.bigquery_to_sql import BigQueryToSqlBaseOperator
2727
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
2828

2929
if TYPE_CHECKING:
3030
from airflow.utils.context import Context
3131

3232

33-
class BigQueryToMsSqlOperator(BaseOperator):
33+
class BigQueryToMsSqlOperator(BigQueryToSqlBaseOperator):
3434
"""
3535
Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
3636
and insert that data into a MSSQL table.
@@ -39,86 +39,62 @@ class BigQueryToMsSqlOperator(BaseOperator):
3939
For more information on how to use this operator, take a look at the guide:
4040
:ref:`howto/operator:BigQueryToMsSqlOperator`
4141
42-
.. note::
43-
If you pass fields to ``selected_fields`` which are in different order than the
44-
order of columns already in
45-
BQ table, the data will still be in the order of BQ table.
46-
For example if the BQ table has 3 columns as
47-
``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
48-
the data would still be of the form ``'A,B'`` and passed through this form
49-
to MSSQL
50-
51-
**Example**: ::
52-
53-
transfer_data = BigQueryToMsSqlOperator(
54-
task_id='task_id',
55-
source_project_dataset_table='my-project.mydataset.mytable',
56-
mssql_table='dest_table_name',
57-
replace=True,
58-
)
59-
6042
:param source_project_dataset_table: A dotted ``<project>.<dataset>.<table>``:
6143
the big query table of origin
62-
:param selected_fields: List of fields to return (comma-separated). If
63-
unspecified, all fields are returned.
64-
:param gcp_conn_id: reference to a specific Google Cloud hook.
44+
:param mssql_table: target MsSQL table. It is deprecated: use target_table_name instead. (templated)
45+
:param target_table_name: target MsSQL table. It takes precedence over mssql_table. (templated)
6546
:param mssql_conn_id: reference to a specific mssql hook
66-
:param database: name of database which overwrite defined one in connection
67-
:param replace: Whether to replace instead of insert
68-
:param batch_size: The number of rows to take in each batch
69-
:param location: The location used for the operation.
70-
:param impersonation_chain: Optional service account to impersonate using short-term
71-
credentials, or chained list of accounts required to get the access_token
72-
of the last account in the list, which will be impersonated in the request.
73-
If set as a string, the account must grant the originating account
74-
the Service Account Token Creator IAM role.
75-
If set as a sequence, the identities from the list must grant
76-
Service Account Token Creator IAM role to the directly preceding identity, with first
77-
account from the list granting this role to the originating account (templated).
7847
"""
7948

80-
template_fields: Sequence[str] = ("source_project_dataset_table", "mssql_table", "impersonation_chain")
49+
template_fields: Sequence[str] = tuple(BigQueryToSqlBaseOperator.template_fields) + (
50+
"source_project_dataset_table",
51+
)
8152
operator_extra_links = (BigQueryTableLink(),)
8253

8354
def __init__(
8455
self,
8556
*,
8657
source_project_dataset_table: str,
87-
mssql_table: str,
88-
selected_fields: list[str] | str | None = None,
89-
gcp_conn_id: str = "google_cloud_default",
58+
mssql_table: str | None = None,
59+
target_table_name: str | None = None,
9060
mssql_conn_id: str = "mssql_default",
91-
database: str | None = None,
92-
replace: bool = False,
93-
batch_size: int = 1000,
94-
location: str | None = None,
95-
impersonation_chain: str | Sequence[str] | None = None,
9661
**kwargs,
9762
) -> None:
98-
super().__init__(**kwargs)
99-
self.selected_fields = selected_fields
100-
self.gcp_conn_id = gcp_conn_id
101-
self.mssql_conn_id = mssql_conn_id
102-
self.database = database
103-
self.mssql_table = mssql_table
104-
self.replace = replace
105-
self.batch_size = batch_size
106-
self.location = location
107-
self.impersonation_chain = impersonation_chain
63+
if mssql_table is not None:
64+
warnings.warn(
65+
# fmt: off
66+
"The `mssql_table` parameter has been deprecated. "
67+
"Use `target_table_name` instead.",
68+
# fmt: on
69+
AirflowProviderDeprecationWarning,
70+
)
71+
72+
if target_table_name is not None:
73+
raise ValueError(
74+
f"Cannot set both arguments: mssql_table={mssql_table!r} and "
75+
f"target_table_name={target_table_name!r}."
76+
)
77+
78+
target_table_name = mssql_table
79+
10880
try:
109-
_, self.dataset_id, self.table_id = source_project_dataset_table.split(".")
81+
_, dataset_id, table_id = source_project_dataset_table.split(".")
11082
except ValueError:
11183
raise ValueError(
11284
f"Could not parse {source_project_dataset_table} as <project>.<dataset>.<table>"
11385
) from None
86+
super().__init__(
87+
target_table_name=target_table_name,
88+
dataset_table=f"{dataset_id}.{table_id}",
89+
**kwargs,
90+
)
91+
self.mssql_conn_id = mssql_conn_id
11492
self.source_project_dataset_table = source_project_dataset_table
11593

116-
def execute(self, context: Context) -> None:
117-
big_query_hook = BigQueryHook(
118-
gcp_conn_id=self.gcp_conn_id,
119-
location=self.location,
120-
impersonation_chain=self.impersonation_chain,
121-
)
94+
def get_sql_hook(self) -> MsSqlHook:
95+
return MsSqlHook(schema=self.database, mysql_conn_id=self.mssql_conn_id)
96+
97+
def persist_links(self, context: Context) -> None:
12298
project_id, dataset_id, table_id = self.source_project_dataset_table.split(".")
12399
BigQueryTableLink.persist(
124100
context=context,
@@ -127,18 +103,3 @@ def execute(self, context: Context) -> None:
127103
project_id=project_id,
128104
table_id=table_id,
129105
)
130-
mssql_hook = MsSqlHook(mssql_conn_id=self.mssql_conn_id, schema=self.database)
131-
for rows in bigquery_get_data(
132-
self.log,
133-
self.dataset_id,
134-
self.table_id,
135-
big_query_hook,
136-
self.batch_size,
137-
self.selected_fields,
138-
):
139-
mssql_hook.insert_rows(
140-
table=self.mssql_table,
141-
rows=rows,
142-
target_fields=self.selected_fields,
143-
replace=self.replace,
144-
)

airflow/providers/google/cloud/transfers/bigquery_to_mysql.py

Lines changed: 28 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,15 @@
1818
"""This module contains Google BigQuery to MySQL operator."""
1919
from __future__ import annotations
2020

21-
from typing import TYPE_CHECKING, Sequence
21+
import warnings
22+
from typing import Sequence
2223

23-
from airflow.models import BaseOperator
24-
from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
25-
from airflow.providers.google.cloud.utils.bigquery_get_data import bigquery_get_data
24+
from airflow.exceptions import AirflowProviderDeprecationWarning
25+
from airflow.providers.google.cloud.transfers.bigquery_to_sql import BigQueryToSqlBaseOperator
2626
from airflow.providers.mysql.hooks.mysql import MySqlHook
2727

28-
if TYPE_CHECKING:
29-
from airflow.utils.context import Context
3028

31-
32-
class BigQueryToMySqlOperator(BaseOperator):
29+
class BigQueryToMySqlOperator(BigQueryToSqlBaseOperator):
3330
"""
3431
Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
3532
and insert that data into a MySQL table.
@@ -38,100 +35,41 @@ class BigQueryToMySqlOperator(BaseOperator):
3835
For more information on how to use this operator, take a look at the guide:
3936
:ref:`howto/operator:BigQueryToMySqlOperator`
4037
41-
.. note::
42-
If you pass fields to ``selected_fields`` which are in different order than the
43-
order of columns already in
44-
BQ table, the data will still be in the order of BQ table.
45-
For example if the BQ table has 3 columns as
46-
``[A,B,C]`` and you pass 'B,A' in the ``selected_fields``
47-
the data would still be of the form ``'A,B'`` and passed through this form
48-
to MySQL
49-
50-
**Example**: ::
51-
52-
# [START howto_operator_bigquery_to_mysql]
53-
transfer_data = BigQueryToMySqlOperator(
54-
task_id='task_id',
55-
dataset_table='origin_bq_table',
56-
mysql_table='dest_table_name',
57-
replace=True,
58-
)
59-
# [END howto_operator_bigquery_to_mysql]
60-
61-
:param dataset_table: A dotted ``<dataset>.<table>``: the big query table of origin
62-
:param selected_fields: List of fields to return (comma-separated). If
63-
unspecified, all fields are returned.
64-
:param gcp_conn_id: reference to a specific Google Cloud hook.
38+
:param mysql_table: target MySQL table, use dot notation to target a
39+
specific database. It is deprecated: use target_table_name instead. (templated)
40+
:param target_table_name: target MySQL table. It takes precedence over mysql_table. (templated)
6541
:param mysql_conn_id: Reference to :ref:`mysql connection id <howto/connection:mysql>`.
66-
:param database: name of database which overwrite defined one in connection
67-
:param replace: Whether to replace instead of insert
68-
:param batch_size: The number of rows to take in each batch
69-
:param location: The location used for the operation.
70-
:param impersonation_chain: Optional service account to impersonate using short-term
71-
credentials, or chained list of accounts required to get the access_token
72-
of the last account in the list, which will be impersonated in the request.
73-
If set as a string, the account must grant the originating account
74-
the Service Account Token Creator IAM role.
75-
If set as a sequence, the identities from the list must grant
76-
Service Account Token Creator IAM role to the directly preceding identity, with first
77-
account from the list granting this role to the originating account (templated).
7842
"""
7943

80-
template_fields: Sequence[str] = (
44+
template_fields: Sequence[str] = tuple(BigQueryToSqlBaseOperator.template_fields) + (
8145
"dataset_id",
8246
"table_id",
83-
"mysql_table",
84-
"impersonation_chain",
8547
)
8648

8749
def __init__(
8850
self,
8951
*,
90-
dataset_table: str,
91-
mysql_table: str,
92-
selected_fields: list[str] | str | None = None,
93-
gcp_conn_id: str = "google_cloud_default",
52+
mysql_table: str | None = None,
53+
target_table_name: str | None = None,
9454
mysql_conn_id: str = "mysql_default",
95-
database: str | None = None,
96-
replace: bool = False,
97-
batch_size: int = 1000,
98-
location: str | None = None,
99-
impersonation_chain: str | Sequence[str] | None = None,
10055
**kwargs,
10156
) -> None:
102-
super().__init__(**kwargs)
103-
self.selected_fields = selected_fields
104-
self.gcp_conn_id = gcp_conn_id
57+
if mysql_table is not None:
58+
warnings.warn(
59+
"The `mysql_table` parameter has been deprecated. Use `target_table_name` instead.",
60+
AirflowProviderDeprecationWarning,
61+
)
62+
63+
if target_table_name is not None:
64+
raise ValueError(
65+
f"Cannot set both arguments: mysql_table={mysql_table!r} and "
66+
f"target_table_name={target_table_name!r}."
67+
)
68+
69+
target_table_name = mysql_table
70+
71+
super().__init__(target_table_name=target_table_name, **kwargs)
10572
self.mysql_conn_id = mysql_conn_id
106-
self.database = database
107-
self.mysql_table = mysql_table
108-
self.replace = replace
109-
self.batch_size = batch_size
110-
self.location = location
111-
self.impersonation_chain = impersonation_chain
112-
try:
113-
self.dataset_id, self.table_id = dataset_table.split(".")
114-
except ValueError:
115-
raise ValueError(f"Could not parse {dataset_table} as <dataset>.<table>") from None
11673

117-
def execute(self, context: Context) -> None:
118-
big_query_hook = BigQueryHook(
119-
gcp_conn_id=self.gcp_conn_id,
120-
location=self.location,
121-
impersonation_chain=self.impersonation_chain,
122-
)
123-
mysql_hook = MySqlHook(schema=self.database, mysql_conn_id=self.mysql_conn_id)
124-
for rows in bigquery_get_data(
125-
self.log,
126-
self.dataset_id,
127-
self.table_id,
128-
big_query_hook,
129-
self.batch_size,
130-
self.selected_fields,
131-
):
132-
mysql_hook.insert_rows(
133-
table=self.mysql_table,
134-
rows=rows,
135-
target_fields=self.selected_fields,
136-
replace=self.replace,
137-
)
74+
def get_sql_hook(self) -> MySqlHook:
75+
return MySqlHook(schema=self.database, mysql_conn_id=self.mysql_conn_id)
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
"""This module contains Google BigQuery to PostgreSQL operator."""
19+
from __future__ import annotations
20+
21+
from typing import Sequence
22+
23+
from airflow.providers.google.cloud.transfers.bigquery_to_sql import BigQueryToSqlBaseOperator
24+
from airflow.providers.postgres.hooks.postgres import PostgresHook
25+
26+
27+
class BigQueryToPostgresOperator(BigQueryToSqlBaseOperator):
28+
"""
29+
Fetches the data from a BigQuery table (alternatively fetch data for selected columns)
30+
and insert that data into a PostgreSQL table.
31+
32+
.. seealso::
33+
For more information on how to use this operator, take a look at the guide:
34+
:ref:`howto/operator:BigQueryToPostgresOperator`
35+
36+
:param target_table_name: target Postgres table (templated)
37+
:param postgres_conn_id: Reference to :ref:`postgres connection id <howto/connection:postgres>`.
38+
"""
39+
40+
template_fields: Sequence[str] = tuple(BigQueryToSqlBaseOperator.template_fields) + (
41+
"dataset_id",
42+
"table_id",
43+
)
44+
45+
def __init__(
46+
self,
47+
*,
48+
target_table_name: str,
49+
postgres_conn_id: str = "postgres_default",
50+
**kwargs,
51+
) -> None:
52+
super().__init__(target_table_name=target_table_name, **kwargs)
53+
self.postgres_conn_id = postgres_conn_id
54+
55+
def get_sql_hook(self) -> PostgresHook:
56+
return PostgresHook(schema=self.database, postgres_conn_id=self.postgres_conn_id)

0 commit comments

Comments
 (0)