Skip to content

Commit 013c95b

Browse files
authored
Update Vertex AI system tests (#34364)
* Update Vertex AI system tests * Update image uri for "Hyperparameter" test
1 parent f93b046 commit 013c95b

20 files changed

+169
-989
lines changed

airflow/providers/google/cloud/example_dags/example_vertex_ai.py

Lines changed: 0 additions & 754 deletions
This file was deleted.

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_batch_prediction_job.py

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@
2626

2727
import os
2828
from datetime import datetime
29-
from pathlib import Path
3029

3130
from google.cloud.aiplatform import schema
3231
from google.protobuf.json_format import ParseDict
3332
from google.protobuf.struct_pb2 import Value
3433

3534
from airflow import models
36-
from airflow.operators.bash import BashOperator
37-
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
35+
from airflow.providers.google.cloud.operators.gcs import (
36+
GCSCreateBucketOperator,
37+
GCSDeleteBucketOperator,
38+
GCSSynchronizeBucketsOperator,
39+
)
3840
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
3941
CreateAutoMLForecastingTrainingJobOperator,
4042
DeleteAutoMLTrainingJobOperator,
@@ -48,22 +50,20 @@
4850
CreateDatasetOperator,
4951
DeleteDatasetOperator,
5052
)
51-
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
5253
from airflow.utils.trigger_rule import TriggerRule
5354

54-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
55+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
5556
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
56-
DAG_ID = "vertex_ai_batch_prediction_job_operations"
57+
DAG_ID = "example_vertex_ai_batch_prediction_operations"
5758
REGION = "us-central1"
5859

5960
FORECAST_DISPLAY_NAME = f"auto-ml-forecasting-{ENV_ID}"
6061
MODEL_DISPLAY_NAME = f"auto-ml-forecasting-model-{ENV_ID}"
6162

6263
JOB_DISPLAY_NAME = f"batch_prediction_job_test_{ENV_ID}"
63-
DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
64+
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
65+
DATA_SAMPLE_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
6466
DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/forecast-dataset.csv"
65-
FORECAST_ZIP_CSV_FILE_LOCAL_PATH = str(Path(__file__).parent / "resources" / "forecast-dataset.csv.zip")
66-
FORECAST_CSV_FILE_LOCAL_PATH = "/batch-prediction/forecast-dataset.csv"
6767

6868
FORECAST_DATASET = {
6969
"display_name": f"forecast-dataset-{ENV_ID}",
@@ -109,18 +109,15 @@
109109
location=REGION,
110110
)
111111

