Source code for lwr.managers.queued_condor

from os.path import exists
from os import stat

from .util.condor import build_submit_description
from .util.condor import condor_submit, condor_stop, summarize_condor_log, submission_params
from .base.external import ExternalBaseManager
from ..managers import status

from logging import getLogger
log = getLogger(__name__)


# TODO:
#  - user_log_sizes and state_cache never expire
#    elements never expire. This is a small memory
#    whole that should be fixed.
[docs]class CondorQueueManager(ExternalBaseManager): """ Job manager backend that plugs into Condor. """ manager_type = "queued_condor" def __init__(self, name, app, **kwds): super(CondorQueueManager, self).__init__(name, app, **kwds) self.submission_params = submission_params(**kwds) self.user_log_sizes = {} self.state_cache = {}
[docs] def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): self._check_execution_with_tool_file(job_id, command_line) job_file_path = self._setup_job_file(job_id, command_line, dependencies_description=dependencies_description, env=env) log_path = self.__condor_user_log(job_id) open(log_path, 'w') # Touch log file build_submit_params = dict( executable=job_file_path, output=self._stdout_path(job_id), error=self._stderr_path(job_id), user_log=log_path, query_params=self.submission_params, ) submit_file_contents = build_submit_description(**build_submit_params) submit_file = self._write_job_file(job_id, "job.condor.submit", submit_file_contents) external_id, message = condor_submit(submit_file) if not external_id: raise Exception(message) self._register_external_id(job_id, external_id)
def __condor_user_log(self, job_id): return self._job_file(job_id, 'job_condor.log') def _kill_external(self, external_id): failure_message = condor_stop(external_id) if failure_message: log.warn("Failed to stop condor job with id %s - %s" % (external_id, failure_message))
[docs] def get_status(self, job_id): external_id = self._external_id(job_id) if not external_id: raise Exception("Failed to obtain external_id for job_id %s, cannot determine status." % job_id) log_path = self.__condor_user_log(job_id) if not exists(log_path): return status.COMPLETE if external_id not in self.user_log_sizes: self.user_log_sizes[external_id] = -1 self.state_cache[external_id] = status.QUEUED log_size = stat(log_path).st_size if log_size == self.user_log_sizes[external_id]: return self.state_cache[external_id] return self.__get_state_from_log(external_id, log_path)
def __get_state_from_log(self, external_id, log_file): s1, s4, s7, s5, s9, log_size = summarize_condor_log(log_file, external_id) if s5 or s9: state = status.COMPLETE elif s1 or s4 or s7: state = status.RUNNING else: state = status.QUEUED self.user_log_sizes[external_id] = log_size self.state_cache[external_id] = state return state