Conversation
| annotations = annotations or {} | ||
|
|
||
| if "priority" not in annotations: | ||
| _splits = self.get_split_keys() | ||
|
|
||
| def _set_prio(key): | ||
| if key in _splits: | ||
| return 1 | ||
| return 0 | ||
|
|
||
| annotations["priority"] = _set_prio | ||
| super().__init__(annotations=annotations) |
There was a problem hiding this comment.
I find the annotations API a bit unintuitive. I would've expected a mapping like
{"key": {"priority": 42}}
but instead it is a
{"priority": {"key": 42}}
More or less it's the same thing but I find myself more naturally iterating over keys than over annotations
| if "priority" not in annotations: | ||
| _splits = self.get_split_keys() | ||
|
|
||
| def _set_prio(key): | ||
| if key in _splits: | ||
| return 1 | ||
| return 0 | ||
|
|
||
| annotations["priority"] = _set_prio | ||
| super().__init__(annotations=annotations) |
There was a problem hiding this comment.
This change is needed because the distributed scheduler does not handle the expanded annotations
|
See #9994 for an intermediate step with the functional changes w/out dropping the actual code |
17ee0e3 to
385e2cf
Compare
| def __reduce__(self): | ||
| """Default serialization implementation, which materializes the Layer""" | ||
| return (MaterializedLayer, (dict(self),)) |
There was a problem hiding this comment.
There is actually no need to have any custom reducers. The old behavior was to materialize all layers by default when pickled. That's clearly not what we're after
0cf0aac to
a1543ae
Compare
dask/layers.py
Outdated
| annotations = annotations or {} | ||
| self._split_keys = None | ||
| if "priority" not in annotations: | ||
| annotations["priority"] = self._key_priority | ||
|
|
||
| self._split_keys_set = set(self.get_split_keys()) | ||
| super().__init__(annotations=annotations) | ||
|
|
||
| def _key_priority(self, key): | ||
| if key in self._split_keys_set: | ||
| return 1 | ||
| return 0 |
There was a problem hiding this comment.
Thanks for the explanation. I'm just learning now that annotations can be callable - cool :)
rjzamora
left a comment
There was a problem hiding this comment.
Things are looking good to me @fjetter - I appreciate your work on this!
My only real concern is about backwards compatibility: I have heard of at least one Dask/RAPIDS user deploying a scheduler process on GPU-free hardware. I'm worried that users like this will start to see errors when pickle.loads starts to require device-memory allocations on the scheduler.
I think it is fine for distributed to tighten the official environment and hardware requirements. However, I think it is important that we acknowledge that this decision is being made, and that the decision will likely break some real-world code. Note that I was originally hoping that we could provide a temporary escape hatch that would materialized the graph and use legacy communication. However, it is not clear to me that the pre-HLG graph-communication logic still exists?
|
My sense is that we're choosing to be more picky about serialization in order to reduce maintenance burden. We acknowledge that there are a few cases where this will inconvenience users but we think that those cases are few enough that the benefits outweigh the costs. I think that this decision was made months ago and now we're done with execution. It's time to pull the trigger I think. If you'd like to support that user then maybe there's something that can be done within the pickle protocol outside of Dask. I'd very much like for us to get out of this game. It's been way too expensive for the project in the past to justify continuing to pay that cost. |
I certainly agree with this - Sorry if my comment made it seem like I'm trying to block anything. Is there a place in the documentation (probably distributed/deployment) where it can be made clear that the environment should be consistent everywhere in the cluster? Or does it already say this somewhere? It probably makes sense for the RAPIDS documentation to cover the GPU-on-the-scheduler detail.
I'm only interested in supporting a workaround if it was a few lines long. Otherwise, I just want to make sure we are documenting the new requirements (so there is a clear reference when the issues come in). |
There is a section in the deploy documentation stating
https://docs.dask.org/en/stable/deployment-considerations.html This section does not mention anything about hardware requirements. I guess this specifically affects GPUs. is there anything else affected? I guess it would make sense to have a GPU section somewhere in https://docs.dask.org/en/stable/deploying.html or to add something in the GPU specific section in https://docs.dask.org/en/stable/gpu.html |
|
If the "GPU on scheduler" truly is a big issue and is considered a show stopper for some users, there is still an escape hatch... I would strongly prefer not doing this to both keep complexity low and to maintain flexibility but it is possible. |
|
I appreciate the clarification @fjetter - My sense is that the relevant documentation is already sufficient on the Dask side and that the escape hatch you are describing is probably not worth the effort. |
|
I'm +1 on this change with the goal of reduced maintenance burden. However it will be breaking for a subset of users and I would like us to make sure that failure modes are as pleasant as possible for those users. Common things I've seen in the wild:
My worry is that users in these groups will run into unpleasant and hard to debug tracebacks (this is usually the user experience we get when we change serialisation things). It would be nice if we can publish a documentation page or blog post with info on this change and how users can update their deployments to comply with the new harder restrictions. Then either catch exceptions related to this and raise a new error with a link to the docs, or include example tracebacks in the docs page so that folks will find it via googling the error. |
|
|
||
| # Parse transition log for processing tasks | ||
| log = [ | ||
| eval(l[0])[0] | ||
| for l in s.transition_log | ||
| if l[1] == "processing" and "simple-shuffle-" in l[0] | ||
| ] | ||
|
|
||
| # Make sure most "split" tasks are processing before | ||
| # any "combine" tasks begin | ||
| late_split = np.quantile( | ||
| [i for i, st in enumerate(log) if st.startswith("split")], 0.75 | ||
| ) | ||
| early_combine = np.quantile( | ||
| [i for i, st in enumerate(log) if st.startswith("simple")], 0.25 | ||
| ) | ||
| assert late_split < early_combine |
There was a problem hiding this comment.
This test was not only flaky but also useless. The change to the shuffle split priorities that were introduced here #7846 are actually quite important but this test is not testing this. The time when a task transitions to processing on scheduler side is quite irrelevant since we're scheduling greedily. split tasks are always transitioned immediately after the grouper ended up in memory. The only case where this might not be true is if a combine task would "unlock" at the same time such that there is a race in the transition. Actual execution order is however ensured by the worker.
This test is a bit of a chicken egg situation. the test tried to capture this behavior without actually asserting that priorities is set. It still used a rather low level API. I kept the intention of not asserting on priorities but had to fiddle with the worker internals a bit. If this turns out to be unstable/hard to maintain we can reconsider. The important thing is: This test is now sensitive to the priorities set in the shuffle layer 🎉
There was a problem hiding this comment.
Nice! I never liked this test very much :)
dask/layers.py
Outdated
| # Return SimpleShuffleLayer "split" keys | ||
| return [ | ||
| stringify((self.split_name, part_out, part_in)) | ||
| (self.split_name, part_out, part_in) |
There was a problem hiding this comment.
💢 😡
callables are evaluated on the actual, non-stringified keys. If this is a sane behavior or not is something I cannot truly judge but this is how it's been before. This is changing now because the __expanded_annotations__ sentinel no longer exists.
There was a problem hiding this comment.
Evaluation on non-stringified keys makes sense to me. For example we might want to prioritize based on the index number ("read-parquet", 15) -> 15
There was a problem hiding this comment.
Evaluation on non-stringified keys makes sense to me. For example we might want to prioritize based on the index number ("read-parquet", 15) -> 15
I think from a user perspective, this definitely makes sense. It's all just a bit confusing when dealing with internals.
No objections. Is that something that the RAPIDS team can help with? |
Sure happy to help. We'll definitely need input from @fjetter though. |
6682361 to
12f1db5
Compare
|
From what I can tell, I found and fixed the last regression. Currently running another set of benchmarks to confirm. If they don't flag anything suspicious, I will move forward with merging this Monday morning unless there are any objections until then. |
I think we are comfortable with this on the RAPIDS side as long as today's Dask release goes smoothly. We will want to pin the RAPIDS-23.04 release to dask-2023.3.2 to avoid a possible scramble if the pickle move causes any problems. |
|
Benchmarks came back and look good. For a couple of workflows we even get a speedup due to reduced overhead.
|
|
alright, all tests passed, I reverted the environment files. I think we're good to go |
|
🎉
…On Mon, Mar 27, 2023 at 10:50 AM Florian Jetter ***@***.***> wrote:
Merged #9988 <#9988> into main.
—
Reply to this email directly, view it on GitHub
<#9988 (comment)>, or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTAZWNVMERBPUWAM4JLW6GZL7ANCNFSM6AAAAAAVDDVBV4>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
|
Thanks for tackling this @fjetter! |
Original PR #9988 by fjetter Original: dask/dask#9988
Merged from original PR #9988 Original: dask/dask#9988
Original PR #9988 by fjetter Original: dask/dask#9988
Merged from original PR #9988 Original: dask/dask#9988

Counterpart to dask/distributed#7564