Skip to content

Commit d361761

Browse files
Early delete a Dataproc cluster if started in the ERROR state. (#33668)
* Early delete a Dataproc cluster if started in the ERROR state. Update airflow/providers/google/cloud/operators/dataproc.py Co-authored-by: Alex Cazacu <2285684+acazacu@users.noreply.github.com> * fixing up logging of exceptions --------- Co-authored-by: Alex Cazacu <2285684+acazacu@users.noreply.github.com>
1 parent 075afe5 commit d361761

File tree

2 files changed

+43
-16
lines changed

2 files changed

+43
-16
lines changed

airflow/providers/google/cloud/operators/dataproc.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -595,13 +595,17 @@ def _handle_error_state(self, hook: DataprocHook, cluster: Cluster) -> None:
595595
if cluster.status.state != cluster.status.State.ERROR:
596596
return
597597
self.log.info("Cluster is in ERROR state")
598+
self.log.info("Gathering diagnostic information.")
598599
gcs_uri = hook.diagnose_cluster(
599600
region=self.region, cluster_name=self.cluster_name, project_id=self.project_id
600601
)
601602
self.log.info("Diagnostic information for cluster %s available at: %s", self.cluster_name, gcs_uri)
602603
if self.delete_on_error:
603604
self._delete_cluster(hook)
604-
raise AirflowException("Cluster was created but was in ERROR state.")
605+
# The delete op is asynchronous and can cause further failure if the cluster finishes
606+
# deleting between catching AlreadyExists and checking state
607+
self._wait_for_cluster_in_deleting_state(hook)
608+
raise AirflowException("Cluster was created in an ERROR state then deleted.")
605609
raise AirflowException("Cluster was created but is in ERROR state")
606610

607611
def _wait_for_cluster_in_deleting_state(self, hook: DataprocHook) -> None:
@@ -668,6 +672,22 @@ def execute(self, context: Context) -> dict:
668672
raise
669673
self.log.info("Cluster already exists.")
670674
cluster = self._get_cluster(hook)
675+
except AirflowException as ae:
676+
# There still could be a cluster created here in an ERROR state which
677+
# should be deleted immediately rather than consuming another retry attempt
678+
# (assuming delete_on_error is true (default))
679+
# This reduces overall the number of task attempts from 3 to 2 to successful cluster creation
680+
# assuming the underlying GCE issues have resolved within that window. Users can configure
681+
# a higher number of retry attempts in powers of two with 30s-60s wait interval
682+
try:
683+
cluster = self._get_cluster(hook)
684+
self._handle_error_state(hook, cluster)
685+
except AirflowException as ae_inner:
686+
# We could get any number of failures here, including cluster not found and we
687+
# can just ignore to ensure we surface the original cluster create failure
688+
self.log.error(ae_inner, exc_info=True)
689+
finally:
690+
raise ae
671691

672692
# Check if cluster is not in ERROR state
673693
self._handle_error_state(hook, cluster)

tests/providers/google/cloud/operators/test_dataproc.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -619,13 +619,16 @@ def test_execute_if_cluster_exists_do_not_use(self, mock_hook):
619619
with pytest.raises(AlreadyExists):
620620
op.execute(context=self.mock_context)
621621

622+
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._wait_for_cluster_in_deleting_state"))
622623
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
623-
def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
624+
def test_execute_if_cluster_exists_in_error_state(self, mock_hook, mock_wait_for_deleting):
624625
mock_hook.return_value.create_cluster.side_effect = [AlreadyExists("test")]
625626
cluster_status = mock_hook.return_value.get_cluster.return_value.status
626627
cluster_status.state = 0
627628
cluster_status.State.ERROR = 0
628629

630+
mock_wait_for_deleting.return_value.get_cluster.side_effect = [NotFound]
631+
629632
op = DataprocCreateClusterOperator(
630633
task_id=TASK_ID,
631634
region=GCP_REGION,
@@ -650,24 +653,30 @@ def test_execute_if_cluster_exists_in_error_state(self, mock_hook):
650653
region=GCP_REGION, project_id=GCP_PROJECT, cluster_name=CLUSTER_NAME
651654
)
652655

656+
@mock.patch(DATAPROC_PATH.format("Cluster.to_dict"))
653657
@mock.patch(DATAPROC_PATH.format("exponential_sleep_generator"))
654658
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._create_cluster"))
655659
@mock.patch(DATAPROC_PATH.format("DataprocCreateClusterOperator._get_cluster"))
656660
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
657661
def test_execute_if_cluster_exists_in_deleting_state(
658-
self, mock_hook, mock_get_cluster, mock_create_cluster, mock_generator
662+
self,
663+
mock_hook,
664+
mock_get_cluster,
665+
mock_create_cluster,
666+
mock_generator,
667+
to_dict_mock,
659668
):
660-
cluster = mock.MagicMock()
661-
cluster.status.state = 0
662-
cluster.status.State.DELETING = 0
669+
cluster_deleting = mock.MagicMock()
670+
cluster_deleting.status.state = 0
671+
cluster_deleting.status.State.DELETING = 0
663672

664-
cluster2 = mock.MagicMock()
665-
cluster2.status.state = 0
666-
cluster2.status.State.ERROR = 0
673+
cluster_running = mock.MagicMock()
674+
cluster_running.status.state = 0
675+
cluster_running.status.State.RUNNING = 0
667676

668-
mock_create_cluster.side_effect = [AlreadyExists("test"), cluster2]
677+
mock_create_cluster.side_effect = [AlreadyExists("test"), cluster_running]
669678
mock_generator.return_value = [0]
670-
mock_get_cluster.side_effect = [cluster, NotFound("test")]
679+
mock_get_cluster.side_effect = [cluster_deleting, NotFound("test")]
671680

672681
op = DataprocCreateClusterOperator(
673682
task_id=TASK_ID,
@@ -679,15 +688,13 @@ def test_execute_if_cluster_exists_in_deleting_state(
679688
delete_on_error=True,
680689
gcp_conn_id=GCP_CONN_ID,
681690
)
682-
with pytest.raises(AirflowException):
683-
op.execute(context=self.mock_context)
684691

692+
op.execute(context=self.mock_context)
685693
calls = [mock.call(mock_hook.return_value), mock.call(mock_hook.return_value)]
686694
mock_get_cluster.assert_has_calls(calls)
687695
mock_create_cluster.assert_has_calls(calls)
688-
mock_hook.return_value.diagnose_cluster.assert_called_once_with(
689-
region=GCP_REGION, project_id=GCP_PROJECT, cluster_name=CLUSTER_NAME
690-
)
696+
697+
to_dict_mock.assert_called_once_with(cluster_running)
691698

692699
@mock.patch(DATAPROC_PATH.format("DataprocHook"))
693700
@mock.patch(DATAPROC_TRIGGERS_PATH.format("DataprocAsyncHook"))

0 commit comments

Comments
 (0)