Skip to content

Commit 5c5a495

Browse files
Add encryption_configuration parameter to BigQuery operators (#40063)
1 parent 279e8b7 commit 5c5a495

File tree

2 files changed

+233
-17
lines changed

2 files changed

+233
-17
lines changed

airflow/providers/google/cloud/operators/bigquery.py

Lines changed: 69 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,12 @@ class _BigQueryOperatorsEncryptionConfigurationMixin:
150150
# annotation of the `self`. Then you can inherit this class in the target operator.
151151
# e.g: BigQueryCheckOperator, BigQueryTableCheckOperator
152152
def include_encryption_configuration( # type:ignore[misc]
153-
self: BigQueryCheckOperator | BigQueryTableCheckOperator,
153+
self: BigQueryCheckOperator
154+
| BigQueryTableCheckOperator
155+
| BigQueryValueCheckOperator
156+
| BigQueryColumnCheckOperator
157+
| BigQueryGetDataOperator
158+
| BigQueryIntervalCheckOperator,
154159
configuration: dict,
155160
config_key: str,
156161
) -> None:
@@ -206,7 +211,7 @@ class BigQueryCheckOperator(
206211
Token Creator IAM role to the directly preceding identity, with first
207212
account from the list granting this role to the originating account. (templated)
208213
:param labels: a dictionary containing labels for the table, passed to BigQuery.
209-
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
214+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
210215
211216
.. code-block:: python
212217
@@ -327,7 +332,9 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
327332
self.log.info("Success.")
328333

329334

330-
class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
335+
class BigQueryValueCheckOperator(
336+
_BigQueryDbHookMixin, SQLValueCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin
337+
):
331338
"""Perform a simple value check using sql code.
332339
333340
.. seealso::
@@ -337,6 +344,13 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
337344
:param sql: SQL to execute.
338345
:param use_legacy_sql: Whether to use legacy SQL (true)
339346
or standard SQL (false).
347+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
348+
349+
.. code-block:: python
350+
351+
encryption_configuration = {
352+
"kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
353+
}
340354
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
341355
:param location: The geographic location of the job. See details at:
342356
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -371,6 +385,7 @@ def __init__(
371385
sql: str,
372386
pass_value: Any,
373387
tolerance: Any = None,
388+
encryption_configuration: dict | None = None,
374389
gcp_conn_id: str = "google_cloud_default",
375390
use_legacy_sql: bool = True,
376391
location: str | None = None,
@@ -384,6 +399,7 @@ def __init__(
384399
self.location = location
385400
self.gcp_conn_id = gcp_conn_id
386401
self.use_legacy_sql = use_legacy_sql
402+
self.encryption_configuration = encryption_configuration
387403
self.impersonation_chain = impersonation_chain
388404
self.labels = labels
389405
self.deferrable = deferrable
@@ -402,6 +418,8 @@ def _submit_job(
402418
},
403419
}
404420

421+
self.include_encryption_configuration(configuration, "query")
422+
405423
return hook.insert_job(
406424
configuration=configuration,
407425
project_id=hook.project_id,
@@ -461,7 +479,9 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
461479
)
462480

463481

464-
class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperator):
482+
class BigQueryIntervalCheckOperator(
483+
_BigQueryDbHookMixin, SQLIntervalCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin
484+
):
465485
"""
466486
Check that the values of metrics given as SQL expressions are within a tolerance of the older ones.
467487
@@ -482,6 +502,13 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
482502
between the current day, and the prior days_back.
483503
:param use_legacy_sql: Whether to use legacy SQL (true)
484504
or standard SQL (false).
505+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
506+
507+
.. code-block:: python
508+
509+
encryption_configuration = {
510+
"kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
511+
}
485512
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
486513
:param location: The geographic location of the job. See details at:
487514
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -521,6 +548,7 @@ def __init__(
521548
gcp_conn_id: str = "google_cloud_default",
522549
use_legacy_sql: bool = True,
523550
location: str | None = None,
551+
encryption_configuration: dict | None = None,
524552
impersonation_chain: str | Sequence[str] | None = None,
525553
labels: dict | None = None,
526554
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
@@ -539,6 +567,7 @@ def __init__(
539567
self.gcp_conn_id = gcp_conn_id
540568
self.use_legacy_sql = use_legacy_sql
541569
self.location = location
570+
self.encryption_configuration = encryption_configuration
542571
self.impersonation_chain = impersonation_chain
543572
self.labels = labels
544573
self.project_id = project_id
@@ -553,6 +582,7 @@ def _submit_job(
553582
) -> BigQueryJob:
554583
"""Submit a new job and get the job id for polling the status using Triggerer."""
555584
configuration = {"query": {"query": sql, "useLegacySql": self.use_legacy_sql}}
585+
self.include_encryption_configuration(configuration, "query")
556586
return hook.insert_job(
557587
configuration=configuration,
558588
project_id=self.project_id or hook.project_id,
@@ -609,7 +639,9 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
609639
)
610640

611641

612-
class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, SQLColumnCheckOperator):
642+
class BigQueryColumnCheckOperator(
643+
_BigQueryDbHookMixin, SQLColumnCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin
644+
):
613645
"""
614646
Subclasses the SQLColumnCheckOperator in order to provide a job id for OpenLineage to parse.
615647
@@ -624,6 +656,13 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, SQLColumnCheckOperator):
624656
:param partition_clause: a string SQL statement added to a WHERE clause
625657
to partition data
626658
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
659+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
660+
661+
.. code-block:: python
662+
663+
encryption_configuration = {
664+
"kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
665+
}
627666
:param use_legacy_sql: Whether to use legacy SQL (true)
628667
or standard SQL (false).
629668
:param location: The geographic location of the job. See details at:
@@ -651,6 +690,7 @@ def __init__(
651690
partition_clause: str | None = None,
652691
database: str | None = None,
653692
accept_none: bool = True,
693+
encryption_configuration: dict | None = None,
654694
gcp_conn_id: str = "google_cloud_default",
655695
use_legacy_sql: bool = True,
656696
location: str | None = None,
@@ -672,6 +712,7 @@ def __init__(
672712
self.database = database
673713
self.accept_none = accept_none
674714
self.gcp_conn_id = gcp_conn_id
715+
self.encryption_configuration = encryption_configuration
675716
self.use_legacy_sql = use_legacy_sql
676717
self.location = location
677718
self.impersonation_chain = impersonation_chain
@@ -684,7 +725,7 @@ def _submit_job(
684725
) -> BigQueryJob:
685726
"""Submit a new job and get the job id for polling the status using Trigger."""
686727
configuration = {"query": {"query": self.sql, "useLegacySql": self.use_legacy_sql}}
687-
728+
self.include_encryption_configuration(configuration, "query")
688729
return hook.insert_job(
689730
configuration=configuration,
690731
project_id=hook.project_id,
@@ -766,7 +807,7 @@ class BigQueryTableCheckOperator(
766807
Service Account Token Creator IAM role to the directly preceding identity, with first
767808
account from the list granting this role to the originating account (templated).
768809
:param labels: a dictionary containing labels for the table, passed to BigQuery
769-
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
810+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
770811
771812
.. code-block:: python
772813
@@ -852,7 +893,7 @@ def execute(self, context=None):
852893
self.log.info("All tests have passed")
853894

854895

855-
class BigQueryGetDataOperator(GoogleCloudBaseOperator):
896+
class BigQueryGetDataOperator(GoogleCloudBaseOperator, _BigQueryOperatorsEncryptionConfigurationMixin):
856897
"""
857898
Fetch data and return it, either from a BigQuery table, or results of a query job.
858899
@@ -921,6 +962,13 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
921962
from the table. (templated)
922963
:param selected_fields: List of fields to return (comma-separated). If
923964
unspecified, all fields are returned.
965+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
966+
967+
.. code-block:: python
968+
969+
encryption_configuration = {
970+
"kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
971+
}
924972
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
925973
:param location: The location used for the operation.
926974
:param impersonation_chain: Optional service account to impersonate using short-term
@@ -965,6 +1013,7 @@ def __init__(
9651013
selected_fields: str | None = None,
9661014
gcp_conn_id: str = "google_cloud_default",
9671015
location: str | None = None,
1016+
encryption_configuration: dict | None = None,
9681017
impersonation_chain: str | Sequence[str] | None = None,
9691018
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
9701019
poll_interval: float = 4.0,
@@ -984,6 +1033,7 @@ def __init__(
9841033
self.gcp_conn_id = gcp_conn_id
9851034
self.location = location
9861035
self.impersonation_chain = impersonation_chain
1036+
self.encryption_configuration = encryption_configuration
9871037
self.project_id = project_id
9881038
self.deferrable = deferrable
9891039
self.poll_interval = poll_interval
@@ -997,6 +1047,8 @@ def _submit_job(
9971047
) -> BigQueryJob:
9981048
get_query = self.generate_query(hook=hook)
9991049
configuration = {"query": {"query": get_query, "useLegacySql": self.use_legacy_sql}}
1050+
self.include_encryption_configuration(configuration, "query")
1051+
10001052
"""Submit a new job and get the job id for polling the status using Triggerer."""
10011053
return hook.insert_job(
10021054
configuration=configuration,
@@ -1199,7 +1251,7 @@ class BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
11991251
:param location: The geographic location of the job. Required except for
12001252
US and EU. See details at
12011253
https://cloud.google.com/bigquery/docs/locations#specifying_your_location
1202-
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1254+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
12031255
12041256
.. code-block:: python
12051257
@@ -1393,9 +1445,9 @@ class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
13931445
13941446
.. seealso::
13951447
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
1396-
:param gcp_conn_id: [Optional] The connection ID used to connect to Google Cloud and
1448+
:param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud and
13971449
interact with the Bigquery service.
1398-
:param google_cloud_storage_conn_id: [Optional] The connection ID used to connect to Google Cloud.
1450+
:param google_cloud_storage_conn_id: (Optional) The connection ID used to connect to Google Cloud.
13991451
and interact with the Google Cloud Storage service.
14001452
:param labels: a dictionary containing labels for the table, passed to BigQuery
14011453
@@ -1433,21 +1485,21 @@ class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
14331485
google_cloud_storage_conn_id="airflow-conn-id",
14341486
)
14351487
1436-
:param view: [Optional] A dictionary containing definition for the view.
1488+
:param view: (Optional) A dictionary containing definition for the view.
14371489
If set, it will create a view instead of a table:
14381490
14391491
.. seealso::
14401492
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
1441-
:param materialized_view: [Optional] The materialized view definition.
1442-
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1493+
:param materialized_view: (Optional) The materialized view definition.
1494+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
14431495
14441496
.. code-block:: python
14451497
14461498
encryption_configuration = {
14471499
"kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
14481500
}
14491501
:param location: The location used for the operation.
1450-
:param cluster_fields: [Optional] The fields used for clustering.
1502+
:param cluster_fields: (Optional) The fields used for clustering.
14511503
BigQuery supports clustering for both partitioned and
14521504
non-partitioned tables.
14531505
@@ -1645,7 +1697,7 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
16451697
:param autodetect: Try to detect schema and format options automatically.
16461698
The schema_fields and schema_object options will be honored when specified explicitly.
16471699
https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources
1648-
:param compression: [Optional] The compression type of the data source.
1700+
:param compression: (Optional) The compression type of the data source.
16491701
Possible values include GZIP and NONE.
16501702
The default value is NONE.
16511703
This setting is ignored for Google Cloud Bigtable,
@@ -1667,7 +1719,7 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
16671719
and interact with the Google Cloud Storage service.
16681720
:param src_fmt_configs: configure optional fields specific to the source format
16691721
:param labels: a dictionary containing labels for the table, passed to BigQuery
1670-
:param encryption_configuration: [Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1722+
:param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
16711723
16721724
.. code-block:: python
16731725

0 commit comments

Comments
 (0)