Skip to content

Task should be executing upon rejoining. #5882

@Dref360

Description

@Dref360

What happened:

When a task secedes from the threadpool, it takes the state long-running when rejoining, the state stays long-running.

What you expected to happen:

I would expect the task to be set as executing now that we are back in the threadpool.

Minimal Complete Verifiable Example:

import time
from pprint import pprint
from typing import Dict

from distributed import Worker, get_client, secede, rejoin, LocalCluster, Client


def get_status_on_worker(dask_worker: Worker) -> Dict[str, str]:
    """Get the status for all tasks on the worker.

    Args:
        dask_worker: The current worker instance

    Returns:
        Status per task

    """
    return {k: v.state for k, v in dask_worker.tasks.items()}


def subfunction_a(a: int):
    # A very complicated subfunction
    time.sleep(3)
    return a * .5


def function_a(a: int):
    # Some work that call a subfunction and secede to do so.
    time.sleep(1)
    secede()
    print("Secede")
    client = get_client()
    future = client.submit(subfunction_a, a=a)
    res = future.result()
    print("Rejoin")
    rejoin()
    time.sleep(1)
    print("Returning")
    return res + a


def main():
    cluster = LocalCluster(n_workers=2)
    client = Client(cluster)
    future_a = client.submit(function_a, a=3)
    while not future_a.done():
        status = client.run(get_status_on_worker)
        pprint(status.values())
        time.sleep(0.5)
    status = client.run(get_status_on_worker)
    pprint(status.values())


if __name__ == '__main__':
    main()

Anything else we need to know?:

The output of the above script is:

dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'executing'}, {}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'executing'}, {}])
Secede
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'executing'}])
Rejoin
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'memory'}])
dict_values([{'function_a-def6e2e640c732893a96e209971eca45': 'long-running'}, {'subfunction_a-1d2f61ee529bf2a9b1794500a1ef9033': 'memory'}])
Returning
[{'function_a-def6e2e640c732893a96e209971eca45': 'memory'}, {}]

I would expect that after the "Rejoin" log, the state of 'function_a-def6e2e640c732893a96e209971eca45' be executing again.

Environment:

  • Dask version: "2021.10.0"
  • Python version: 3.8
  • Operating System: Mac
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementImprove existing functionality or make things work betterfeatureSomething is missing

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions