Source code for lwr.managers.unqueued

try:
    import thread
except ImportError:
    import _thread as thread  # Py3K changed it.
import platform

from .util import kill_pid
from lwr.managers.base.directory import DirectoryBaseManager
from lwr.managers import status
from galaxy.util import execute

from logging import getLogger
log = getLogger(__name__)

JOB_FILE_SUBMITTED = "submitted"
JOB_FILE_CANCELLED = "cancelled"
JOB_FILE_PID = "pid"


# Job Locks (for status updates). Following methods are locked.
#    _finish_execution(self, job_id)
#    _get_status(self, job_id)
#    _is_cancelled(self, job_id)
#    _record_pid(self, job_id, pid)
#    _get_pid_for_killing_or_cancel(self, job_id)
#
[docs]class Manager(DirectoryBaseManager): """ A simple job manager that just directly runs jobs as given (no queueing). Preserved for compatibilty with older versions of LWR client code where Galaxy is used to maintain queue (like Galaxy's local job runner). """ manager_type = "unqueued" def __init__(self, name, app, **kwds): super(Manager, self).__init__(name, app, **kwds) def _record_cancel(self, job_id): self._job_directory(job_id).store_metadata(JOB_FILE_CANCELLED, 'true') def _record_submission(self, job_id): self._job_directory(job_id).store_metadata(JOB_FILE_SUBMITTED, 'true') def __get_pid(self, job_id): pid = None try: pid = self._job_directory(job_id).load_metadata(JOB_FILE_PID) if pid is not None: pid = int(pid) except: pass return pid
[docs] def setup_job(self, input_job_id, tool_id, tool_version): job_id = self._get_job_id(input_job_id) return self._setup_job_for_job_id(job_id, tool_id, tool_version)
def _get_job_id(self, galaxy_job_id): return str(self.id_assigner(galaxy_job_id)) def _get_job_lock(self, job_id): return self._job_directory(job_id).lock()
[docs] def get_status(self, job_id): with self._get_job_lock(job_id): return self._get_status(job_id)
[docs] def kill(self, job_id): log.info("Attempting to kill job with job_id %s" % job_id) job_lock = self._get_job_lock(job_id) with job_lock: pid = self._get_pid_for_killing_or_cancel(job_id) if pid: log.info("Attempting to kill pid %s" % pid) kill_pid(pid)
def _monitor_execution(self, job_id, proc, stdout, stderr): try: proc.wait() stdout.close() stderr.close() return_code = proc.returncode # TODO: This is invalid if we have written a job script. self._write_return_code(job_id, str(return_code)) finally: with self._get_job_lock(job_id): self._finish_execution(job_id) # with job lock def _finish_execution(self, job_id): self._job_directory(job_id).remove_metadata(JOB_FILE_SUBMITTED) self._job_directory(job_id).remove_metadata(JOB_FILE_PID) # with job lock def _get_status(self, job_id): job_directory = self._job_directory(job_id) if self._is_cancelled(job_id): job_status = status.CANCELLED elif job_directory.has_metadata(JOB_FILE_PID): job_status = status.RUNNING elif job_directory.has_metadata(JOB_FILE_SUBMITTED): job_status = status.QUEUED else: job_status = status.COMPLETE return job_status # with job lock def _is_cancelled(self, job_id): return self._job_directory(job_id).has_metadata(JOB_FILE_CANCELLED) # with job lock def _record_pid(self, job_id, pid): self._job_directory(job_id).store_metadata(JOB_FILE_PID, str(pid)) # with job lock def _get_pid_for_killing_or_cancel(self, job_id): job_status = self._get_status(job_id) if job_status not in [status.RUNNING, status.QUEUED]: return pid = self.__get_pid(job_id) if pid is None: self._record_cancel(job_id) self._job_directory(job_id).remove_metadata(JOB_FILE_SUBMITTED) return pid def _run(self, job_id, command_line, async=True): with self._get_job_lock(job_id): if self._is_cancelled(job_id): return job_directory = self.job_directory(job_id) working_directory = job_directory.working_directory() stdout = self._open_standard_output(job_id) stderr = self._open_standard_error(job_id) proc = execute(command_line=command_line, working_directory=working_directory, stdout=stdout, stderr=stderr) with self._get_job_lock(job_id): self._record_pid(job_id, proc.pid) if async: thread.start_new_thread(self._monitor_execution, (job_id, proc, stdout, stderr)) else: self._monitor_execution(job_id, proc, stdout, stderr)
[docs] def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env) self._run(job_id, command_line)
def _prepare_run(self, job_id, command_line, dependencies_description, env): self._check_execution_with_tool_file(job_id, command_line) self._record_submission(job_id) if platform.system().lower() == "windows": # TODO: Don't ignore requirements and env without warning. Ideally # process them or at least warn about them being ignored. command_line = self._expand_command_line(command_line, dependencies_description) else: command_line = self._setup_job_file(job_id, command_line, dependencies_description=dependencies_description, env=env) return command_line
__all__ = [Manager]