Skip to content

Commit 44e54dc

Browse files
committed
Code review
1 parent 3bacc5f commit 44e54dc

2 files changed

Lines changed: 32 additions & 50 deletions

File tree

distributed/tests/test_worker_state_machine.py

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,33 +1033,34 @@ def test_gather_priority(ws):
10331033
]
10341034

10351035

1036+
@gen_cluster()
1037+
async def test_clean_log(s, a, b):
1038+
"""Test that brand new workers start with a clean log"""
1039+
assert not a.state.log
1040+
assert not a.state.stimulus_log
1041+
1042+
10361043
def test_running_task_in_all_running_tasks(ws_with_running_task):
10371044
ws = ws_with_running_task
1045+
ws2 = "127.0.0.1:2"
10381046
ts = ws.tasks["x"]
10391047
assert ts in ws.all_running_tasks
10401048

1041-
ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="cancel"))
1049+
ws.handle_stimulus(FreeKeysEvent(keys=["x"], stimulus_id="s1"))
10421050
assert ts.state == "cancelled"
10431051
assert ts in ws.all_running_tasks
10441052

10451053
ws.handle_stimulus(
1046-
ComputeTaskEvent.dummy(
1047-
key="y",
1048-
who_has={"x": ["127.0.0.1:1235"]},
1049-
stimulus_id="compute-y",
1050-
),
1054+
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2")
10511055
)
10521056
assert ts.state == "resumed"
10531057
assert ts in ws.all_running_tasks
10541058

10551059

1056-
@pytest.mark.xfail(reason="distributed#6565")
1060+
@pytest.mark.xfail(reason="distributed#6565, distributed#6692")
10571061
@pytest.mark.parametrize(
10581062
"done_ev_cls,done_status",
1059-
[
1060-
(ExecuteSuccessEvent, "memory"),
1061-
(ExecuteFailureEvent, "error"),
1062-
],
1063+
[(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "error")],
10631064
)
10641065
def test_done_task_not_in_all_running_tasks(
10651066
ws_with_running_task, done_ev_cls, done_status
@@ -1068,48 +1069,27 @@ def test_done_task_not_in_all_running_tasks(
10681069
ts = ws.tasks["x"]
10691070
assert ts in ws.all_running_tasks
10701071

1071-
ws.handle_stimulus(
1072-
done_ev_cls.dummy(
1073-
key="x",
1074-
stimulus_id="success",
1075-
)
1076-
)
1072+
ws.handle_stimulus(done_ev_cls.dummy("x", stimulus_id="s1"))
10771073
assert ts.state == done_status
10781074
assert ts not in ws.all_running_tasks
10791075

10801076

1081-
# @pytest.mark.xfail(reason="distributed#6565")
1077+
@pytest.mark.xfail(reason="distributed#6565, distributed#6689, distributed#6692")
10821078
@pytest.mark.parametrize(
10831079
"done_ev_cls,done_status",
1084-
[
1085-
(ExecuteSuccessEvent, "memory"),
1086-
(ExecuteFailureEvent, "error"),
1087-
],
1080+
[(ExecuteSuccessEvent, "memory"), (ExecuteFailureEvent, "error")],
10881081
)
1089-
def test_done_resumed_running_task_not_in_all_running_tasks(
1082+
def test_done_resumed_task_not_in_all_running_tasks(
10901083
ws_with_running_task, done_ev_cls, done_status
10911084
):
10921085
ws = ws_with_running_task
1086+
ws2 = "127.0.0.1:2"
10931087

10941088
ws.handle_stimulus(
1095-
FreeKeysEvent(keys=["x"], stimulus_id="cancel"),
1096-
ComputeTaskEvent.dummy(
1097-
key="y",
1098-
who_has={"x": ["127.0.0.1:1235"]},
1099-
stimulus_id="compute-y",
1100-
),
1101-
done_ev_cls(
1102-
key="x",
1103-
stimulus_id="success",
1104-
),
1089+
FreeKeysEvent(keys=["x"], stimulus_id="s1"),
1090+
ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s2"),
1091+
done_ev_cls.dummy("x", stimulus_id="s3"),
11051092
)
11061093
ts = ws.tasks["x"]
11071094
assert ts.state == done_status
11081095
assert ts not in ws.all_running_tasks
1109-
1110-
1111-
@gen_cluster()
1112-
async def test_clean_log(s, a, b):
1113-
"""Test that brand new workers start with a clean log"""
1114-
assert not a.state.log
1115-
assert not a.state.stimulus_log

distributed/worker_state_machine.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,6 +1203,7 @@ def handle_stimulus(self, *stims: StateMachineEvent) -> Instructions:
12031203
@property
12041204
def executing_count(self) -> int:
12051205
"""Count of tasks currently executing on this worker.
1206+
Does not include long running (a.k.a. seceded) and cancelled tasks.
12061207
12071208
See also
12081209
--------
@@ -1212,6 +1213,17 @@ def executing_count(self) -> int:
12121213
"""
12131214
return len(self.executing)
12141215

1216+
@property
1217+
def all_running_tasks(self) -> set[TaskState]:
1218+
"""All tasks that are currently occupying a thread.
1219+
These are:
1220+
1221+
- ``ts.status in ("executing", "long-running", "cancelled")``
1222+
- ``ts.status == "resumed" and ts._previous in ("executing", "long-running")``
1223+
"""
1224+
# Note: cancelled and resumed tasks are still in either of these sets
1225+
return self.executing | {self.tasks[key] for key in self.long_running}
1226+
12151227
@property
12161228
def in_flight_tasks_count(self) -> int:
12171229
"""Count of tasks currently being replicated from other workers to this one.
@@ -3160,16 +3172,6 @@ def validate_state(self) -> None:
31603172
for tss in self.data_needed.values():
31613173
assert len({ts.key for ts in tss}) == len(tss)
31623174

3163-
@property
3164-
def all_running_tasks(self) -> set[TaskState]:
3165-
"""All tasks that are currently running.
3166-
These are:
3167-
- ``ts.status`` == ``executing``, ``long-running``, or ``cancelled``
3168-
- ``ts.status` == ``resumed`` and ``ts._previous`` == ``executing`` or ``long-running``
3169-
"""
3170-
# Note: tasks in "cancelled" and "resumed" state are still in either of these sets
3171-
return self.executing | {self.tasks[key] for key in self.long_running}
3172-
31733175

31743176
class BaseWorker(abc.ABC):
31753177
"""Wrapper around the :class:`WorkerState` that implements instructions handling.

0 commit comments

Comments
 (0)