@@ -1033,33 +1033,34 @@ def test_gather_priority(ws):
10331033 ]
10341034
10351035
1036+ @gen_cluster ()
1037+ async def test_clean_log (s , a , b ):
1038+ """Test that brand new workers start with a clean log"""
1039+ assert not a .state .log
1040+ assert not a .state .stimulus_log
1041+
1042+
10361043def test_running_task_in_all_running_tasks (ws_with_running_task ):
10371044 ws = ws_with_running_task
1045+ ws2 = "127.0.0.1:2"
10381046 ts = ws .tasks ["x" ]
10391047 assert ts in ws .all_running_tasks
10401048
1041- ws .handle_stimulus (FreeKeysEvent (keys = ["x" ], stimulus_id = "cancel " ))
1049+ ws .handle_stimulus (FreeKeysEvent (keys = ["x" ], stimulus_id = "s1 " ))
10421050 assert ts .state == "cancelled"
10431051 assert ts in ws .all_running_tasks
10441052
10451053 ws .handle_stimulus (
1046- ComputeTaskEvent .dummy (
1047- key = "y" ,
1048- who_has = {"x" : ["127.0.0.1:1235" ]},
1049- stimulus_id = "compute-y" ,
1050- ),
1054+ ComputeTaskEvent .dummy ("y" , who_has = {"x" : [ws2 ]}, stimulus_id = "s2" )
10511055 )
10521056 assert ts .state == "resumed"
10531057 assert ts in ws .all_running_tasks
10541058
10551059
1056- @pytest .mark .xfail (reason = "distributed#6565" )
1060+ @pytest .mark .xfail (reason = "distributed#6565, distributed#6692 " )
10571061@pytest .mark .parametrize (
10581062 "done_ev_cls,done_status" ,
1059- [
1060- (ExecuteSuccessEvent , "memory" ),
1061- (ExecuteFailureEvent , "error" ),
1062- ],
1063+ [(ExecuteSuccessEvent , "memory" ), (ExecuteFailureEvent , "error" )],
10631064)
10641065def test_done_task_not_in_all_running_tasks (
10651066 ws_with_running_task , done_ev_cls , done_status
@@ -1068,48 +1069,27 @@ def test_done_task_not_in_all_running_tasks(
10681069 ts = ws .tasks ["x" ]
10691070 assert ts in ws .all_running_tasks
10701071
1071- ws .handle_stimulus (
1072- done_ev_cls .dummy (
1073- key = "x" ,
1074- stimulus_id = "success" ,
1075- )
1076- )
1072+ ws .handle_stimulus (done_ev_cls .dummy ("x" , stimulus_id = "s1" ))
10771073 assert ts .state == done_status
10781074 assert ts not in ws .all_running_tasks
10791075
10801076
1081- # @pytest.mark.xfail(reason="distributed#6565")
1077+ @pytest .mark .xfail (reason = "distributed#6565, distributed#6689, distributed#6692 " )
10821078@pytest .mark .parametrize (
10831079 "done_ev_cls,done_status" ,
1084- [
1085- (ExecuteSuccessEvent , "memory" ),
1086- (ExecuteFailureEvent , "error" ),
1087- ],
1080+ [(ExecuteSuccessEvent , "memory" ), (ExecuteFailureEvent , "error" )],
10881081)
1089- def test_done_resumed_running_task_not_in_all_running_tasks (
1082+ def test_done_resumed_task_not_in_all_running_tasks (
10901083 ws_with_running_task , done_ev_cls , done_status
10911084):
10921085 ws = ws_with_running_task
1086+ ws2 = "127.0.0.1:2"
10931087
10941088 ws .handle_stimulus (
1095- FreeKeysEvent (keys = ["x" ], stimulus_id = "cancel" ),
1096- ComputeTaskEvent .dummy (
1097- key = "y" ,
1098- who_has = {"x" : ["127.0.0.1:1235" ]},
1099- stimulus_id = "compute-y" ,
1100- ),
1101- done_ev_cls (
1102- key = "x" ,
1103- stimulus_id = "success" ,
1104- ),
1089+ FreeKeysEvent (keys = ["x" ], stimulus_id = "s1" ),
1090+ ComputeTaskEvent .dummy ("y" , who_has = {"x" : [ws2 ]}, stimulus_id = "s2" ),
1091+ done_ev_cls .dummy ("x" , stimulus_id = "s3" ),
11051092 )
11061093 ts = ws .tasks ["x" ]
11071094 assert ts .state == done_status
11081095 assert ts not in ws .all_running_tasks
1109-
1110-
1111- @gen_cluster ()
1112- async def test_clean_log (s , a , b ):
1113- """Test that brand new workers start with a clean log"""
1114- assert not a .state .log
1115- assert not a .state .stimulus_log
0 commit comments