Skip to content

Commit 49c5c1e

Browse files
committed
Initial implementation of cwl_run command.
Usage: ``` planemo cwl_run [--conformance-test] tool.cwl job.json ``` This will load a Galaxy instance configured to run CWL tools with the specified tool and then submit the job described by job.json. Add cwl-runner implementation wrapper. To mimic the interface established by the reference implementation cwltool. See https://github.com/common-workflow-language/cwltool/tree/master/cwl-runner for more information. Known issues - The test case doesn't work even though the command works fine from the command line. - --conformance-test prints extra junk to the screen it shouldn't. - Files aren't staged back out of Galaxy as would be expected by reference implementation.
1 parent be54e4e commit 49c5c1e

File tree

14 files changed

+337
-37
lines changed

14 files changed

+337
-37
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ develop-eggs
1919
lib
2020
lib64
2121

22+
cwl-runner/dist
23+
cwl-runner/build
24+
2225
# Installer logs
2326
pip-log.txt
2427

cwl-runner/README

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
This an optional companion package to planemo which provides provides
2+
an additional entry point under the alias "cwl-runner", which is the
3+
implementation-agnostic name for the default CWL interpreter installed
4+
on a host.
5+
6+
This package is based on the CWL reference implementation package of
7+
the same name, see
8+
https://github.com/common-workflow-language/cwltool/tree/master/cwl-runner
9+
for more information.

cwl-runner/cwl-runner

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/sh
2+
3+
planemo cwl_run "$@"

cwl-runner/setup.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
import sys
5+
import setuptools.command.egg_info as egg_info_cmd
6+
import shutil
7+
8+
from setuptools import setup, find_packages
9+
10+
SETUP_DIR = os.path.dirname(__file__)
11+
README = os.path.join(SETUP_DIR, 'README')
12+
13+
scripts = ["cwl-runner"]
14+
15+
setup(name='cwl_runner',
16+
version='1.0',
17+
description='Common workflow language interpreter implementation (for Galaxy + Planemo)',
18+
long_description=open(README).read(),
19+
author='John Chilton',
20+
author_email='jmchilton@gmail.com',
21+
url="https://github.com/galaxyproject/planemo",
22+
download_url="https://github.com/galaxyproject/planemo",
23+
license="AFL",
24+
install_requires=[
25+
'planemo'
26+
],
27+
scripts=scripts,
28+
zip_safe=True
29+
)

planemo/commands/cmd_cwl_run.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import click
2+
from planemo.cli import pass_context
3+
from planemo.io import conditionally_captured_io
4+
from planemo import options
5+
from planemo import galaxy_serve
6+
from planemo import cwl
7+
from planemo import io
8+
9+
10+
@click.command('cwl_run')
11+
@options.required_tool_arg()
12+
@options.required_job_arg()
13+
@options.galaxy_serve_options()
14+
@options.galaxy_cwl_root_option()
15+
@options.cwl_conformance_test()
16+
@pass_context
17+
def cli(ctx, path, job_path, **kwds):
18+
"""Planemo command for running CWL tools and jobs.
19+
20+
::
21+
22+
% planemo cwl_run cat1-tool.cwl cat-job.json
23+
"""
24+
# TODO: serve options aren't exactly right - don't care about
25+
# port for instance.
26+
kwds["cwl"] = True
27+
conformance_test = kwds.get("conformance_test", False)
28+
with conditionally_captured_io(conformance_test):
29+
with galaxy_serve.serve_daemon(ctx, [path], **kwds) as config:
30+
try:
31+
cwl_run = cwl.run_cwl_tool(path, job_path, config, **kwds)
32+
except Exception:
33+
io.warn("Problem running cwl tool...")
34+
print(config.log_contents)
35+
raise
36+
37+
print(cwl_run.cwl_command_state)
38+
return 0

planemo/cwl/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .client import run_cwl_tool
2+
3+
__all__ = [
4+
'run_cwl_tool',
5+
]

planemo/cwl/client.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
""" High-level client sitting on top of bioblend for running CWL
2+
stuff in Galaxy.
3+
"""
4+
from __future__ import print_function
5+
6+
import json
7+
import os
8+
9+
from planemo.io import wait_on
10+
11+
12+
DEFAULT_HISTORY_NAME = "CWL Target History"
13+
14+
15+
def run_cwl_tool(tool_path, job_path, config, **kwds):
16+
user_gi = config.user_gi
17+
admin_gi = config.gi
18+
19+
tool_id = _tool_id(tool_path)
20+
history_id = _history_id(user_gi, **kwds)
21+
job_dict = _galactic_job_json(job_path, user_gi, history_id)
22+
final_state = _wait_for_history(user_gi, history_id)
23+
if final_state != "ok":
24+
msg = "Failed to run CWL job final job state is [%s]." % final_state
25+
with open("errored_galaxy.log", "w") as f:
26+
f.write(config.log_contents)
27+
raise Exception(msg)
28+
run_tool_payload = dict(
29+
history_id=history_id,
30+
tool_id=tool_id,
31+
tool_inputs=job_dict,
32+
inputs_representation="cwl",
33+
)
34+
run_response = user_gi.tools._tool_post(run_tool_payload)
35+
job = run_response["jobs"][0]
36+
job_id = job["id"]
37+
final_state = _wait_for_job(user_gi, job_id)
38+
if final_state != "ok":
39+
msg = "Failed to run CWL job final job state is [%s]." % final_state
40+
with open("errored_galaxy.log", "w") as f:
41+
f.write(config.log_contents)
42+
raise Exception(msg)
43+
44+
job_info = admin_gi.jobs.show_job(job_id)
45+
cwl_command_state = job_info["cwl_command_state"]
46+
47+
return CwlRunResponse(cwl_command_state, run_response)
48+
49+
50+
class CwlRunResponse(object):
51+
52+
def __init__(self, cwl_command_state, api_run_response):
53+
self.cwl_command_state = cwl_command_state
54+
self.api_run_response = api_run_response
55+
56+
57+
def _history_id(gi, **kwds):
58+
history_id = kwds.get("history_id", None)
59+
if history_id is None:
60+
history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME)
61+
history_id = gi.histories.create_history(history_name)["id"]
62+
return history_id
63+
64+
65+
def _tool_id(tool_path):
66+
tool_id, _ = os.path.splitext(os.path.basename(tool_path))
67+
return tool_id
68+
69+
70+
def _galactic_job_json(job_path, gi, history_id):
71+
with open(job_path, "r") as f:
72+
job_as_dict = json.load( f )
73+
74+
replace_keys = {}
75+
for key, value in job_as_dict.iteritems():
76+
if isinstance( value, dict ):
77+
type_class = value.get("class", None)
78+
if type_class != "File":
79+
continue
80+
81+
file_path = value.get("path", None)
82+
if file_path is None:
83+
continue
84+
85+
if not os.path.isabs(file_path):
86+
directory = os.path.dirname(job_path)
87+
file_path = os.path.join(directory, file_path)
88+
89+
upload_response = gi.tools.upload_file(file_path, history_id)
90+
dataset_id = upload_response["outputs"][0]["id"]
91+
92+
replace_keys[key] = {"src": "hda", "id": dataset_id}
93+
94+
job_as_dict.update(replace_keys)
95+
return job_as_dict
96+
97+
98+
def _wait_for_history(gi, history_id):
99+
state_func = lambda: gi.histories.show_history(history_id)
100+
return _wait_on_state(state_func)
101+
102+
103+
def _wait_for_job(gi, job_id):
104+
state_func = lambda: gi.jobs.show_job(job_id)
105+
return _wait_on_state(state_func)
106+
107+
108+
def _wait_on_state(state_func):
109+
110+
def get_state():
111+
response = state_func()
112+
state = response[ "state" ]
113+
if str(state) not in [ "running", "queued", "new", "ready" ]:
114+
return state
115+
else:
116+
return None
117+
118+
final_state = wait_on(get_state, "state", timeout=100)
119+
return final_state

planemo/galaxy_config.py

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import os
66
import random
77
import shutil
8-
import time
98
from six.moves.urllib.request import urlopen
109
from six import iteritems
1110
from string import Template
@@ -19,6 +18,7 @@
1918
from planemo.io import shell
2019
from planemo.io import write_file
2120
from planemo.io import kill_pid_file
21+
from planemo.io import wait_on
2222
from planemo import git
2323
from planemo.shed import tool_shed_url
2424
from planemo.bioblend import (
@@ -154,7 +154,7 @@ def config_join(*args):
154154

155155
os.makedirs(shed_tools_path)
156156
server_name = "planemo%d" % random.randint(0, 100000)
157-
port = kwds.get("port", 9090)
157+
port = int(kwds.get("port", 9090))
158158
template_args = dict(
159159
port=port,
160160
host=kwds.get("host", "127.0.0.1"),
@@ -276,6 +276,7 @@ def __init__(
276276
self.port = port
277277
self.server_name = server_name
278278
self.master_api_key = master_api_key
279+
self._user_api_key = None
279280

280281
def kill(self):
281282
kill_pid_file(self.pid_file)
@@ -288,10 +289,33 @@ def pid_file(self):
288289
def gi(self):
289290
ensure_module(galaxy)
290291
return galaxy.GalaxyInstance(
291-
url="http://localhost:%d" % self.port,
292+
url="http://localhost:%d" % int(self.port),
292293
key=self.master_api_key
293294
)
294295

296+
@property
297+
def user_gi(self):
298+
# TODO: thread-safe
299+
if self._user_api_key is None:
300+
users = self.gi.users
301+
# Allow override with --user_api_key.
302+
user_response = users.create_local_user(
303+
"planemo",
304+
"planemo@galaxyproject.org",
305+
"planemo",
306+
)
307+
user_id = user_response["id"]
308+
309+
self._user_api_key = users.create_user_apikey(user_id)
310+
return self._gi_for_key(self._user_api_key)
311+
312+
def _gi_for_key(self, key):
313+
ensure_module(galaxy)
314+
return galaxy.GalaxyInstance(
315+
url="http://localhost:%d" % self.port,
316+
key=key
317+
)
318+
295319
def install_repo(self, *args, **kwds):
296320
self.tool_shed_client.install_repository_revision(
297321
*args, **kwds
@@ -305,33 +329,30 @@ def wait_for_all_installed(self):
305329
def status_ready(repo):
306330
status = repo["status"]
307331
if status in ["Installing", "New"]:
308-
return False
332+
return None
309333
if status == "Installed":
310334
return True
311335
raise Exception("Error installing repo status is %s" % status)
312336

313-
def not_ready():
337+
def ready():
314338
repos = self.tool_shed_client.get_repositories()
315-
return not all(map(status_ready, repos))
316-
317-
self._wait_for(not_ready)
318-
319-
# Taken from Galaxy's twilltestcase.
320-
def _wait_for(self, func, **kwd):
321-
sleep_amount = 0.2
322-
slept = 0
323-
walltime_exceeded = 1086400
324-
while slept <= walltime_exceeded:
325-
result = func()
326-
if result:
327-
time.sleep(sleep_amount)
328-
slept += sleep_amount
329-
sleep_amount *= 1.25
330-
if slept + sleep_amount > walltime_exceeded:
331-
sleep_amount = walltime_exceeded - slept
332-
else:
333-
break
334-
assert slept < walltime_exceeded, "Action taking too long."
339+
ready = all(map(status_ready, repos))
340+
return ready or None
341+
342+
wait_on(ready)
343+
344+
@property
345+
def log_file(self):
346+
""" Not actually used by this module, but galaxy_serve will
347+
respect it.
348+
"""
349+
file_name = "%s.log" % self.server_name
350+
return os.path.join(self.galaxy_root, file_name)
351+
352+
@property
353+
def log_contents(self):
354+
with open(self.log_file, "r") as f:
355+
return f.read()
335356

336357
def cleanup(self):
337358
shutil.rmtree(self.config_directory)

planemo/galaxy_serve.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,8 @@ def serve(ctx, paths, **kwds):
4141

4242
@contextlib.contextmanager
4343
def shed_serve(ctx, install_args_list, **kwds):
44-
config = serve(ctx, [], daemon=True, **kwds)
45-
install_deps = not kwds.get("skip_dependencies", False)
46-
try:
44+
with serve_daemon(ctx, **kwds) as config:
45+
install_deps = not kwds.get("skip_dependencies", False)
4746
io.info("Installing repositories - this may take some time...")
4847
for install_args in install_args_list:
4948
install_args["install_tool_dependencies"] = install_deps
@@ -54,6 +53,13 @@ def shed_serve(ctx, install_args_list, **kwds):
5453
)
5554
config.wait_for_all_installed()
5655
yield config
56+
57+
58+
@contextlib.contextmanager
59+
def serve_daemon(ctx, paths=[], **kwds):
60+
try:
61+
config = serve(ctx, paths, daemon=True, **kwds)
62+
yield config
5763
finally:
5864
config.kill()
5965
if not kwds.get("no_cleanup", False):

0 commit comments

Comments
 (0)