-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Currently dask.compute infers whether or not a dask.distributed.Client exists by reading the config value scheduler. If this value is set, it tries to access the default_client and uses this to submit the futures.
See
Lines 1394 to 1395 in f463fcf
| if config.get("scheduler", None): | |
| return get_scheduler(scheduler=config.get("scheduler", None)) |
Lines 1372 to 1375 in f463fcf
| elif scheduler in ("dask.distributed", "distributed"): | |
| from distributed.worker import get_client | |
| return get_client().get |
This config value is set in the distributed client if the initialized client is supposed to be considered a "default client" or global client, see https://github.com/dask/distributed/blob/3619923a1aec2fa51bf9dcd099560742b61121cc/distributed/client.py#L950-L953
This mechanism is quite brittle and requires relatively complex code in dask.distributed to reset this value properly when this client closes, specifically in a context where there are multiple clients (regardless of default or not).
Apart from code complexity, this imposes thread safety problems that are extremely challenging to address. An attempt has been made in dask/distributed#5901 to make this setting/resetting thread safe but it appears that the proposed fix is not even sufficient.
I propose to change the mechanism to infer whether or not a distributed client should be used to not use the dask config system but instead use a proper thread safe API
Assuming that an existing global client always take precedence if no explicit scheduler is provided, we could simply do something like
def get_scheduler(get=None, scheduler=None, collections=None, cls=None):
if scheduler is not None:
try:
from distributed.worker import get_client
return get_client()
except ValueError:
pass
...I would suggest to use a similar logic to deal with the shuffle=tasks setting