Refactor HighLevelGraph Layers to use ToPickle and reduce boilerplate#8672
Refactor HighLevelGraph Layers to use ToPickle and reduce boilerplate#8672
Conversation
… to port this Layer in a follow-up PR)
dask/highlevelgraph.py
Outdated
| attrs = list(self.layer_state.keys()) + ["annotations"] | ||
| return (self.__class__, tuple(getattr(self, attr) for attr in attrs)) | ||
|
|
||
| def __dask_distributed_pack__(self, all_hlg_keys, *args, **kwargs): |
There was a problem hiding this comment.
Is it possible to remove this protocol?
There was a problem hiding this comment.
Yes - We should be able to remove this protocol next, since there should only be two different cases now: a "materialized" layer, or an "abstract" layer (requiring a single ToPickle call). However, I would prefer to leave this change for a follow-up PR (mostly to make this PR easier to review).
|
@ian-r-rose @gjoseph92 - Any interest in reviewing this (or know who should)? :) |
ian-r-rose
left a comment
There was a problem hiding this comment.
Thanks @rjzamora, this is heroic work. I love a PR that has more deletions than additions. I'm still digesting this, but here are some early thoughts.
My biggest concern is around the interface of the base Layer. It now has some fairly strong assumptions about graph and partitioning structure (see, e.g., output_blocks, _keys_to_indices) which is not always true. An immediate consequence of this is that we need to allow output_blocks to be None everywhere to handle those cases. Right now that is mostly MaterializedLayer, but I could imagine other, weirder things involving Delayed layers or something. To me, this indicates that the Layer class is trying to do a bit too much. I understand that part of what you are trying to do here is consolidate logic, so that impulse makes a lot of sense! But it feels a bit too far right now.
I'm still thinking about this, and may have misunderstood something fundamental. But my instinct here is to actually have most of your Layer implementation be something like a PartitionedLayer, which handles a lot of logic around partitioned collections, output blocks, keys -> names -> indices -> keys translations, etc. And both that and MaterializedLayer could satisfy an abstract Layer interface that tries hard to be as close as possible to a pure Mapping interface + cull.
| return keys_in_tasks(all_hlg_keys, [self[key]]) | ||
|
|
||
| def __copy__(self): | ||
| """Default shallow copy implementation""" |
There was a problem hiding this comment.
Not sure - This isn't new code. Just moved
|
|
||
| # If pickle is disabled on the scheduler, all layers must | ||
| # be converted to `MaterializedLayer` objects before packing | ||
| materialize = not config.get("distributed.scheduler.pickle") |
There was a problem hiding this comment.
It's unfortunate that this config value doesn't really have any bearing on whether pickling is actually enabled on the scheduler. Can we instead add some metadata to the comm or similar and check whether the scheduler has it enabled?
There was a problem hiding this comment.
Yeah - I guess it is possible for the config setting to be different on the scheduler and client, but I suspect this will cover most cases.
When the client tries to send pickled data to the scheduler, the scheduler will throw an error. Perhaps we should just update this error in distributed to clarify that the "pickle" config must be True on both the client and scheduler?
There was a problem hiding this comment.
I suspect this will cover most cases
I think this is true for local clusters, but if there is a remote cluster somewhere with pickling disabled, I expect it to fail more often than it succeeds (simply because most users won't have a local config for this).
The error message and documentation may be enough, though I still think a better user experience would be to introspect something on the client (which is an arg here) to determine whether to pickle something. Happy to leave that discussion for a follow-up, however, since it's separate from API design.
There was a problem hiding this comment.
The error message and documentation may be enough, though I still think a better user experience would be to introspect something on the client (which is an arg here) to determine whether to pickle something. Happy to leave that discussion for a follow-up, however, since it's separate from API design.
I agree with the user-experience concern. Even with a clear error message, it will probably be confusing to run into serialization errors when the external cluster has pickling disabled (however rare it may be for people to explicitly disable pickling). I don't personally have any bright ideas about how we can introspect this information on the client. Do you think we need to implement something new to make this information available - I'm hoping we dont need to do something annoying, like ping the scheduler with a pickled test packet.
There was a problem hiding this comment.
Do you think we need to implement something new to make this information available - I'm hoping we dont need to do something annoying, like ping the scheduler with a pickled test packet.
Yeah, we probably would need to implement something new, I don't think this is possible today (at least, not without pickling a test packet as you suggest). But I don't think there is a fundamental reason it couldn't be available. But I'm happy to defer this discussion!
|
Thanks for taking the time tolook through this @ian-r-rose ! I completely agree with your concerns here about the Although I do want a |
Fully agree here -- most of the cases we care about for big data assume some sort of partitioning. I wanted to raise it here for a couple of reasons:
Edit: another take (disagreeing with mine) on whether |
I don't have strong feelings (yet) about whether a |
| def __dask_distributed_pack__( | ||
| self, | ||
| all_hlg_keys: Iterable[Hashable], | ||
| known_key_dependencies: Mapping[Hashable, set], | ||
| client, | ||
| client_keys: Iterable[Hashable], | ||
| ) -> Any: | ||
|
|
There was a problem hiding this comment.
Why is this protocol still necessary? Could we just lift the entire graph and send it to the scheduler? What stops this?
There was a problem hiding this comment.
So, to make this concrete, how about the following diff.
diff --git a/distributed/client.py b/distributed/client.py
index f68570f7..bc3afc75 100644
--- a/distributed/client.py
+++ b/distributed/client.py
@@ -2904,7 +2904,6 @@ class Client(SyncMethodMixin):
# Pack the high level graph before sending it to the scheduler
keyset = set(keys)
- dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
# Create futures before sending graph (helps avoid contention)
futures = {key: Future(key, self, inform=False) for key in keyset}
@@ -2912,7 +2911,7 @@ class Client(SyncMethodMixin):
self._send_to_scheduler(
{
"op": "update-graph-hlg",
- "hlg": dsk,
+ "hlg": ToPickle(dsk),
"keys": list(map(stringify, keys)),
"priority": priority,
"submitting_task": getattr(thread_state, "key", None),I suspect that this doesn't actually work today, but I'd be curious why not, and if it's not doable.
There was a problem hiding this comment.
Why is this protocol still necessary? Could we just lift the entire graph and send it to the scheduler? What stops this?
Thanks for looking at this @mrocklin! I'll try to catch up on your various discussions/work from last week as soon as I can.
The short answer is that we absolutely can pickle the entire graph as you are suggesting. Continuing to use __dask_distributed_pack__ at the Layer level for this PR was only meant to be a simpler intermediate step.
My (possibly wrong) impression, was that continuing to use the Layer-by-Layer approach makes it a bit easier to materialize layers without loosing annotations. For now, I do get the sense that we should preserve the option for the user to disable pickle on the scheduler, because I do know of some cases (like Dask-sql CI) where the scheduler has a different python environment that then client/workers.
Is your suggestion that (1) we should require pickling to be enabled, (2) that we should drop annotations when pickling is disabled, or (3) something else?
Note that I am personally still a bit interested in seeing the scheduler written in another language at some point, so I like the idea of layers being "materialized" individually (whether it is in a dedicated __dask_distributed_pack__ routine or not)
|
I'm inclined to pursue option (1). I think that it is the simplest, and
right now I get the sense that development is bound by complexity rather
than other factors.
…On Mon, Apr 4, 2022 at 9:12 AM Richard (Rick) Zamora < ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In dask/highlevelgraph.py
<#8672 (comment)>:
> + def __dask_distributed_pack__(
+ self,
+ all_hlg_keys: Iterable[Hashable],
+ known_key_dependencies: Mapping[Hashable, set],
+ client,
+ client_keys: Iterable[Hashable],
+ ) -> Any:
+
Why is this protocol still necessary? Could we just lift the entire graph
and send it to the scheduler? What stops this?
Thanks for looking at this @mrocklin <https://github.com/mrocklin>! I'll
try to catch up on your various discussions/work from last week as soon as
I can.
The short answer is that we absolutely *can* pickle the entire graph as
you are suggesting. Continuing to use __dask_distributed_pack__ at the
Layer level for this PR was only meant to be a simpler intermediate step.
My (possibly wrong) impression, was that continuing to use the
Layer-by-Layer approach makes it a bit easier to materialize layers without
loosing annotations. For now, I do get the sense that we should preserve
the option for the user to disable pickle on the scheduler, because I do
know of some cases (like Dask-sql CI) where the scheduler has a different
python environment that then client/workers.
Is your suggestion that (1) we should require pickling to be enabled, (2)
that we should drop annotations when pickling is disabled, or (3) something
else?
Note that I am personally still a bit interested in seeing the scheduler
written in another language at some point, so I like the idea of layers
being "materialized" individually (whether it is in a dedicated
__dask_distributed_pack__ routine or not)
—
Reply to this email directly, view it on GitHub
<#8672 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTH5JCLQO44HMXETBB3VDL2D5ANCNFSM5NYII2PA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
This PR refactors
Layerand its subclasses to simplify serialization and reduce redundant graph-caching and culling logic. These changes were motivated by the serialization change in distributed#5728 (which proposes a newToPickleprotocol).This PR ultimately demonstrates that new
Layerimplementations can be dramatically simplified if we go "all in" on the newToPickleprotocol, and just require thatpickle.loadsto be allowed on the scheduler for any (non-MaterializedLayer)Layerinstances to be sent to and materialized on the scheduler. That is, for cases where the"scheduler.pickle"config option is set toFalse, we will simply convert all HighLevelGraph layers toMaterializedLayerinstances before communicating them to the scheduler.TODO:
Layerclass to implementToPickleserialization, basic culling, and graph-caching logicBlockwiseto inherit from the centralized serialization/caching logicToPickle, but all layers will be converted toMaterializedLayers on the client)ToPickle) should be handled (I am thinking we can just convert all Layers toMaterializedLayerobjects on the client). [Current Solution: Convert HLG Layers toMaterializedLayerobjects whendistributed.protocol.serialize.ToPickleis not available.]"optimization.fuse.active"option is confusing. [Current Solution: Use existing"distributed.scheduler.pickle"config option. When this is set to False, we always convert HLG Layers toMaterializedLayerobjects before sending to the scheduler.]Follow-up Work:
__dask_distributed_pack__and__dask_distributed_unpack__from code-base completely (since packing will be identical for all materialized layers, and a singleToPicklecall for unmaterialized layers)layers.py(Maybe?)Layer