from json import dumps
from getpass import getuser
from .base.base_drmaa import BaseDrmaaManager
from .util.sudo import sudo_popen
from ..managers import status
from logging import getLogger
log = getLogger(__name__)
DEFAULT_CHOWN_WORKING_DIRECTORY_SCRIPT = "scripts/chown_working_directory.bash"
DEFAULT_DRMAA_KILL_SCRIPT = "scripts/drmaa_kill.bash"
DEFAULT_DRMAA_LAUNCH_SCRIPT = "scripts/drmaa_launch.bash"
[docs]class ExternalDrmaaQueueManager(BaseDrmaaManager):
"""
DRMAA backed queue manager.
"""
manager_type = "queued_external_drmaa"
def __init__(self, name, app, **kwds):
super(ExternalDrmaaQueueManager, self).__init__(name, app, **kwds)
self.chown_working_directory_script = kwds.get('chown_working_directory_script', DEFAULT_CHOWN_WORKING_DIRECTORY_SCRIPT)
self.drmaa_kill_script = kwds.get('drmaa_kill_script', DEFAULT_DRMAA_KILL_SCRIPT)
self.drmaa_launch_script = kwds.get('drmaa_launch_script', DEFAULT_DRMAA_LAUNCH_SCRIPT)
self.production = kwds.get('production', "true").lower() != "false"
self.reclaimed = {}
self.user_map = {}
[docs] def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]):
self._check_execution_with_tool_file(job_id, command_line)
attributes = self._build_template_attributes(
job_id,
command_line,
dependencies_description=dependencies_description,
env=env,
submit_params=submit_params,
)
print open(attributes['remoteCommand'], 'r').read()
job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes))
user = submit_params.get('user', None)
log.info("Submit as user %s" % user)
if not user:
raise Exception("Must specify user submit parameter with this manager.")
self.__change_ownership(job_id, user)
external_id = self.__launch(job_attributes_file, user).strip()
self.user_map[external_id] = user
self._register_external_id(job_id, external_id)
def _kill_external(self, external_id):
user = self.user_map[external_id]
self.__sudo(self.drmaa_kill_script, "--external_id", external_id, user=user)
[docs] def get_status(self, job_id):
external_id = self._external_id(job_id)
if not external_id:
raise KeyError("Failed to find external id for job_id %s" % job_id)
external_status = super(ExternalDrmaaQueueManager, self)._get_status_external(external_id)
if external_status == status.COMPLETE and job_id not in self.reclaimed:
self.reclaimed[job_id] = True
self.__change_ownership(job_id, getuser())
return external_status
def __launch(self, job_attributes, user):
return self.__sudo(self.drmaa_launch_script, "--job_attributes", str(job_attributes), user=user)
def __change_ownership(self, job_id, username):
cmds = [self.chown_working_directory_script, "--user", str(username)]
if self.production:
cmds.extend(["--job_id", job_id])
else:
# In testing, the loading working directory from server.ini doesn't
# work. Need to reimagine how to securely map job_id to working
# direcotry between test cases and production.
cmds.extend(["--job_directory", str(self._job_directory(job_id).path)])
# TODO: Verify ownership change.
self.__sudo(*cmds)
def __sudo(self, *cmds, **kwargs):
p = sudo_popen(*cmds, **kwargs)
stdout, stderr = p.communicate()
assert p.returncode == 0, "%s, %s" % (stdout, stderr)
return stdout