Source code for lwr.managers.queued

import os
import multiprocessing
from six.moves import queue
import threading
import traceback

from lwr.managers.unqueued import Manager

from logging import getLogger
log = getLogger(__name__)

STOP_SIGNAL = object()
RUN = object()
# Number of concurrent jobs used by default for
# QueueManager.
DEFAULT_NUM_CONCURRENT_JOBS = 1

JOB_FILE_COMMAND_LINE = "command_line"


[docs]class QueueManager(Manager): """ A job manager that queues up jobs directly (i.e. does not use an external queuing software such PBS, SGE, etc...). """ manager_type = "queued_python" def __init__(self, name, app, **kwds): super(QueueManager, self).__init__(name, app, **kwds) num_concurrent_jobs = kwds.get('num_concurrent_jobs', DEFAULT_NUM_CONCURRENT_JOBS) if num_concurrent_jobs == '*': num_concurrent_jobs = multiprocessing.cpu_count() else: num_concurrent_jobs = int(num_concurrent_jobs) self._init_worker_threads(num_concurrent_jobs) def _init_worker_threads(self, num_concurrent_jobs): self.work_queue = queue.Queue() self.work_threads = [] for i in range(num_concurrent_jobs): worker = threading.Thread(target=self.run_next) worker.start() self.work_threads.append(worker)
[docs] def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env) try: self._job_directory(job_id).store_metadata(JOB_FILE_COMMAND_LINE, command_line) except Exception: log.info("Failed to persist command line for job %s, will not be able to recover." % job_id) self.work_queue.put((RUN, (job_id, command_line)))
def _recover_active_job(self, job_id): command_line = self._job_directory(job_id).load_metadata(JOB_FILE_COMMAND_LINE, None) if command_line: self.work_queue.put((RUN, (job_id, command_line)))
[docs] def shutdown(self): for i in range(len(self.work_threads)): self.work_queue.put((STOP_SIGNAL, None)) for worker in self.work_threads: worker.join()
[docs] def run_next(self): """ Run the next item in the queue (a job waiting to run). """ while 1: (op, obj) = self.work_queue.get() if op is STOP_SIGNAL: return try: (job_id, command_line) = obj try: os.remove(self._job_file(job_id, JOB_FILE_COMMAND_LINE)) except Exception: log.exception("Running command but failed to delete - command may rerun on LWR boot.") self._run(job_id, command_line, async=False) except: log.warn("Uncaught exception running job with job_id %s" % job_id) traceback.print_exc()