Skip to content

Avoid unnecessary imports for HLG Layer unpacking and materialization#7381

Merged
jrbourbeau merged 26 commits intodask:mainfrom
rjzamora:shuffle-avoid-pd-import
Mar 18, 2021
Merged

Avoid unnecessary imports for HLG Layer unpacking and materialization#7381
jrbourbeau merged 26 commits intodask:mainfrom
rjzamora:shuffle-avoid-pd-import

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Mar 12, 2021

Motivated by #7374

Adds layer_materialize_module and layer_materialize_class properties to highlevelgraph.Layer. These properties are used within HighLevelGraph.__dask_distributed_pack__ so that a different class (usually a light-weight parent class) can be used for unpacking and graph materialization.

In order to leverage the new properties for shuffle operations, this PR adds a new dask/layers.py file, where lightweight versions of SimpleShuffleLayer and ShuffleLayer have been defined. The "full" versions of these classes, in dask.dataframe.shuffle, are now subclasses. The new lightweight classes only include the required logic for unpacking and layer materialization, and they only import functions/classes from dask.dask and tlz.

Since we cannot import functions like _concat and shuffle_group in layers.py, we also use run_ext_function (new in this PR) for all materialized tasks so that we can build the graph with serialized functions. Note: If we try to specify a serialized function as a callable object in a task, the stringify_collection_keys pass will misinterpret those tasks as collection keys. I am very open to suggestions on other ways to avoid this problem. However, if we do avoid the stringify issue, we would still need to modify distributed.worker.dumps_task to optionally skip the dumps_func step (since the functions will already need to be serialized).

@rjzamora
Copy link
Member Author

cc @jrbourbeau @ian-r-rose

@ian-r-rose
Copy link
Collaborator

Thanks for the quick look at this @rjzamora. To me, the extra cognitive overhead of having two different flavors of, e.g., SimpleShuffleLayer is kind of unfortunate. Especially having a layer_materialize_class, for just one more level of indirection. If I understand correctly, you have done this to be able to pass in pickled versions of things like _concat from places that are actually allowed to import them.

I wonder if it would be possible to instead refer to those functions as string tuple of (module, function). We've already crossed that bridge when it comes to packing/unacking the layers, what's the harm of one more :) Then your unpickling in run_ext_function would be replaced by something like an import/getattr. I haven't really thought this through, however, and would be happy to be talked out of it.

@rjzamora
Copy link
Member Author

To me, the extra cognitive overhead of having two different flavors of, e.g., SimpleShuffleLayer is kind of unfortunate. Especially having a layer_materialize_class, for just one more level of indirection. If I understand correctly, you have done this to be able to pass in pickled versions of things like _concat from places that are actually allowed to import them.

I think you have this mostly right for the particular case of shuffle. We are moving the materialization component of SimpleShuffleLayer and ShuffleLayer into layers.py, because we need to have those methods defined in a class/file with minimal import requirements. We are leaving the rest of the logic in the original dataframe.shuffle.SimpleShuffleLayer and dataframe.shuffle.ShuffleLayer. For this specific case, the initialization, packing and culling logic could probably tweaked a bit to allow the entire class to be moved to layers.py, the dask.dataframe.shuffle function would just need to add a bit more logic to avoid the avoid doing things like passing in the pandas metadata.

I can understand the resistance to effectively splitting the class definition between two files. My primary motivation was to keep as much dataframe/shuffle-specific logic in the dataframe.shuffle module itself. It seems to me that we cannot use a dask.dataframe.layers module without importing the requirements for the dask.dataframe module. So, if we want to avoid splitting class definitions, like this PR is currently doing, it seems that we may want to introduce a dedicated highlevelgraph directory structure to organize the large amount of HLG Layer/materialization logic that we eventually want to live in lightweight modules.

@rjzamora rjzamora marked this pull request as ready for review March 15, 2021 20:05
dask/layers.py Outdated
Comment on lines +9 to +12
def run_ext_function(func, *args):
if isinstance(func, dict):
func = getattr(import_module(func["__module__"]), func["__name__"])
return func(*args)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally prefer this variety of the function over the pickle version, but curious to know what others think!

Comment on lines +137 to +142
"concat_func": {"__module__": _concat.__module__, "__name__": "_concat"},
"getitem_func": {"__module__": "operator", "__name__": "getitem"},
"shuffle_group_func": {
"__module__": shuffle_group.__module__,
"__name__": "shuffle_group",
},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a name for this? Indirect import? Quoted import? It's starting to feel like a new flavor of task, perhaps worth formalizing in some way?

It is almost not necessary to have this in dask/dataframe/shuffle at this point, no?

Copy link
Member Author

@rjzamora rjzamora Mar 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a name for this? Indirect import? Quoted import? It's starting to feel like a new flavor of task, perhaps worth formalizing in some way?

Right - Once we nail down what we actually want this to look like, I agree that we should clearly document and possibly formalize this approach in some way.

It is almost not necessary to have this in dask/dataframe/shuffle at this point, no?

Right. I am still open minded to the idea of moving everything into layers.py. However, I personally feel that there are a variety of reasons not to do this. In general, I feel that the path forward is likely to be much easier if we only require the unpacking and materialization logic to live in the lightweight (layers.py) module. For shuffle, the only stickling point is the empty pd/cudf.DataFrame metadata that we need within _construct_graph. When the __dask_distributed_pack__ method is defined within the dask.dataframe module, it is easy to simply serialize this object. However, if that definition was forced to live in layers.py, we wouldn't really be able to use the robust DataFrame serialization machinery. I think we could certainly brainstorm ways to get around this, but I suspect that similar challenges will keep popping up.

EDIT: On second thought, I think that metadata-serialization question is moot. We can still import the necessary logic within the __dask_distributed_pack__ method (we only need to worry about top-level imports)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh, good point. That actually opens a lot of doors :)

Copy link
Member Author

@rjzamora rjzamora Mar 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, to clarify, we can indeed move the shuffle layers entirely into layers.py. Should we go ahead and do this? That file may eventually become quite bloated, but it shouldn't be muich work to cross the re-organization bridge later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and moved "everything" over to layers.py (this includes ShuffleLayer, SimpleShuffleLayer, BlockwiseCreateArray, and BroadcastJoinLayer). I think you are right about keeping the entire Layer definition in one place :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 thanks @rjzamora!

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add dask.layers to the set of modules on this line

test_import "" "import dask, dask.base, dask.multiprocessing, dask.threaded, dask.optimization, dask.bag, dask.delayed, dask.graph_manipulation"

That will ensure we can import dask.layers with only Dask's required dependencies. This won't cover non top-level imports (e.g. importing something inside a __dask_distributed_pack__ method) but it does provide us some good automated import coverage

@rjzamora rjzamora changed the title Avoid unnecessary imports for (Simple)ShuffleLayer unpacking and materialization Avoid unnecessary imports for HLG Layer unpacking and materialization Mar 16, 2021
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on @rjzamora! Generally I like the approach of moving layers to a separate module where we can test top-level imports.

Because of the large diff would you mind giving a brief high-level summary of the changes? It looks like the Layer classes were moved around and we introduced CallableLazyImport -- was there anything else that changed?

Comment on lines +352 to +354
else:
# Maually remove the target library
sys.modules.pop(mod)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modifying sys.modules in this way makes me a little uncomfortable -- though maybe this is actually fine

Since the cluster fixture creates a new process with the launched scheduler I wouldn't expect anything that's not a distributed dependency to be imported. What happens if we instead just assert that none of the initial sys.modules start with self.pattern?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I should have commented about this earlier. I did start off by adding the assertion you suggest above. However, the test was failing for the _array_creation case (pandas is not a problem). It seems that numpy is imported somewhere outside of this test, and so I added the logic to manually "clean" sys.modules (rather than looking into it further) :/

dask/layers.py Outdated
Comment on lines +22 to +32
class CallableLazyImport:
"""Function Wrapper for Lazy Importing"""

def __init__(self, function_path):
split_path = function_path.split(".")
self.module = ".".join(split_path[:-1])
self.name = split_path[-1]

def __call__(self, *args, **kwargs):
func = getattr(import_module(self.module), self.name)
return func(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks new. Just to confirm, this is so we can materialize the graph on the scheduler without importing, say, pandas?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - I am very interested in feedback on this. The idea is to have a "callable" at graph materialization time without requiring the scheduler to acutally import anything from a something like pandas/numpy/dask.dataframe/dask.array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, thanks for confirming. I'll admit I'm not super excited about adding a new wrapper class for this, but I also can't think of a better solution at the moment.

It's a shame we won't always have distributed installed when this code is run -- otherwise we could use import_term

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm - I actually think we can do that with a small tweak (by adding a deserializing parameter, as we do in Blockwise). I’ll try this out tonight.

Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@rjzamora
Copy link
Member Author

Because of the large diff would you mind giving a brief high-level summary of the changes? It looks like the Layer classes were moved around and we introduced CallableLazyImport -- was there anything else that changed?

Thanks for the review @jrbourbeau !

Yes - The high-level overview is that this PR simply moves existing Layer logic into the new layers.py file. For example:

  • dask.dataframe.shuffle.SimpleShuffleLayer -> dask.layers.SimpleShuffleLayer
  • dask.dataframe.shuffle.ShuffleLayer -> dask.layers.ShuffleLayer
  • dask.dataframe.multi.BroadcastJoinLayer -> dask.layers.BroadcastJoinLayer
  • dask.array.blockwise.CreateArrayDeps -> dask.layers.CreateArrayDeps
  • dask.array.blockwise.BlockwiseCreateArray -> dask.layers.BlockwiseCreateArray

I tried to avoid changing the code, when possible. However, in order to avoid importing functions like dask.dataframe.shuffle.shuffle_group during graph materialization, I introduced a CallableLazyImport class. This purpose of this class is to delay the need for any specialized functions to be imported (from modules that may not be supported on the scheduler) until the task is executed on the worker.

This PR also uses your testing idea in a new test_layers.py module.

dask/layers.py Outdated
Comment on lines +670 to +677
# dsk.update(toolz.valmap(dumps_task, raw))

# dependencies.update(
# {k: keys_in_tasks(dsk, [v], as_list=True) for k, v in raw.items()}
# )

# if state["annotations"]:
# cls.unpack_annotations(annotations, state["annotations"], raw.keys())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm looking at the PR diff correctly, it seems we're also updating BroadcastJoinLayer.__dask_distributed_unpack__ here -- just wanted to check that this is intentional. It looks like maybe things were incorrect before?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are a few things going on here, and I'm not sure if the current logic is "wrong." This code was originally copy-pasteed (mostly) from the blockwise and/or shuffle code. It seems that recent Layer/annotation work modified the blockwise and shuffle __dask_distributed_unpack__ definitions, but not the broadcast join (not sure whether or not the broadcast-join PR was merged after those changes).

dask/layers.py Outdated
Comment on lines +22 to +32
class CallableLazyImport:
"""Function Wrapper for Lazy Importing"""

def __init__(self, function_path):
split_path = function_path.split(".")
self.module = ".".join(split_path[:-1])
self.name = split_path[-1]

def __call__(self, *args, **kwargs):
func = getattr(import_module(self.module), self.name)
return func(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, thanks for confirming. I'll admit I'm not super excited about adding a new wrapper class for this, but I also can't think of a better solution at the moment.

It's a shame we won't always have distributed installed when this code is run -- otherwise we could use import_term

Comment on lines +90 to +91
"dask.layers",
"CreateArrayDeps",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to change anything in this PR, but seeing this alongside how we're handling CallableLazyImport makes me wonder if we should always use fully-qualified names (e.g. dask.layers.CreateArrayDeps)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reminded of the syntax for entry points, e.g. dask.layers:CreateArrayDeps. I've never particularly liked the :, but perhaps there is a reason for it that I am unaware of.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I agree that it is worth the extra split logic to use the fully-qualified names. However, I agree it would be less confusing to make that change in a separate PR.

@jrbourbeau
Copy link
Member

Also cc @ian-r-rose if you get a chance to look at this

@rjzamora
Copy link
Member Author

Okay - the last batch of changes avoids the use of CallableLazyImport unless we know the graph is being materialized on the scheduler. Also, when that wrapper is used, we now take advantage of distributed.utils.import_term.

rjzamora and others added 3 commits March 17, 2021 09:52
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @rjzamora!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants