Skip to content

Commit 22eb33a

Browse files
fjetterhendrikmakaitcrusaderky
authored
Remove stringification (#8083)
Co-authored-by: Hendrik Makait <hendrik@coiled.io> Co-authored-by: crusaderky <crusaderky@gmail.com>
1 parent 03ea2e1 commit 22eb33a

27 files changed

Lines changed: 165 additions & 157 deletions

continuous_integration/environment-mindeps.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ dependencies:
1818
- toolz=0.10.0
1919
- tornado=6.0.4
2020
- urllib3=1.24.3
21-
- zict=2.2.0
21+
- zict=3.0.0
2222
# Distributed depends on the latest version of Dask
2323
- pip
2424
- pip:

continuous_integration/recipes/distributed/meta.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ requirements:
4646
- toolz >=0.10.0
4747
- tornado >=6.0.4
4848
- urllib3 >=1.24.3
49-
- zict >=2.2.0
49+
- zict >=3.0.0
5050
run_constrained:
5151
- openssl !=1.1.1e
5252

distributed/client.py

Lines changed: 32 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
from typing import Any, ClassVar, Literal, NamedTuple, TypedDict
2929

3030
from packaging.version import parse as parse_version
31-
from tlz import first, groupby, keymap, merge, partition_all, valmap
31+
from tlz import first, groupby, merge, partition_all, valmap
3232

3333
import dask
3434
from dask.base import collections_to_dsk, normalize_token, tokenize
@@ -41,14 +41,13 @@
4141
format_bytes,
4242
funcname,
4343
parse_timedelta,
44-
stringify,
4544
typename,
4645
)
4746
from dask.widgets import get_template
4847

4948
from distributed.core import ErrorMessage
5049
from distributed.protocol.serialize import _is_dumpable
51-
from distributed.utils import Deadline, wait_for
50+
from distributed.utils import Deadline, validate_key, wait_for
5251

5352
try:
5453
from dask.delayed import single_key
@@ -211,7 +210,6 @@ class Future(WrappedKey):
211210
def __init__(self, key, client=None, inform=True, state=None):
212211
self.key = key
213212
self._cleared = False
214-
self._tkey = stringify(key)
215213
self._client = client
216214
self._input_state = state
217215
self._inform = inform
@@ -231,19 +229,19 @@ def _bind_late(self):
231229
client = None
232230
self._client = client
233231
if self._client and not self._state:
234-
self._client._inc_ref(self._tkey)
232+
self._client._inc_ref(self.key)
235233
self._generation = self._client.generation
236234

237-
if self._tkey in self._client.futures:
238-
self._state = self._client.futures[self._tkey]
235+
if self.key in self._client.futures:
236+
self._state = self._client.futures[self.key]
239237
else:
240-
self._state = self._client.futures[self._tkey] = FutureState()
238+
self._state = self._client.futures[self.key] = FutureState()
241239

242240
if self._inform:
243241
self._client._send_to_scheduler(
244242
{
245243
"op": "client-desires-keys",
246-
"keys": [self._tkey],
244+
"keys": [self.key],
247245
"client": self._client.id,
248246
}
249247
)
@@ -503,7 +501,7 @@ def release(self):
503501
if not self._cleared and self.client.generation == self._generation:
504502
self._cleared = True
505503
try:
506-
self.client.loop.add_callback(self.client._dec_ref, stringify(self.key))
504+
self.client.loop.add_callback(self.client._dec_ref, self.key)
507505
except TypeError: # pragma: no cover
508506
pass # Shutting down, add_callback may be None
509507

@@ -1963,10 +1961,8 @@ def submit(
19631961
else:
19641962
key = funcname(func) + "-" + str(uuid.uuid4())
19651963

1966-
skey = stringify(key)
1967-
19681964
with self._refcount_lock:
1969-
if skey in self.futures:
1965+
if key in self.futures:
19701966
return Future(key, self, inform=False)
19711967

19721968
if allow_other_workers and workers is None:
@@ -1976,16 +1972,16 @@ def submit(
19761972
workers = [workers]
19771973

19781974
if kwargs:
1979-
dsk = {skey: (apply, func, list(args), kwargs)}
1975+
dsk = {key: (apply, func, list(args), kwargs)}
19801976
else:
1981-
dsk = {skey: (func,) + tuple(args)}
1977+
dsk = {key: (func,) + tuple(args)}
19821978

19831979
futures = self._graph_to_futures(
19841980
dsk,
1985-
[skey],
1981+
[key],
19861982
workers=workers,
19871983
allow_other_workers=allow_other_workers,
1988-
internal_priority={skey: 0},
1984+
internal_priority={key: 0},
19891985
user_priority=priority,
19901986
resources=resources,
19911987
retries=retries,
@@ -1995,7 +1991,7 @@ def submit(
19951991

19961992
logger.debug("Submit %s(...), %s", funcname(func), key)
19971993

1998-
return futures[skey]
1994+
return futures[key]
19991995

20001996
def map(
20011997
self,
@@ -2200,7 +2196,7 @@ def map(
22002196
)
22012197
logger.debug("map(%s, ...)", funcname(func))
22022198

2203-
return [futures[stringify(k)] for k in keys]
2199+
return [futures[k] for k in keys]
22042200

22052201
async def _gather(self, futures, errors="raise", direct=None, local_worker=None):
22062202
unpacked, future_set = unpack_remotedata(futures, byte_keys=True)
@@ -2212,7 +2208,7 @@ async def _gather(self, futures, errors="raise", direct=None, local_worker=None)
22122208
f"mismatched Futures and their client IDs (this client is {self.id}): "
22132209
f"{ {f: f.client.id for f in mismatched_futures} }"
22142210
)
2215-
keys = [stringify(future.key) for future in future_set]
2211+
keys = [future.key for future in future_set]
22162212
bad_data = dict()
22172213
data = {}
22182214

@@ -2423,11 +2419,6 @@ async def _scatter(
24232419
timeout = self._timeout
24242420
if isinstance(workers, (str, Number)):
24252421
workers = [workers]
2426-
if isinstance(data, dict) and not all(
2427-
isinstance(k, (bytes, str)) for k in data
2428-
):
2429-
d = await self._scatter(keymap(stringify, data), workers, broadcast)
2430-
return {k: d[stringify(k)] for k in data}
24312422

24322423
if isinstance(data, type(range(0))):
24332424
data = list(data)
@@ -2639,7 +2630,7 @@ def scatter(
26392630

26402631
async def _cancel(self, futures, force=False):
26412632
# FIXME: This method is asynchronous since interacting with the FutureState below requires an event loop.
2642-
keys = list({stringify(f.key) for f in futures_of(futures)})
2633+
keys = list({f.key for f in futures_of(futures)})
26432634
self._send_to_scheduler({"op": "cancel-keys", "keys": keys, "force": force})
26442635
for k in keys:
26452636
st = self.futures.pop(k, None)
@@ -2665,7 +2656,7 @@ def cancel(self, futures, asynchronous=None, force=False):
26652656
return self.sync(self._cancel, futures, asynchronous=asynchronous, force=force)
26662657

26672658
async def _retry(self, futures):
2668-
keys = list({stringify(f.key) for f in futures_of(futures)})
2659+
keys = list({f.key for f in futures_of(futures)})
26692660
response = await self.scheduler.retry(keys=keys, client=self.id)
26702661
for key in response:
26712662
st = self.futures[key]
@@ -2689,7 +2680,7 @@ async def _publish_dataset(self, *args, name=None, override=False, **kwargs):
26892680
coroutines = []
26902681

26912682
def add_coro(name, data):
2692-
keys = [stringify(f.key) for f in futures_of(data)]
2683+
keys = [f.key for f in futures_of(data)]
26932684
coroutines.append(
26942685
self.scheduler.publish_put(
26952686
keys=keys,
@@ -3148,6 +3139,10 @@ def _graph_to_futures(
31483139
# Pack the high level graph before sending it to the scheduler
31493140
keyset = set(keys)
31503141

3142+
# Validate keys
3143+
for key in keyset:
3144+
validate_key(key)
3145+
31513146
# Create futures before sending graph (helps avoid contention)
31523147
futures = {key: Future(key, self, inform=False) for key in keyset}
31533148
# Circular import
@@ -3171,7 +3166,7 @@ def _graph_to_futures(
31713166
"op": "update-graph",
31723167
"graph_header": header,
31733168
"graph_frames": frames,
3174-
"keys": list(map(stringify, keys)),
3169+
"keys": list(keys),
31753170
"internal_priority": internal_priority,
31763171
"submitting_task": getattr(thread_state, "key", None),
31773172
"fifo_timeout": fifo_timeout,
@@ -3297,7 +3292,7 @@ def _optimize_insert_futures(self, dsk, keys):
32973292
with self._refcount_lock:
32983293
changed = False
32993294
for key in list(dsk):
3300-
if stringify(key) in self.futures:
3295+
if key in self.futures:
33013296
if not changed:
33023297
changed = True
33033298
dsk = ensure_dict(dsk)
@@ -3805,7 +3800,7 @@ async def _():
38053800
async def _rebalance(self, futures=None, workers=None):
38063801
if futures is not None:
38073802
await _wait(futures)
3808-
keys = list({stringify(f.key) for f in self.futures_of(futures)})
3803+
keys = list({f.key for f in self.futures_of(futures)})
38093804
else:
38103805
keys = None
38113806
result = await self.scheduler.rebalance(keys=keys, workers=workers)
@@ -3841,7 +3836,7 @@ def rebalance(self, futures=None, workers=None, **kwargs):
38413836
async def _replicate(self, futures, n=None, workers=None, branching_factor=2):
38423837
futures = self.futures_of(futures)
38433838
await _wait(futures)
3844-
keys = {stringify(f.key) for f in futures}
3839+
keys = {f.key for f in futures}
38453840
await self.scheduler.replicate(
38463841
keys=list(keys), n=n, workers=workers, branching_factor=branching_factor
38473842
)
@@ -3962,7 +3957,7 @@ def who_has(self, futures=None, **kwargs):
39623957
"""
39633958
if futures is not None:
39643959
futures = self.futures_of(futures)
3965-
keys = list(map(stringify, {f.key for f in futures}))
3960+
keys = list({f.key for f in futures})
39663961
else:
39673962
keys = None
39683963

@@ -4098,7 +4093,7 @@ def call_stack(self, futures=None, keys=None):
40984093
keys = keys or []
40994094
if futures is not None:
41004095
futures = self.futures_of(futures)
4101-
keys += list(map(stringify, {f.key for f in futures}))
4096+
keys += list({f.key for f in futures})
41024097
return self.sync(self.scheduler.call_stack, keys=keys or None)
41034098

41044099
def profile(
@@ -4682,10 +4677,9 @@ def _expand_key(cls, k):
46824677
k = (k,)
46834678
for kk in k:
46844679
if dask.is_dask_collection(kk):
4685-
for kkk in kk.__dask_keys__():
4686-
yield stringify(kkk)
4680+
yield from kk.__dask_keys__()
46874681
else:
4688-
yield stringify(kk)
4682+
yield kk
46894683

46904684
@staticmethod
46914685
def collections_to_dsk(collections, *args, **kwargs):
@@ -5732,7 +5726,7 @@ def fire_and_forget(obj):
57325726
future.client._send_to_scheduler(
57335727
{
57345728
"op": "client-desires-keys",
5735-
"keys": [stringify(future.key)],
5729+
"keys": [future.key],
57365730
"client": "fire-and-forget",
57375731
}
57385732
)

distributed/dashboard/components/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2371,7 +2371,7 @@ def add_new_nodes_edges(self, new, new_edges, update=False):
23712371
continue
23722372
xx = x[key]
23732373
yy = y[key]
2374-
node_key.append(escape.url_escape(key))
2374+
node_key.append(escape.url_escape(str(key)))
23752375
node_x.append(xx)
23762376
node_y.append(yy)
23772377
node_state.append(task.state)

distributed/dashboard/tests/test_scheduler_bokeh.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import dask
1717
from dask.core import flatten
18-
from dask.utils import stringify
1918

2019
from distributed import Event
2120
from distributed.client import wait
@@ -910,7 +909,7 @@ async def test_TaskGraph_complex(c, s, a, b):
910909
gp.update()
911910
assert set(gp.layout.index.values()) == set(range(len(gp.layout.index)))
912911
visible = gp.node_source.data["visible"]
913-
keys = list(map(stringify, flatten(y.__dask_keys__())))
912+
keys = list(flatten(y.__dask_keys__()))
914913
assert all(visible[gp.layout.index[key]] == "True" for key in keys)
915914

916915

distributed/diagnostics/progress.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from tlz import groupby, valmap
1111

1212
from dask.base import tokenize
13-
from dask.utils import key_split, stringify
13+
from dask.utils import key_split
1414

1515
from distributed.diagnostics.plugin import SchedulerPlugin
1616
from distributed.metrics import time
@@ -68,7 +68,7 @@ class Progress(SchedulerPlugin):
6868
def __init__(self, keys, scheduler, minimum=0, dt=0.1, complete=False, name=None):
6969
self.name = name or f"progress-{tokenize(keys, minimum, dt, complete)}"
7070
self.keys = {k.key if hasattr(k, "key") else k for k in keys}
71-
self.keys = {stringify(k) for k in self.keys}
71+
self.keys = {k for k in self.keys}
7272
self.scheduler = scheduler
7373
self.complete = complete
7474
self._minimum = minimum

distributed/protocol/numpy.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,14 +137,13 @@ def deserialize_numpy_ndarray(header, frames):
137137
elif not x.flags.writeable:
138138
# This should exclusively happen when the underlying buffer is read-only, e.g.
139139
# a read-only mmap.mmap or a bytes object.
140-
# Specifically, these are the known use cases:
141-
# 1. decompression with a library that does not support output to bytearray
142-
# (lz4 does; snappy, zlib, and zstd don't).
143-
# Note that this only applies to buffers whose uncompressed size was small
144-
# enough that they weren't sharded (distributed.comm.shard); for larger
145-
# buffers the decompressed output is deep-copied beforehand into a bytearray
146-
# in order to merge it.
147-
# 2. unspill with zict <2.3.0 (https://github.com/dask/zict/pull/74)
140+
# The only known case is:
141+
# decompression with a library that does not support output to
142+
# bytearray (lz4 does; snappy, zlib, and zstd don't). Note that this
143+
# only applies to buffers whose uncompressed size was small enough
144+
# that they weren't sharded (distributed.comm.shard); for larger
145+
# buffers the decompressed output is deep-copied beforehand into a
146+
# bytearray in order to merge it.
148147
x = np.require(x, requirements=["W"])
149148

150149
return x

distributed/protocol/tests/test_highlevelgraph.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
from __future__ import annotations
22

3-
import ast
4-
53
import pytest
64

75
np = pytest.importorskip("numpy")
@@ -110,8 +108,7 @@ def update_graph(self, scheduler, dsk=None, keys=None, restrictions=None, **kwar
110108

111109
if "priority" in annots:
112110
self.priority_matches = sum(
113-
int(self.priority_fn(ast.literal_eval(k)) == p)
114-
for k, p in annots["priority"].items()
111+
int(self.priority_fn(k) == p) for k, p in annots["priority"].items()
115112
)
116113

117114
if "qux" in annots:

distributed/queues.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import uuid
66
from collections import defaultdict
77

8-
from dask.utils import parse_timedelta, stringify
8+
from dask.utils import parse_timedelta
99

1010
from distributed.client import Future
1111
from distributed.utils import wait_for
@@ -214,7 +214,7 @@ async def _():
214214
async def _put(self, value, timeout=None):
215215
if isinstance(value, Future):
216216
await self.client.scheduler.queue_put(
217-
key=stringify(value.key), timeout=timeout, name=self.name
217+
key=value.key, timeout=timeout, name=self.name
218218
)
219219
else:
220220
await self.client.scheduler.queue_put(

0 commit comments

Comments
 (0)