|
18 | 18 | from datetime import datetime, timedelta |
19 | 19 |
|
20 | 20 | from airflow import DAG |
| 21 | +from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator, GCSDeleteBucketOperator |
21 | 22 | from airflow.providers.google.cloud.transfers.azure_fileshare_to_gcs import AzureFileShareToGCSOperator |
| 23 | +from airflow.utils.trigger_rule import TriggerRule |
22 | 24 |
|
23 | | -DEST_GCS_BUCKET = os.environ.get('GCP_GCS_BUCKET', 'gs://INVALID BUCKET NAME') |
| 25 | +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") |
| 26 | +PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") |
| 27 | +DAG_ID = 'azure_fileshare_to_gcs_example' |
| 28 | + |
| 29 | +BUCKET_NAME = f"bucket_{DAG_ID}_{ENV_ID}" |
24 | 30 | AZURE_SHARE_NAME = os.environ.get('AZURE_SHARE_NAME', 'test-azure-share') |
25 | 31 | AZURE_DIRECTORY_NAME = "test-azure-dir" |
26 | 32 |
|
27 | | - |
28 | 33 | with DAG( |
29 | | - dag_id='azure_fileshare_to_gcs_example', |
| 34 | + dag_id=DAG_ID, |
30 | 35 | default_args={ |
31 | 36 | 'owner': 'airflow', |
32 | 37 | 'depends_on_past': False, |
|
39 | 44 | schedule_interval='@once', |
40 | 45 | start_date=datetime(2021, 1, 1), |
41 | 46 | catchup=False, |
42 | | - tags=['example'], |
| 47 | + tags=['example', 'azure'], |
43 | 48 | ) as dag: |
| 49 | + create_bucket = GCSCreateBucketOperator( |
| 50 | + task_id="create_bucket", bucket_name=BUCKET_NAME, project_id=PROJECT_ID |
| 51 | + ) |
| 52 | + |
44 | 53 | # [START howto_operator_azure_fileshare_to_gcs_basic] |
45 | 54 | sync_azure_files_with_gcs = AzureFileShareToGCSOperator( |
46 | 55 | task_id='sync_azure_files_with_gcs', |
47 | 56 | share_name=AZURE_SHARE_NAME, |
48 | | - dest_gcs=DEST_GCS_BUCKET, |
| 57 | + dest_gcs=BUCKET_NAME, |
49 | 58 | directory_name=AZURE_DIRECTORY_NAME, |
50 | 59 | replace=False, |
51 | 60 | gzip=True, |
52 | 61 | google_impersonation_chain=None, |
53 | 62 | ) |
54 | 63 | # [END howto_operator_azure_fileshare_to_gcs_basic] |
| 64 | + |
| 65 | + delete_bucket = GCSDeleteBucketOperator( |
| 66 | + task_id="delete_bucket", bucket_name=BUCKET_NAME, trigger_rule=TriggerRule.ALL_DONE |
| 67 | + ) |
| 68 | + |
| 69 | + ( |
| 70 | + # TEST SETUP |
| 71 | + create_bucket |
| 72 | + # TEST BODY |
| 73 | + >> sync_azure_files_with_gcs |
| 74 | + # TEST TEARDOWN |
| 75 | + >> delete_bucket |
| 76 | + ) |
| 77 | + |
| 78 | + from tests.system.utils.watcher import watcher |
| 79 | + |
| 80 | + # This test needs watcher in order to properly mark success/failure |
| 81 | + # when "tearDown" task with trigger rule is part of the DAG |
| 82 | + list(dag.tasks) >> watcher() |
| 83 | + |
| 84 | + |
| 85 | +from tests.system.utils import get_test_run # noqa: E402 |
| 86 | + |
| 87 | +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) |
| 88 | +test_run = get_test_run(dag) |
0 commit comments