-
-
Notifications
You must be signed in to change notification settings - Fork 757
Closed
Description
Notes for @madsbk
# We can probably ignore this for now
if resources:
resources = self._expand_resources(
resources, all_keys=itertools.chain(dsk, keys)
)
resources = {tokey(k): v for k, v in resources.items()}
# We can probably ignore this for now
if retries:
retries = self._expand_retries(
retries, all_keys=itertools.chain(dsk, keys)
)
if actors is not None and actors is not True and actors is not False:
actors = list(self._expand_key(actors))
keyset = set(keys)
# This probably only needs to be applied to BasicLayers
# and also extra graphs, like keywords in Blockwise
values = {
k: v
for k, v in dsk.items()
if isinstance(v, Future) and k not in keyset
}
if values:
dsk = subs_multiple(dsk, values)
# Same as above
d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
extra_keys = {tokey(future.key) for future in extra_futures}
# This is interesting. The client has keys that can be strings, tuples and such
# while the scheduler expects all keys to be strings
# I think that we will handle this on BasicLayers here
# and then again on the scheduler side when we get there.
dsk2 = str_graph({k: v[0] for k, v in d.items()}, extra_keys)
# Only needs to be done on BasicLayers
dsk3 = {k: v for k, v in dsk2.items() if k is not v}
for future in extra_futures:
if future.client is not self:
msg = "Inputs contain futures that were created by another client."
raise ValueError(msg)
if restrictions:
restrictions = keymap(tokey, restrictions)
restrictions = valmap(list, restrictions)
if loose_restrictions is not None:
loose_restrictions = list(map(tokey, loose_restrictions))
future_dependencies = {
tokey(k): {tokey(f.key) for f in v[1]} for k, v in d.items()
}
for s in future_dependencies.values():
for v in s:
if v not in self.futures:
raise CancelledError(v)
# We should call the HLG method here probably
dependencies = {k: get_dependencies(dsk, k) for k in dsk}
# This may be an issue short term
# I think that eventually we're going to have to handle this on the scheduler side
# I don't know of a good workaround until then
# we can always send `{k: 0 for k in dsk}` for testing, but we'll never be able to merge this
if priority is None:
priority = dask.order.order(dsk, dependencies=dependencies)
priority = keymap(tokey, priority)
# This is the stringification of keys again. This is probably ok as-is
dependencies = {
tokey(k): [tokey(dep) for dep in deps]
for k, deps in dependencies.items()
if deps
}
for k, deps in future_dependencies.items():
if deps:
dependencies[k] = list(set(dependencies.get(k, ())) | deps)
if isinstance(retries, Number) and retries > 0:
retries = {k: retries for k in dsk3}
futures = {key: Future(key, self, inform=False) for key in keyset}
self._send_to_scheduler(
{
"op": "update-graph",
# Here is where we're going to have to start thinking about how to
# serialize high level graph layers
"tasks": valmap(dumps_task, dsk3),
"dependencies": dependencies,
"keys": list(map(tokey, keys)),
"restrictions": restrictions or {},
"loose_restrictions": loose_restrictions,
"priority": priority,
"user_priority": user_priority,
"resources": resources,
"submitting_task": getattr(thread_state, "key", None),
"retries": retries,
"fifo_timeout": fifo_timeout,
"actors": actors,
}
)
return futuresReactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels