Conversation
This is intended to be a base for LocalCluster (and others) that want to specify more heterogeneous information about workers.
|
I'm going to try this out with some heterogeneous GPU machines. This feels like a nice base on which to rewrite and cleanup LocalCluster though, a prospect for which I am excited :) |
|
Is the spec intended to be per-worker - e.g.: spec = {
'worker1': {"cls": Worker, "options": {"ncores": 1}},
'nanny1': {"cls": Nanny, "options": {"ncores": 2}},
'worker2': {"cls": Worker, "options": {"ncores": 1}},
'nanny2': {"cls": Nanny, "options": {"ncores": 2}},
'worker3': {"cls": Worker, "options": {"ncores": 1}},
'nanny3': {"cls": Nanny, "options": {"ncores": 2}},
...
} |
|
I'm just wondering if now is a good time to introduce the concept of "worker pools": e.g. you would pass >>> pool_specs = {
... 'default': {
... 'worker': {"cls": Worker, "options": {"ncores": 1}},
... 'nanny': {"cls": Nanny, "options": {"ncores": 2}},
... },
... 'no-nanny': {
... 'worker': {"cls": Worker, "options": {"ncores": 1}},
... },
... }
>>> worker_specs = {'worker1': 'default', 'worker2': 'no-nanny'}
>>> cluster = SpecCluster(workers=worker_specs, pools=pool_specs) |
Previously nannies could leak out in various ways
This is related to that issue, but is lower level. I think that it would enable other people to add things like pools more easily. If this is something that you'd like to explore I encourage you to do so now. I agree that now would be a good time to explore this to help guide design. |
distributed/deploy/spec.py
Outdated
| # If people call this frequently, we only want to run it once | ||
| return self._correct_state_waiting | ||
| else: | ||
| task = asyncio.Task(self._correct_state_internal()) |
There was a problem hiding this comment.
You shouldn't create Tasks manually, but instead use asyncio.ensure_future.
distributed/deploy/spec.py
Outdated
| d = self.worker_spec[name] | ||
| cls, opts = d["cls"], d.get("options", {}) | ||
| if "name" not in opts: | ||
| opts = toolz.merge({"name": name}, opts, {"loop": self.loop}) |
There was a problem hiding this comment.
Did you mean to include the loop in here?
There was a problem hiding this comment.
Yes, ideally we want the worker to use the IOLoop used by the cluster object.
There was a problem hiding this comment.
I mean that loop is only added if name is not in opts. Wouldn't you always want to pass it?
There was a problem hiding this comment.
Ah, indeed. Looking at this again it looks like we do this in an async def function anyway, so IOLoop.current() should be valid regardless. I'll remove the reference to loop entirely, which should be helpful in reducing the contract too.
| if workers: | ||
| await asyncio.wait(workers) | ||
| for w in workers: | ||
| w._cluster = weakref.ref(self) |
There was a problem hiding this comment.
What is the cluster weakref for?
There was a problem hiding this comment.
There are a lot of weakrefs around now. They're useful when tracking down leaking references to things.
distributed/deploy/spec.py
Outdated
| for w in workers: | ||
| w._cluster = weakref.ref(self) | ||
| if self.status == "running": | ||
| await w |
There was a problem hiding this comment.
The non running workers are never awaited, what happens to them? They're still added to the workers dict below.
There was a problem hiding this comment.
This is again a tornado/asyncio difference. I've removed the running check and made things optimal, I think for both async def and gen.coroutine style functions.
|
|
||
| async def _close(self): | ||
| while self.status == "closing": | ||
| await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Instead of polling, could have a future for the closing operation (created by the first call to _close), and just wait on that?
There was a problem hiding this comment.
Good thought. I'm inclined to wait on this for now though if that's ok.
|
|
||
| def _correct_state(self): | ||
| if self._correct_state_waiting: | ||
| # If people call this frequently, we only want to run it once |
There was a problem hiding this comment.
I think this drops scale requests while a current scale request is processing:
- Call scale
- Spec updated
- correct state task start, task is stored as
_correct_state_waiting - scale returns
- Call scale again
- Spec updated
- since previous call is still in progress, state is not corrected, no new workers are started/stopped. Spec and tasks are now out of sync. Also, since there are multiple
awaitcalls in_correct_state_internal, theworker_speccan be different at different points in that function, leading to potential bugs.
One naive solution would be to have a background task that loops forever, waiting on an event:
while self.running:
await self._spec_updated.wait()
# update workers to match spec
# After updating, only clear the event if things are up to date
# If things aren't up to date, then we loop again
if self.spec_matches_current_state():
self._spec_updated.clear()Then _correct_state would look like:
def _correct_state(self):
# set the event, it's only ever cleared in the loop
# We force synchronization here to prevent scheduling tons
# of tasks all setting the event, this blocks until it's set.
return self.sync(self._mark_state_updated)
async def _mark_state_updated(self):
self._state_updated.set()There are likely other ways to handle this. In dask-gateway I have a task per worker/scheduler. As the spec updates, unfinished tasks are cancelled or new ones are fired. If a previous scale call is still in progress for a cluster, scale will block until that call has finished. Note that this only blocks while we update our internal task state (cancelling/firing new tasks), not until those tasks have completed.
There was a problem hiding this comment.
since previous call is still in progress, state is not corrected, no new workers are started/stopped. Spec and tasks are now out of sync. Also, since there are multiple await calls in _correct_state_internal, the worker_spec can be different at different points in that function, leading to potential bugs.
So, the _correct_state_waiting attribute isn't the currently running task, it's the currently enqueued one. Once _correct_state starts running it immediately clears this attribute. After someone calls scale there is a clean, not-yet-run _correct_state_waiting future that will run soon.
There was a problem hiding this comment.
Since _correct_state_internal waits on the created workers, this does mean that there's no way to cancel pending workers. This is fine for LocalCluster, but would be problematic if used as a base class for other cluster managers. The following would request and start 100 workers before scaling back down afaict:
cluster.scale(100)
cluster.scale(2)There was a problem hiding this comment.
I think that this depends on what you mean by "waits on".
One approach is that for a cluster manager to reach a correct state it only has to successfully submit a request to the resource manager have received an acknowledgement that the resource manager is handling it. We're not guaranteeing full deployment, merely that we've done our part of the job. I would expect this to almost always be fairly fast.
Separately, there is now a Client.wait_for_workers(n=10) method that might be used for full client <-> scheduler checks.
|
|
||
| async def _start(self): | ||
| while self.status == "starting": | ||
| await asyncio.sleep(0.01) |
There was a problem hiding this comment.
Same here as closing, could wait on the start task instead of polling.
|
|
||
| def __enter__(self): | ||
| self.sync(self._correct_state) | ||
| self.sync(self._wait_for_workers) |
There was a problem hiding this comment.
Does this mean that __enter__ will only complete once the initial n workers have started? What happens if we request 2, 1 worker starts and 1 fails?
There was a problem hiding this comment.
Yes, this might hang. I'm not sure we ever had a test in our test suite with this case. I'll add something.
|
Thanks for the review @jcrist ! If you have a chance to pass through things tomorrow I would appreciate it |
|
I plan to merge this later today if there are no further comments. Tests here are pretty decent, although I'll need to overhaul adaptive. I'd like to do this in a separate PR though. |
|
OK. Merging this in. I intend to be active in this area for a while, so if there are still issues please feel free to raise them. I plan to do the following:
|
LocalCluster.__repr__ was removed in dask#2675.
LocalCluster.__repr__ was removed in dask#2675.
LocalCluster.__repr__ was removed in #2675.
This is intended to be a base for LocalCluster (and others) that want to specify more heterogeneous information about workers. This forces the use of Python 3 and introduces more asyncio and async def handling. This cleans up a number of intermittent testing failures and improves our testing harness hygeine.
LocalCluster.__repr__ was removed in dask#2675.
This is intended to be a base for LocalCluster (and others) that want to
specify more heterogeneous information about workers.
Additionally, this PR does the following:
Docstring
Cluster that requires a full specification of workers
This attempts to handle much of the logistics of cleanly setting up and
tearing down a scheduler and workers, without handling any of the logic
around user inputs. It should form the base of other cluster creation
functions.
Examples