Skip to content

Commit f7708ac

Browse files
authored
Fix hive_partition_sensor system test (#40023)
1 parent fc4fbb3 commit f7708ac

File tree

2 files changed

+23
-16
lines changed

2 files changed

+23
-16
lines changed

airflow/providers/google/cloud/operators/dataproc_metastore.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ def execute(self, context: Context) -> dict:
431431
hook = DataprocMetastoreHook(
432432
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
433433
)
434-
self.log.info("Creating Dataproc Metastore service: %s", self.project_id)
434+
self.log.info("Creating Dataproc Metastore service: %s", self.service_id)
435435
try:
436436
operation = hook.create_service(
437437
region=self.region,
@@ -548,13 +548,24 @@ def execute(self, context: Context) -> None:
548548
class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
549549
"""Delete a single service.
550550
551-
:param request: The request object. Request message for
552-
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
551+
:param region: Required. The ID of the Google Cloud region that the service belongs to.
553552
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
553+
:param service_id: Required. The ID of the metastore service, which is used as the final component of
554+
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
555+
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
556+
hyphens.
554557
:param retry: Designation of what errors, if any, should be retried.
555558
:param timeout: The timeout for this request.
556559
:param metadata: Strings which should be sent along with the request as metadata.
557-
:param gcp_conn_id:
560+
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
561+
:param impersonation_chain: Optional service account to impersonate using short-term
562+
credentials, or chained list of accounts required to get the access_token
563+
of the last account in the list, which will be impersonated in the request.
564+
If set as a string, the account must grant the originating account
565+
the Service Account Token Creator IAM role.
566+
If set as a sequence, the identities from the list must grant
567+
Service Account Token Creator IAM role to the directly preceding identity, with first
568+
account from the list granting this role to the originating account (templated).
558569
"""
559570

560571
template_fields: Sequence[str] = (
@@ -589,7 +600,7 @@ def execute(self, context: Context):
589600
hook = DataprocMetastoreHook(
590601
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
591602
)
592-
self.log.info("Deleting Dataproc Metastore service: %s", self.project_id)
603+
self.log.info("Deleting Dataproc Metastore service: %s", self.service_id)
593604
operation = hook.delete_service(
594605
region=self.region,
595606
project_id=self.project_id,
@@ -599,7 +610,7 @@ def execute(self, context: Context):
599610
metadata=self.metadata,
600611
)
601612
hook.wait_for_operation(self.timeout, operation)
602-
self.log.info("Service %s deleted successfully", self.project_id)
613+
self.log.info("Service %s deleted successfully", self.service_id)
603614

604615

605616
class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):

tests/system/providers/google/cloud/dataproc_metastore/example_dataproc_metastore_hive_partition_sensor.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
# specific language governing permissions and limitations
1717
# under the License.
1818
"""
19-
Example Airflow DAG that show how to check Hive partitions existence
20-
using Dataproc Metastore Sensor.
19+
Example Airflow DAG that shows how to check Hive partitions existence with Dataproc Metastore Sensor.
2120
2221
Note that Metastore service must be configured to use gRPC endpoints.
2322
"""
@@ -47,7 +46,7 @@
4746
DAG_ID = "hive_partition_sensor"
4847
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "demo-project")
4948
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "demo-env")
50-
REGION = "us-central1"
49+
REGION = "europe-west1"
5150
NETWORK = "default"
5251

5352
METASTORE_SERVICE_ID = f"metastore-{DAG_ID}-{ENV_ID}".replace("_", "-")
@@ -60,7 +59,7 @@
6059
"network": f"projects/{PROJECT_ID}/global/networks/{NETWORK}",
6160
}
6261
METASTORE_SERVICE_QFN = f"projects/{PROJECT_ID}/locations/{REGION}/services/{METASTORE_SERVICE_ID}"
63-
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}".replace("_", "-")
62+
DATAPROC_CLUSTER_NAME = f"cluster-{DAG_ID}-{ENV_ID}".replace("_", "-")
6463
DATAPROC_CLUSTER_CONFIG = {
6564
"master_config": {
6665
"num_instances": 1,
@@ -133,7 +132,7 @@
133132

134133
@task(task_id="get_hive_warehouse_bucket_task")
135134
def get_hive_warehouse_bucket(**kwargs):
136-
"""Returns Hive Metastore Warehouse GCS bucket name."""
135+
"""Return Hive Metastore Warehouse GCS bucket name."""
137136
ti = kwargs["ti"]
138137
metastore_service: dict = ti.xcom_pull(task_ids="create_metastore_service")
139138
config_overrides: dict = metastore_service["hive_metastore_config"]["config_overrides"]
@@ -216,19 +215,16 @@ def get_hive_warehouse_bucket(**kwargs):
216215
trigger_rule=TriggerRule.ALL_DONE,
217216
)
218217

219-
# TEST SETUP
220218
(
219+
# TEST SETUP
221220
create_metastore_service
222221
>> create_cluster
223222
>> get_hive_warehouse_bucket_task
224223
>> copy_source_data
225224
>> create_external_table
226225
>> create_partitioned_table
227-
>> partition_data
228-
)
229-
(
230-
create_metastore_service
231226
# TEST BODY
227+
>> partition_data
232228
>> hive_partition_sensor
233229
# TEST TEARDOWN
234230
>> [delete_dataproc_cluster, delete_metastore_service, delete_warehouse_bucket]

0 commit comments

Comments
 (0)