import datetime
import os
import time
import threading
from lwr.managers import ManagerProxy
from lwr.managers import status
from .staging import preprocess
from .staging import postprocess
import logging
log = logging.getLogger(__name__)
DEFAULT_DO_MONITOR = False
DECACTIVATE_FAILED_MESSAGE = "Failed to deactivate job with job id %s. May be problems when starting LWR next."
ACTIVATE_FAILED_MESSAGE = "Failed to activate job wiht job id %s. This job may not recover properly upon LWR restart."
JOB_FILE_FINAL_STATUS = "final_status"
JOB_FILE_POSTPROCESSED = "postprocessed"
JOB_FILE_PREPROCESSED = "preprocessed"
JOB_METADATA_RUNNING = "running"
DEFAULT_MIN_POLLING_INTERVAL = 0.5
[docs]class StatefulManagerProxy(ManagerProxy):
"""
"""
def __init__(self, manager, **manager_options):
super(StatefulManagerProxy, self).__init__(manager)
min_polling_interval = manager_options.get("min_polling_interval", DEFAULT_MIN_POLLING_INTERVAL)
self.min_polling_interval = datetime.timedelta(0, min_polling_interval)
self.active_jobs = ActiveJobs(manager)
self.__state_change_callback = lambda status, job_id: None
self.__recover_active_jobs()
self.__monitor = None
[docs] def set_state_change_callback(self, state_change_callback):
self.__state_change_callback = state_change_callback
self.__monitor = ManagerMonitor(self)
@property
def name(self):
return self._proxied_manager.name
[docs] def setup_job(self, *args, **kwargs):
job_id = self._proxied_manager.setup_job(*args, **kwargs)
return job_id
[docs] def handle_remote_staging(self, job_id, staging_config):
job_directory = self._proxied_manager.job_directory(job_id)
job_directory.store_metadata("staging_config", staging_config)
[docs] def launch(self, job_id, *args, **kwargs):
job_directory = self._proxied_manager.job_directory(job_id)
def do_preprocess():
try:
staging_config = job_directory.load_metadata("staging_config", {})
preprocess(job_directory, staging_config.get("setup", []))
self._proxied_manager.launch(job_id, *args, **kwargs)
with job_directory.lock("status"):
job_directory.store_metadata(JOB_FILE_PREPROCESSED, True)
self.active_jobs.activate_job(job_id)
except Exception:
log.exception("Failed job preprocess for %s:", job_id)
self.__state_change_callback(status.FAILED, job_id)
new_thread_for_manager(self, "preprocess", do_preprocess, daemon=False)
[docs] def get_status(self, job_id):
""" Compute status used proxied manager and handle state transitions
and track additional state information needed.
"""
job_directory = self._proxied_manager.job_directory(job_id)
with job_directory.lock("status"):
proxy_status, state_change = self.__proxy_status(job_directory, job_id)
if state_change == "to_complete":
self.__deactivate(job_id, proxy_status)
elif state_change == "to_running":
self.__state_change_callback(status.RUNNING, job_id)
return self.__status(job_directory, proxy_status)
def __proxy_status(self, job_directory, job_id):
""" Determine state with proxied job manager and if this job needs
to be marked as deactivated (this occurs when job first returns a
complete status from proxy.
"""
state_change = None
if not job_directory.has_metadata(JOB_FILE_PREPROCESSED):
proxy_status = status.PREPROCESSING
elif job_directory.has_metadata(JOB_FILE_FINAL_STATUS):
proxy_status = job_directory.load_metadata(JOB_FILE_FINAL_STATUS)
else:
proxy_status = self._proxied_manager.get_status(job_id)
if proxy_status == status.RUNNING:
if not job_directory.has_metadata(JOB_METADATA_RUNNING):
job_directory.store_metadata(JOB_METADATA_RUNNING, True)
state_change = "to_running"
elif proxy_status in [status.COMPLETE, status.CANCELLED]:
job_directory.store_metadata(JOB_FILE_FINAL_STATUS, proxy_status)
state_change = "to_complete"
return proxy_status, state_change
def __status(self, job_directory, proxy_status):
""" Use proxied manager's status to compute the real
(stateful) status of job.
"""
if proxy_status == status.COMPLETE:
if not job_directory.has_metadata(JOB_FILE_POSTPROCESSED):
job_status = status.POSTPROCESSING
else:
job_status = status.COMPLETE
else:
job_status = proxy_status
return job_status
def __deactivate(self, job_id, proxy_status):
self.active_jobs.deactivate_job(job_id)
deactivate_method = getattr(self._proxied_manager, "_deactivate_job", None)
if deactivate_method:
try:
deactivate_method(job_id)
except Exception:
log.exception("Failed to deactivate via proxied manager job %s" % job_id)
if proxy_status == status.COMPLETE:
self.__handle_postprocessing(job_id)
def __handle_postprocessing(self, job_id):
def do_postprocess():
postprocess_success = False
try:
postprocess_success = postprocess(self._proxied_manager.job_directory(job_id))
except Exception:
log.exception("Failed to postprocess results for job id %s" % job_id)
final_status = status.COMPLETE if postprocess_success else status.FAILED
self.__state_change_callback(final_status, job_id)
new_thread_for_manager(self, "postprocess", do_postprocess, daemon=False)
[docs] def shutdown(self):
if self.__monitor:
try:
self.__monitor.shutdown()
except Exception:
log.exception("Failed to shutdown job monitor for manager %s" % self.name)
super(StatefulManagerProxy, self).shutdown()
def __recover_active_jobs(self):
recover_method = getattr(self._proxied_manager, "_recover_active_job", None)
if recover_method is None:
return
for job_id in self.active_jobs.active_job_ids():
try:
recover_method(job_id)
except Exception:
log.exception("Failed to recover active job %s" % job_id)
[docs]class ActiveJobs(object):
""" Keeps track of active jobs (those that are not yet "complete").
Current implementation is file based, but could easily be made
database-based instead.
TODO: Keep active jobs in memory after initial load so don't need to repeatedly
hit disk to recover this information.
"""
def __init__(self, manager):
persistence_directory = manager.persistence_directory
if persistence_directory:
active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager.name)
if not os.path.exists(active_job_directory):
os.makedirs(active_job_directory)
else:
active_job_directory = None
self.active_job_directory = active_job_directory
[docs] def active_job_ids(self):
job_ids = []
if self.active_job_directory:
job_ids = os.listdir(self.active_job_directory)
return job_ids
[docs] def activate_job(self, job_id):
if self.active_job_directory:
path = self._active_job_file(job_id)
try:
open(path, "w").close()
except Exception:
log.warn(ACTIVATE_FAILED_MESSAGE % job_id)
[docs] def deactivate_job(self, job_id):
if self.active_job_directory:
path = self._active_job_file(job_id)
if os.path.exists(path):
try:
os.remove(path)
except Exception:
log.warn(DECACTIVATE_FAILED_MESSAGE % job_id)
def _active_job_file(self, job_id):
return os.path.join(self.active_job_directory, job_id)
[docs]class ManagerMonitor(object):
""" Monitors active jobs of a StatefulManagerProxy.
"""
def __init__(self, stateful_manager):
self.stateful_manager = stateful_manager
self.active = True
thread = new_thread_for_manager(self, "monitor", self._run, True)
self.thread = thread
[docs] def shutdown(self):
self.active = False
self.thread.join()
def _run(self):
""" Main loop, repeatedly checking active jobs of stateful manager.
"""
while self.active:
try:
self._monitor_active_jobs()
except Exception:
log.exception("Failure in stateful manager monitor step.")
def _monitor_active_jobs(self):
active_job_ids = self.stateful_manager.active_jobs.active_job_ids()
iteration_start = datetime.datetime.now()
for active_job_id in active_job_ids:
try:
self._check_active_job_status(active_job_id)
except Exception:
log.exception("Failed checking active job status for job_id %s" % active_job_id)
iteration_end = datetime.datetime.now()
iteration_length = iteration_end - iteration_start
if iteration_length < self.stateful_manager.min_polling_interval:
to_sleep = (self.stateful_manager.min_polling_interval - iteration_length)
time.sleep(to_sleep.total_seconds())
def _check_active_job_status(self, active_job_id):
# Manager itself will handle state transitions when status changes,
# just need to poll get_statu
self.stateful_manager.get_status(active_job_id)
[docs]def new_thread_for_manager(manager, name, target, daemon):
thread_name = "%s-%s" % (manager, name)
thread = threading.Thread(name=thread_name, target=target)
thread.daemon = daemon
thread.start()
return thread
__all__ = [StatefulManagerProxy]