Skip to content

Pass HighLevelGraphs through graph_to_fututures #4119

@mrocklin

Description

@mrocklin

Notes for @madsbk

# We can probably ignore this for now
            if resources:
                resources = self._expand_resources(
                    resources, all_keys=itertools.chain(dsk, keys)
                )
                resources = {tokey(k): v for k, v in resources.items()}

# We can probably ignore this for now
            if retries:
                retries = self._expand_retries(
                    retries, all_keys=itertools.chain(dsk, keys)
                )

            if actors is not None and actors is not True and actors is not False:
                actors = list(self._expand_key(actors))

            keyset = set(keys)

# This probably only needs to be applied to BasicLayers 
# and also extra graphs, like keywords in Blockwise
            values = {
                k: v
                for k, v in dsk.items()
                if isinstance(v, Future) and k not in keyset
            }
            if values:
                dsk = subs_multiple(dsk, values)

# Same as above
            d = {k: unpack_remotedata(v, byte_keys=True) for k, v in dsk.items()}
            extra_futures = set.union(*[v[1] for v in d.values()]) if d else set()
            extra_keys = {tokey(future.key) for future in extra_futures}

# This is interesting.  The client has keys that can be strings, tuples and such 
# while the scheduler expects all keys to be strings
# I think that we will handle this on BasicLayers here
# and then again on the scheduler side when we get there.
            dsk2 = str_graph({k: v[0] for k, v in d.items()}, extra_keys)

# Only needs to be done on BasicLayers
            dsk3 = {k: v for k, v in dsk2.items() if k is not v}
            for future in extra_futures:
                if future.client is not self:
                    msg = "Inputs contain futures that were created by another client."
                    raise ValueError(msg)

            if restrictions:
                restrictions = keymap(tokey, restrictions)
                restrictions = valmap(list, restrictions)

            if loose_restrictions is not None:
                loose_restrictions = list(map(tokey, loose_restrictions))

            future_dependencies = {
                tokey(k): {tokey(f.key) for f in v[1]} for k, v in d.items()
            }

            for s in future_dependencies.values():
                for v in s:
                    if v not in self.futures:
                        raise CancelledError(v)

# We should call the HLG method here probably
            dependencies = {k: get_dependencies(dsk, k) for k in dsk}

# This may be an issue short term
# I think that eventually we're going to have to handle this on the scheduler side
# I don't know of a good workaround until then
# we can always send `{k: 0 for k in dsk}` for testing, but we'll never be able to merge this
            if priority is None:
                priority = dask.order.order(dsk, dependencies=dependencies)
                priority = keymap(tokey, priority)

# This is the stringification of keys again.  This is probably ok as-is
            dependencies = {
                tokey(k): [tokey(dep) for dep in deps]
                for k, deps in dependencies.items()
                if deps
            }
            for k, deps in future_dependencies.items():
                if deps:
                    dependencies[k] = list(set(dependencies.get(k, ())) | deps)

            if isinstance(retries, Number) and retries > 0:
                retries = {k: retries for k in dsk3}

            futures = {key: Future(key, self, inform=False) for key in keyset}
            self._send_to_scheduler(
                {
                    "op": "update-graph",
# Here is where we're going to have to start thinking about how to 
# serialize high level graph layers
                    "tasks": valmap(dumps_task, dsk3),
                    "dependencies": dependencies,
                    "keys": list(map(tokey, keys)),
                    "restrictions": restrictions or {},
                    "loose_restrictions": loose_restrictions,
                    "priority": priority,
                    "user_priority": user_priority,
                    "resources": resources,
                    "submitting_task": getattr(thread_state, "key", None),
                    "retries": retries,
                    "fifo_timeout": fifo_timeout,
                    "actors": actors,
                }
            )
            return futures

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions