Skip to content

HighLevelGraphs to the Scheduler#4140

Merged
mrocklin merged 14 commits intodask:masterfrom
madsbk:update_graph_hlg
Nov 5, 2020
Merged

HighLevelGraphs to the Scheduler#4140
mrocklin merged 14 commits intodask:masterfrom
madsbk:update_graph_hlg

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Oct 1, 2020

This PR implements pack and unpack of send high level graphs, which makes it possible for the Client to send a HLG directly to the scheduler.

@madsbk madsbk mentioned this pull request Oct 1, 2020
2 tasks
@jakirkham
Copy link
Member

What about having serialize and deserialize methods on these objects that we call instead? That would avoid needing pickling and the alluded to security concerns around it.

@madsbk
Copy link
Contributor Author

madsbk commented Oct 14, 2020

What about having serialize and deserialize methods on these objects that we call instead? That would avoid needing pickling and the alluded to security concerns around it.

I think this is essential what we are doing right now just using __reduce__() instead of serialize() ?

@jrbourbeau
Copy link
Member

Woo, nice green check marks 🎉

@sjperkins sjperkins mentioned this pull request Oct 26, 2020
2 tasks
@madsbk
Copy link
Contributor Author

madsbk commented Oct 27, 2020

I have been working on this PR some time now. Initially, my idea was to implement and use map_tasks() from dask/dask#6689 to handle preprocessing of the high-level graph and use the Layer.__reduce__() methods to handle serialization before sending it to the scheduler. This is a clean generic approach, but it doesn’t work. Distributed requires a very specific protocol when updating the scheduler task graph, which makes it very hard (if not impossible) to implement generically.

I have compiled a list of requirements and thoughts:

  • We should support BasicLayer efficiently

  • Converting all keys to strings requires domain knowledge in layers like Blockwise where all keys aren't materialized.

  • A layer needs ALL keys in the HLG in order to calculate dependencies.

  • Dependencies must be calculated before dumps_task() thus dependency calculation must be part of serialization. Some layers can infer key dependencies but consider layers such as Blockwise where kwargs can contain any key. Not to mention BasicLayer, which has no prior dependency knowledge.

  • Before serialization, we need to unpack remote data (Futures). This requires map_tasks() to provide key information, which can be problematic: [WIP] Add optional pass_key argument to map_tasks dask#6761 (comment).

  • Additionally, we need to search for Futures inside tuples, which map_tasks() cannot support and we need to in cooperate the dependencies of the unpacked Futures into the existing key dependencies without having to recalculate anything.

  • Currently, ordering has to be done on the original keys and not after they have been converted to string: Prioritize tasks with their true keys, before stringifying #2006


Because of these requirements, I am exploring a new approach where the serialization and deserialization of layers is specialized. I introduce dumps_highlevelgraph() that do all the preprocessing (unpack Futures, convert keys to strings, calculate dependencies that can cannot be inferred on the scheduler, and dump tasks) and loads_highlevelgraph() that deserialize the required and return a materialized graph and all key dependencies.

The current implementation does all this in Distributed however we should consider move it to the Layer classes in Dask. But notice, these serialization methods will be very Distributed specific and use a lot of functions and classes from Distributed.

NB: the current push is just a rough implementation that always falls back to BasicLayer serialization.

@mrocklin
Copy link
Member

@madsbk if you're around and want to have a high bandwidth conversation I would be interested. Some quick comments/questions

Converting all keys to strings requires domain knowledge in layers like Blockwise where all keys aren't materialized.

Materializing all keys might be ok. Ideally we don't have to materialize the entire graph until we get to the scheduler.

We could also convert to strings after we get to the scheduler and have the full graph.

A layer needs ALL keys in the HLG in order to calculate dependencies.

Again, I think that materializing keys will be ok for now (this is, I think, likely to be faster than the graph). In the future I hope that we can avoid cases where we need to calculate dependencies in the common case.

Dependencies must be calculated before dumps_task() thus dependency calculation must be part of serialization. Some layers can infer key dependencies but consider layers such as Blockwise where kwargs can contain any key. Not to mention BasicLayer, which has no prior dependency knowledge.

Hrm, I'm curious about this. I'll need to think about this more.

Before serialization, we need to unpack remote data (Futures). This requires map_tasks() to provide key information, which can be problematic: dask/dask#6761 (comment).

Also curious.

Additionally, we need to search for Futures inside tuples, which map_tasks() cannot support and we need to in cooperate the dependencies of the unpacked Futures into the existing key dependencies without having to recalculate anything.

We can drop this if we need to

Currently, ordering has to be done on the original keys and not after they have been converted to string: #2006

Ah, right. Well, maybe we can change how we stringify in order to keep ordering. Maybe we do both ordering and stringifying on the scheduler after we've moved everything over.

@madsbk
Copy link
Contributor Author

madsbk commented Oct 29, 2020

Materializing all keys might be ok. Ideally we don't have to materialize the entire graph until we get to the scheduler.

Notice, the keys here also refers to the keys inside tasks. Since tasks might not exist yet, it requires domain knowledge to stringify them.

We could also convert to strings after we get to the scheduler and have the full graph.

True, it is properly possible to serialize keys using msgpack in most cases, but dumping tasks before stringify means that the dumped task may contain non-stringified keys.

@madsbk
Copy link
Contributor Author

madsbk commented Nov 2, 2020

Moved packing of shuffle layers to Dask: dask/dask#6786 thus CI will fail until it has been merged.

Comment on lines +78 to +90
layers.append(
{
"__module__": None,
"__name__": None,
"state": _materialized_layer_pack(
layer,
hlg.get_all_external_keys(),
hlg.key_dependencies,
allowed_client,
allows_futures,
),
}
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this was the implementation of Layer.__dask_distributed_pack__? That might simplify things a bit here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would import the relevant distributed functions within the method in the Dask codebase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a big deal either way, but it might isolate things a bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is best to have the fall back in Distributed for now. Long-term it would be nice to have it in Layer.__dask_distributed_pack__() but right now it requires a lot of very Distributed specific things like key_dependencies, allowed_client, and allows_futures.
Furthermore, my plan is to also incorporate dsk.map_basic_layers() into distributed_pack() in order to remove both map_basic_layers() and map_tasks() from the HLG API.

@mrocklin
Copy link
Member

mrocklin commented Nov 4, 2020

@madsbk is there anything that I can do or should look at to help here?

@madsbk
Copy link
Contributor Author

madsbk commented Nov 4, 2020

@madsbk is there anything that I can do or should look at to help here?

The final issue is ordering, which has to be done before converting keys to strings. This is what is failing in CI.

@mrocklin
Copy link
Member

mrocklin commented Nov 4, 2020

If we do ordering on the scheduler side after stringification I suspect that everything will work the same in almost all cases. I would be willing to take the hit on those cases in order to move forward. Is this an easy option for you?

@madsbk
Copy link
Contributor Author

madsbk commented Nov 4, 2020 via email

@mrocklin
Copy link
Member

mrocklin commented Nov 4, 2020

That seems sensible to me

@madsbk
Copy link
Contributor Author

madsbk commented Nov 5, 2020

@mrocklin, I managed to support ordering but it requires a fancy string comparison in order.order(): dask/dask#6807

@mrocklin
Copy link
Member

mrocklin commented Nov 5, 2020

Let's xfail the test for now, include a link to the order PR in the reason= keyword, and then merge this.

I want to think a bit about prefixing with zeroes before we commit to it.

@madsbk madsbk marked this pull request as ready for review November 5, 2020 14:24
@madsbk madsbk changed the title [WIP] HighLevelGraphs to the Scheduler HighLevelGraphs to the Scheduler Nov 5, 2020
@madsbk
Copy link
Contributor Author

madsbk commented Nov 5, 2020

@mrocklin @jrbourbeau, I think this is ready to be merged. It lacks documentation of the pack/unpack API but let's wait with that until we settle on an API.

@mrocklin
Copy link
Member

mrocklin commented Nov 5, 2020

I'll take another look in a bit and then merge in.

@mrocklin
Copy link
Member

mrocklin commented Nov 5, 2020

This looks slick. Thank you for figuring this out @madsbk

@mrocklin mrocklin merged commit 09d9799 into dask:master Nov 5, 2020
@madsbk madsbk deleted the update_graph_hlg branch November 6, 2020 07:58
@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2020

@sjperkins this is in. Now would probably be a good time to add in support for annotations. I think that they should go in client.py::Client._graph_to_futures

@sjperkins
Copy link
Member

@sjperkins this is in. Now would probably be a good time to add in support for annotations. I think that they should go in client.py::Client._graph_to_futures

@mrocklin Thanks for the headsup. My schedule is busy till next week Thursday. Would it be OK if I picked it up then?

@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2020 via email

sonicxml added a commit to sonicxml/distributed that referenced this pull request Dec 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants