2828from typing import Any , ClassVar , Literal , NamedTuple , TypedDict
2929
3030from packaging .version import parse as parse_version
31- from tlz import first , groupby , keymap , merge , partition_all , valmap
31+ from tlz import first , groupby , merge , partition_all , valmap
3232
3333import dask
3434from dask .base import collections_to_dsk , normalize_token , tokenize
4141 format_bytes ,
4242 funcname ,
4343 parse_timedelta ,
44- stringify ,
4544 typename ,
4645)
4746from dask .widgets import get_template
4847
4948from distributed .core import ErrorMessage
5049from distributed .protocol .serialize import _is_dumpable
51- from distributed .utils import Deadline , wait_for
50+ from distributed .utils import Deadline , validate_key , wait_for
5251
5352try :
5453 from dask .delayed import single_key
@@ -211,7 +210,6 @@ class Future(WrappedKey):
211210 def __init__ (self , key , client = None , inform = True , state = None ):
212211 self .key = key
213212 self ._cleared = False
214- self ._tkey = stringify (key )
215213 self ._client = client
216214 self ._input_state = state
217215 self ._inform = inform
@@ -231,19 +229,19 @@ def _bind_late(self):
231229 client = None
232230 self ._client = client
233231 if self ._client and not self ._state :
234- self ._client ._inc_ref (self ._tkey )
232+ self ._client ._inc_ref (self .key )
235233 self ._generation = self ._client .generation
236234
237- if self ._tkey in self ._client .futures :
238- self ._state = self ._client .futures [self ._tkey ]
235+ if self .key in self ._client .futures :
236+ self ._state = self ._client .futures [self .key ]
239237 else :
240- self ._state = self ._client .futures [self ._tkey ] = FutureState ()
238+ self ._state = self ._client .futures [self .key ] = FutureState ()
241239
242240 if self ._inform :
243241 self ._client ._send_to_scheduler (
244242 {
245243 "op" : "client-desires-keys" ,
246- "keys" : [self ._tkey ],
244+ "keys" : [self .key ],
247245 "client" : self ._client .id ,
248246 }
249247 )
@@ -503,7 +501,7 @@ def release(self):
503501 if not self ._cleared and self .client .generation == self ._generation :
504502 self ._cleared = True
505503 try :
506- self .client .loop .add_callback (self .client ._dec_ref , stringify ( self .key ) )
504+ self .client .loop .add_callback (self .client ._dec_ref , self .key )
507505 except TypeError : # pragma: no cover
508506 pass # Shutting down, add_callback may be None
509507
@@ -1963,10 +1961,8 @@ def submit(
19631961 else :
19641962 key = funcname (func ) + "-" + str (uuid .uuid4 ())
19651963
1966- skey = stringify (key )
1967-
19681964 with self ._refcount_lock :
1969- if skey in self .futures :
1965+ if key in self .futures :
19701966 return Future (key , self , inform = False )
19711967
19721968 if allow_other_workers and workers is None :
@@ -1976,16 +1972,16 @@ def submit(
19761972 workers = [workers ]
19771973
19781974 if kwargs :
1979- dsk = {skey : (apply , func , list (args ), kwargs )}
1975+ dsk = {key : (apply , func , list (args ), kwargs )}
19801976 else :
1981- dsk = {skey : (func ,) + tuple (args )}
1977+ dsk = {key : (func ,) + tuple (args )}
19821978
19831979 futures = self ._graph_to_futures (
19841980 dsk ,
1985- [skey ],
1981+ [key ],
19861982 workers = workers ,
19871983 allow_other_workers = allow_other_workers ,
1988- internal_priority = {skey : 0 },
1984+ internal_priority = {key : 0 },
19891985 user_priority = priority ,
19901986 resources = resources ,
19911987 retries = retries ,
@@ -1995,7 +1991,7 @@ def submit(
19951991
19961992 logger .debug ("Submit %s(...), %s" , funcname (func ), key )
19971993
1998- return futures [skey ]
1994+ return futures [key ]
19991995
20001996 def map (
20011997 self ,
@@ -2200,7 +2196,7 @@ def map(
22002196 )
22012197 logger .debug ("map(%s, ...)" , funcname (func ))
22022198
2203- return [futures [stringify ( k ) ] for k in keys ]
2199+ return [futures [k ] for k in keys ]
22042200
22052201 async def _gather (self , futures , errors = "raise" , direct = None , local_worker = None ):
22062202 unpacked , future_set = unpack_remotedata (futures , byte_keys = True )
@@ -2212,7 +2208,7 @@ async def _gather(self, futures, errors="raise", direct=None, local_worker=None)
22122208 f"mismatched Futures and their client IDs (this client is { self .id } ): "
22132209 f"{ {f : f .client .id for f in mismatched_futures } } "
22142210 )
2215- keys = [stringify ( future .key ) for future in future_set ]
2211+ keys = [future .key for future in future_set ]
22162212 bad_data = dict ()
22172213 data = {}
22182214
@@ -2423,11 +2419,6 @@ async def _scatter(
24232419 timeout = self ._timeout
24242420 if isinstance (workers , (str , Number )):
24252421 workers = [workers ]
2426- if isinstance (data , dict ) and not all (
2427- isinstance (k , (bytes , str )) for k in data
2428- ):
2429- d = await self ._scatter (keymap (stringify , data ), workers , broadcast )
2430- return {k : d [stringify (k )] for k in data }
24312422
24322423 if isinstance (data , type (range (0 ))):
24332424 data = list (data )
@@ -2639,7 +2630,7 @@ def scatter(
26392630
26402631 async def _cancel (self , futures , force = False ):
26412632 # FIXME: This method is asynchronous since interacting with the FutureState below requires an event loop.
2642- keys = list ({stringify ( f .key ) for f in futures_of (futures )})
2633+ keys = list ({f .key for f in futures_of (futures )})
26432634 self ._send_to_scheduler ({"op" : "cancel-keys" , "keys" : keys , "force" : force })
26442635 for k in keys :
26452636 st = self .futures .pop (k , None )
@@ -2665,7 +2656,7 @@ def cancel(self, futures, asynchronous=None, force=False):
26652656 return self .sync (self ._cancel , futures , asynchronous = asynchronous , force = force )
26662657
26672658 async def _retry (self , futures ):
2668- keys = list ({stringify ( f .key ) for f in futures_of (futures )})
2659+ keys = list ({f .key for f in futures_of (futures )})
26692660 response = await self .scheduler .retry (keys = keys , client = self .id )
26702661 for key in response :
26712662 st = self .futures [key ]
@@ -2689,7 +2680,7 @@ async def _publish_dataset(self, *args, name=None, override=False, **kwargs):
26892680 coroutines = []
26902681
26912682 def add_coro (name , data ):
2692- keys = [stringify ( f .key ) for f in futures_of (data )]
2683+ keys = [f .key for f in futures_of (data )]
26932684 coroutines .append (
26942685 self .scheduler .publish_put (
26952686 keys = keys ,
@@ -3148,6 +3139,10 @@ def _graph_to_futures(
31483139 # Pack the high level graph before sending it to the scheduler
31493140 keyset = set (keys )
31503141
3142+ # Validate keys
3143+ for key in keyset :
3144+ validate_key (key )
3145+
31513146 # Create futures before sending graph (helps avoid contention)
31523147 futures = {key : Future (key , self , inform = False ) for key in keyset }
31533148 # Circular import
@@ -3171,7 +3166,7 @@ def _graph_to_futures(
31713166 "op" : "update-graph" ,
31723167 "graph_header" : header ,
31733168 "graph_frames" : frames ,
3174- "keys" : list (map ( stringify , keys ) ),
3169+ "keys" : list (keys ),
31753170 "internal_priority" : internal_priority ,
31763171 "submitting_task" : getattr (thread_state , "key" , None ),
31773172 "fifo_timeout" : fifo_timeout ,
@@ -3297,7 +3292,7 @@ def _optimize_insert_futures(self, dsk, keys):
32973292 with self ._refcount_lock :
32983293 changed = False
32993294 for key in list (dsk ):
3300- if stringify ( key ) in self .futures :
3295+ if key in self .futures :
33013296 if not changed :
33023297 changed = True
33033298 dsk = ensure_dict (dsk )
@@ -3805,7 +3800,7 @@ async def _():
38053800 async def _rebalance (self , futures = None , workers = None ):
38063801 if futures is not None :
38073802 await _wait (futures )
3808- keys = list ({stringify ( f .key ) for f in self .futures_of (futures )})
3803+ keys = list ({f .key for f in self .futures_of (futures )})
38093804 else :
38103805 keys = None
38113806 result = await self .scheduler .rebalance (keys = keys , workers = workers )
@@ -3841,7 +3836,7 @@ def rebalance(self, futures=None, workers=None, **kwargs):
38413836 async def _replicate (self , futures , n = None , workers = None , branching_factor = 2 ):
38423837 futures = self .futures_of (futures )
38433838 await _wait (futures )
3844- keys = {stringify ( f .key ) for f in futures }
3839+ keys = {f .key for f in futures }
38453840 await self .scheduler .replicate (
38463841 keys = list (keys ), n = n , workers = workers , branching_factor = branching_factor
38473842 )
@@ -3962,7 +3957,7 @@ def who_has(self, futures=None, **kwargs):
39623957 """
39633958 if futures is not None :
39643959 futures = self .futures_of (futures )
3965- keys = list (map ( stringify , {f .key for f in futures }) )
3960+ keys = list ({f .key for f in futures })
39663961 else :
39673962 keys = None
39683963
@@ -4098,7 +4093,7 @@ def call_stack(self, futures=None, keys=None):
40984093 keys = keys or []
40994094 if futures is not None :
41004095 futures = self .futures_of (futures )
4101- keys += list (map ( stringify , {f .key for f in futures }) )
4096+ keys += list ({f .key for f in futures })
41024097 return self .sync (self .scheduler .call_stack , keys = keys or None )
41034098
41044099 def profile (
@@ -4682,10 +4677,9 @@ def _expand_key(cls, k):
46824677 k = (k ,)
46834678 for kk in k :
46844679 if dask .is_dask_collection (kk ):
4685- for kkk in kk .__dask_keys__ ():
4686- yield stringify (kkk )
4680+ yield from kk .__dask_keys__ ()
46874681 else :
4688- yield stringify ( kk )
4682+ yield kk
46894683
46904684 @staticmethod
46914685 def collections_to_dsk (collections , * args , ** kwargs ):
@@ -5732,7 +5726,7 @@ def fire_and_forget(obj):
57325726 future .client ._send_to_scheduler (
57335727 {
57345728 "op" : "client-desires-keys" ,
5735- "keys" : [stringify ( future .key ) ],
5729+ "keys" : [future .key ],
57365730 "client" : "fire-and-forget" ,
57375731 }
57385732 )
0 commit comments