Skip to content

Commit 26ab3ca

Browse files
committed
Merge branch 'main' into cluster-dump-plugin
2 parents cb54986 + ed48736 commit 26ab3ca

21 files changed

Lines changed: 99 additions & 121 deletions

continuous_integration/recipes/distributed/meta.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ outputs:
4747
track_features: # [cython_enabled]
4848
- cythonized-scheduler # [cython_enabled]
4949
entry_points:
50-
- dask-scheduler = distributed.cli.dask_scheduler:go
51-
- dask-ssh = distributed.cli.dask_ssh:go
52-
- dask-worker = distributed.cli.dask_worker:go
50+
- dask-scheduler = distributed.cli.dask_scheduler:main
51+
- dask-ssh = distributed.cli.dask_ssh:main
52+
- dask-worker = distributed.cli.dask_worker:main
5353
requirements:
5454
build:
5555
- {{ compiler('c') }} # [cython_enabled]

distributed/cli/dask_scheduler.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from tornado.ioloop import IOLoop
1111

1212
from distributed import Scheduler
13-
from distributed.cli.utils import check_python_3, install_signal_handlers
13+
from distributed.cli.utils import install_signal_handlers
1414
from distributed.preloading import validate_preload_argv
1515
from distributed.proctitle import (
1616
enable_proctitle_on_children,
@@ -212,10 +212,5 @@ async def run():
212212
logger.info("End scheduler at %r", scheduler.address)
213213

214214

215-
def go():
216-
check_python_3()
217-
main()
218-
219-
220215
if __name__ == "__main__":
221-
go() # pragma: no cover
216+
main() # pragma: no cover

distributed/cli/dask_ssh.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import click
77

8-
from distributed.cli.utils import check_python_3
98
from distributed.deploy.old_ssh import SSHCluster
109

1110
logger = logging.getLogger("distributed.dask_ssh")
@@ -223,10 +222,5 @@ def main(
223222
print("[ dask-ssh ]: Remote processes have been terminated. Exiting.")
224223

225224

226-
def go():
227-
check_python_3()
228-
main()
229-
230-
231225
if __name__ == "__main__":
232-
go() # pragma: no cover
226+
main() # pragma: no cover

distributed/cli/dask_worker.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from dask.system import CPU_COUNT
1717

1818
from distributed import Nanny
19-
from distributed.cli.utils import check_python_3, install_signal_handlers
19+
from distributed.cli.utils import install_signal_handlers
2020
from distributed.comm import get_address_host_port
2121
from distributed.deploy.utils import nprocesses_nthreads
2222
from distributed.preloading import validate_preload_argv
@@ -486,10 +486,5 @@ async def run():
486486
logger.info("End worker")
487487

488488

489-
def go():
490-
check_python_3()
491-
main()
492-
493-
494489
if __name__ == "__main__":
495-
go() # pragma: no cover
490+
main() # pragma: no cover

distributed/cli/utils.py

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,5 @@
1-
import click
2-
from packaging.version import parse as parse_version
31
from tornado.ioloop import IOLoop
42

5-
CLICK_VERSION = parse_version(click.__version__)
6-
7-
py3_err_msg = """
8-
Warning: Your terminal does not set locales.
9-
10-
If you use unicode text inputs for command line options then this may cause
11-
undesired behavior. This is rare.
12-
13-
If you don't use unicode characters in command line options then you can safely
14-
ignore this message. This is the common case.
15-
16-
You can support unicode inputs by specifying encoding environment variables,
17-
though exact solutions may depend on your system:
18-
19-
$ export LC_ALL=C.UTF-8
20-
$ export LANG=C.UTF-8
21-
22-
For more information see: http://click.pocoo.org/5/python3/
23-
""".lstrip()
24-
25-
26-
def check_python_3():
27-
"""Ensures that the environment is good for unicode on Python 3."""
28-
# https://github.com/pallets/click/issues/448#issuecomment-246029304
29-
import click.core
30-
31-
# TODO: Remove use of internal click functions
32-
if CLICK_VERSION < parse_version("8.0.0"):
33-
click.core._verify_python3_env = lambda: None
34-
else:
35-
click.core._verify_python_env = lambda: None
36-
37-
try:
38-
from click import _unicodefun
39-
40-
if CLICK_VERSION < parse_version("8.0.0"):
41-
_unicodefun._verify_python3_env()
42-
else:
43-
_unicodefun._verify_python_env()
44-
except (TypeError, RuntimeError):
45-
import click
46-
47-
click.echo(py3_err_msg, err=True)
48-
493

504
def install_signal_handlers(loop=None, cleanup=None):
515
"""

distributed/client.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,10 @@
115115

116116
_current_client = ContextVar("_current_client", default=None)
117117

118-
DEFAULT_EXTENSIONS = [PubSubClientExtension]
118+
DEFAULT_EXTENSIONS = {
119+
"pubsub": PubSubClientExtension,
120+
}
121+
119122
# Placeholder used in the get_dataset function(s)
120123
NO_DEFAULT_PLACEHOLDER = "_no_default_"
121124

@@ -928,8 +931,9 @@ def __init__(
928931
server=self,
929932
)
930933

931-
for ext in extensions:
932-
ext(self)
934+
self.extensions = {
935+
name: extension(self) for name, extension in extensions.items()
936+
}
933937

934938
preload = dask.config.get("distributed.client.preload")
935939
preload_argv = dask.config.get("distributed.client.preload-argv")

distributed/event.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ def __init__(self, scheduler):
5858
}
5959
)
6060

61-
self.scheduler.extensions["events"] = self
62-
6361
async def event_wait(self, name=None, timeout=None):
6462
"""Wait until the event is set to true.
6563
Returns false, when this did not happen in the given time

distributed/lock.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ def __init__(self, scheduler):
3030
{"lock_acquire": self.acquire, "lock_release": self.release}
3131
)
3232

33-
self.scheduler.extensions["locks"] = self
34-
3533
async def acquire(self, name=None, id=None, timeout=None):
3634
with log_errors():
3735
if isinstance(name, list):

distributed/multi_lock.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ def __init__(self, scheduler):
4646
{"multi_lock_acquire": self.acquire, "multi_lock_release": self.release}
4747
)
4848

49-
self.scheduler.extensions["multi_locks"] = self
50-
5149
def _request_locks(self, locks: list[str], id: Hashable, num_locks: int) -> bool:
5250
"""Request locks
5351

distributed/publish.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ def __init__(self, scheduler):
2626
}
2727

2828
self.scheduler.handlers.update(handlers)
29-
self.scheduler.extensions["publish"] = self
3029

3130
def put(self, keys=None, data=None, name=None, override=False, client=None):
3231
with log_errors():

0 commit comments

Comments
 (0)