@@ -619,13 +619,16 @@ def test_execute_if_cluster_exists_do_not_use(self, mock_hook):
619619 with pytest .raises (AlreadyExists ):
620620 op .execute (context = self .mock_context )
621621
622+ @mock .patch (DATAPROC_PATH .format ("DataprocCreateClusterOperator._wait_for_cluster_in_deleting_state" ))
622623 @mock .patch (DATAPROC_PATH .format ("DataprocHook" ))
623- def test_execute_if_cluster_exists_in_error_state (self , mock_hook ):
624+ def test_execute_if_cluster_exists_in_error_state (self , mock_hook , mock_wait_for_deleting ):
624625 mock_hook .return_value .create_cluster .side_effect = [AlreadyExists ("test" )]
625626 cluster_status = mock_hook .return_value .get_cluster .return_value .status
626627 cluster_status .state = 0
627628 cluster_status .State .ERROR = 0
628629
630+ mock_wait_for_deleting .return_value .get_cluster .side_effect = [NotFound ]
631+
629632 op = DataprocCreateClusterOperator (
630633 task_id = TASK_ID ,
631634 region = GCP_REGION ,
@@ -650,24 +653,30 @@ def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
650653 region = GCP_REGION , project_id = GCP_PROJECT , cluster_name = CLUSTER_NAME
651654 )
652655
656+ @mock .patch (DATAPROC_PATH .format ("Cluster.to_dict" ))
653657 @mock .patch (DATAPROC_PATH .format ("exponential_sleep_generator" ))
654658 @mock .patch (DATAPROC_PATH .format ("DataprocCreateClusterOperator._create_cluster" ))
655659 @mock .patch (DATAPROC_PATH .format ("DataprocCreateClusterOperator._get_cluster" ))
656660 @mock .patch (DATAPROC_PATH .format ("DataprocHook" ))
657661 def test_execute_if_cluster_exists_in_deleting_state (
658- self , mock_hook , mock_get_cluster , mock_create_cluster , mock_generator
662+ self ,
663+ mock_hook ,
664+ mock_get_cluster ,
665+ mock_create_cluster ,
666+ mock_generator ,
667+ to_dict_mock ,
659668 ):
660- cluster = mock .MagicMock ()
661- cluster .status .state = 0
662- cluster .status .State .DELETING = 0
669+ cluster_deleting = mock .MagicMock ()
670+ cluster_deleting .status .state = 0
671+ cluster_deleting .status .State .DELETING = 0
663672
664- cluster2 = mock .MagicMock ()
665- cluster2 .status .state = 0
666- cluster2 .status .State .ERROR = 0
673+ cluster_running = mock .MagicMock ()
674+ cluster_running .status .state = 0
675+ cluster_running .status .State .RUNNING = 0
667676
668- mock_create_cluster .side_effect = [AlreadyExists ("test" ), cluster2 ]
677+ mock_create_cluster .side_effect = [AlreadyExists ("test" ), cluster_running ]
669678 mock_generator .return_value = [0 ]
670- mock_get_cluster .side_effect = [cluster , NotFound ("test" )]
679+ mock_get_cluster .side_effect = [cluster_deleting , NotFound ("test" )]
671680
672681 op = DataprocCreateClusterOperator (
673682 task_id = TASK_ID ,
@@ -679,15 +688,13 @@ def test_execute_if_cluster_exists_in_deleting_state(
679688 delete_on_error = True ,
680689 gcp_conn_id = GCP_CONN_ID ,
681690 )
682- with pytest .raises (AirflowException ):
683- op .execute (context = self .mock_context )
684691
692+ op .execute (context = self .mock_context )
685693 calls = [mock .call (mock_hook .return_value ), mock .call (mock_hook .return_value )]
686694 mock_get_cluster .assert_has_calls (calls )
687695 mock_create_cluster .assert_has_calls (calls )
688- mock_hook .return_value .diagnose_cluster .assert_called_once_with (
689- region = GCP_REGION , project_id = GCP_PROJECT , cluster_name = CLUSTER_NAME
690- )
696+
697+ to_dict_mock .assert_called_once_with (cluster_running )
691698
692699 @mock .patch (DATAPROC_PATH .format ("DataprocHook" ))
693700 @mock .patch (DATAPROC_TRIGGERS_PATH .format ("DataprocAsyncHook" ))
0 commit comments