Avoid unnecessary imports for HLG Layer unpacking and materialization#7381
Avoid unnecessary imports for HLG Layer unpacking and materialization#7381jrbourbeau merged 26 commits intodask:mainfrom
Conversation
…shuffle-avoid-pd-import
|
Thanks for the quick look at this @rjzamora. To me, the extra cognitive overhead of having two different flavors of, e.g., I wonder if it would be possible to instead refer to those functions as string tuple of |
I think you have this mostly right for the particular case of shuffle. We are moving the materialization component of I can understand the resistance to effectively splitting the class definition between two files. My primary motivation was to keep as much dataframe/shuffle-specific logic in the |
dask/layers.py
Outdated
| def run_ext_function(func, *args): | ||
| if isinstance(func, dict): | ||
| func = getattr(import_module(func["__module__"]), func["__name__"]) | ||
| return func(*args) |
There was a problem hiding this comment.
I personally prefer this variety of the function over the pickle version, but curious to know what others think!
dask/dataframe/shuffle.py
Outdated
| "concat_func": {"__module__": _concat.__module__, "__name__": "_concat"}, | ||
| "getitem_func": {"__module__": "operator", "__name__": "getitem"}, | ||
| "shuffle_group_func": { | ||
| "__module__": shuffle_group.__module__, | ||
| "__name__": "shuffle_group", | ||
| }, |
There was a problem hiding this comment.
Should we have a name for this? Indirect import? Quoted import? It's starting to feel like a new flavor of task, perhaps worth formalizing in some way?
It is almost not necessary to have this in dask/dataframe/shuffle at this point, no?
There was a problem hiding this comment.
Should we have a name for this? Indirect import? Quoted import? It's starting to feel like a new flavor of task, perhaps worth formalizing in some way?
Right - Once we nail down what we actually want this to look like, I agree that we should clearly document and possibly formalize this approach in some way.
It is almost not necessary to have this in dask/dataframe/shuffle at this point, no?
Right. I am still open minded to the idea of moving everything into layers.py. However, I personally feel that there are a variety of reasons not to do this. In general, I feel that the path forward is likely to be much easier if we only require the unpacking and materialization logic to live in the lightweight (layers.py) module. For shuffle, the only stickling point is the empty pd/cudf.DataFrame metadata that we need within _construct_graph. When the __dask_distributed_pack__ method is defined within the dask.dataframe module, it is easy to simply serialize this object. However, if that definition was forced to live in layers.py, we wouldn't really be able to use the robust DataFrame serialization machinery. I think we could certainly brainstorm ways to get around this, but I suspect that similar challenges will keep popping up.
EDIT: On second thought, I think that metadata-serialization question is moot. We can still import the necessary logic within the __dask_distributed_pack__ method (we only need to worry about top-level imports)
There was a problem hiding this comment.
Ooh, good point. That actually opens a lot of doors :)
There was a problem hiding this comment.
So, to clarify, we can indeed move the shuffle layers entirely into layers.py. Should we go ahead and do this? That file may eventually become quite bloated, but it shouldn't be muich work to cross the re-organization bridge later.
There was a problem hiding this comment.
I went ahead and moved "everything" over to layers.py (this includes ShuffleLayer, SimpleShuffleLayer, BlockwiseCreateArray, and BroadcastJoinLayer). I think you are right about keeping the entire Layer definition in one place :)
jrbourbeau
left a comment
There was a problem hiding this comment.
Could you add dask.layers to the set of modules on this line
That will ensure we can import dask.layers with only Dask's required dependencies. This won't cover non top-level imports (e.g. importing something inside a __dask_distributed_pack__ method) but it does provide us some good automated import coverage
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for taking this on @rjzamora! Generally I like the approach of moving layers to a separate module where we can test top-level imports.
Because of the large diff would you mind giving a brief high-level summary of the changes? It looks like the Layer classes were moved around and we introduced CallableLazyImport -- was there anything else that changed?
dask/tests/test_distributed.py
Outdated
| else: | ||
| # Maually remove the target library | ||
| sys.modules.pop(mod) |
There was a problem hiding this comment.
Modifying sys.modules in this way makes me a little uncomfortable -- though maybe this is actually fine
Since the cluster fixture creates a new process with the launched scheduler I wouldn't expect anything that's not a distributed dependency to be imported. What happens if we instead just assert that none of the initial sys.modules start with self.pattern?
There was a problem hiding this comment.
Sorry - I should have commented about this earlier. I did start off by adding the assertion you suggest above. However, the test was failing for the _array_creation case (pandas is not a problem). It seems that numpy is imported somewhere outside of this test, and so I added the logic to manually "clean" sys.modules (rather than looking into it further) :/
dask/layers.py
Outdated
| class CallableLazyImport: | ||
| """Function Wrapper for Lazy Importing""" | ||
|
|
||
| def __init__(self, function_path): | ||
| split_path = function_path.split(".") | ||
| self.module = ".".join(split_path[:-1]) | ||
| self.name = split_path[-1] | ||
|
|
||
| def __call__(self, *args, **kwargs): | ||
| func = getattr(import_module(self.module), self.name) | ||
| return func(*args, **kwargs) |
There was a problem hiding this comment.
This looks new. Just to confirm, this is so we can materialize the graph on the scheduler without importing, say, pandas?
There was a problem hiding this comment.
Right - I am very interested in feedback on this. The idea is to have a "callable" at graph materialization time without requiring the scheduler to acutally import anything from a something like pandas/numpy/dask.dataframe/dask.array.
There was a problem hiding this comment.
Gotcha, thanks for confirming. I'll admit I'm not super excited about adding a new wrapper class for this, but I also can't think of a better solution at the moment.
It's a shame we won't always have distributed installed when this code is run -- otherwise we could use import_term
There was a problem hiding this comment.
Hmm - I actually think we can do that with a small tweak (by adding a deserializing parameter, as we do in Blockwise). I’ll try this out tonight.
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Thanks for the review @jrbourbeau ! Yes - The high-level overview is that this PR simply moves existing Layer logic into the new
I tried to avoid changing the code, when possible. However, in order to avoid importing functions like This PR also uses your testing idea in a new |
dask/layers.py
Outdated
| # dsk.update(toolz.valmap(dumps_task, raw)) | ||
|
|
||
| # dependencies.update( | ||
| # {k: keys_in_tasks(dsk, [v], as_list=True) for k, v in raw.items()} | ||
| # ) | ||
|
|
||
| # if state["annotations"]: | ||
| # cls.unpack_annotations(annotations, state["annotations"], raw.keys()) |
There was a problem hiding this comment.
If I'm looking at the PR diff correctly, it seems we're also updating BroadcastJoinLayer.__dask_distributed_unpack__ here -- just wanted to check that this is intentional. It looks like maybe things were incorrect before?
There was a problem hiding this comment.
I think there are a few things going on here, and I'm not sure if the current logic is "wrong." This code was originally copy-pasteed (mostly) from the blockwise and/or shuffle code. It seems that recent Layer/annotation work modified the blockwise and shuffle __dask_distributed_unpack__ definitions, but not the broadcast join (not sure whether or not the broadcast-join PR was merged after those changes).
dask/layers.py
Outdated
| class CallableLazyImport: | ||
| """Function Wrapper for Lazy Importing""" | ||
|
|
||
| def __init__(self, function_path): | ||
| split_path = function_path.split(".") | ||
| self.module = ".".join(split_path[:-1]) | ||
| self.name = split_path[-1] | ||
|
|
||
| def __call__(self, *args, **kwargs): | ||
| func = getattr(import_module(self.module), self.name) | ||
| return func(*args, **kwargs) |
There was a problem hiding this comment.
Gotcha, thanks for confirming. I'll admit I'm not super excited about adding a new wrapper class for this, but I also can't think of a better solution at the moment.
It's a shame we won't always have distributed installed when this code is run -- otherwise we could use import_term
| "dask.layers", | ||
| "CreateArrayDeps", |
There was a problem hiding this comment.
We don't have to change anything in this PR, but seeing this alongside how we're handling CallableLazyImport makes me wonder if we should always use fully-qualified names (e.g. dask.layers.CreateArrayDeps)
There was a problem hiding this comment.
I'm reminded of the syntax for entry points, e.g. dask.layers:CreateArrayDeps. I've never particularly liked the :, but perhaps there is a reason for it that I am unaware of.
There was a problem hiding this comment.
Yeah - I agree that it is worth the extra split logic to use the fully-qualified names. However, I agree it would be less confusing to make that change in a separate PR.
|
Also cc @ian-r-rose if you get a chance to look at this |
|
Okay - the last batch of changes avoids the use of |
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks for all your work on this @rjzamora!
Motivated by #7374
Adds
layer_materialize_moduleandlayer_materialize_classproperties tohighlevelgraph.Layer. These properties are used withinHighLevelGraph.__dask_distributed_pack__so that a different class (usually a light-weight parent class) can be used for unpacking and graph materialization.In order to leverage the new properties for shuffle operations, this PR adds a new
dask/layers.pyfile, where lightweight versions ofSimpleShuffleLayerandShuffleLayerhave been defined. The "full" versions of these classes, indask.dataframe.shuffle, are now subclasses. The new lightweight classes only include the required logic for unpacking and layer materialization, and they only import functions/classes fromdask.daskandtlz.Since we cannot import functions like
_concatandshuffle_groupinlayers.py, we also userun_ext_function(new in this PR) for all materialized tasks so that we can build the graph with serialized functions. Note: If we try to specify a serialized function as a callable object in a task, thestringify_collection_keyspass will misinterpret those tasks as collection keys. I am very open to suggestions on other ways to avoid this problem. However, if we do avoid the stringify issue, we would still need to modifydistributed.worker.dumps_taskto optionally skip thedumps_funcstep (since the functions will already need to be serialized).