Skip to content

Commit 8fd5ac6

Browse files
champon1020eladkal
andauthored
Fix the logic of checking dataflow job state (#34785)
Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com>
1 parent 3cb0870 commit 8fd5ac6

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

airflow/providers/google/cloud/hooks/dataflow.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,9 @@ def _check_dataflow_job_state(self, job) -> bool:
419419
"JOB_STATE_DRAINED while it is a batch job"
420420
)
421421

422-
if not self._wait_until_finished and current_state == self._expected_terminal_state:
422+
if current_state == self._expected_terminal_state:
423+
if self._expected_terminal_state == DataflowJobStatus.JOB_STATE_RUNNING:
424+
return not self._wait_until_finished
423425
return True
424426

425427
if current_state in DataflowJobStatus.AWAITING_STATES:

tests/providers/google/cloud/hooks/test_dataflow.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,6 +1417,10 @@ def test_check_dataflow_job_state_wait_until_finished(
14171417
@pytest.mark.parametrize(
14181418
"job_state, wait_until_finished, expected_result",
14191419
[
1420+
# DONE
1421+
(DataflowJobStatus.JOB_STATE_DONE, None, True),
1422+
(DataflowJobStatus.JOB_STATE_DONE, True, True),
1423+
(DataflowJobStatus.JOB_STATE_DONE, False, True),
14201424
# RUNNING
14211425
(DataflowJobStatus.JOB_STATE_RUNNING, None, False),
14221426
(DataflowJobStatus.JOB_STATE_RUNNING, True, False),

0 commit comments

Comments
 (0)