Skip to content

Update dask.order.order to consider "next" nodes using both FIFO and LIFO#5872

Merged
jcrist merged 22 commits intodask:masterfrom
eriknw:fix_order_gh5859
May 11, 2020
Merged

Update dask.order.order to consider "next" nodes using both FIFO and LIFO#5872
jcrist merged 22 commits intodask:masterfrom
eriknw:fix_order_gh5859

Conversation

@eriknw
Copy link
Member

@eriknw eriknw commented Feb 7, 2020

Closes #5859. @JSKenyon

How do we choose the next node in order after a node with no dependents?
The answer is from one of dependents of an already completed node.
However, this is difficult to do well while also keeping things fast.
Previously, we chose dependents by considering completed nodes in a FIFO manner.
Now, we consider both LIFO and FIFO and choose the best ones.
This seems to do reasonably well and is fast enough.

  • Tests added / passed
  • Passes black dask / flake8 dask

…d LIFO.

Closes dask#5859.  @JSKenyon

How do we choose the next node in `order` after a node with no dependents?
The answer is from one of dependents of an already completed node.
However, this is difficult to do well while also keeping things fast.
Previously, we chose dependents by considering completed nodes in a FIFO manner.
Now, we consider both LIFO and FIFO and choose the best ones.
This seems to do reasonably well and is fast enough.
@eriknw
Copy link
Member Author

eriknw commented Feb 7, 2020

I don't know what the Travis CI error is about.

@JSKenyon
Copy link
Contributor

JSKenyon commented Feb 7, 2020

@eriknw I have tried these changes locally and I am still seeing memory growth. Could you possibly try the following example (a slightly simplified version of the example from #5864) and see if it works for you? You can examine htop, but my quick and dirty check is to just run with /usr/bin/time and check maxresident.

import dask.array as da
import numpy as np

n_points = 1250*60*60*4
n_scales = 300

scales = da.arange(n_scales, chunks=(1,))
omega = da.fft.fftfreq(n_points, chunks=(1250*60*5,)) * 2 * np.pi

def mul_2d(arr1, arr2):
    return np.atleast_2d(arr1*arr2)

x = da.blockwise(mul_2d, "ij",
                 scales, "i",
                 omega, "j")

y = da.to_zarr(x, 'testdata', overwrite=True)

da.compute(y, scheduler="single-threaded")

On this branch and 2.10.1 this will use ~8.5GB of RAM. With 2.9.1 it only uses 450MB. Very occasionally it behaves itself with 2.10.1/this branch but if you run it a few times the problem should be apparent.

When repopulating `outer_stack`, consider and use all dependents of all completed tasks.
Partition them based on a simple metric, then trust FIFO ordering, so no need to sort.
This remains fast.  Fingers crossed!
@TomAugspurger
Copy link
Member

I don't know what the Travis CI error is about.

That can be ignored. Tracking it in #5867

@eriknw
Copy link
Member Author

eriknw commented Feb 7, 2020

@JSKenyon thanks again! I updated the PR to try a different approach. I think it's generally better than what's on master, and I prefer it over my first attempt in this PR. Can you give this a try on #5859? I don't think it'll be perfect, but I hope and expect it's better.

Regarding your new example, I haven't found any version of order (old, current, new, or variations) that does well. They all use >10GB of memory for me.

@TomAugspurger
Copy link
Member

@eriknw are you able to write tests for the new behavior?

@TomAugspurger
Copy link
Member

And the memory usage in #5872 (comment) looks good for me with Dask 2.9.1. Using memory-profiler

2 9 1

@mrocklin
Copy link
Member

mrocklin commented Feb 7, 2020 via email

@eriknw
Copy link
Member Author

eriknw commented Feb 7, 2020

Aha! Thanks for checking @TomAugspurger. I need to be more clear about how I tested the above example.

This example runs fine for me in Dask 2.9.1. It also runs fine if I update Dask 2.9.1 to use dask.order.order from this PR. I did not check Dask 2.9.1 with dask.order.order from current master (although I suspect it's fine).

This example does not run fine for me in current master, this PR, or current master using dask.order.order from Dask 2.9.1.

To iterate: any order works fine in Dask 2.9.1, and no order works well in current master. Therefore, I conclude this particular regression is not caused by the change of order.

I'll see what I can do about adding tests. This is kinda hard to isolate.

@TomAugspurger
Copy link
Member

Ah, makes sense.

@mrocklin
Copy link
Member

mrocklin commented Feb 7, 2020 via email

@JSKenyon
Copy link
Contributor

JSKenyon commented Feb 8, 2020

I find that a little surprising @eriknw. When I tested for the original issue (here), I observed the change in behaviour only after the ordering changes. I was relatively certain I had isolated it, as I built from source and checked out the relevant commits. That doesn't mean I didn't make a mistake though. :-)

@eriknw
Copy link
Member Author

eriknw commented Feb 8, 2020

Sorry for the confusion @JSKenyon. The issue in #5859 is indeed from the changes to order. However, the example you posted in #5872 (comment) (adapted from #5864) appears to behave badly due to something other than order.

@JSKenyon
Copy link
Contributor

@eriknw I understand now. Not entirely sure why everything I interact with seems to have a memory problem.

I have tested out the latest version of order.order on my problem. It still underperforms relative to the old order.order. 2.9.1 has a maximum footprint of around 1.3GB on my test data. This PR is around 5.3GB. Unfortunately that is still unacceptably high for the single threaded case, as this ultimately needs to be run in parallel on ~TB sized data sets.

@JSKenyon
Copy link
Contributor

The problem with the example I posted here stems from this commit.

@JSKenyon
Copy link
Contributor

I think it is worthwhile clarifying that there are now two separate problems which have been exposed by this PR and the issues that precede it:

  1. The changes to order.order can negatively affect memory usage, particularly for large, complex graphs. This was raised in Ordering changes negatively affecting memory use #5859. This PR is an attempt to improve/rectify this problem.
  2. Changes to array metadata creation in to_zarr can lead to massively increased memory footprints. The cause seems to be Delay creating metadata in to_zarr #5797. This emerged from my take on Memory leak on local cluster  #5864. Pinging @chrisroat for his input.

@chrisroat
Copy link
Contributor

Apologies for the memory issue, if it stems from #5797. I take from this discussion that the problem goes away if a backend other than zarr is substituted in the repro code?

I am not fully familiar with the task execution/graph pieces of dask. Essentially what #5797 does is add one create task upstream of the zarr store tasks. Since a single new task presumably is light-weight, my educated guess is that the extra memory must then come from the extra of graph edges/dependencies from the store tasks back to the extra task (in addition to whatever dependencies they already had).

See this comment for a visual representation.

@eriknw
Copy link
Member Author

eriknw commented Feb 11, 2020

Thanks again for the engagement @JSKenyon. I've been making progress locally (at least to my eyes), so I should have something new for you to try tomorrow.

@JSKenyon
Copy link
Contributor

@chrisroat I believe your instinct is likely correct. I think wrapping create with delayed creates a common ancestor for all the writes, which in turn causes the scheduler (possibly via order) to hold everything in memory. A colleague of mine pointed me at the following PR for one of his projects. We believe it may be a similar issue.

@eriknw Thank you for continuing to work on this - I know that it isn't exactly an easy problem so I really appreciate your efforts.

@mrocklin
Copy link
Member

I've been making progress locally (at least to my eyes), so I should have something new for you to try tomorrow.

Checking in here. Is there still progress or is this stalled? If it stalls out, do we have a backup plan?

@chrisroat
Copy link
Contributor

chrisroat commented Feb 16, 2020

A backup plan would be rolling back the delayed create PR #5797 , if that is the problem. While I believe that PR is the correct way forward, it's a minor improvement that can wait until its downsides can be mitigated.

@JSKenyon
Copy link
Contributor

@chrisroat I think that the delayed creation issue is less pressing than the ordering problems, but @mrocklin likely has a better idea of how many people are affected.

The ordering issue is not resolved as yet. I have been waiting on further progress from @eriknw.

@TomAugspurger
Copy link
Member

I'll be able to dig into this over the next few days.

@TomAugspurger
Copy link
Member

@JSKenyon do you have a reproducible example demonstrating the issues you mentioned in #5872 (comment)? It looks to me like the behavior of 2.9.1, master, and this PR are all fairly similar once a27f111 is reverted.

2.9.1 order - a27f111

revert-order-revert-root

master order - a27f111

master-revert-root

GH-5872 order - a27f111

gh5859-revert-root

@eriknw
Copy link
Member Author

eriknw commented Feb 17, 2020

Hey, sorry for my absence (I choose not to get notifications on the weekend). I've continued to work on this the last week. This is actually a really fun space to think about, and I've explored several variations (too many to enumerate). I'll push a new attempt up today.

To my eyes, this actually looks pretty good (from the example in #5859):
dsk_new8-min

…ical goals.

The old old method used to switch tactical goals frequently.  Current master
does not switch tactical goals at all.  This commit compromises and switches
tactical goals when it is obviously better.

Also, instead of trying to do a LIFO or FIFO or LIFO/FIFO policy for choosing
what to compute after a tactical goal is reached, we now have a policy that
allows continuing from any node (not just the last or first).  In order for
this to behave well for a variety of use cases, we needed to introduce the
concept of "epochs" and "eras", which define what can be computed later.

Performance-wise, this remains pretty good.  It is often a little slower, but
not significantly so.  We avoided pathological scaling behaviors.
@eriknw
Copy link
Member Author

eriknw commented Feb 17, 2020

New attempt is up. I think its behavior is pretty reasonable.

@JSKenyon, I would be much obliged if you could give it a try. CC @TomAugspurger

dask/order.py Outdated
if not inner_stack:
if len(deps) == 1:
inner_stack.append(dep)
# An interesting option is to always set `inner_stack = [dep]` here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also looking into the failing map_overlap test. FWIW, here's what I cam up with to get things passing.

diff --git a/dask/order.py b/dask/order.py
index cd5e5799f..19c41f443 100644
--- a/dask/order.py
+++ b/dask/order.py
@@ -297,12 +297,19 @@ def order(dsk, dependencies=None):
                                 i += 1
 
                     if deps:
+                        num_needed_inner = []
                         for dep in deps:
                             num_needed[dep] -= 1
+                            if num_needed[dep] == 1:
+                                num_needed_inner.append(dep)
                         if not inner_stack:
                             if len(deps) == 1:
                                 inner_stack.append(dep)
                                 continue
+                            elif num_needed_inner:
+                                deps2 = sorted(deps, key=lambda x: -num_needed[x])
+                                inner_stack.extend(deps2)
+                                continue
                         else:
                             deps.discard(inner_stack[-1])  # safe to mutate
                             if not deps:
diff --git a/dask/tests/test_order.py b/dask/tests/test_order.py
index 98e450f48..80bac1a3b 100644
--- a/dask/tests/test_order.py
+++ b/dask/tests/test_order.py
@@ -516,7 +516,7 @@ def test_map_overlap(abcde):
        |/  | \ | / | \|
       d1  d2  d3  d4  d5
        |       |      |
-      e1      e2      e5
+      e1      e3      e5
 
     Want to finish b1 before we start on e5
     """

Your way is probably better, but wanted to put this up just in case.

I think the intuition here is "are we really close to being done? If so, let's prioritize these deps". Hopefully we're both achieving that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neat! I'll take a look tomorrow. It's fun having somebody else poke at this.

@TomAugspurger
Copy link
Member

Right now I'm planning to merge this after our release, to give this a bit of time on master. dask/community#38

@mrocklin
Copy link
Member

@mrocklin there's a discussion on performance at https://github.com/dask/dask/pull/5872/files#r394585154 that may interest you. Our benchmarks pick up what should be a worst case scenario for the new partition_keys concept. Do you have thoughts on the relative value of runtime of order vs. getting a good ordering?

Getting a good ordering is important. That's the difference of things working vs them not working. The comparison of FIFO/LIFO vs current approach misses though that our previous version of this broke ties by the key itself, which in practice was typically nicely ordered (because dask dataframe/array tend to name things in a consistent and well-ordered way). I think that reverting a couple of changes in ordering code is probably the thing to compare this to, rather than master.

Ordering costs come up as non-trivial factors in our overhead. This limits Dask's use on very large graphs, such as frequently come up in Pangeo workloads, for example (@rabernat for example will probably not be happy to hear that ordering costs may triple). Keeping these costs low are important. They're not as important as making sure that things work, so this should probably go in, but I recommend that we reassess previous changes that were made that moved order away from using key names. I think that @eriknw was around for that change as well, and may be able to provide a good counter-argument.

@eriknw
Copy link
Member Author

eriknw commented Mar 25, 2020

The comparison of FIFO/LIFO vs current approach misses though that our previous version of this broke ties by the key itself

I disagree wholeheartedly.

We still break ties based on key name. The structural considerations go deeper than this. I think you are missing the point of FIFO/LIFO/current comparison. Going back to "use keys more often" only addresses how we order dependencies or dependents. This only takes us so far. Behaviorally, it is at least as important to determine what to compute next after a root node (w/o dependents). The old method used a LIFO approach, because dependents remained on the stack. Now we can compare this to FIFO and the current approach. How we use keys is largely irrelevant.

Many of these benchmarks are not meant to capture real workloads, but, instead, to reveal performance characteristics. I am performance-minded, and I worry little over the benchmark that is 3.5x slower. Looking more closely at this benchmark (which has 10,000 layers of two nodes, where each layer is fully connected to the next layer), the time per edge and time per node is still very fast, so this particular performance characteristic is unlikely to dominate in real workloads, and, if by chance it does, then I suspect the overhead of order would be acceptable, because any operation that iterates over the DAG would likely be comparable.

Recall that #5646 generally improved performance across the board. Sometimes, gains of 3x-10x were seen! I'm sorry I don't have these numbers available, but I suspect this PR would compare favorably to pre-#5646.

All previous ordering algorithms have serious known shortcomings. I can't recommend anything to revert, but if somebody wants to dig into this, I'll be available for discussion.

I apologize that my counter-argument is largely dismissive of Matt's argument, but I suppose this is natural given that Matt's was largely dismissive of the rationale for the current approach in this PR. I understand LIFO vs FIFO vs other may be a bit abstract, but these are what appear to dominate the behavior of order on graphs of interest. I'm sorry I haven't been able to demonstrate this more clearly, but I think discussing it even in the abstract will be useful for the next person who chooses to dive into this.

@TomAugspurger
Copy link
Member

It'd be helpful to know of workloads where tie-breaking by key was important. I'll see if I can run the workload from the original issue on the old order implementation with some metrics for when tie-breaking was used.

I'll also run this order against some more realistic workloads from the pangeo examples. I'd expect the 3x slowdown in order time will be near the theoretical upper bound, not something that's likely in real-world use cases.

@mrocklin
Copy link
Member

mrocklin commented Mar 25, 2020 via email

@eriknw
Copy link
Member Author

eriknw commented Mar 26, 2020

Let me discuss this a little more to allow for more informed criticisms and concerns (which are welcome!).

Dependents are sorted by dependents_key, and dependencies are sorted by dependencies_key. Breaking ties based on the key in the DAG is done in these two functions. We actually have a lot of freedom in choosing these functions; in other words, ordering tends to perform well for a wide variety of key functions, and we shouldn't be afraid to tinker with these in the future. The choice of which of the below approaches plays a larger role determining how order succeeds or fails.

In terms of the current node...

LIFO:

  • append dependents to stack
  • append non-computed dependencies to stack

FIFO:

  • if stack is empty, append min dependent to stack
  • append non-computed dependencies to stack
  • other dependents are added to stack (one at a time) after root node (with no dependents) is computed
    • currently done as FIFO, but could easily switch to LIFO

This PR:

  • similar to FIFO, we append non-computed dependencies to stack and, if necessary, the min dependent
  • if stack is not empty, sometimes make a new stack from a dependent that is better than current stack (and save previous stack for later)
  • other dependents go into dicts of pools, one of which will be processed once all stacks from above are computed
    • there are currently two dicts; one dict is always given priority over the other dict
    • we can sort the keys of the dict, so we are not limited by FIFO or LIFO
  • there are some nuances not covered here

Some graphs naturally perform well for LIFO, but fail for others. Likewise, some do better (or worse) with FIFO. This PR acknowledges that neither LIFO or FIFO dominates the other, and both have failings. Moreover, I tried methods that switch between LIFO or FIFO, but none of these experiments fared well either. The current approach is more complicated, but it attempts to address known shortcomings. I agree with @TomAugspurger to give this time to sit in master before release.

@TomAugspurger
Copy link
Member

I tried both this and master with the old pre-#5646 order out on https://github.com/pangeo-data/pangeo-example-notebooks. No noticeable difference in order time (submitting computations still felt instantaneous) and approximately the same runtimes for the computations.

I'll try to get some statistics on when tie-breaking was important tomorrow.

…safe too.

Previously, `order` only added one dependent to `inner_stack` when `inner_stack` is empty.
Now, it adds all the other dependents with the minimum partition key to `inner_stacks`.
Prior to this commit, the new changes did a lot better with this example

dask#5859 (comment)

but it didn't quite pass the new test.  In general, this means that non-optimal behavior
can happen when `inner_stacks` gets processed while there are better options available
in `next_nodes`.  Typically, this should only result in a modest delay.  We could do
better if we pushed dependents into, say, a sorted heapq, but this would kill performance.
@eriknw
Copy link
Member Author

eriknw commented Apr 3, 2020

@TomAugspurger any idea what this test failure is? Looks unrelated.

With the latest change, the example from #5859 (comment) looks like this:

dask_new

The latest changes may have slightly improved a couple of the benchmarks. Most are unchanged.

@TomAugspurger
Copy link
Member

That failure can be ignored. We have an open issue about it IIRC.

eriknw and others added 5 commits April 13, 2020 15:43
Adding some dependents some of the time to inner_stacks can be beneficial,
but can be deterimental at other times.  Add a heuristic to try to decide.
Added tests that exemplify this.
@TomAugspurger
Copy link
Member

Short status update: I haven't been able to find realistic workloads where the slowdown in order is noticeable (tried some pangeo ones, and some large dask.dataframe stuff).

I failed to come up with metrics on when we fall back to name as a tie-breaker like I promised in #5872 (comment).

@jcrist is hopefully going to have time to look through this this week.

Copy link
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @eriknw. Overall this looks good to me. I was unable to find any cases where this resulted in a worse ordering or was noticeably slower with realistic workloads. Great comments in the implementation too.

I found two places where we currently don't have test coverage that I think we should. There were a couple other one-off lines, but they're mostly for switching between sorting and random pop'ing for larger lists (and are visibly correct), so I'm less concerned with hitting those (though full coverage would be nice).

dask/order.py Outdated
break # all done!

if next_nodes:
if len(next_nodes) > 150:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole branch isn't hit by any of the tests in dask/tests/test_order.py. It would be good to get some coverage here.

@jcrist
Copy link
Member

jcrist commented May 4, 2020

The error here is unrelated. I think this is good to merge now, thanks @eriknw!

We probably want this to sit in master for a bit before the next release, just in case issues turn up. Since we just released last week, I think now is a fine time. cc-ing @jrbourbeau or @TomAugspurger for a quick 2nd opinion.

@TomAugspurger
Copy link
Member

The last dask/dask release was April 24, so if we follow the 2 week cadence we'd do a release this Friday. Dunno if we'll follow that this week since we have had some extra distributed releases.

I suspect we'll discuss this on the maintainers call today.

@jcrist
Copy link
Member

jcrist commented May 5, 2020

Ah, it was just distributed that was released last week. My mistake.

@jcrist
Copy link
Member

jcrist commented May 11, 2020

Since we've now released, I'm going to merge this. Thanks @eriknw!

@jcrist jcrist merged commit 816cfa7 into dask:master May 11, 2020
@TomAugspurger
Copy link
Member

Thanks Erik!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ordering changes negatively affecting memory use

6 participants