Adjust transfer costs in worker_objective#5326
Conversation
I'd like to incorporate measured latency somehow too instead of a magic 10ms, but it's a start.
As discussed in dask#5325. The idea is that if a key we need has many dependents, we should amortize the cost of transferring it to a new worker, since those other dependencies could then run on the new worker more cheaply. "We'll probably have to move this at some point anyway, might as well do it now." This isn't actually intended to encourage transfers though. It's more meant to discourage transferring keys that could have just stayed in one place. The goal is that if A and B are on different workers, and we're the only task that will ever need A, but plenty of other tasks will need B, we should schedule alongside A even if B is a bit larger to move. But this is all a theory and needs some tests.
distributed/scheduler.py
Outdated
| # amortize transfer cost over all waiters | ||
| comm_bytes += nbytes / len(dts._waiters) |
There was a problem hiding this comment.
Can you add an in-code comment explaining how this division amortizes cost? I assume this is again a "local topology" argument related to the fan-out tasks (#5325 (comment)) where we try to "ignore" tasks which will likely end up everywhere anyhow?
There was a problem hiding this comment.
Will do. It's related to that, but actually a simpler idea. Basically, if we transfer to this worker now, that opens up the potential for N other tasks to run on this worker without transferring the data. So you could look at as, rather than this task paying the whole cost up front and others getting the benefit for free, all the sibling tasks split the cost of the transferring evenly between them. (That's an analogy of course—once transferred, the other tasks don't actually pay anything!)
Unit Test Results 16 files ±0 16 suites ±0 7h 38m 11s ⏱️ + 22m 54s For more details on these failures and errors, see this check. Results for commit e3d62f6. ± Comparison against base commit baf05c0. ♻️ This comment has been updated with latest results. |
| # amortize transfer cost over all waiters | ||
| comm_bytes += nbytes / len(dts.waiters) | ||
| xfers += 1 |
There was a problem hiding this comment.
| # amortize transfer cost over all waiters | |
| comm_bytes += nbytes / len(dts.waiters) | |
| xfers += 1 | |
| nwaiters = len(dts.waiters) | |
| # amortize transfer cost over all waiters | |
| comm_bytes += nbytes / nwaiters | |
| xfers += 1 / nwaiters |
@gjoseph92 do you agree?
There was a problem hiding this comment.
However this would not be replicable in get_comm_cost above
This should maybe be two PRs, since there are two different things happening:
Add a fixed (currently 10ms) penalty per transfer as discussed in Scheduler underestimates data transfer cost for small transfers #5324 (comment). This should help discourage small transfers. I'd prefer if this cost weren't just a magic 0.01 number though.
Amortize the transfer cost by the number of waiters. This is related to Ignore widely-shared dependencies in
decide_worker#5325. See the commit message b4ebbee for more description:I haven't tested this at all yet; it's just a theory right now. Just looking for thoughts.
black distributed/flake8 distributed/isort distributedcc @fjetter @crusaderky @mrocklin