Skip to content

Drop distributed pack#9988

Merged
fjetter merged 6 commits intodask:mainfrom
fjetter:drop_distributed_pack
Mar 27, 2023
Merged

Drop distributed pack#9988
fjetter merged 6 commits intodask:mainfrom
fjetter:drop_distributed_pack

Conversation

@fjetter
Copy link
Copy Markdown
Member

@fjetter fjetter commented Feb 21, 2023

Counterpart to dask/distributed#7564

Comment on lines +391 to +396
annotations = annotations or {}

if "priority" not in annotations:
_splits = self.get_split_keys()

def _set_prio(key):
if key in _splits:
return 1
return 0

annotations["priority"] = _set_prio
super().__init__(annotations=annotations)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the annotations API a bit unintuitive. I would've expected a mapping like

{"key": {"priority": 42}}

but instead it is a

{"priority": {"key": 42}}

More or less it's the same thing but I find myself more naturally iterating over keys than over annotations

Comment on lines +393 to +396
if "priority" not in annotations:
_splits = self.get_split_keys()

def _set_prio(key):
if key in _splits:
return 1
return 0

annotations["priority"] = _set_prio
super().__init__(annotations=annotations)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is needed because the distributed scheduler does not handle the expanded annotations

@fjetter fjetter mentioned this pull request Feb 23, 2023
@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Feb 23, 2023

See #9994 for an intermediate step with the functional changes w/out dropping the actual code

@fjetter fjetter force-pushed the drop_distributed_pack branch from 17ee0e3 to 385e2cf Compare March 10, 2023 15:14
Comment on lines -471 to -473
def __reduce__(self):
"""Default serialization implementation, which materializes the Layer"""
return (MaterializedLayer, (dict(self),))
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is actually no need to have any custom reducers. The old behavior was to materialize all layers by default when pickled. That's clearly not what we're after

@fjetter fjetter force-pushed the drop_distributed_pack branch from 0cf0aac to a1543ae Compare March 21, 2023 14:45
dask/layers.py Outdated
Comment on lines +391 to +402
annotations = annotations or {}
self._split_keys = None
if "priority" not in annotations:
annotations["priority"] = self._key_priority

self._split_keys_set = set(self.get_split_keys())
super().__init__(annotations=annotations)

def _key_priority(self, key):
if key in self._split_keys_set:
return 1
return 0
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth pointing out that an earlier version of this caused a regression earlier see #9994
and #10041 with the fix and a reason why this wasn't merged, yet. It accidentally triggered materialization, no longer an issue with this PR

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. I'm just learning now that annotations can be callable - cool :)

@jrbourbeau jrbourbeau mentioned this pull request Mar 21, 2023
6 tasks
Copy link
Copy Markdown
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Things are looking good to me @fjetter - I appreciate your work on this!

My only real concern is about backwards compatibility: I have heard of at least one Dask/RAPIDS user deploying a scheduler process on GPU-free hardware. I'm worried that users like this will start to see errors when pickle.loads starts to require device-memory allocations on the scheduler.

I think it is fine for distributed to tighten the official environment and hardware requirements. However, I think it is important that we acknowledge that this decision is being made, and that the decision will likely break some real-world code. Note that I was originally hoping that we could provide a temporary escape hatch that would materialized the graph and use legacy communication. However, it is not clear to me that the pre-HLG graph-communication logic still exists?

@mrocklin
Copy link
Copy Markdown
Member

My sense is that we're choosing to be more picky about serialization in order to reduce maintenance burden. We acknowledge that there are a few cases where this will inconvenience users but we think that those cases are few enough that the benefits outweigh the costs. I think that this decision was made months ago and now we're done with execution. It's time to pull the trigger I think.

If you'd like to support that user then maybe there's something that can be done within the pickle protocol outside of Dask. I'd very much like for us to get out of this game. It's been way too expensive for the project in the past to justify continuing to pay that cost.

@rjzamora
Copy link
Copy Markdown
Member

rjzamora commented Mar 22, 2023

I think that this decision was made months ago and now we're done with execution. It's time to pull the trigger I think.

I certainly agree with this - Sorry if my comment made it seem like I'm trying to block anything. Is there a place in the documentation (probably distributed/deployment) where it can be made clear that the environment should be consistent everywhere in the cluster? Or does it already say this somewhere? It probably makes sense for the RAPIDS documentation to cover the GPU-on-the-scheduler detail.

If you'd like to support that user...

I'm only interested in supporting a workaround if it was a few lines long. Otherwise, I just want to make sure we are documenting the new requirements (so there is a clear reference when the issues come in).

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 22, 2023

Is there a place in the documentation (probably distributed/deployment) where it can be made clear that the environment should be consistent everywhere in the cluster? Or does it already say this somewhere?

There is a section in the deploy documentation stating

For Dask to function properly, the same set of Python packages, at the same versions, need to be installed on the scheduler and workers as on the client

https://docs.dask.org/en/stable/deployment-considerations.html

This section does not mention anything about hardware requirements. I guess this specifically affects GPUs. is there anything else affected? I guess it would make sense to have a GPU section somewhere in https://docs.dask.org/en/stable/deploying.html or to add something in the GPU specific section in https://docs.dask.org/en/stable/gpu.html

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 22, 2023

If the "GPU on scheduler" truly is a big issue and is considered a show stopper for some users, there is still an escape hatch...
After my refactoring the deserialization and materialization is static. For the most part this is also true for the annotations. There is a very clean separation between deserialization, graph materialization and consolidation with the scheduler state.
All of this could be "offloaded" to a Worker which has the proper environment. Of course, this would require the worker to send back the fully materialized graph and more which is not optimal but possible without the complexity of the pack/unpack protocol.

I would strongly prefer not doing this to both keep complexity low and to maintain flexibility but it is possible.

@rjzamora
Copy link
Copy Markdown
Member

I appreciate the clarification @fjetter - My sense is that the relevant documentation is already sufficient on the Dask side and that the escape hatch you are describing is probably not worth the effort.

@jacobtomlinson
Copy link
Copy Markdown
Member

jacobtomlinson commented Mar 22, 2023

I'm +1 on this change with the goal of reduced maintenance burden. However it will be breaking for a subset of users and I would like us to make sure that failure modes are as pleasant as possible for those users.

Common things I've seen in the wild:

  • Users creating a different minimal environment for the scheduler
  • Users not including a GPU on scheduler nodes (but using the same software environment)

My worry is that users in these groups will run into unpleasant and hard to debug tracebacks (this is usually the user experience we get when we change serialisation things).

It would be nice if we can publish a documentation page or blog post with info on this change and how users can update their deployments to comply with the new harder restrictions. Then either catch exceptions related to this and raise a new error with a link to the docs, or include example tracebacks in the docs page so that folks will find it via googling the error.

Comment on lines -720 to -732

# Parse transition log for processing tasks
log = [
eval(l[0])[0]
for l in s.transition_log
if l[1] == "processing" and "simple-shuffle-" in l[0]
]

# Make sure most "split" tasks are processing before
# any "combine" tasks begin
late_split = np.quantile(
[i for i, st in enumerate(log) if st.startswith("split")], 0.75
)
early_combine = np.quantile(
[i for i, st in enumerate(log) if st.startswith("simple")], 0.25
)
assert late_split < early_combine
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was not only flaky but also useless. The change to the shuffle split priorities that were introduced here #7846 are actually quite important but this test is not testing this. The time when a task transitions to processing on scheduler side is quite irrelevant since we're scheduling greedily. split tasks are always transitioned immediately after the grouper ended up in memory. The only case where this might not be true is if a combine task would "unlock" at the same time such that there is a race in the transition. Actual execution order is however ensured by the worker.

This test is a bit of a chicken egg situation. the test tried to capture this behavior without actually asserting that priorities is set. It still used a rather low level API. I kept the intention of not asserting on priorities but had to fiddle with the worker internals a bit. If this turns out to be unstable/hard to maintain we can reconsider. The important thing is: This test is now sensitive to the priorities set in the shuffle layer 🎉

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! I never liked this test very much :)

