Skip to content

Moving high level graph pack/unpack Dask #7179

Merged
jrbourbeau merged 25 commits intodask:masterfrom
madsbk:hlg_pack_move_to_dask
Feb 26, 2021
Merged

Moving high level graph pack/unpack Dask #7179
jrbourbeau merged 25 commits intodask:masterfrom
madsbk:hlg_pack_move_to_dask

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Feb 5, 2021

Important: remove CI hack before merge!

This PR moves all pack and unpacking of high level graphs from Distributed to Dask. This work is based on the discussion in dask/distributed#4406.
Beside the move, unpack now doesn't modify the input graph and dependencies, instead the unpacked graph and dependencies a returned, which makes it possible to delegate the pack and unpack of annotations to the base class.

Notice, this PR includes a hack that makes CI install dask/distributed#4489 before testing. This should be removed before merging.

  • Tests added / passed
  • Passes black dask / flake8 dask
  • Remove CI hack

cc. @sjperkins, @ian-r-rose, @jrbourbeau

@madsbk madsbk force-pushed the hlg_pack_move_to_dask branch from 8a31de8 to 38413e6 Compare February 5, 2021 14:41
@madsbk madsbk force-pushed the hlg_pack_move_to_dask branch 3 times, most recently from 96f4709 to 07af9a2 Compare February 8, 2021 11:41
@madsbk madsbk force-pushed the hlg_pack_move_to_dask branch from 3827be1 to 0f4d5d6 Compare February 8, 2021 13:16
@madsbk madsbk marked this pull request as ready for review February 8, 2021 13:33
@madsbk madsbk changed the title [WIP] Moving high level graph pack/unpack Dask [REVIEW] Moving high level graph pack/unpack Dask Feb 8, 2021
Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking this on @madsbk, this looks good to me

@madsbk madsbk requested a review from ian-r-rose February 12, 2021 08:47
@sjperkins
Copy link
Member

Thanks for working on this @madsbk. I aim to provide feedback later today.

madsbk and others added 3 commits February 12, 2021 12:27
Comment on lines +208 to +219
expanded[a] = v
else:
if not keys_stringified:
keys = [stringify(k) for k in keys]
keys_stringified = True

expanded[a] = dict.fromkeys(keys, v)

# Merge the expanded annotations with the existing annotations mapping
for k, v in expanded.items():
v.update(annotations.get(k, {}))
annotations.update(expanded)
Copy link
Collaborator

@crusaderky crusaderky Feb 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
expanded[a] = v
else:
if not keys_stringified:
keys = [stringify(k) for k in keys]
keys_stringified = True
expanded[a] = dict.fromkeys(keys, v)
# Merge the expanded annotations with the existing annotations mapping
for k, v in expanded.items():
v.update(annotations.get(k, {}))
annotations.update(expanded)
else:
if not keys_stringified:
keys = [stringify(k) for k in keys]
keys_stringified = True
v = dict.fromkeys(keys, v)
try:
v.update(annotations[a])
except KeyError:
pass
annotations[a] = v

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of collision, the original annotations silently win; is it deliberate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be a case of the original annotations (i.e. supplied as kwargs to Client.{submit, persist} overriding layer annotations. @ian-r-rose may know more here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be a case of the original annotations (i.e. supplied as kwargs to Client.{submit, persist} overriding layer annotations

Yes, this was also what I was thinking. @ian-r-rose what do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be a case of the original annotations (i.e. supplied as kwargs to Client.{submit, persist} overriding layer annotations.

Sorry for the slow reply here. Yes, that was my understanding as well. However, I don't have a strong intuition of which should win, and I can think of an argument for both (more specific annotation vs later-applied annotation).

@sjperkins
Copy link
Member

sjperkins commented Feb 18, 2021

@madsbk Here's my suggestion, I wasn't able to use your fork as a base for a PR:

sjperkins@7fcd644

Summary of changes:

  1. BasicLayer renamed to MaterialisedLayer.
  2. MaterialisedLayer contains code for packing materialized graphs (a.k.a. python dicts) to the scheduler (previously in Layer)
  3. __dask_distributed_{pack, unpack}__ now calls super() to pack/unpack annotations.
  4. __dask_distributed_{pack, unpack}__ now return dicts.
  5. `_dask_distributed_anno{pack,unpack} removed.
  6. __dask_distributed_{pack, unpack}__ implemented for ParquetSubgraph

More broadly, MaterialisedLayer is now the single point of responsibility for transmitting materialised graphs. Layer returns to a more abstract form (is_materialised return False).

py.test -s -vvv distributed/protocol/tests/test_highlevelgraph.py succeeds for me on the distributed branch.

@madsbk
Copy link
Contributor Author

madsbk commented Feb 18, 2021

I have a hard time seeing the advantages of your approach. Personally, I was hoping that we could avoid introducing a new protocol/requirements for the packed state returned by __dask_distributed_pack__(). Now that we have a class hierarchy, why not handle the pack/unpack through inheritance without having to define a shared state dict?

@sjperkins
Copy link
Member

I have a hard time seeing the advantages of your approach. Personally, I was hoping that we could avoid introducing a new protocol/requirements for the packed state returned by __dask_distributed_pack__(). Now that we have a class hierarchy, why not handle the pack/unpack through inheritance without having to define a shared state dict?

I think I understand that you're asking why the materialised packing behaviour has been moved into MaterialisedLayer? If so, its an attempt to separate interface from implementation. In that sense, Layer is the interface while the derived classes are implementations of that interface. The reason for this is that Implementing materialised behaviour in the interface (base class) could potentially constrain the behaviour of derived classes in future: better to place in the behaviour in a specific implementation class.

I get that it may seem like quibbling, but inheritance hierarchies can get out of hand.

@madsbk
Copy link
Contributor Author

madsbk commented Feb 18, 2021

I get that it may seem like quibbling, but inheritance hierarchies can get out of hand.

I agree inheritance hierarchies can get out of hand but now that we have a class hierarchy and we have a perfectly suited default implementation that is compatible with any conceivable derived class, I think it makes sense. Particularly, if the alternative is an introduction of a key scheme for the packed state dict.

@sjperkins
Copy link
Member

I get that it may seem like quibbling, but inheritance hierarchies can get out of hand.

I agree inheritance hierarchies can get out of hand but now that we have a class hierarchy and we have a perfectly suited default implementation that is compatible with any conceivable derived class, I think it makes sense. Particularly, if the alternative is an introduction of a key scheme for the packed state dict.

OK, I see your point. I'm happy with the PR as it stands then.

As suggested by @sjperkins, this makes it easier to extend the API
in the future.
Layer.__reduce__() isn't used by the client -> scheduler communication
any more.
Previously, BasicLayer could be partial materialized, which isn't
possible any more. Now BasicLayer is always materialized.
@madsbk
Copy link
Contributor Author

madsbk commented Feb 19, 2021

@sjperkins, thanks for the discussion I have added your suggestions to let __dask_distributed_unpack__ return a dict and renamed BasicLayer -> MaterializedLayer.

Also @crusaderky, @ian-r-rose, thanks for the reviews.

@jrbourbeau, I think this is ready to be merged after the removal of the CI hack. Could you take a look and if you agree, we should revert continuous_integration/scripts/install.sh and merge this PR and dask/distributed#4489

madsbk added a commit to madsbk/distributed that referenced this pull request Feb 19, 2021
@madsbk
Copy link
Contributor Author

madsbk commented Feb 24, 2021

@jrbourbeau, I think we should merge this soon. Can you revert continuous_integration/scripts/install.sh and merge this PR and dask/distributed#4489 ?

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delayed review, thank you for the ping @madsbk. Overall this PR and dask/distributed#4489 look good to me. My primary comment is that there are a few places where I think using the full annotations term instead of anno for shorthand will improve code clarity. Otherwise, I think we're good to revert the temporary CI changes here and merge.

@jrbourbeau
Copy link
Member

Hmm actually it looks like there's an issue with how we're handling annotations (see CI over in dask/distributed#4489 -- for example this CI build)

@madsbk madsbk force-pushed the hlg_pack_move_to_dask branch from 4db5b83 to b6e0d20 Compare February 25, 2021 11:19
@jrbourbeau jrbourbeau changed the title [REVIEW] Moving high level graph pack/unpack Dask Moving high level graph pack/unpack Dask Feb 26, 2021
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @madsbk! This, and the companion PR in distributed, look good to me : )

Since we may be releasing tomorrow (xref dask/community#129 (comment)), and the changes here are an internal refactor instead of addressing user-facing issues, I'm inclined to wait a day before merging these PRs (until after the release) to give the changes here a little time to simmer in the main branch before pushing them out to users

@madsbk
Copy link
Contributor Author

madsbk commented Feb 26, 2021

Since we may be releasing tomorrow (xref dask/community#129 (comment)), and the changes here are an internal refactor instead of addressing user-facing issues, I'm inclined to wait a day before merging these PRs (until after the release) to give the changes here a little time to simmer in the main branch before pushing them out to users

Sounds good!

@jrbourbeau
Copy link
Member

Note I've opened up a follow-up PR at #7279 to resolve a CI issue

@jakirkham
Copy link
Member

Thanks James! 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants