Add draft of PBSCluster#56
Conversation
This provides a Pythonic way to launch Dask clusters on a PBS cluster from Python. This also works with adaptive clusters Examples -------- ```python >>> from pangeo import PBSCluster >>> cluster = PBSCluster(project='...') >>> cluster.start_workers(10) # this may take a few seconds to launch >>> from dask.distributed import Client >>> client = Client(cluster) This also works with adaptive clusters. This automatically launches and kill workers based on load. >>> from distributed.deploy import Adaptive >>> adapt = Adaptive(cluster.cluster.scheduler, cluster) ```
| def __init__(self, | ||
| name='dask', | ||
| q='regular', | ||
| project='UCLB0022', # is there an environment variable I can use? |
There was a problem hiding this comment.
Is there a way to obtain a default project for a particular user? @davidedelvento
There was a problem hiding this comment.
PBS does not have a concept of default project, however NCAR added it for users who wish to set it up for their own reasons, by using the $PBS_ACCOUNT environmental variable.
Some users set it, and it'd be great if you'd use it for them, but many users do not set it. So you need to cover for when it's missing. In this case, you can pick the first project they have or one at random. See how we (== the group I work in, aka CSG) implemented this logic in /glade/u/apps/ch/opt/usr/bin/qcmd and /glade/u/apps/ch/opt/usr/bin/qinteractive (they are bash scripts, IIRC). Both of them (IIRC) rely on /ncar/opt/sam/sam_validate.py which is out of the control of CSG (as you can see it's owned by root, which we don't have). Also, you can see this thing is not truly made for importing, but I could see if I find who maintains it and put you in touch with them if you prefer to do use a stable API vs parsing text as bash does.
|
@jhamman this may interest you. If you get a chance to test drive this I'd appreciate it. Please feel free to push to this branch or make comments as you like. |
|
Thanks @mrocklin - giving it a go now. |
|
@mrocklin - I made a few edits. I was able to get the PBSCluster to work but was not successful in getting the adaptive cluster to scale up/down. I'll give it another try next week. |
|
Yeah, I think that clean scale down depends on dask/distributed#1659 |
| `#PBS -A` option. | ||
| resource_spec : str | ||
| Request resources and specify job placement. Passed to `#PBS -l` | ||
| option. |
There was a problem hiding this comment.
How should we reconcile this argument and the memory and threads_per_worker argument?
Also, I'm curious, what was the reason for this change? I would not be surprised to learn that the previous solution was restrictive in some way.
Also, why the multiple processes per job change? Is this nicer than having many small jobs? Would array jobs be more appropriate? From a cleanup perspective it's nice for the PBSCluster to have a handle on each individual worker separately.
There was a problem hiding this comment.
Short term I've reverted this and gone back to only supporting --nthreads. It is difficult to use --nprocs, especially if we want to do adaptive deployments.
There was a problem hiding this comment.
Yes, motivation for this change was:
-
The resource specification should be flexible enough to move between different machines that use PBS. While
#PBS -l select=1:ncpus=$ncpus:mem=$memworks on cheyenne, this is not going to work everywhere. I figured it is easy enough to allow users to specify this line. -
From the Cheyenne docs:
Most of the Cheyenne batch queues are for exclusive use, and jobs are charged for all 36 cores on each node that is used. Jobs in the shared use "share" queue are charged only for the cores used.
This is why I had setup the PBSCluster to use multiple workers per job. Alternatively, we could a) try using the shared queue, b) accept paying for cores we're not using, or c) use 36 threads per worker (good for cpu bound computations but not IO bound tasks).
There was a problem hiding this comment.
I suggest avoid option a) at least by default, since sharing node with others can cause a lot of troubles (at least at the moment). Mostly, but not only, because users have workarounds that allow them to use more memory, CPU and network than their share, overloading the node (we plan to deploy a more aggressive watchdog, angrily biting whoever spills over their share, but it's not in place and won't be for a while). Option c) seems best. If you are IO bound, oh well. How many IO bound jobs are a user going to run? For how long total compared to their own total allocation? I suspect for most folks it'd be irrelevant cost. And for the few people it isn't, they could use the non-default a) option and deal with the issues.
There was a problem hiding this comment.
I also like option (c) from a Dask perspective, however I'm also sensitive to IO bound concerns. I suspect that our computations are more I/O bound than most that are run on Cheyenne.
@jhamman are you mainly talking about disk I/O or inter-worker network communication? If disk I/O then do we know what the limiting factor is here? Is it compression libraries that don't release the GIL?
There was a problem hiding this comment.
@mrocklin - yes, disk I/O. I suspect it will be difficult to clearly define the limiting factor on disk I/O. It will greatly depend on both how the data is stored and the chunk configuration. One thing we know for sure is that more processes certainly helps.
This also reverts the previous --nprocs and resource_spec changes
|
Ok, I'll take another crack at this
…On Dec 23, 2017 2:03 PM, "Joe Hamman" ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pangeo/pbs.py
<#56 (comment)>:
> + extra='',
+ **kwargs):
+ """ Initialize a PBS Cluster
+
+ Parameters
+ ----------
+ name : str
+ Name of worker jobs. Passed to `$PBS -N` option.
+ queue : str
+ Destination queue for each worker job. Passed to `#PBS -q` option.
+ project : str
+ Accounting string associated with each worker job. Passed to
+ `#PBS -A` option.
+ resource_spec : str
+ Request resources and specify job placement. Passed to `#PBS -l`
+ option.
Yes, motivation for this change was:
1.
The resource specification should be flexible enough to move between
different machines that use PBS. While #PBS -l
select=1:ncpus=$ncpus:mem=$mem works on cheyenne, this is not going to
work everywhere. I figured it is easy enough to allow users to specify this
line.
2.
From the Cheyenne docs
<https://www2.cisl.ucar.edu/resources/computational-systems/cheyenne/quick-start-cheyenne>
:
Most of the Cheyenne batch queues are for exclusive use, and jobs are
charged for all 36 cores on each node that is used. Jobs in the shared use
"share" queue are charged only for the cores used.
This is why I had setup the PBSCluster to use multiple workers per
job. Alternatively, we could a) try using the shared queue, b) accept
paying for cores we're not using, or c) use 36 threads per worker (good for
cpu bound computations but not IO bound tasks).
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#56 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszGRk6S93uSR68oJsREn7xS7LItbmks5tDXi-gaJpZM4RLZA0>
.
|
|
One thing to do here eventually is to load some data with a many-threaded
worker and then check the distributed profiler. That'll probably give us a
sense about where the hotspots are.
…On Sun, Dec 24, 2017 at 11:29 AM, Joe Hamman ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In pangeo/pbs.py
<#56 (comment)>:
> + extra='',
+ **kwargs):
+ """ Initialize a PBS Cluster
+
+ Parameters
+ ----------
+ name : str
+ Name of worker jobs. Passed to `$PBS -N` option.
+ queue : str
+ Destination queue for each worker job. Passed to `#PBS -q` option.
+ project : str
+ Accounting string associated with each worker job. Passed to
+ `#PBS -A` option.
+ resource_spec : str
+ Request resources and specify job placement. Passed to `#PBS -l`
+ option.
@mrocklin <https://github.com/mrocklin> - yes, disk I/O. I suspect it
will be difficult to clearly define the limiting factor on disk I/O. It
will greatly depend on both how the data is stored and the chunk
configuration. One thing we know for sure is that more processes certainly
helps.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#56 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszDOkBQKBcqosCFUuj5V5do_m59wLks5tDqYYgaJpZM4RLZA0>
.
|
|
@jhamman I believe that I've handled the multi-processing issues. This depends on a development branch in dask/distributed#1632 from pangeo import PBSCluster
cluster = PBSCluster(project='...')
cluster.adapt()
from dask.distributed import Client
client = Client(cluster)Then you'll get nine processes with four threads each by default. The cluster will scale up and down by full nodes. We group worker processes together by node before we determine which to clean up. |
|
Thanks @mrocklin - I'll give it a try and report back. |
| #PBS -j oe | ||
| #PBS -m abe | ||
|
|
||
| %(base_path)s/dask-worker %(scheduler)s \ |
There was a problem hiding this comment.
Where is the associated dask-worker script?
There was a problem hiding this comment.
This is installed when you install dask.distributed
|
|
|
Ah - how do I correctly install that development branch? I did this: and ended up with |
|
Something like the following would probably work: But as always this depends on how you've set up your environment. |
|
Thanks, I manually linked and that fixed the problem, tests are working now. |
|
Centralizing conversation from mrocklin#1 to here. @jedwards4b asks
Yes, that is the intended user experience behind this PR.
The top comment of this PR provides a minimal example. |
|
Your introductory comment was I think I'm beginning to understand, thanks. |
|
I apologize if my communication has frustrated you. As stated in #50 (comment) this might not be the right path for you. I don't know though because I'm not sure what it is that you're trying to accomplish. |
jhamman
left a comment
There was a problem hiding this comment.
I don't see anything here that should keep us from merging and making some incremental improvements as needed. Any objections to merging?
| --memory-limit %(memory)s \ | ||
| --name %(name)s-%(n)d \ | ||
| %(extra)s | ||
| """.lstrip() |
There was a problem hiding this comment.
In the future, we may want to make this template a attribute on the PBSCluster object. For use on Cheyenne, this will certainly work so I'm not suggesting changing this until we want to be entirely machine agnostic.
|
Sure. Merging. Was just waiting on feedback. |
This provides a Pythonic way to launch Dask clusters on a PBS cluster
from Python.
This also works with adaptive clusters
Examples