dask/layers.py Outdated
# Return SimpleShuffleLayer "split" keys
return [
stringify((self.split_name, part_out, part_in))
(self.split_name, part_out, part_in)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💢 😡

callables are evaluated on the actual, non-stringified keys. If this is a sane behavior or not is something I cannot truly judge but this is how it's been before. This is changing now because the __expanded_annotations__ sentinel no longer exists.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evaluation on non-stringified keys makes sense to me. For example we might want to prioritize based on the index number ("read-parquet", 15) -> 15

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Evaluation on non-stringified keys makes sense to me. For example we might want to prioritize based on the index number ("read-parquet", 15) -> 15

I think from a user perspective, this definitely makes sense. It's all just a bit confusing when dealing with internals.

@mrocklin
Copy link
Copy Markdown
Member

It would be nice if we can publish a documentation page or blog post with info on this change and how users can update their deployments to comply with the new harder restrictions. Then either catch exceptions related to this and raise a new error with a link to the docs, or include example tracebacks in the docs page so that folks will find it via googling the error.

No objections. Is that something that the RAPIDS team can help with?

@jacobtomlinson
Copy link
Copy Markdown
Member

Is that something that the RAPIDS team can help with?

Sure happy to help. We'll definitely need input from @fjetter though.

@fjetter fjetter force-pushed the drop_distributed_pack branch from 6682361 to 12f1db5 Compare March 24, 2023 16:48
@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 24, 2023

From what I can tell, I found and fixed the last regression. Currently running another set of benchmarks to confirm. If they don't flag anything suspicious, I will move forward with merging this Monday morning unless there are any objections until then.

@rjzamora
Copy link
Copy Markdown
Member

I will move forward with merging this Monday morning unless there are any objections until then.

I think we are comfortable with this on the RAPIDS side as long as today's Dask release goes smoothly. We will want to pin the RAPIDS-23.04 release to dask-2023.3.2 to avoid a possible scramble if the pickle move causes any problems.

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 27, 2023

Benchmarks came back and look good. For a couple of workflows we even get a speedup due to reduced overhead.

Ok, benchmark results are finally in as well. This time w/out any regressions

Wall Time (right side is null hypothesis, i.e. main vs. main to measure noise; Left side is this PR).

What we can see is that there is not much to see. This change is not intended to change scheduling behavior or speed anything up. These benchmarks mostly confirm that we're dispatching the proper computations.

There is one sizable performance improvement in the test case test_trivial_workload_should_not_cause_work_stealing which is indeed connected to the refactoring. This test case is generating a couple of thousand delayed objects and is computing them embarrassingly parallel. This refactoring is actually shaving off a couple of seconds in serialization time which is relatively speaking a big change for this workflow (from 12.5s down to 8s, i.e. ~36%). This also translate to other almost embarrassingly parallel graphs, e.g. test_set_index[1-p2p-False] is about ~10-15s faster. Nice but relatively speaking not as exciting.

image

Memory comparisons do not show any differences beyond noise.

Benchmark results available at

See dask/distributed#7564 (comment)

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Mar 27, 2023

alright, all tests passed, I reverted the environment files. I think we're good to go

@fjetter fjetter merged commit ec3ffed into dask:main Mar 27, 2023
@fjetter fjetter deleted the drop_distributed_pack branch March 27, 2023 15:50
@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Mar 27, 2023 via email

@rjzamora
Copy link
Copy Markdown
Member

Thanks for tackling this @fjetter!

@fjetter fjetter mentioned this pull request Mar 31, 2023
ryantqiu pushed a commit to snorkel-marlin-repos/dask_dask_pr_9988_fef89ce0-81c2-4435-90d7-8b4870eaa94c that referenced this pull request Oct 1, 2025
Original PR #9988 by fjetter
Original: dask/dask#9988
ryantqiu added a commit to snorkel-marlin-repos/dask_dask_pr_9988_fef89ce0-81c2-4435-90d7-8b4870eaa94c that referenced this pull request Oct 1, 2025
Merged from original PR #9988
Original: dask/dask#9988
ryantqiu pushed a commit to snorkel-marlin-repos/dask_dask_pr_9988_094099c2-0caa-4e58-803c-9d5ca0706c54 that referenced this pull request Oct 2, 2025
Original PR #9988 by fjetter
Original: dask/dask#9988
ryantqiu added a commit to snorkel-marlin-repos/dask_dask_pr_9988_094099c2-0caa-4e58-803c-9d5ca0706c54 that referenced this pull request Oct 2, 2025
Merged from original PR #9988
Original: dask/dask#9988
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants