Source code for lwr.managers.base.directory

import os
import stat

JOB_FILE_RETURN_CODE = "return_code"
JOB_FILE_STANDARD_OUTPUT = "stdout"
JOB_FILE_STANDARD_ERROR = "stderr"
JOB_FILE_TOOL_ID = "tool_id"
JOB_FILE_TOOL_VERSION = "tool_version"

from lwr.managers.base import BaseManager
from lwr.managers import LWR_UNKNOWN_RETURN_CODE
from ..util.job_script import job_script
from ..util.env import env_to_statement


[docs]class DirectoryBaseManager(BaseManager): def _job_file(self, job_id, name): return self._job_directory(job_id)._job_file(name)
[docs] def return_code(self, job_id): return_code_str = self._read_job_file(job_id, JOB_FILE_RETURN_CODE, default=LWR_UNKNOWN_RETURN_CODE) return int(return_code_str) if return_code_str and return_code_str != LWR_UNKNOWN_RETURN_CODE else return_code_str
[docs] def stdout_contents(self, job_id): return self._read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT, default="")
[docs] def stderr_contents(self, job_id): return self._read_job_file(job_id, JOB_FILE_STANDARD_ERROR, default="")
def _stdout_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_OUTPUT) def _stderr_path(self, job_id): return self._job_file(job_id, JOB_FILE_STANDARD_ERROR) def _return_code_path(self, job_id): return self._job_file(job_id, JOB_FILE_RETURN_CODE) def _setup_job_for_job_id(self, job_id, tool_id, tool_version): self._setup_job_directory(job_id) tool_id = str(tool_id) if tool_id else "" tool_version = str(tool_version) if tool_version else "" authorization = self._get_authorization(job_id, tool_id) authorization.authorize_setup() self._write_tool_info(job_id, tool_id, tool_version) return job_id def _read_job_file(self, job_id, name, **kwds): return self._job_directory(job_id).read_file(name, **kwds) def _write_job_file(self, job_id, name, contents): return self._job_directory(job_id).write_file(name, contents) def _write_return_code(self, job_id, return_code): self._write_job_file(job_id, JOB_FILE_RETURN_CODE, str(return_code)) def _write_tool_info(self, job_id, tool_id, tool_version): job_directory = self._job_directory(job_id) job_directory.store_metadata(JOB_FILE_TOOL_ID, tool_id) job_directory.store_metadata(JOB_FILE_TOOL_VERSION, tool_version) def _open_standard_output(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_OUTPUT, 'w') def _open_standard_error(self, job_id): return self._job_directory(job_id).open_file(JOB_FILE_STANDARD_ERROR, 'w') def _check_execution_with_tool_file(self, job_id, command_line): tool_id = self._tool_id(job_id) self._check_execution(job_id, tool_id, command_line) def _tool_id(self, job_id): tool_id = None job_directory = self._job_directory(job_id) if job_directory.has_metadata(JOB_FILE_TOOL_ID): tool_id = job_directory.load_metadata(JOB_FILE_TOOL_ID) return tool_id # Helpers methods related to setting up job script files. def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[]): command_line = self._expand_command_line(command_line, dependencies_description) script_env = self._job_template_env(job_id, command_line=command_line, env=env) script = job_script(**script_env) return self._write_job_script(job_id, script) def _job_template_env(self, job_id, command_line=None, env=[]): return_code_path = self._return_code_path(job_id) # TODO: Add option to ignore remote env. env = env + self.env_vars env_setup_commands = map(env_to_statement, env) job_template_env = { 'job_instrumenter': self.job_metrics.default_job_instrumenter, 'galaxy_lib': self._galaxy_lib(), 'env_setup_commands': env_setup_commands, 'exit_code_path': return_code_path, 'working_directory': self.job_directory(job_id).working_directory(), 'job_id': job_id, } if command_line: job_template_env['command'] = command_line return job_template_env def _write_job_script(self, job_id, contents): self._write_job_file(job_id, "command.sh", contents) script_path = self._job_file(job_id, "command.sh") os.chmod(script_path, stat.S_IEXEC | stat.S_IWRITE | stat.S_IREAD) return script_path