Conversation
fjetter
left a comment
There was a problem hiding this comment.
Currently, there are still failing tests
test_prefer_short_dependentsrequires me to adjust the "finish now" logic to account for the artificial root tasktest_order_with_equal_dependentsTODOtest_run_smaller_sectionssee comment
| | | ||
| c3 | ||
|
|
||
| Prefer b1 over c1 because it won't stick around waiting for d1 to complete |
There was a problem hiding this comment.
I believe this explanation is actually wrong. I think c3 should be preferred to run before b2 because c3->a1 is the longest critical path.
I'm not even entirely convinced that d1 should run before b2. I believe the memory pressure is identical and b2 has a longer way to walk. Anyway, this is how it currently works and I think that's fine
| / \ /| | / | ||
| a c e cc | ||
|
|
||
| Prefer to run acb first because then we can get that out of the way |
There was a problem hiding this comment.
This is currently favoring finishing aa before anything else. I believe this is a matter of finding the appropriate target functions.
Considering that it was xfailed I'm not terribly concerned but I will have a look again.
There was a problem hiding this comment.
This example is a bit ambiguous. There is a "best ordering" for single threaded execution and one for multi threaded execution.
Single threaded
This PR
| Step | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |
|---|---|---|---|---|---|---|---|---|---|
| T1 | e | c | d | cc | bb | aa | dd | a | b |
| Pressure | 1 | 2 | 2 | 3 | 4 | 3 | 3 | 4 | 2 |
What the docs say
| Step | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 |
|---|---|---|---|---|---|---|---|---|---|
| T1 | c | a | b | e | d | cc | dd | bb | aa |
| Pressure | 1 | 2 | 2 | 3 | 2 | 3 | 3 | 3 | 2 |
Multi threaded
This PR
| Step | 0 | 1 | 2 | 3 | 4 |
|---|---|---|---|---|---|
| T1 | e | d | bb | aa | b |
| T2 | c | cc | dd | a | x |
| Pressure | 2 | 3 | 3 | 2 | 2 |
What the docs say
| Step | 0 | 1 | 2 | 3 | 4 |
|---|---|---|---|---|---|
| T1 | a | b | d | bb | aa |
| T2 | c | e | cc | dd | x |
| Pressure | 2 | 2 | 3 | 4 | 2 |
There was a problem hiding this comment.
Typically dask.order acts as if we were executing sequentially so what the docs say is not wrong. However, in practice, most will be running in a multi threaded environment. This behavior may, of course, look again very different for more threads...
|
At least on smaller graphs, this is fixing the root task problem mentioned in #9995 by ensuring to finish a computation branch before opening a new one The xarray style reduction reported in #10384 now looks like I will move to real world testing/benchmarking tomorrow. If that looks good, I will have to do a little refactoring since there is now plenty of dead code I'm effectively skipping due to the root task addition. 🤞 |
| if len(root_nodes) > 1: | ||
| root = "root-node" | ||
| dsk[root] = (object(), *root_nodes) | ||
| dependencies[root] = root_nodes | ||
| o = order(dsk, dependencies) | ||
| del o[root] | ||
| return o |
There was a problem hiding this comment.
This is by far the most interesting change in this PR since it is effectively disabling a lot of logic. For one, this is disabling the entire skip_root_node and also specialized finish_now things.
What's even more critical is that this effectively disables many root-node dependent graph metrics (stuff like max_dependencies, min_dependencies) since all nodes now share the same long term goal
There was a problem hiding this comment.
There is an interesting effect, though. The partition_keys function is depending on min_dependencies, or rather the difference between min_dependencies and total_dependencies. If min_dependencies was removed entirely, this function would become negative, reverting every decision.
|
For the first variation in dsk2, an additional node is appended at the leafs to generate some asymmetry. This PR differs from main in which branch of the asymmetric end is walked first. This branch favors the longer branch while main favors the shorter one. The fourth variation removes an additional leaf and only keeps a single node after the reduction It is asserted that those two nodes run exactly after each other (asserting that the two branches run consecutively would've been the better and correct assertion for the above variations as well and would've maintained best ordering for #5859 (comment)) |
|
Ok, I got through the unit tests and think everything is in order. I got sidetracked a little when looking at the cost functions because the graph_metrics confused me a little. Turns out (obvious in hindsight) they are only accurate for trees and are double counting quite a bit. Oh well, now there are a couple of tests. I also changed the definition slightly in a way that makes more sense to me. I may break this out into a dedicated PR, this is quite unrelated to the important stuff in here |
|
Cool. How excited should I be about this? "Solves all Xarray pain?" "Solves a couple of problems we've seen recently, but still lots more to do?" |
Let's wait for the first round of benchmarks before I answer :) |
|
OK, so benchmark results will be a little delayed. There is some kind of issue with our CI. From manual testing I can say that the xarray example that's been reported in pangeo-data/distributed-array-examples#2 now runs smoothly with constant memory. (Legend below: scaling the time dimension / number of tasks by that factor; The green line is only constant because we're spilling) The up side is that on main, the 1.0 measure was already saturating memory fully... Looks like I have some more work to do |
|
Well, at the very least simulated memory pressure, i.e. the number of tasks a single threaded scheduler would have to hold in memory, is behaving reasonably well. Not constant but no longer linear and even better than the DF version before. (This goes up to 100_000, i.e. 20x more than the reported example) |
|
I'm pretty sure that what I'm seeing right now is not an ordering problem anymore. The behavior appears to be quite flaky. I assume this there is some kind distributed scheduler state that is impacting the worker placement decision. For instance, I now managed to get another round of runs through where the small dataset actually misbehaved but I didn't have an issue scaling up to 5 times the data (i.e. total input data is 10x more than the cluster memory) |
|
OK, turns out it is an ordering problem after all. Order appears to not be stable. I patched the scheduler to compute a hash of the order result and compute the simulated pressure and log this. The hash is always different. The pressure is sometimes bad and when it is, the run is horrible. Just running the same thing in a for loop for a couple of times At least I know what to look for now... kind of |
|
Well... good and bad news. Order seems to be fine (ish?). If I disable optimization, everything seems to be very, very stable. However, once optimization is on, this becomes flaky! |
|
By best guess is that this is connected to keys somehow. This issue is only flaky if I regenerate the graph, i.e. regenerate the keys. We probably no longer have sufficient structural information to break ties w/out str comp after the fusion. |
|
TLDR I shouldn't have touch the graph_metrics. However, everything is sooo fragile that there is more work to do still Who was I to think touching the graph_metrics definition was a good idea? Stupid me! The two diverging orderings were indeed triggered by a str comparison but the fundamental bug was introduced by me changing the graph_metrics. I don't know if this will ever be relevant but for my own sanity, I want to document what happened. Interested readers may skip this safely. I was looking at why these two graphs diverged snapshot bad ordering The good ordering properly targeted the first reducer and (not shown here) would clean up after itself. That's perfect. The bad ordering regressed into a breadth first search that was releasing roots relatively quickly but wouldn't go deep, such that we cannot truly release anything until rather late. The divergence happens already at the very fist decision point. The initial root node is random (str compared) because the graph is pretty much symmetrical. It would then decide which one of the dependents to pick by using Lines 593 to 599 in 2b45b21 which leads us to these if/elif/else clauses Lines 640 to 652 in 2b45b21 The thing is, these dependents are not identical since one of them can already be run while the other has a dependency to load first. Depending on which one of the two is Line 645 in 2b45b21 order to work on this "single" before loading any more dependents. Effectively these singles is a deviation from the depth first search (which is sometimes necessary to free memory).The good path actually picks the key == key2 and 5 * partition_keys[item] > 22 * key path (where do these magical numbers come from) https://github.com/fjetter/dask/blob/e71033821ecedab254750bd6f158714ba589acf8/dask/order.py#L650which does almost the same thing but doesn't put a "single" relationship on the other dep. Alas, reverting the graph_metrics change ensure that both graphs are now producing identical orderings. The chosen path is actually the bad one... but once we move to larger graphs it appears to work again. This just shows that there is still some work to do... |
|
🎉
…On Thu, Sep 28, 2023, 10:10 AM Florian Jetter ***@***.***> wrote:
alright, the above example now works nicely with constant memory. The
below shows runs with increasingly more data/tasks. Runtime is increasing
linearily and memory is constant. Just the way it's supposed to.
[image: image]
<https://user-images.githubusercontent.com/8629629/271330591-dcb358b2-b5d2-435f-a117-bf0b0915ae7a.png>
—
Reply to this email directly, view it on GitHub
<#10535 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTAFEGDRD6DHJ3WB3KLX4WHMZANCNFSM6AAAAAA5IA6QQE>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
|
❤️ |
| def test_favor_longest_critical_path(abcde): | ||
| r""" | ||
|
|
||
| a | ||
| | | ||
| d b e | ||
| \ | / | ||
| c | ||
| a | ||
| | | ||
| d b e | ||
| \ | / | ||
| c |
There was a problem hiding this comment.
I believe the argument of the earlier version is wrong. Writing again out what the pressure is over time, yields
Single threaded
This PR
| Step | 0 | 1 | 2 | 3 | 4 |
|---|---|---|---|---|---|
| T1 | c | b | a | e | d |
| Pressure | 1 | 2 | 2 | 3 | 3 |
Main
| Step | 0 | 1 | 2 | 3 | 4 |
|---|---|---|---|---|---|
| T1 | c | d | e | b | a |
| Pressure | 1 | 2 | 3 | 3 | 3 |
Multi threaded
This PR
| Step | 0 | 1 | 2 |
|---|---|---|---|
| T1 | c | b | a |
| T2 | x | d | e |
| Pressure | 1 | 3 | 3 |
Main
| Step | 0 | 1 | 2 | 3 |
|---|---|---|---|---|
| T1 | c | d | b | a |
| T2 | x | e | x | x |
| Pressure | 1 | 3 | 3 | 3 |
Of course, this assumes that all tasks weigh the same. If we assumed that source tasks weight more, this is different but this is not universally true (often but not always).
Favoring the longest critical path, however, benefits execution times in multi threaded environments (fewer steps, again assuming equal weight / runtime)
fjetter
left a comment
There was a problem hiding this comment.
I left inline comments to explain all the changes that happened here. This can either guide a reviewer or help a forensic analysis in case something blows up after all
| if len(root_nodes) > 1: | ||
| # This is also nice because it makes us robust to difference when | ||
| # computing vs persisting collections | ||
| root = object() | ||
|
|
||
| def _f(*args, **kwargs): | ||
| pass | ||
|
|
||
| dsk[root] = (_f, *root_nodes) | ||
| dependencies[root] = root_nodes | ||
| o = order(dsk, dependencies) | ||
| del o[root] | ||
| return o |
There was a problem hiding this comment.
This is an important but still poorly tested change. It ensures that all graphs that are orderd will be reduced to a single task, i.e. even embarrassingly parallel graphs will virtually reduce into a single sink node.
Initially, this was motivated because I noticed different behavior between an array and a dataframe workload that I could track back to exactly this change in the topology.
However, zooming out, this is quite an important thing in general considering that distributed is frequently appending such a sink task whenever a compute is called on a collection. I believe a compute (vs persist) should not alter the way graphs are treated. Therefore, normalizing every graph like this seems to be the right approach (it could also be normalized in the other direction, of course)
| _, | ||
| _, |
There was a problem hiding this comment.
The "single sink node" normalization above also causes all min/max_dependencies to be equal for all tasks since they all reduce to the same node. From what I can tell, this is the strongest driver for the difference we've seen between the array vs dataframe reduction.
A follow-up PR can change the graph_metrics to no longer compute this. I'll leave this in for now.
|
|
||
| def finish_now_key(x): | ||
| """Determine the order of dependents that are ready to run and be released""" | ||
| return (-len(dependencies[x]), StrComparable(x)) | ||
|
|
||
| root_total_dependencies = total_dependencies[list(root_nodes)[0]] | ||
| # Computing this for all keys can sometimes be relatively expensive :( | ||
| partition_keys = { | ||
| key: ( | ||
| (min_dependencies - total_dependencies[key] + 1) | ||
| (root_total_dependencies - total_dependencies[key] + 1) | ||
| * (total_dependents - min_heights) | ||
| ) |
There was a problem hiding this comment.
This partition_keys cost function is from what I can tell a little fragile but it is extremely important. Without min_dependencies here, this needed a different baseline and taking the largest value of total_dependencies ensure that the first scalar of this product is strictly positive. If this is not guaranteed, this cost function will no longer be stable, particularly not if one of the two scalars is allowed to become zero.
This cost function makes me nervous because I don't fully understand the reasoning behind it. It's doing a decent job so I didn't modify it further.
There was a problem hiding this comment.
Should we add an in-line comment for the requirement to be strictly positive for the cost funtion to be stable?
There was a problem hiding this comment.
I don't understand this cost function sufficiently well to say whether this must be strictly positive. I merely discovered cases where zero caused very bad behavior. I think this PR is a better documentation than whatever I can write in the code.
| # These dicts use `partition_keys` as keys. We process them by placing the values | ||
| # in `outer_stack` so that the smallest keys will be processed first. | ||
| next_nodes = defaultdict(list) | ||
| later_nodes = defaultdict(list) |
There was a problem hiding this comment.
I couldn't find an actual use for later_nodes and couldn't find a workload or test that would be sensitive to this. Both next and later nodes are ordered by partition keys and if we trust partition_keys, I don't see a reason to further divide into next/later. In the interest of removing complexity, I deleted this code path.
| if skip_root_node: | ||
| seen = set(root_nodes) | ||
| else: | ||
| seen = set() # seen in an inner_stack (and has dependencies) |
There was a problem hiding this comment.
Due to the topology normalization we're now always in the skip_root_node branch which is a little confusing given the normalization approach.
This toggle ensure that when we're walking the graph, we're actually ignoring this root node, i.e. during metrics calculation we act as if it was there but when walking we're ignoring it.
I haven't found a decent way to remove this and decided to keep it. This logic allows us to keep otherwise connected subgraphs from not mixing which is overall a nice property to have.
Mildly inconsistent but effective.
|
|
||
| # NOTE: If this was too slow, LIFO would be a decent | ||
| # approximation | ||
| for single in sorted(singles_keys, key=lambda x: partition_keys[x]): |
There was a problem hiding this comment.
I didn't perform any measurements but empirically, I found that many singles are already part of result unless we're working in very dense all-to-all graphs.
In practice, this means that singles should be small here and sorting is feasible. In earlier versions I was simply iterating reversely which also provided really nice results but sorting it is much better.
This is mostly replacing the finish_now logic that I removed above. Cherry on top is that this may now even finish dangling linear branches instead of just single tasks. The downside is that it is no longer guaranteed
| if ( | ||
| add_to_inner_stack | ||
| and len(set_difference(dependents[parent], result)) > 1 | ||
| len( | ||
| set_difference( | ||
| set_difference(dependents[parent], result), | ||
| seen, | ||
| ) | ||
| ) | ||
| > 1 | ||
| ): |
There was a problem hiding this comment.
This is what makes the singles processing actually a little more eager than before. Apart from the topological normalization, probably the most impactful change in here.
We're typically only allowing singles to be processed if they are guaranteed to release their parent. this condition now also allows to run a single to run if the parent would be freed after the next tactical goal is reached (i.e. inner_stack is emptied).
This is very close to the idea of later_singles that is already implemented here but there are multiple code paths that create a new inner_stack without triggering a singles compute. I found this logic to be more reliable.
| if add_to_inner_stack and not inner_stack: | ||
| inner_stack = [dep] | ||
| inner_stack_pop = inner_stack.pop | ||
| seen_add(dep) | ||
| continue | ||
| key = partition_keys[dep] |
There was a problem hiding this comment.
No functional change, should just be a reduction of code duplication
| psingles = possible_singles[key] | ||
| for s in psingles: | ||
| singles[s] = item | ||
| vals -= psingles | ||
| next_nodes[key].append(vals) |
There was a problem hiding this comment.
This is another big-ish change. Previously, singles would only be considered for nodes with two dependents. This now allows us to detect dependents of nodes that split into many tasks to be treated as singles as well.
dask/order.py
Outdated
| 0 | ||
| | | ||
| 2 1 | ||
| 1 0 | ||
| \ / | ||
| 4 | ||
| 3 |
This has been fun to read every morning (certainly more interesting and promising than the daily news). Thanks! It's a shame we no longer write papers. This would be an interesting topic. |
|
It seems like the relative nature of the AB tests may bemisleading sometimes. Might be worth rethinking at some point. |
|
I intend to follow up here with with a smaller non-functional PR that cleans this up a little further like...
but I think all of this is better left for a follow up PR |
|
I have loved reading this, and it's very exciting! |
|
I intend to merge in about 24hrs unless there is a valid reason not to. If a review comes in after this, I will address any raised issues in a follow-up PR. Merging early allows this to sit on main (and be tested by dask engineers and our benchmarking suite) for a couple of days before the next release. |
|
+1
…On Mon, Oct 9, 2023 at 5:07 AM Florian Jetter ***@***.***> wrote:
I intend to merge in about 24hrs unless there is a valid reason not to. If
a review comes in after this, I will address any raised issues in a
follow-up PR.
Merging early allows this to sit on main (and be tested by dask engineers
and our benchmarking suite) for a couple of days before the next release.
—
Reply to this email directly, view it on GitHub
<#10535 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTCKIP3F73QUI7Z7DTTX6PEGRAVCNFSM6AAAAAA5IA6QQGVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTONJSG4YTQOBRGI>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
hendrikmakait
left a comment
There was a problem hiding this comment.
Thanks for adding the comments. They make it easier feasible to understand what's going on. Generally, the commentary and testing seem reasonable to me. I haven't done an in-depth check of the functional changes, but at least there is nothing obvious going awry. This is good enough for me to merge 👍
|
|
||
| def finish_now_key(x): | ||
| """Determine the order of dependents that are ready to run and be released""" | ||
| return (-len(dependencies[x]), StrComparable(x)) | ||
|
|
||
| root_total_dependencies = total_dependencies[list(root_nodes)[0]] | ||
| # Computing this for all keys can sometimes be relatively expensive :( | ||
| partition_keys = { | ||
| key: ( | ||
| (min_dependencies - total_dependencies[key] + 1) | ||
| (root_total_dependencies - total_dependencies[key] + 1) | ||
| * (total_dependents - min_heights) | ||
| ) |
There was a problem hiding this comment.
Should we add an in-line comment for the requirement to be strictly positive for the cost funtion to be stable?
| return task | ||
|
|
||
|
|
||
| def sanitize_dsk(dsk): |
There was a problem hiding this comment.
nit: Adding some type hints to new functions (and order) would be a nice addition. The code is already difficult enough to understand if you know what data types to expect.
There was a problem hiding this comment.
nit: Adding some type hints to new functions (and order) would be a nice addition. The code is already difficult enough to understand if you know what data types to expect.
I intend to follow up with this (also some variable renaming) but this is definitely out of scope for this PR
|
In case anybody is still stumbling over this PR, we recently merged a follow up #10660 that is rewriting this entire code section. If you are encountering issues with this, please open a new ticket |












Related #10505
Closes #10384
Closes #9995
Edit: The above description is a little outdated. See comment farthest below and inline comments for an accurate description