Skip to content

Refactor HighLevelGraph Layers to use ToPickle and reduce boilerplate#8672

Closed
rjzamora wants to merge 48 commits intodask:mainfrom
rjzamora:to_pickle
Closed

Refactor HighLevelGraph Layers to use ToPickle and reduce boilerplate#8672
rjzamora wants to merge 48 commits intodask:mainfrom
rjzamora:to_pickle

Conversation

@rjzamora
Copy link
Copy Markdown
Member

@rjzamora rjzamora commented Feb 7, 2022

This PR refactors Layer and its subclasses to simplify serialization and reduce redundant graph-caching and culling logic. These changes were motivated by the serialization change in distributed#5728 (which proposes a new ToPickle protocol).

This PR ultimately demonstrates that new Layer implementations can be dramatically simplified if we go "all in" on the new ToPickle protocol, and just require that pickle.loads to be allowed on the scheduler for any (non-MaterializedLayer) Layer instances to be sent to and materialized on the scheduler. That is, for cases where the "scheduler.pickle" config option is set to False, we will simply convert all HighLevelGraph layers to MaterializedLayer instances before communicating them to the scheduler.

TODO:

  • Update base Layer class to implement ToPickle serialization, basic culling, and graph-caching logic
  • Revise all DataFrame-specific Layers to inherit from the centralized serialization/culling/caching logic
  • Revise Blockwise to inherit from the centralized serialization/caching logic
  • (BLOCKER) Get distributed#5728 merged (This PR will "work" without ToPickle, but all layers will be converted to MaterializedLayers on the client)
  • Roll back temporary CI-config changes to point back to main branch of distributed
  • Establish if/how backward compatibility (pre-ToPickle) should be handled (I am thinking we can just convert all Layers to MaterializedLayer objects on the client). [Current Solution: Convert HLG Layers to MaterializedLayer objects when distributed.protocol.serialize.ToPickle is not available.]
  • Establish clear configuration option for scheduler- vs client-side materialization. The indirect nature of the current "optimization.fuse.active" option is confusing. [Current Solution: Use existing "distributed.scheduler.pickle" config option. When this is set to False, we always convert HLG Layers to MaterializedLayer objects before sending to the scheduler.]

Follow-up Work:

  • (Try to) Remove __dask_distributed_pack__ and __dask_distributed_unpack__ from code-base completely (since packing will be identical for all materialized layers, and a single ToPickle call for unmaterialized layers)
  • Redistribute the collection-specific code defined in layers.py (Maybe?)
  • Add/Revise "how-to" developer documentation for implementing a new Layer

attrs = list(self.layer_state.keys()) + ["annotations"]
return (self.__class__, tuple(getattr(self, attr) for attr in attrs))

def __dask_distributed_pack__(self, all_hlg_keys, *args, **kwargs):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to remove this protocol?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - We should be able to remove this protocol next, since there should only be two different cases now: a "materialized" layer, or an "abstract" layer (requiring a single ToPickle call). However, I would prefer to leave this change for a follow-up PR (mostly to make this PR easier to review).

@rjzamora rjzamora marked this pull request as ready for review February 24, 2022 17:49
@rjzamora rjzamora added the needs review Needs review from a contributor. label Mar 1, 2022
@rjzamora rjzamora added the highlevelgraph Issues relating to HighLevelGraphs. label Mar 22, 2022
@rjzamora
Copy link
Copy Markdown
Member Author

@ian-r-rose @gjoseph92 - Any interest in reviewing this (or know who should)? :)

@ian-r-rose ian-r-rose self-requested a review March 22, 2022 20:08
Copy link
Copy Markdown
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora, this is heroic work. I love a PR that has more deletions than additions. I'm still digesting this, but here are some early thoughts.

