Skip to content

Add draft of PBSCluster#56

Merged
mrocklin merged 10 commits intopangeo-data:masterfrom
mrocklin:pbs-cluster
Jan 2, 2018
Merged

Add draft of PBSCluster#56
mrocklin merged 10 commits intopangeo-data:masterfrom
mrocklin:pbs-cluster

Conversation

@mrocklin
Copy link
Copy Markdown
Member

@mrocklin mrocklin commented Dec 22, 2017

This provides a Pythonic way to launch Dask clusters on a PBS cluster
from Python.

This also works with adaptive clusters

Examples

>>> from pangeo import PBSCluster
>>> cluster = PBSCluster(project='...')
>>> cluster.start_workers(10)  # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters.  This automatically launches and
kill workers based on load.

>>> cluster.adapt()

This provides a Pythonic way to launch Dask clusters on a PBS cluster
from Python.

This also works with adaptive clusters

Examples
--------

```python
>>> from pangeo import PBSCluster
>>> cluster = PBSCluster(project='...')
>>> cluster.start_workers(10)  # this may take a few seconds to launch

>>> from dask.distributed import Client
>>> client = Client(cluster)

This also works with adaptive clusters.  This automatically launches and
kill workers based on load.

>>> from distributed.deploy import Adaptive
>>> adapt = Adaptive(cluster.cluster.scheduler, cluster)
```
Comment thread pangeo/pbs.py Outdated
def __init__(self,
name='dask',
q='regular',
project='UCLB0022', # is there an environment variable I can use?
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to obtain a default project for a particular user? @davidedelvento

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PBS does not have a concept of default project, however NCAR added it for users who wish to set it up for their own reasons, by using the $PBS_ACCOUNT environmental variable.

Some users set it, and it'd be great if you'd use it for them, but many users do not set it. So you need to cover for when it's missing. In this case, you can pick the first project they have or one at random. See how we (== the group I work in, aka CSG) implemented this logic in /glade/u/apps/ch/opt/usr/bin/qcmd and /glade/u/apps/ch/opt/usr/bin/qinteractive (they are bash scripts, IIRC). Both of them (IIRC) rely on /ncar/opt/sam/sam_validate.py which is out of the control of CSG (as you can see it's owned by root, which we don't have). Also, you can see this thing is not truly made for importing, but I could see if I find who maintains it and put you in touch with them if you prefer to do use a stable API vs parsing text as bash does.

@mrocklin
Copy link
Copy Markdown
Member Author

@jhamman this may interest you. If you get a chance to test drive this I'd appreciate it. Please feel free to push to this branch or make comments as you like.

@jhamman
Copy link
Copy Markdown
Member

jhamman commented Dec 22, 2017

Thanks @mrocklin - giving it a go now.

@jhamman
Copy link
Copy Markdown
Member

jhamman commented Dec 23, 2017

@mrocklin - I made a few edits. I was able to get the PBSCluster to work but was not successful in getting the adaptive cluster to scale up/down. I'll give it another try next week.

@mrocklin
Copy link
Copy Markdown
Member Author

Yeah, I think that clean scale down depends on dask/distributed#1659

Comment thread pangeo/pbs.py
`#PBS -A` option.
resource_spec : str
Request resources and specify job placement. Passed to `#PBS -l`
option.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How should we reconcile this argument and the memory and threads_per_worker argument?

Also, I'm curious, what was the reason for this change? I would not be surprised to learn that the previous solution was restrictive in some way.

Also, why the multiple processes per job change? Is this nicer than having many small jobs? Would array jobs be more appropriate? From a cleanup perspective it's nice for the PBSCluster to have a handle on each individual worker separately.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short term I've reverted this and gone back to only supporting --nthreads. It is difficult to use --nprocs, especially if we want to do adaptive deployments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, motivation for this change was:

  1. The resource specification should be flexible enough to move between different machines that use PBS. While #PBS -l select=1:ncpus=$ncpus:mem=$mem works on cheyenne, this is not going to work everywhere. I figured it is easy enough to allow users to specify this line.

  2. From the Cheyenne docs:

    Most of the Cheyenne batch queues are for exclusive use, and jobs are charged for all 36 cores on each node that is used. Jobs in the shared use "share" queue are charged only for the cores used.

    This is why I had setup the PBSCluster to use multiple workers per job. Alternatively, we could a) try using the shared queue, b) accept paying for cores we're not using, or c) use 36 threads per worker (good for cpu bound computations but not IO bound tasks).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest avoid option a) at least by default, since sharing node with others can cause a lot of troubles (at least at the moment). Mostly, but not only, because users have workarounds that allow them to use more memory, CPU and network than their share, overloading the node (we plan to deploy a more aggressive watchdog, angrily biting whoever spills over their share, but it's not in place and won't be for a while). Option c) seems best. If you are IO bound, oh well. How many IO bound jobs are a user going to run? For how long total compared to their own total allocation? I suspect for most folks it'd be irrelevant cost. And for the few people it isn't, they could use the non-default a) option and deal with the issues.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also like option (c) from a Dask perspective, however I'm also sensitive to IO bound concerns. I suspect that our computations are more I/O bound than most that are run on Cheyenne.

