Allow worker to refuse data requests with busy signal#2092
Allow worker to refuse data requests with busy signal#2092mrocklin merged 5 commits intodask:masterfrom
Conversation
distributed/worker.py
Outdated
| raise | ||
|
|
||
| def transition_dep_flight_waiting(self, dep, worker=None): | ||
| def transition_dep_flight_waiting(self, dep, worker=None, busy=False): |
There was a problem hiding this comment.
This keyword should be inverted and changed to something like remove, which probably makes more sense locally
you mean workers on the same host? |
distributed/tests/test_worker.py
Outdated
| yield wait(futures) | ||
|
|
||
| assert len(workers[0].outgoing_transfer_log) < 18 | ||
| assert sum(not not w.outgoing_transfer_log for w in workers) >= 3 |
There was a problem hiding this comment.
not not is weird. wouldn't bool(w.outgoing_transfer_log) work?
There was a problem hiding this comment.
or sum(1 for w in workers if len(w.outgoing_transfer_log) > 0) that might be even more explicit.
There was a problem hiding this comment.
Fixed to be more explicit. I'm using len(... for ... if ...)
distributed/tests/test_worker.py
Outdated
|
|
||
| yield wait(futures) | ||
|
|
||
| assert len(workers[0].outgoing_transfer_log) < 18 |
There was a problem hiding this comment.
Where does the 18 and the 3 come from? Where they measured from empirical runs?
There was a problem hiding this comment.
Would it be possible to increase the number secondary worker-to-worker transfers by increasing the size of x while making x cheaper to allocate initially? For instance:
x = c.submit(bytes, int(1e8), workers=[workers[0].address])There was a problem hiding this comment.
Two issues:
- We may start to run out of RAM on travis with 1e8 * 20 bytes
- Compression will make the transfers too fast
I'm not very concerned about the cost of creating the random array the first time. I don't think that this will affect the number of secondary worker-to-worker transfers. However I may not fully understand your meaning.
ogrisel
left a comment
There was a problem hiding this comment.
LGTM, but I am really not familiar with the code so I trust you and the existing test suite. Did you run benchmarks to ensure that this does not cause any significant performance regression?
Feel free to upgrade the joblib connector as part of this PR to remove the explicit broadcasting in the auto-scatter thingy.
distributed/tests/test_worker.py
Outdated
|
|
||
| yield wait(futures) | ||
|
|
||
| assert len(workers[0].outgoing_transfer_log) < 18 |
There was a problem hiding this comment.
Would it be possible to increase the number secondary worker-to-worker transfers by increasing the size of x while making x cheaper to allocate initially? For instance:
x = c.submit(bytes, int(1e8), workers=[workers[0].address])
distributed/distributed.yaml
Outdated
| worker: | ||
| multiprocessing-method: forkserver | ||
| use-file-locking: True | ||
| max-connections: 10 # maximum simultaneous outgoing connections |
There was a problem hiding this comment.
Did you have the opportunity to run some IO intensive benchmark/stress test on a "real" cluster (e.g. on GCP) to measure the impact of that setting on the overall completion time of a data bottlenecked set of tasks?
If you do I would be curious to see the empirical arity of the tree structure of the resulting broadcasting for different values of max-connections.
There was a problem hiding this comment.
No, I haven't yet done any benchmarking. I'll play a bit on my local machine. I may get to trying this out on a larger cluster, but that's uncertain. Instead, I suspect that we will end up changing the default value over time.
93f6b80 to
592df43
Compare
This allows workers to say "I'm too busy right now" when presented with
a request for data from another worker. That worker then waits a bit,
queries the scheduler to see if anyone else has that data, and then
tries again. The wait time is an exponential backoff.
Pragmatically this means that when single pieces of data are in high
demand that the cluster will informally do a tree scattering. Some workers
will get the data directly while others wait on the busy signal. Then other
workers will get from them, etc.. We used to ask users to do this explicitly
with the following:
client.replicate(future)
or
client.scatter(data, broadcast=True)
And now the replicate/broadcast step is no longer strictly necessary. (though
some scattering of local data still is).
Machines on the same host are given some preference, and so should be able to
sneak in more easily.
Currently this has two issues:
1. We need to unify the configuration with the total_connections parameter
(which does the same thing, but in the opposite direction)
2. We don't test the same-host behavior (this is hard because we're currently
getting host information from the socket.)
592df43 to
c018823
Compare
from dask_jobqueue import PBSCluster
cluster = PBSCluster(processes=18)
cluster.scale(20) # results in 18 * 20 processes on 20 physical machines
from dask.distributed import Client
client = Client(cluster)
client
import numpy as np
x = client.submit(np.random.random, 100000000, pure=False)
workers = list(client.scheduler_info()['workers'])
futures = [client.submit(len, x, pure=False, workers=[w])
for w in workers]One max connection (double for same node)Around 25s of communication time. (note that the communication starts after zero) Ten max connections (double for same node)Around 18s 100Around 35s (note that the dashboard starts before zero for some reason) 100050-60s |
|
The behavior here is as I would expect. I'm comfortable with this from a performance perspective, though I also think that we'll end up wanting to tune this default in the future by a factor of 2-3. There is still some administrative cleanup to do I think. |
|
Thanks for the benchmarks, that seems to work fine :) |
6eb9a96 to
40409d1
Compare




This allows workers to say "I'm too busy right now" when presented with
a request for data from another worker. That worker then waits a bit,
queries the scheduler to see if anyone else has that data, and then
tries again. The wait time is an exponential backoff.
Pragmatically this means that when single pieces of data are in high
demand that the cluster will informally do a tree scattering. Some workers
will get the data directly while others wait on the busy signal. Then other
workers will get from them, etc.. We used to ask users to do this explicitly
with the following:
And now the replicate/broadcast step is no longer strictly necessary. (though
some scattering of local data still is).
Machines on the same host are given some preference, and so should be able to
sneak in more easily.
Currently this has two issues:
(which does the same thing, but in the opposite direction)
getting host information from the socket.)