My biggest concern is around the interface of the base Layer. It now has some fairly strong assumptions about graph and partitioning structure (see, e.g., output_blocks, _keys_to_indices) which is not always true. An immediate consequence of this is that we need to allow output_blocks to be None everywhere to handle those cases. Right now that is mostly MaterializedLayer, but I could imagine other, weirder things involving Delayed layers or something. To me, this indicates that the Layer class is trying to do a bit too much. I understand that part of what you are trying to do here is consolidate logic, so that impulse makes a lot of sense! But it feels a bit too far right now.

I'm still thinking about this, and may have misunderstood something fundamental. But my instinct here is to actually have most of your Layer implementation be something like a PartitionedLayer, which handles a lot of logic around partitioned collections, output blocks, keys -> names -> indices -> keys translations, etc. And both that and MaterializedLayer could satisfy an abstract Layer interface that tries hard to be as close as possible to a pure Mapping interface + cull.

return keys_in_tasks(all_hlg_keys, [self[key]])

def __copy__(self):
"""Default shallow copy implementation"""
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who is using this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure - This isn't new code. Just moved


# If pickle is disabled on the scheduler, all layers must
# be converted to `MaterializedLayer` objects before packing
materialize = not config.get("distributed.scheduler.pickle")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unfortunate that this config value doesn't really have any bearing on whether pickling is actually enabled on the scheduler. Can we instead add some metadata to the comm or similar and check whether the scheduler has it enabled?

Copy link
Copy Markdown
Member Author

@rjzamora rjzamora Mar 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I guess it is possible for the config setting to be different on the scheduler and client, but I suspect this will cover most cases.

When the client tries to send pickled data to the scheduler, the scheduler will throw an error. Perhaps we should just update this error in distributed to clarify that the "pickle" config must be True on both the client and scheduler?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this will cover most cases

I think this is true for local clusters, but if there is a remote cluster somewhere with pickling disabled, I expect it to fail more often than it succeeds (simply because most users won't have a local config for this).

The error message and documentation may be enough, though I still think a better user experience would be to introspect something on the client (which is an arg here) to determine whether to pickle something. Happy to leave that discussion for a follow-up, however, since it's separate from API design.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message and documentation may be enough, though I still think a better user experience would be to introspect something on the client (which is an arg here) to determine whether to pickle something. Happy to leave that discussion for a follow-up, however, since it's separate from API design.

I agree with the user-experience concern. Even with a clear error message, it will probably be confusing to run into serialization errors when the external cluster has pickling disabled (however rare it may be for people to explicitly disable pickling). I don't personally have any bright ideas about how we can introspect this information on the client. Do you think we need to implement something new to make this information available - I'm hoping we dont need to do something annoying, like ping the scheduler with a pickled test packet.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we need to implement something new to make this information available - I'm hoping we dont need to do something annoying, like ping the scheduler with a pickled test packet.

Yeah, we probably would need to implement something new, I don't think this is possible today (at least, not without pickling a test packet as you suggest). But I don't think there is a fundamental reason it couldn't be available. But I'm happy to defer this discussion!

@rjzamora
Copy link
Copy Markdown
Member Author

Thanks for taking the time tolook through this @ian-r-rose !

I completely agree with your concerns here about the Layer class trying to do so much. For example, it does feel a bit “forced” to have the output_blocks attribute living in the base Layer class. I originally wanted to define a PartitionedLayer class, but ended up putting everything in Layers because it reduced the amount of code.

In hindsight - I do agree that we should distinguish collection-specific layers a bit more clearly. (And so I just pushed some changes to do this)

Although I do want a PartitionedLayer class, I do think it is worth considering that most layer definitions probably can support a concept very similar to “blocks”. The "big-data" collections use blocks/partitions literally, while (almost) everything else can be described with the same language if we want. For example, every Delayed object is really just a single block/partition layer, and from_delayed is just mapping each of these single-block layers onto distinct output blocks. I think the only problem arises when you consider that a raw user-defined graph may have an arbitrary number (and naming scheme) for “output”
tasks.

@ian-r-rose
Copy link
Copy Markdown
Collaborator

ian-r-rose commented Mar 24, 2022

Although I do want a PartitionedLayer class, I do think it is worth considering that most layer definitions probably can support a concept very similar to “blocks”. The "big-data" collections use blocks/partitions literally, while (almost) everything else can be described with the same language if we want.

Fully agree here -- most of the cases we care about for big data assume some sort of partitioning. I wanted to raise it here for a couple of reasons:

  1. defining abstract base classes is exactly the right time to ask "what is the minimum interface I need to accomplish the goal?"
  2. While most of the important cases assume some partitioning, the current implementation makes a stronger assumption than that about key structure. I'm specifically thinking about _key_to_parts, which assumes that all keys can be unpacked into name, part. Of course, this is satisfied today by array and dataframe collections, but baking that into the API of a graph makes it harder to experiment with other partitioning representations. While I don't necessarily foresee that happening any time soon, I wouldn't want to to preclude without discussion.

Edit: another take (disagreeing with mine) on whether Layers should continue looking like Mappings from @jcrist : #7933 (comment)

@rjzamora
Copy link
Copy Markdown
Member Author

Edit: another take (disagreeing with mine) on whether Layers should continue looking like Mappings from @jcrist : #7933 (comment)

I don't have strong feelings (yet) about whether a Layer should continue looking like a Mapping, but I do agree that there is no great reason to push the construct_graph/caching approach all the way down to the base Layer definition in this PR. For now, it doesn't really hurt to preserve the current "flexibility", but urge most developers to inherit from a simplified class (like PartitionedLayer) if/when they want to implement their own subclass.

Comment on lines +604 to +611
def __dask_distributed_pack__(
self,
all_hlg_keys: Iterable[Hashable],
known_key_dependencies: Mapping[Hashable, set],
client,
client_keys: Iterable[Hashable],
) -> Any:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this protocol still necessary? Could we just lift the entire graph and send it to the scheduler? What stops this?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, to make this concrete, how about the following diff.

diff --git a/distributed/client.py b/distributed/client.py
index f68570f7..bc3afc75 100644
--- a/distributed/client.py
+++ b/distributed/client.py
@@ -2904,7 +2904,6 @@ class Client(SyncMethodMixin):
 
             # Pack the high level graph before sending it to the scheduler
             keyset = set(keys)
-            dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
 
             # Create futures before sending graph (helps avoid contention)
             futures = {key: Future(key, self, inform=False) for key in keyset}
@@ -2912,7 +2911,7 @@ class Client(SyncMethodMixin):
             self._send_to_scheduler(
                 {
                     "op": "update-graph-hlg",
-                    "hlg": dsk,
+                    "hlg": ToPickle(dsk),
                     "keys": list(map(stringify, keys)),
                     "priority": priority,
                     "submitting_task": getattr(thread_state, "key", None),

I suspect that this doesn't actually work today, but I'd be curious why not, and if it's not doable.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this protocol still necessary? Could we just lift the entire graph and send it to the scheduler? What stops this?

Thanks for looking at this @mrocklin! I'll try to catch up on your various discussions/work from last week as soon as I can.

The short answer is that we absolutely can pickle the entire graph as you are suggesting. Continuing to use __dask_distributed_pack__ at the Layer level for this PR was only meant to be a simpler intermediate step.

My (possibly wrong) impression, was that continuing to use the Layer-by-Layer approach makes it a bit easier to materialize layers without loosing annotations. For now, I do get the sense that we should preserve the option for the user to disable pickle on the scheduler, because I do know of some cases (like Dask-sql CI) where the scheduler has a different python environment that then client/workers.

Is your suggestion that (1) we should require pickling to be enabled, (2) that we should drop annotations when pickling is disabled, or (3) something else?

Note that I am personally still a bit interested in seeing the scheduler written in another language at some point, so I like the idea of layers being "materialized" individually (whether it is in a dedicated __dask_distributed_pack__ routine or not)

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Apr 4, 2022 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dataframe highlevelgraph Issues relating to HighLevelGraphs. io needs review Needs review from a contributor.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants