-
-
Notifications
You must be signed in to change notification settings - Fork 757
Open
Labels
enhancementImprove existing functionality or make things work betterImprove existing functionality or make things work betterfeatureSomething is missingSomething is missing
Description
What happened:
When a task secedes from the threadpool, it takes the state long-running when rejoining, the state stays long-running.
What you expected to happen:
I would expect the task to be set as executing now that we are back in the threadpool.
Minimal Complete Verifiable Example:
import time
from pprint import pprint
from typing import Dict
from distributed import Worker, get_client, secede, rejoin, LocalCluster, Client
def get_status_on_worker(dask_worker: Worker) -> Dict[str, str]:
"""Get the status for all tasks on the worker.
Args:
dask_worker: The current worker instance
Returns:
Status per task
"""
return {k: v.state for k, v in dask_worker.tasks.items()}
def subfunction_a(a: int):
# A very complicated subfunction
time.sleep(3)
return a * .5
def function_a(a: int):
# Some work that call a subfunction and secede to do so.
time.sleep(1)
secede()
print("Secede")
client = get_client()
future = client.submit(subfunction_a, a=a)
res = future.result()
print("Rejoin")
rejoin()
time.sleep(1)
print("Returning")
return res + a
def main():
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
future_a = client.submit(function_a, a=3)
while not future_a.done():
status = client.run(get_status_on_worker)
pprint(status.values())
time.sleep(0.5)
status = client.run(get_status_on_worker)
pprint(status.values())
if __name__ == '__main__':
main()Anything else we need to know?:
The output of the above script is:
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'executing'}, {}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'executing'}, {}])
Secede
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
Rejoin
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'memory'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'memory'}])
Returning
[{'function_a-def6e2e640c732893a96e209971eca45': 'memory'}, {}]I would expect that after the "Rejoin" log, the state of 'function_a-def6e2e640c732893a96e209971eca45' be executing again.
Environment:
- Dask version: "2021.10.0"
- Python version: 3.8
- Operating System: Mac
- Install method (conda, pip, source): pip
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementImprove existing functionality or make things work betterImprove existing functionality or make things work betterfeatureSomething is missingSomething is missing