@@ -150,7 +150,12 @@ class _BigQueryOperatorsEncryptionConfigurationMixin:
150150 # annotation of the `self`. Then you can inherit this class in the target operator.
151151 # e.g: BigQueryCheckOperator, BigQueryTableCheckOperator
152152 def include_encryption_configuration ( # type:ignore[misc]
153- self : BigQueryCheckOperator | BigQueryTableCheckOperator ,
153+ self : BigQueryCheckOperator
154+ | BigQueryTableCheckOperator
155+ | BigQueryValueCheckOperator
156+ | BigQueryColumnCheckOperator
157+ | BigQueryGetDataOperator
158+ | BigQueryIntervalCheckOperator ,
154159 configuration : dict ,
155160 config_key : str ,
156161 ) -> None :
@@ -206,7 +211,7 @@ class BigQueryCheckOperator(
206211 Token Creator IAM role to the directly preceding identity, with first
207212 account from the list granting this role to the originating account. (templated)
208213 :param labels: a dictionary containing labels for the table, passed to BigQuery.
209- :param encryption_configuration: [ Optional] Custom encryption configuration (e.g., Cloud KMS keys).
214+ :param encryption_configuration: ( Optional) Custom encryption configuration (e.g., Cloud KMS keys).
210215
211216 .. code-block:: python
212217
@@ -327,7 +332,9 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
327332 self .log .info ("Success." )
328333
329334
330- class BigQueryValueCheckOperator (_BigQueryDbHookMixin , SQLValueCheckOperator ):
335+ class BigQueryValueCheckOperator (
336+ _BigQueryDbHookMixin , SQLValueCheckOperator , _BigQueryOperatorsEncryptionConfigurationMixin
337+ ):
331338 """Perform a simple value check using sql code.
332339
333340 .. seealso::
@@ -337,6 +344,13 @@ class BigQueryValueCheckOperator(_BigQueryDbHookMixin, SQLValueCheckOperator):
337344 :param sql: SQL to execute.
338345 :param use_legacy_sql: Whether to use legacy SQL (true)
339346 or standard SQL (false).
347+ :param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
348+
349+ .. code-block:: python
350+
351+ encryption_configuration = {
352+ "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
353+ }
340354 :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
341355 :param location: The geographic location of the job. See details at:
342356 https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -371,6 +385,7 @@ def __init__(
371385 sql : str ,
372386 pass_value : Any ,
373387 tolerance : Any = None ,
388+ encryption_configuration : dict | None = None ,
374389 gcp_conn_id : str = "google_cloud_default" ,
375390 use_legacy_sql : bool = True ,
376391 location : str | None = None ,
@@ -384,6 +399,7 @@ def __init__(
384399 self .location = location
385400 self .gcp_conn_id = gcp_conn_id
386401 self .use_legacy_sql = use_legacy_sql
402+ self .encryption_configuration = encryption_configuration
387403 self .impersonation_chain = impersonation_chain
388404 self .labels = labels
389405 self .deferrable = deferrable
@@ -402,6 +418,8 @@ def _submit_job(
402418 },
403419 }
404420
421+ self .include_encryption_configuration (configuration , "query" )
422+
405423 return hook .insert_job (
406424 configuration = configuration ,
407425 project_id = hook .project_id ,
@@ -461,7 +479,9 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
461479 )
462480
463481
464- class BigQueryIntervalCheckOperator (_BigQueryDbHookMixin , SQLIntervalCheckOperator ):
482+ class BigQueryIntervalCheckOperator (
483+ _BigQueryDbHookMixin , SQLIntervalCheckOperator , _BigQueryOperatorsEncryptionConfigurationMixin
484+ ):
465485 """
466486 Check that the values of metrics given as SQL expressions are within a tolerance of the older ones.
467487
@@ -482,6 +502,13 @@ class BigQueryIntervalCheckOperator(_BigQueryDbHookMixin, SQLIntervalCheckOperat
482502 between the current day, and the prior days_back.
483503 :param use_legacy_sql: Whether to use legacy SQL (true)
484504 or standard SQL (false).
505+ :param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
506+
507+ .. code-block:: python
508+
509+ encryption_configuration = {
510+ "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
511+ }
485512 :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
486513 :param location: The geographic location of the job. See details at:
487514 https://cloud.google.com/bigquery/docs/locations#specifying_your_location
@@ -521,6 +548,7 @@ def __init__(
521548 gcp_conn_id : str = "google_cloud_default" ,
522549 use_legacy_sql : bool = True ,
523550 location : str | None = None ,
551+ encryption_configuration : dict | None = None ,
524552 impersonation_chain : str | Sequence [str ] | None = None ,
525553 labels : dict | None = None ,
526554 deferrable : bool = conf .getboolean ("operators" , "default_deferrable" , fallback = False ),
@@ -539,6 +567,7 @@ def __init__(
539567 self .gcp_conn_id = gcp_conn_id
540568 self .use_legacy_sql = use_legacy_sql
541569 self .location = location
570+ self .encryption_configuration = encryption_configuration
542571 self .impersonation_chain = impersonation_chain
543572 self .labels = labels
544573 self .project_id = project_id
@@ -553,6 +582,7 @@ def _submit_job(
553582 ) -> BigQueryJob :
554583 """Submit a new job and get the job id for polling the status using Triggerer."""
555584 configuration = {"query" : {"query" : sql , "useLegacySql" : self .use_legacy_sql }}
585+ self .include_encryption_configuration (configuration , "query" )
556586 return hook .insert_job (
557587 configuration = configuration ,
558588 project_id = self .project_id or hook .project_id ,
@@ -609,7 +639,9 @@ def execute_complete(self, context: Context, event: dict[str, Any]) -> None:
609639 )
610640
611641
612- class BigQueryColumnCheckOperator (_BigQueryDbHookMixin , SQLColumnCheckOperator ):
642+ class BigQueryColumnCheckOperator (
643+ _BigQueryDbHookMixin , SQLColumnCheckOperator , _BigQueryOperatorsEncryptionConfigurationMixin
644+ ):
613645 """
614646 Subclasses the SQLColumnCheckOperator in order to provide a job id for OpenLineage to parse.
615647
@@ -624,6 +656,13 @@ class BigQueryColumnCheckOperator(_BigQueryDbHookMixin, SQLColumnCheckOperator):
624656 :param partition_clause: a string SQL statement added to a WHERE clause
625657 to partition data
626658 :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
659+ :param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
660+
661+ .. code-block:: python
662+
663+ encryption_configuration = {
664+ "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
665+ }
627666 :param use_legacy_sql: Whether to use legacy SQL (true)
628667 or standard SQL (false).
629668 :param location: The geographic location of the job. See details at:
@@ -651,6 +690,7 @@ def __init__(
651690 partition_clause : str | None = None ,
652691 database : str | None = None ,
653692 accept_none : bool = True ,
693+ encryption_configuration : dict | None = None ,
654694 gcp_conn_id : str = "google_cloud_default" ,
655695 use_legacy_sql : bool = True ,
656696 location : str | None = None ,
@@ -672,6 +712,7 @@ def __init__(
672712 self .database = database
673713 self .accept_none = accept_none
674714 self .gcp_conn_id = gcp_conn_id
715+ self .encryption_configuration = encryption_configuration
675716 self .use_legacy_sql = use_legacy_sql
676717 self .location = location
677718 self .impersonation_chain = impersonation_chain
@@ -684,7 +725,7 @@ def _submit_job(
684725 ) -> BigQueryJob :
685726 """Submit a new job and get the job id for polling the status using Trigger."""
686727 configuration = {"query" : {"query" : self .sql , "useLegacySql" : self .use_legacy_sql }}
687-
728+ self . include_encryption_configuration ( configuration , "query" )
688729 return hook .insert_job (
689730 configuration = configuration ,
690731 project_id = hook .project_id ,
@@ -766,7 +807,7 @@ class BigQueryTableCheckOperator(
766807 Service Account Token Creator IAM role to the directly preceding identity, with first
767808 account from the list granting this role to the originating account (templated).
768809 :param labels: a dictionary containing labels for the table, passed to BigQuery
769- :param encryption_configuration: [ Optional] Custom encryption configuration (e.g., Cloud KMS keys).
810+ :param encryption_configuration: ( Optional) Custom encryption configuration (e.g., Cloud KMS keys).
770811
771812 .. code-block:: python
772813
@@ -852,7 +893,7 @@ def execute(self, context=None):
852893 self .log .info ("All tests have passed" )
853894
854895
855- class BigQueryGetDataOperator (GoogleCloudBaseOperator ):
896+ class BigQueryGetDataOperator (GoogleCloudBaseOperator , _BigQueryOperatorsEncryptionConfigurationMixin ):
856897 """
857898 Fetch data and return it, either from a BigQuery table, or results of a query job.
858899
@@ -921,6 +962,13 @@ class BigQueryGetDataOperator(GoogleCloudBaseOperator):
921962 from the table. (templated)
922963 :param selected_fields: List of fields to return (comma-separated). If
923964 unspecified, all fields are returned.
965+ :param encryption_configuration: (Optional) Custom encryption configuration (e.g., Cloud KMS keys).
966+
967+ .. code-block:: python
968+
969+ encryption_configuration = {
970+ "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
971+ }
924972 :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud.
925973 :param location: The location used for the operation.
926974 :param impersonation_chain: Optional service account to impersonate using short-term
@@ -965,6 +1013,7 @@ def __init__(
9651013 selected_fields : str | None = None ,
9661014 gcp_conn_id : str = "google_cloud_default" ,
9671015 location : str | None = None ,
1016+ encryption_configuration : dict | None = None ,
9681017 impersonation_chain : str | Sequence [str ] | None = None ,
9691018 deferrable : bool = conf .getboolean ("operators" , "default_deferrable" , fallback = False ),
9701019 poll_interval : float = 4.0 ,
@@ -984,6 +1033,7 @@ def __init__(
9841033 self .gcp_conn_id = gcp_conn_id
9851034 self .location = location
9861035 self .impersonation_chain = impersonation_chain
1036+ self .encryption_configuration = encryption_configuration
9871037 self .project_id = project_id
9881038 self .deferrable = deferrable
9891039 self .poll_interval = poll_interval
@@ -997,6 +1047,8 @@ def _submit_job(
9971047 ) -> BigQueryJob :
9981048 get_query = self .generate_query (hook = hook )
9991049 configuration = {"query" : {"query" : get_query , "useLegacySql" : self .use_legacy_sql }}
1050+ self .include_encryption_configuration (configuration , "query" )
1051+
10001052 """Submit a new job and get the job id for polling the status using Triggerer."""
10011053 return hook .insert_job (
10021054 configuration = configuration ,
@@ -1199,7 +1251,7 @@ class BigQueryExecuteQueryOperator(GoogleCloudBaseOperator):
11991251 :param location: The geographic location of the job. Required except for
12001252 US and EU. See details at
12011253 https://cloud.google.com/bigquery/docs/locations#specifying_your_location
1202- :param encryption_configuration: [ Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1254+ :param encryption_configuration: ( Optional) Custom encryption configuration (e.g., Cloud KMS keys).
12031255
12041256 .. code-block:: python
12051257
@@ -1393,9 +1445,9 @@ class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
13931445
13941446 .. seealso::
13951447 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#timePartitioning
1396- :param gcp_conn_id: [ Optional] The connection ID used to connect to Google Cloud and
1448+ :param gcp_conn_id: ( Optional) The connection ID used to connect to Google Cloud and
13971449 interact with the Bigquery service.
1398- :param google_cloud_storage_conn_id: [ Optional] The connection ID used to connect to Google Cloud.
1450+ :param google_cloud_storage_conn_id: ( Optional) The connection ID used to connect to Google Cloud.
13991451 and interact with the Google Cloud Storage service.
14001452 :param labels: a dictionary containing labels for the table, passed to BigQuery
14011453
@@ -1433,21 +1485,21 @@ class BigQueryCreateEmptyTableOperator(GoogleCloudBaseOperator):
14331485 google_cloud_storage_conn_id="airflow-conn-id",
14341486 )
14351487
1436- :param view: [ Optional] A dictionary containing definition for the view.
1488+ :param view: ( Optional) A dictionary containing definition for the view.
14371489 If set, it will create a view instead of a table:
14381490
14391491 .. seealso::
14401492 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
1441- :param materialized_view: [ Optional] The materialized view definition.
1442- :param encryption_configuration: [ Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1493+ :param materialized_view: ( Optional) The materialized view definition.
1494+ :param encryption_configuration: ( Optional) Custom encryption configuration (e.g., Cloud KMS keys).
14431495
14441496 .. code-block:: python
14451497
14461498 encryption_configuration = {
14471499 "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
14481500 }
14491501 :param location: The location used for the operation.
1450- :param cluster_fields: [ Optional] The fields used for clustering.
1502+ :param cluster_fields: ( Optional) The fields used for clustering.
14511503 BigQuery supports clustering for both partitioned and
14521504 non-partitioned tables.
14531505
@@ -1645,7 +1697,7 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
16451697 :param autodetect: Try to detect schema and format options automatically.
16461698 The schema_fields and schema_object options will be honored when specified explicitly.
16471699 https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources
1648- :param compression: [ Optional] The compression type of the data source.
1700+ :param compression: ( Optional) The compression type of the data source.
16491701 Possible values include GZIP and NONE.
16501702 The default value is NONE.
16511703 This setting is ignored for Google Cloud Bigtable,
@@ -1667,7 +1719,7 @@ class BigQueryCreateExternalTableOperator(GoogleCloudBaseOperator):
16671719 and interact with the Google Cloud Storage service.
16681720 :param src_fmt_configs: configure optional fields specific to the source format
16691721 :param labels: a dictionary containing labels for the table, passed to BigQuery
1670- :param encryption_configuration: [ Optional] Custom encryption configuration (e.g., Cloud KMS keys).
1722+ :param encryption_configuration: ( Optional) Custom encryption configuration (e.g., Cloud KMS keys).
16711723
16721724 .. code-block:: python
16731725
0 commit comments