|
17 | 17 | # under the License. |
18 | 18 | from __future__ import annotations |
19 | 19 |
|
| 20 | +import json |
20 | 21 | import unittest |
21 | 22 | from unittest import mock |
22 | 23 | from unittest.mock import MagicMock, call |
|
51 | 52 | {"name": "id", "type": "INTEGER", "mode": "NULLABLE"}, |
52 | 53 | {"name": "name", "type": "STRING", "mode": "NULLABLE"}, |
53 | 54 | ] |
| 55 | +SCHEMA_BUCKET = "test-schema-bucket" |
| 56 | +SCHEMA_OBJECT = "test/schema/schema.json" |
54 | 57 | TEST_SOURCE_OBJECTS = ["test/objects/test.csv"] |
55 | 58 | TEST_SOURCE_OBJECTS_AS_STRING = "test/objects/test.csv" |
56 | 59 | LABELS = {"k1": "v1"} |
@@ -675,6 +678,117 @@ def test_source_objs_as_string_without_external_table_should_execute_successfull |
675 | 678 |
|
676 | 679 | hook.return_value.insert_job.assert_has_calls(calls) |
677 | 680 |
|
| 681 | + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") |
| 682 | + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") |
| 683 | + def test_schema_obj_external_table_should_execute_successfully(self, bq_hook, gcs_hook): |
| 684 | + bq_hook.return_value.insert_job.side_effect = [ |
| 685 | + MagicMock(job_id=pytest.real_job_id, error_result=False), |
| 686 | + pytest.real_job_id, |
| 687 | + ] |
| 688 | + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id |
| 689 | + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) |
| 690 | + gcs_hook.return_value.download.return_value = bytes(json.dumps(SCHEMA_FIELDS), "utf-8") |
| 691 | + operator = GCSToBigQueryOperator( |
| 692 | + task_id=TASK_ID, |
| 693 | + bucket=TEST_BUCKET, |
| 694 | + source_objects=TEST_SOURCE_OBJECTS, |
| 695 | + schema_object_bucket=SCHEMA_BUCKET, |
| 696 | + schema_object=SCHEMA_OBJECT, |
| 697 | + write_disposition=WRITE_DISPOSITION, |
| 698 | + destination_project_dataset_table=TEST_EXPLICIT_DEST, |
| 699 | + external_table=True, |
| 700 | + ) |
| 701 | + |
| 702 | + operator.execute(context=MagicMock()) |
| 703 | + |
| 704 | + bq_hook.return_value.create_empty_table.assert_called_once_with( |
| 705 | + table_resource={ |
| 706 | + "tableReference": {"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, |
| 707 | + "labels": None, |
| 708 | + "description": None, |
| 709 | + "externalDataConfiguration": { |
| 710 | + "source_uris": [f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], |
| 711 | + "source_format": "CSV", |
| 712 | + "maxBadRecords": 0, |
| 713 | + "autodetect": True, |
| 714 | + "compression": "NONE", |
| 715 | + "csvOptions": { |
| 716 | + "fieldDelimeter": ",", |
| 717 | + "skipLeadingRows": None, |
| 718 | + "quote": None, |
| 719 | + "allowQuotedNewlines": False, |
| 720 | + "allowJaggedRows": False, |
| 721 | + }, |
| 722 | + }, |
| 723 | + "location": None, |
| 724 | + "encryptionConfiguration": None, |
| 725 | + "schema": {"fields": SCHEMA_FIELDS}, |
| 726 | + } |
| 727 | + ) |
| 728 | + gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT) |
| 729 | + |
| 730 | + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSHook") |
| 731 | + @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") |
| 732 | + def test_schema_obj_without_external_table_should_execute_successfully(self, bq_hook, gcs_hook): |
| 733 | + bq_hook.return_value.insert_job.side_effect = [ |
| 734 | + MagicMock(job_id=pytest.real_job_id, error_result=False), |
| 735 | + pytest.real_job_id, |
| 736 | + ] |
| 737 | + bq_hook.return_value.generate_job_id.return_value = pytest.real_job_id |
| 738 | + bq_hook.return_value.split_tablename.return_value = (PROJECT_ID, DATASET, TABLE) |
| 739 | + gcs_hook.return_value.download.return_value = bytes(json.dumps(SCHEMA_FIELDS), "utf-8") |
| 740 | + |
| 741 | + operator = GCSToBigQueryOperator( |
| 742 | + task_id=TASK_ID, |
| 743 | + bucket=TEST_BUCKET, |
| 744 | + source_objects=TEST_SOURCE_OBJECTS, |
| 745 | + schema_object_bucket=SCHEMA_BUCKET, |
| 746 | + schema_object=SCHEMA_OBJECT, |
| 747 | + destination_project_dataset_table=TEST_EXPLICIT_DEST, |
| 748 | + write_disposition=WRITE_DISPOSITION, |
| 749 | + external_table=False, |
| 750 | + ) |
| 751 | + |
| 752 | + operator.execute(context=MagicMock()) |
| 753 | + |
| 754 | + calls = [ |
| 755 | + call( |
| 756 | + configuration={ |
| 757 | + "load": dict( |
| 758 | + autodetect=True, |
| 759 | + createDisposition="CREATE_IF_NEEDED", |
| 760 | + destinationTable={"projectId": PROJECT_ID, "datasetId": DATASET, "tableId": TABLE}, |
| 761 | + destinationTableProperties={ |
| 762 | + "description": None, |
| 763 | + "labels": None, |
| 764 | + }, |
| 765 | + sourceFormat="CSV", |
| 766 | + skipLeadingRows=None, |
| 767 | + sourceUris=[f"gs://{TEST_BUCKET}/{TEST_SOURCE_OBJECTS_AS_STRING}"], |
| 768 | + writeDisposition=WRITE_DISPOSITION, |
| 769 | + ignoreUnknownValues=False, |
| 770 | + allowQuotedNewlines=False, |
| 771 | + encoding="UTF-8", |
| 772 | + schema={"fields": SCHEMA_FIELDS}, |
| 773 | + allowJaggedRows=False, |
| 774 | + fieldDelimiter=",", |
| 775 | + maxBadRecords=0, |
| 776 | + quote=None, |
| 777 | + schemaUpdateOptions=(), |
| 778 | + ), |
| 779 | + }, |
| 780 | + project_id=bq_hook.return_value.project_id, |
| 781 | + location=None, |
| 782 | + job_id=pytest.real_job_id, |
| 783 | + timeout=None, |
| 784 | + retry=DEFAULT_RETRY, |
| 785 | + nowait=True, |
| 786 | + ), |
| 787 | + ] |
| 788 | + |
| 789 | + bq_hook.return_value.insert_job.assert_has_calls(calls) |
| 790 | + gcs_hook.return_value.download.assert_called_once_with(SCHEMA_BUCKET, SCHEMA_OBJECT) |
| 791 | + |
678 | 792 | @mock.patch("airflow.providers.google.cloud.transfers.gcs_to_bigquery.BigQueryHook") |
679 | 793 | def test_all_fields_should_be_present(self, hook): |
680 | 794 | hook.return_value.insert_job.side_effect = [ |
|
0 commit comments