@jhamman are you mainly talking about disk I/O or inter-worker network communication? If disk I/O then do we know what the limiting factor is here? Is it compression libraries that don't release the GIL?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrocklin - yes, disk I/O. I suspect it will be difficult to clearly define the limiting factor on disk I/O. It will greatly depend on both how the data is stored and the chunk configuration. One thing we know for sure is that more processes certainly helps.

This also reverts the previous --nprocs and resource_spec changes
@mrocklin
Copy link
Copy Markdown
Member Author

mrocklin commented Dec 23, 2017 via email

@mrocklin
Copy link
Copy Markdown
Member Author

mrocklin commented Dec 24, 2017 via email

@mrocklin
Copy link
Copy Markdown
Member Author

@jhamman I believe that I've handled the multi-processing issues. This depends on a development branch in dask/distributed#1632

from pangeo import PBSCluster
cluster = PBSCluster(project='...')
cluster.adapt()

from dask.distributed import Client
client = Client(cluster)

Then you'll get nine processes with four threads each by default. The cluster will scale up and down by full nodes. We group worker processes together by node before we determine which to clean up.

@jhamman
Copy link
Copy Markdown
Member

jhamman commented Dec 28, 2017

Thanks @mrocklin - I'll give it a try and report back.

@jedwards4b jedwards4b mentioned this pull request Dec 28, 2017
Comment thread pangeo/pbs.py
#PBS -j oe
#PBS -m abe

%(base_path)s/dask-worker %(scheduler)s \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the associated dask-worker script?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is installed when you install dask.distributed

@jedwards4b
Copy link
Copy Markdown
Contributor


    def adapt(self):
        """ Start up an Adaptive deployment if not already started
    
            This makes the cluster request resources in accordance to current
            demand on the scheduler """
        from distributed.deploy import Adaptive
        if self._adaptive:
            return
        else:
            self._adaptive = Adaptive(self.scheduler, self, startup_cost=5,
>                                     key=lambda ws: ws.host)
E           TypeError: __init__() got an unexpected keyword argument 'key'

/glade/u/home/jedwards/sandboxes/pangeo/pangeo/pbs.py:236

@mrocklin
Copy link
Copy Markdown
Member Author

This depends on a development branch in dask/distributed#1632

@jedwards4b
Copy link
Copy Markdown
Contributor

jedwards4b commented Dec 28, 2017

Ah - how do I correctly install that development branch? I did this:

python setup.py install --prefix ~/miniconda3/envs/pangeo/

and ended up with miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed-1.20.2+28.g4479889-py3.6.egg/distributed/deploy/adaptive.py so it's still using
miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/deploy/adaptive.py

@mrocklin
Copy link
Copy Markdown
Member Author

Something like the following would probably work:

source activate pangeo  # use the pangeo environment
pip install git+https://github.com/asford/distributed.git@grouped_adaptive

But as always this depends on how you've set up your environment.

@jedwards4b
Copy link
Copy Markdown
Contributor

Thanks, I manually linked and that fixed the problem, tests are working now.

@mrocklin
Copy link
Copy Markdown
Member Author

Centralizing conversation from mrocklin#1 to here.

@jedwards4b asks

Are you saying that PBSCluster would allow you to start dask scheduler and workers from the notebook session directly?

Yes, that is the intended user experience behind this PR.

Maybe it would help if you provided an example notebook?

The top comment of this PR provides a minimal example.

@jedwards4b
Copy link
Copy Markdown
Contributor

Your introductory comment was This provides a Pythonic way to launch Dask clusters on a PBS cluster from Python. now you are saying that this is a way to launch a dask cluster on a PBS system through a notebook interface and is not an appropriate method outside a notebook.

I think I'm beginning to understand, thanks.

@mrocklin
Copy link
Copy Markdown
Member Author

I apologize if my communication has frustrated you. As stated in #50 (comment) this might not be the right path for you. I don't know though because I'm not sure what it is that you're trying to accomplish.

Copy link
Copy Markdown
Member

@jhamman jhamman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see anything here that should keep us from merging and making some incremental improvements as needed. Any objections to merging?

Comment thread pangeo/pbs.py
--memory-limit %(memory)s \
--name %(name)s-%(n)d \
%(extra)s
""".lstrip()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future, we may want to make this template a attribute on the PBSCluster object. For use on Cheyenne, this will certainly work so I'm not suggesting changing this until we want to be entirely machine agnostic.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed on both counts.

@mrocklin
Copy link
Copy Markdown
Member Author

mrocklin commented Jan 2, 2018

Sure. Merging. Was just waiting on feedback.

@mrocklin mrocklin merged commit 992fc6d into pangeo-data:master Jan 2, 2018
@mrocklin mrocklin deleted the pbs-cluster branch January 2, 2018 16:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants