Source code for lwr.web.routes

import os
from webob import exc
from json import loads

from galaxy.util import (
    copy_to_path,
    copy_to_temp,
)
from lwr.lwr_client.job_directory import verify_is_in_directory
from lwr.web.framework import Controller
from lwr.manager_factory import DEFAULT_MANAGER_NAME
from lwr.manager_endpoint_util import (
    submit_job,
    setup_job,
    full_status,
)

import logging
log = logging.getLogger(__name__)


[docs]class LwrController(Controller): def __init__(self, **kwargs): super(LwrController, self).__init__(**kwargs) def _check_access(self, req, environ, start_response): if req.app.private_key: sent_private_key = req.GET.get("private_key", None) if not (req.app.private_key == sent_private_key): return exc.HTTPUnauthorized()(environ, start_response) def _app_args(self, args, req): app = req.app managers = app.managers manager_name = args.get('manager_name', DEFAULT_MANAGER_NAME) app_args = {} app_args['manager'] = managers[manager_name] app_args['file_cache'] = getattr(app, 'file_cache', None) app_args['object_store'] = getattr(app, 'object_store', None) return app_args
@LwrController(response_type='json') def setup(manager, job_id, tool_id=None, tool_version=None): return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version) def __setup(manager, job_id, tool_id, tool_version): response = setup_job(manager, job_id, tool_id, tool_version) log.debug("Setup job with configuration: %s" % response) return response @LwrController() def clean(manager, job_id): manager.clean(job_id) @LwrController() def launch(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='[]', env='[]'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) env = loads(env) remote_staging = loads(remote_staging) submit_config = dict( job_id=job_id, command_line=command_line, setup_params=setup_params, submit_params=submit_params, dependencies_description=dependencies_description, env=env, remote_staging=remote_staging ) submit_job(manager, submit_config) @LwrController(response_type='json') def check_complete(manager, job_id): status = manager.get_status(job_id) return full_status(manager, status, job_id) @LwrController() def kill(manager, job_id): manager.kill(job_id) # Following routes allow older clients to talk to new LWR, should be considered # deprecated in favor of generic upload_file route. @LwrController(response_type='json') def upload_tool_file(manager, file_cache, job_id, name, body, cache_token=None): path = manager.job_directory(job_id).calculate_path(name, 'tool') return _handle_upload( file_cache, path, body, cache_token=cache_token ) @LwrController(response_type='json') def upload_input(manager, file_cache, job_id, name, body, cache_token=None): path = manager.job_directory(job_id).calculate_path(name, 'input') return _handle_upload( file_cache, path, body, cache_token=cache_token ) @LwrController(response_type='json') def upload_extra_input(manager, file_cache, job_id, name, body, cache_token=None): path = manager.job_directory(job_id).calculate_path(name, 'input') return _handle_upload( file_cache, path, body, cache_token=cache_token ) @LwrController(response_type='json') def upload_config_file(manager, file_cache, job_id, name, body, cache_token=None): path = manager.job_directory(job_id).calculate_path(name, 'config') return _handle_upload( file_cache, path, body, cache_token=cache_token, ) @LwrController(response_type='json') def upload_working_directory_file(manager, file_cache, job_id, name, body, cache_token=None): path = manager.job_directory(job_id).calculate_path(name, 'workdir') return _handle_upload( file_cache, path, body, cache_token=cache_token, ) @LwrController(response_type='json') def upload_unstructured_file(manager, file_cache, job_id, name, body, cache_token=None): path = manager.job_directory(job_id).calculate_path(name, 'unstructured') return _handle_upload( file_cache, path, body, cache_token=cache_token, ) @LwrController(response_type='json') def upload_file(manager, input_type, file_cache, job_id, name, body, cache_token=None): # Input type should be one of input, config, workdir, tool, or unstructured. path = manager.job_directory(job_id).calculate_path(name, input_type) return _handle_upload(file_cache, path, body, cache_token=cache_token) @LwrController(response_type='json') def input_path(manager, input_type, job_id, name): path = manager.job_directory(job_id).calculate_path(name, input_type) return {'path': path} @LwrController(response_type='file') def download_output(manager, job_id, name, output_type="direct"): return _output_path(manager, job_id, name, output_type) @LwrController(response_type='json') def output_path(manager, job_id, name, output_type="direct"): # output_type should be one of... # work_dir, direct # Added for non-transfer downloading. return {"path": _output_path(manager, job_id, name, output_type)} def _output_path(manager, job_id, name, output_type): """ """ directory = manager.job_directory(job_id).outputs_directory() if output_type == "task" or output_type == "work_dir": directory = manager.job_directory(job_id).working_directory() path = os.path.join(directory, name) verify_is_in_directory(path, directory) return path @LwrController(response_type='json') def file_available(file_cache, ip, path): """ Returns {token: <token>, ready: <bool>} """ return file_cache.file_available(ip, path) @LwrController(response_type='json') def cache_required(file_cache, ip, path): """ Returns bool indicating whether this client should execute cache_insert. Either way client should be follow up with file_available. """ return file_cache.cache_required(ip, path) @LwrController(response_type='json') def cache_insert(file_cache, ip, path, body): temp_path = copy_to_temp(body) file_cache.cache_file(temp_path, ip, path) # TODO: coerce booleans and None values into correct types - simplejson may # do this already but need to check. @LwrController(response_type='json') def object_store_exists(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.exists(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_file_ready(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.file_ready(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_create(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.create(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_empty(object_store, object_id, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.empty(obj, base_dir=base_dir, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_size(object_store, object_id, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.size(obj, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_delete(object_store, object_id, entire_dir=False, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.delete(obj, entire_dir=False, base_dir=None, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_get_data(object_store, object_id, start=0, count=-1, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.get_data(obj, start=int(start), count=int(count), entire_dir=False, base_dir=None, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_get_filename(object_store, object_id, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None): obj = LwrDataset(object_id) return object_store.get_filename(obj, base_dir=base_dir, dir_only=dir_only, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name) @LwrController(response_type='json') def object_store_update_from_file(object_store, object_id, base_dir=None, extra_dir=None, extra_dir_at_root=False, alt_name=None, file_name=None, create=False): obj = LwrDataset(object_id) return object_store.update_from_file(obj, base_dir=base_dir, extra_dir=extra_dir, extra_dir_at_root=extra_dir_at_root, alt_name=alt_name, file_name=file_name, create=create) @LwrController(response_type='json') def object_store_get_store_usage_percent(object_store): return object_store.get_store_usage_percent()
[docs]class LwrDataset(object): """Intermediary between lwr and objectstore.""" def __init__(self, id): self.id = id self.object_store_id = None
def _handle_upload(file_cache, path, body, cache_token=None): source = body if cache_token: cached_file = file_cache.destination(cache_token) source = open(cached_file, 'rb') log.info("Copying cached file %s to %s" % (cached_file, path)) copy_to_path(source, path) return {"path": path}