Source code for lwr.managers.base.base_drmaa

from .external import ExternalBaseManager
from ..util.drmaa import DrmaaSessionFactory
from lwr.managers import status

try:
    from drmaa import JobState
except ImportError:
    JobState = None

import logging
log = logging.getLogger(__name__)


[docs]class BaseDrmaaManager(ExternalBaseManager): def __init__(self, name, app, **kwds): super(BaseDrmaaManager, self).__init__(name, app, **kwds) self.native_specification = kwds.get('native_specification', None) drmaa_session_factory_class = kwds.get('drmaa_session_factory_class', DrmaaSessionFactory) drmaa_session_factory = drmaa_session_factory_class() self.drmaa_session = drmaa_session_factory.get()
[docs] def shutdown(self): try: super(BaseDrmaaManager, self).shutdown() except: pass self.drmaa_session.close()
def _get_status_external(self, external_id): drmaa_state = self.drmaa_session.job_status(external_id) return { JobState.UNDETERMINED: status.COMPLETE, JobState.QUEUED_ACTIVE: status.QUEUED, JobState.SYSTEM_ON_HOLD: status.QUEUED, JobState.USER_ON_HOLD: status.QUEUED, JobState.USER_SYSTEM_ON_HOLD: status.QUEUED, JobState.RUNNING: status.RUNNING, JobState.SYSTEM_SUSPENDED: status.QUEUED, JobState.USER_SUSPENDED: status.QUEUED, JobState.DONE: status.COMPLETE, JobState.FAILED: status.COMPLETE, # Should be a FAILED state here as well }[drmaa_state] def _build_template_attributes(self, job_id, command_line, dependencies_description=None, env=[], submit_params={}): stdout_path = self._stdout_path(job_id) stderr_path = self._stderr_path(job_id) attributes = { "remoteCommand": self._setup_job_file(job_id, command_line, dependencies_description=dependencies_description, env=env), "jobName": self._job_name(job_id), "outputPath": ":%s" % stdout_path, "errorPath": ":%s" % stderr_path, } if self.native_specification: attributes["nativeSpecification"] = self.native_specification elif submit_params.get("native_specification", None): attributes["nativeSpecification"] = submit_params["native_specification"] return attributes