Skip to content

Commit a43e860

Browse files
committed
[python] Fix task condition set wrong deps
After #7505 merged. we could use condition task type but our dependent set in the wrong direction, all the condition operators should be upstream of the current task instead of downstream fix: #7649
1 parent 8b29213 commit a43e860

3 files changed

Lines changed: 44 additions & 38 deletions

File tree

dolphinscheduler-python/pydolphinscheduler/examples/task_conditions_example.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@
2222
condition have one upstream which we declare explicit with syntax `parent >> condition`, and three downstream
2323
automatically set dependence by condition task by passing parameter `condition`. The graph of this workflow
2424
like:
25-
--> condition_success_1
26-
/
27-
parent -> conditions -> --> condition_success_2
28-
\
29-
--> condition_fail
25+
pre_task_success_1 ->
26+
\
27+
pre_task_success_2 -> --> conditions -> end
28+
/
29+
pre_task_fail ->
3030
.
3131
"""
3232

@@ -35,21 +35,22 @@
3535
from pydolphinscheduler.tasks.shell import Shell
3636

3737
with ProcessDefinition(name="task_conditions_example", tenant="tenant_exists") as pd:
38-
parent = Shell(name="parent", command="echo parent")
39-
condition_success_1 = Shell(
40-
name="condition_success_1", command="echo condition_success_1"
38+
condition_pre_task_1 = Shell(
39+
name="pre_task_success_1", command="echo pre_task_success_1"
4140
)
42-
condition_success_2 = Shell(
43-
name="condition_success_2", command="echo condition_success_2"
41+
condition_pre_task_2 = Shell(
42+
name="pre_task_success_2", command="echo pre_task_success_2"
4443
)
45-
condition_fail = Shell(name="condition_fail", command="echo condition_fail")
44+
condition_pre_task_3 = Shell(name="pre_task_fail", command="echo pre_task_fail")
4645
cond_operator = And(
4746
And(
48-
SUCCESS(condition_success_1, condition_success_2),
49-
FAILURE(condition_fail),
47+
SUCCESS(condition_pre_task_1, condition_pre_task_2),
48+
FAILURE(condition_pre_task_3),
5049
),
5150
)
5251

52+
end = Shell(name="end", command="echo parent")
53+
5354
condition = Conditions(name="conditions", condition=cond_operator)
54-
parent >> condition
55+
condition >> end
5556
pd.submit()

dolphinscheduler-python/pydolphinscheduler/src/pydolphinscheduler/tasks/condition.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,13 @@ def __init__(self, name: str, condition: ConditionOperator, *args, **kwargs):
164164
self._set_dep()
165165

166166
def _set_dep(self) -> None:
167-
"""Set downstream according to parameter `condition`."""
168-
downstream = []
167+
"""Set upstream according to parameter `condition`."""
168+
upstream = []
169169
for cond in self.condition.args:
170170
if isinstance(cond, ConditionOperator):
171171
for status in cond.args:
172-
downstream.extend(list(status.tasks))
173-
self.set_downstream(downstream)
172+
upstream.extend(list(status.tasks))
173+
self.set_upstream(upstream)
174174

175175
@property
176176
def task_params(self, camel_attr: bool = True, custom_attr: set = None) -> Dict:

dolphinscheduler-python/pydolphinscheduler/tests/tasks/test_condition.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -396,44 +396,49 @@ def test_dependent_get_define(mock_condition_code_version, mock_task_code_versio
396396
def test_condition_set_dep_workflow(mock_task_code_version):
397397
"""Test task condition set dependence in workflow level."""
398398
with ProcessDefinition(name="test-condition-set-dep-workflow") as pd:
399-
parent = Task(name="parent", task_type=TEST_TYPE)
400-
condition_success_1 = Task(name="condition_success_1", task_type=TEST_TYPE)
401-
condition_success_2 = Task(name="condition_success_2", task_type=TEST_TYPE)
402-
condition_fail = Task(name="condition_fail", task_type=TEST_TYPE)
399+
condition_pre_task_1 = Task(name="pre_task_success_1", task_type=TEST_TYPE)
400+
condition_pre_task_2 = Task(name="pre_task_success_2", task_type=TEST_TYPE)
401+
condition_pre_task_3 = Task(name="pre_task_fail", task_type=TEST_TYPE)
403402
cond_operator = And(
404403
And(
405-
SUCCESS(condition_success_1, condition_success_2),
406-
FAILURE(condition_fail),
404+
SUCCESS(condition_pre_task_1, condition_pre_task_2),
405+
FAILURE(condition_pre_task_3),
407406
),
408407
)
408+
end = Task(name="end", task_type=TEST_TYPE)
409+
410+
condition = Conditions(name="conditions", condition=cond_operator)
411+
condition >> end
409412

410-
condition = Conditions(name=TEST_NAME, condition=cond_operator)
411-
parent >> condition
412413
# General tasks test
413414
assert len(pd.tasks) == 5
414415
assert sorted(pd.task_list, key=lambda t: t.name) == sorted(
415416
[
416-
parent,
417417
condition,
418-
condition_success_1,
419-
condition_success_2,
420-
condition_fail,
418+
condition_pre_task_1,
419+
condition_pre_task_2,
420+
condition_pre_task_3,
421+
end,
421422
],
422423
key=lambda t: t.name,
423424
)
424425
# Task dep test
425-
assert parent._downstream_task_codes == {condition.code}
426-
assert condition._upstream_task_codes == {parent.code}
426+
assert end._upstream_task_codes == {condition.code}
427+
assert condition._downstream_task_codes == {end.code}
427428

428429
# Condition task dep after ProcessDefinition function get_define called
429-
assert condition._downstream_task_codes == {
430-
condition_success_1.code,
431-
condition_success_2.code,
432-
condition_fail.code,
430+
assert condition._upstream_task_codes == {
431+
condition_pre_task_1.code,
432+
condition_pre_task_2.code,
433+
condition_pre_task_3.code,
433434
}
434435
assert all(
435436
[
436-
child._upstream_task_codes == {condition.code}
437-
for child in [condition_success_1, condition_success_2, condition_fail]
437+
child._downstream_task_codes == {condition.code}
438+
for child in [
439+
condition_pre_task_1,
440+
condition_pre_task_2,
441+
condition_pre_task_3,
442+
]
438443
]
439444
)

0 commit comments

Comments
 (0)