112-
unzip_file = BashOperator(
113-
task_id="unzip_csv_data_file",
114-
bash_command=f"mkdir -p /batch-prediction && "
115-
f"unzip {FORECAST_ZIP_CSV_FILE_LOCAL_PATH} -d /batch-prediction/",
112+
move_dataset_file = GCSSynchronizeBucketsOperator(
113+
task_id="move_dataset_to_bucket",
114+
source_bucket=RESOURCE_DATA_BUCKET,
115+
source_object="vertex-ai/datasets",
116+
destination_bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
117+
destination_object="vertex-ai",
118+
recursive=True,
116119
)
117120

118-
upload_files = LocalFilesystemToGCSOperator(
119-
task_id="upload_file_to_bucket",
120-
src=FORECAST_CSV_FILE_LOCAL_PATH,
121-
dst=DATA_SAMPLE_GCS_OBJECT_NAME,
122-
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
123-
)
124121
create_forecast_dataset = CreateDatasetOperator(
125122
task_id="forecast_dataset",
126123
dataset=FORECAST_DATASET,
@@ -186,7 +183,8 @@
186183

187184
delete_auto_ml_forecasting_training_job = DeleteAutoMLTrainingJobOperator(
188185
task_id="delete_auto_ml_forecasting_training_job",
189-
training_pipeline_id=create_auto_ml_forecasting_training_job.output["training_id"],
186+
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='auto_ml_forecasting_task', "
187+
"key='training_id') }}",
190188
region=REGION,
191189
project_id=PROJECT_ID,
192190
trigger_rule=TriggerRule.ALL_DONE,
@@ -204,16 +202,10 @@
204202
trigger_rule=TriggerRule.ALL_DONE,
205203
)
206204

207-
clear_folder = BashOperator(
208-
task_id="clear_folder",
209-
bash_command="rm -r /batch-prediction/*",
210-
)
211-
212205
(
213206
# TEST SETUP
214207
create_bucket
215-
>> unzip_file
216-
>> upload_files
208+
>> move_dataset_file
217209
>> create_forecast_dataset
218210
>> create_auto_ml_forecasting_training_job
219211
# TEST BODY
@@ -224,7 +216,6 @@
224216
>> delete_auto_ml_forecasting_training_job
225217
>> delete_forecast_dataset
226218
>> delete_bucket
227-
>> clear_folder
228219
)
229220

230221

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@
2626

2727
import os
2828
from datetime import datetime
29-
from pathlib import Path
3029

3130
from google.cloud.aiplatform import schema
3231
from google.protobuf.json_format import ParseDict
3332
from google.protobuf.struct_pb2 import Value
3433

3534
from airflow import models
36-
from airflow.operators.bash import BashOperator
37-
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
35+
from airflow.providers.google.cloud.operators.gcs import (
36+
GCSCreateBucketOperator,
37+
GCSDeleteBucketOperator,
38+
GCSSynchronizeBucketsOperator,
39+
)
3840
from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
3941
CreateCustomContainerTrainingJobOperator,
4042
DeleteCustomTrainingJobOperator,
@@ -43,22 +45,19 @@
4345
CreateDatasetOperator,
4446
DeleteDatasetOperator,
4547
)
46-
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
4748
from airflow.utils.trigger_rule import TriggerRule
4849

49-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
50+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
5051
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
51-
DAG_ID = "vertex_ai_custom_job_operations"
52+
DAG_ID = "example_vertex_ai_custom_job_operations"
5253
REGION = "us-central1"
5354
CONTAINER_DISPLAY_NAME = f"train-housing-container-{ENV_ID}"
5455
MODEL_DISPLAY_NAME = f"container-housing-model-{ENV_ID}"
5556

56-
CUSTOM_CONTAINER_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
57+
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
58+
CUSTOM_CONTAINER_GCS_BUCKET_NAME = f"bucket_cont_{DAG_ID}_{ENV_ID}".replace("_", "-")
5759

5860
DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
59-
CSV_FILE_LOCAL_PATH = "/custom-job-container/california_housing_train.csv"
60-
RESOURCES_PATH = Path(__file__).parent / "resources"
61-
CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing-custom-container.zip")
6261

6362

6463
def TABULAR_DATASET(bucket_name):
@@ -97,17 +96,16 @@ def TABULAR_DATASET(bucket_name):
9796
storage_class="REGIONAL",
9897
location=REGION,
9998
)
100-
unzip_file = BashOperator(
101-
task_id="unzip_csv_data_file",
102-
bash_command=f"mkdir -p /custom-job-container/ && "
103-
f"unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job-container/",
104-
)
105-
upload_files = LocalFilesystemToGCSOperator(
106-
task_id="upload_file_to_bucket",
107-
src=CSV_FILE_LOCAL_PATH,
108-
dst=DATA_SAMPLE_GCS_OBJECT_NAME,
109-
bucket=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
99+
100+
move_data_files = GCSSynchronizeBucketsOperator(
101+
task_id="move_files_to_bucket",
102+
source_bucket=RESOURCE_DATA_BUCKET,
103+
source_object="vertex-ai/california-housing-data",
104+
destination_bucket=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
105+
destination_object="vertex-ai",
106+
recursive=True,
110107
)
108+
111109
create_tabular_dataset = CreateDatasetOperator(
112110
task_id="tabular_dataset",
113111
dataset=TABULAR_DATASET(CUSTOM_CONTAINER_GCS_BUCKET_NAME),
@@ -141,8 +139,10 @@ def TABULAR_DATASET(bucket_name):
141139

142140
delete_custom_training_job = DeleteCustomTrainingJobOperator(
143141
task_id="delete_custom_training_job",
144-
training_pipeline_id=create_custom_container_training_job.output["training_id"],
145-
custom_job_id=create_custom_container_training_job.output["custom_job_id"],
142+
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_container_task', "
143+
"key='training_id') }}",
144+
custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_container_task', "
145+
"key='custom_job_id') }}",
146146
region=REGION,
147147
project_id=PROJECT_ID,
148148
trigger_rule=TriggerRule.ALL_DONE,
@@ -160,24 +160,18 @@ def TABULAR_DATASET(bucket_name):
160160
bucket_name=CUSTOM_CONTAINER_GCS_BUCKET_NAME,
161161
trigger_rule=TriggerRule.ALL_DONE,
162162
)
163-
clear_folder = BashOperator(
164-
task_id="clear_folder",
165-
bash_command="rm -r /custom-job-container/*",
166-
)
167163

168164
(
169165
# TEST SETUP
170166
create_bucket
171-
>> unzip_file
172-
>> upload_files
167+
>> move_data_files
173168
>> create_tabular_dataset
174169
# TEST BODY
175170
>> create_custom_container_training_job
176171
# TEST TEARDOWN
177172
>> delete_custom_training_job
178173
>> delete_tabular_dataset
179174
>> delete_bucket
180-
>> clear_folder
181175
)
182176

183177

tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,17 @@
2626

2727
import os
2828
from datetime import datetime
29-
from pathlib import Path
3029

3130
from google.cloud.aiplatform import schema
3231
from google.protobuf.json_format import ParseDict
3332
from google.protobuf.struct_pb2 import Value
3433

3534
from airflow import models
36-
from airflow.operators.bash import BashOperator
37-
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator
35+
from airflow.providers.google.cloud.operators.gcs import (
36+
GCSCreateBucketOperator,
37+
GCSDeleteBucketOperator,
38+
GCSSynchronizeBucketsOperator,
39+
)
3840
from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
3941
CreateCustomTrainingJobOperator,
4042
DeleteCustomTrainingJobOperator,
@@ -43,22 +45,20 @@
4345
CreateDatasetOperator,
4446
DeleteDatasetOperator,
4547
)
46-
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
48+
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
4749
from airflow.utils.trigger_rule import TriggerRule
4850

49-
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
50-
DAG_ID = "vertex_ai_custom_job_operations"
51+
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default")
52+
DAG_ID = "example_vertex_ai_custom_job_operations"
5153
PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT", "default")
5254
REGION = "us-central1"
5355
CUSTOM_DISPLAY_NAME = f"train-housing-custom-{ENV_ID}"
5456
MODEL_DISPLAY_NAME = f"custom-housing-model-{ENV_ID}"
5557

56-
CUSTOM_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}"
58+
RESOURCE_DATA_BUCKET = "airflow-system-tests-resources"
59+
CUSTOM_GCS_BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}".replace("_", "-")
5760

5861
DATA_SAMPLE_GCS_OBJECT_NAME = "vertex-ai/california_housing_train.csv"
59-
RESOURCES_PATH = Path(__file__).parent / "resources"
60-
CSV_ZIP_FILE_LOCAL_PATH = str(RESOURCES_PATH / "California-housing-custom-job.zip")
61-
CSV_FILE_LOCAL_PATH = "/custom-job/california_housing_train.csv"
6262

6363

6464
def TABULAR_DATASET(bucket_name):
@@ -76,7 +76,7 @@ def TABULAR_DATASET(bucket_name):
7676
MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
7777
REPLICA_COUNT = 1
7878

79-
LOCAL_TRAINING_SCRIPT_PATH = "/custom-job/california_housing_training_script.py"
79+
LOCAL_TRAINING_SCRIPT_PATH = "california_housing_training_script.py"
8080

8181

8282
with models.DAG(
@@ -92,16 +92,23 @@ def TABULAR_DATASET(bucket_name):
9292
storage_class="REGIONAL",
9393
location=REGION,
9494
)
95-
unzip_file = BashOperator(
96-
task_id="unzip_csv_data_file",
97-
bash_command=f"mkdir -p /custom-job && unzip {CSV_ZIP_FILE_LOCAL_PATH} -d /custom-job/",
95+
96+
move_data_files = GCSSynchronizeBucketsOperator(
97+
task_id="move_files_to_bucket",
98+
source_bucket=RESOURCE_DATA_BUCKET,
99+
source_object="vertex-ai/california-housing-data",
100+
destination_bucket=CUSTOM_GCS_BUCKET_NAME,
101+
destination_object="vertex-ai",
102+
recursive=True,
98103
)
99-
upload_files = LocalFilesystemToGCSOperator(
100-
task_id="upload_file_to_bucket",
101-
src=CSV_FILE_LOCAL_PATH,
102-
dst=DATA_SAMPLE_GCS_OBJECT_NAME,
104+
105+
download_training_script_file = GCSToLocalFilesystemOperator(
106+
task_id="download_training_script_file",
107+
object_name="vertex-ai/california_housing_training_script.py",
103108
bucket=CUSTOM_GCS_BUCKET_NAME,
109+
filename=LOCAL_TRAINING_SCRIPT_PATH,
104110
)
111+
105112
create_tabular_dataset = CreateDatasetOperator(
106113
task_id="tabular_dataset",
107114
dataset=TABULAR_DATASET(CUSTOM_GCS_BUCKET_NAME),
@@ -132,8 +139,8 @@ def TABULAR_DATASET(bucket_name):
132139
# [START how_to_cloud_vertex_ai_delete_custom_training_job_operator]
133140
delete_custom_training_job = DeleteCustomTrainingJobOperator(
134141
task_id="delete_custom_training_job",
135-
training_pipeline_id=create_custom_training_job.output["training_id"],
136-
custom_job_id=create_custom_training_job.output["custom_job_id"],
142+
training_pipeline_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='training_id') }}",
143+
custom_job_id="{{ task_instance.xcom_pull(task_ids='custom_task', key='custom_job_id') }}",
137144
region=REGION,
138145
project_id=PROJECT_ID,
139146
trigger_rule=TriggerRule.ALL_DONE,
@@ -152,24 +159,19 @@ def TABULAR_DATASET(bucket_name):
152159
bucket_name=CUSTOM_GCS_BUCKET_NAME,
153160
trigger_rule=TriggerRule.ALL_DONE,
154161
)
155-
clear_folder = BashOperator(
156-
task_id="clear_folder",
157-
bash_command="rm -r /custom-job/*",
158-
)
159162

160163
(
161164
# TEST SETUP
162165
create_bucket
163-
>> unzip_file
164-
>> upload_files
166+
>> move_data_files
167+
>> download_training_script_file
165168
>> create_tabular_dataset
166169
# TEST BODY
167170
>> create_custom_training_job
168171
# TEST TEARDOWN
169172
>> delete_custom_training_job
170173
>> delete_tabular_dataset
171174
>> delete_bucket
172-
>> clear_folder
173175
)
174176

175177

0 commit comments

Comments
 (0)