Source code for lwr.messaging.bind_amqp

from galaxy.util import asbool
from lwr.lwr_client import amqp_exchange_factory
from lwr import manager_endpoint_util
import functools
import threading
import logging

log = logging.getLogger(__name__)


TYPED_PARAMS = {
    "amqp_consumer_timeout": lambda val: None if str(val) == "None" else float(val),
    "amqp_publish_retry": asbool,
    "amqp_publish_retry_max_retries": int,
    "amqp_publish_retry_interval_start": int,
    "amqp_publish_retry_interval_step": int,
    "amqp_publish_retry_interval_max": int,
}


[docs]def get_exchange(connection_string, manager_name, conf): # HACK: Fixup non-string parameters - utlimately this should reuse spec # stuff from Galaxy. for param, to_type in TYPED_PARAMS.iteritems(): if param in conf: val = conf[param] conf[param] = to_type(val) lwr_exchange = amqp_exchange_factory.get_exchange( connection_string, manager_name, conf ) return lwr_exchange
[docs]def bind_manager_to_queue(manager, queue_state, connection_string, conf): lwr_exchange = get_exchange(connection_string, manager.name, conf) process_setup_messages = functools.partial(__process_setup_message, manager) process_kill_messages = functools.partial(__process_kill_message, manager) def drain(callback, name): __drain(name, queue_state, lwr_exchange, callback) log.info("Finished consuming %s queue - no more messages will be processed." % (name)) if conf.get("message_queue_consume", True): start_setup_consumer(lwr_exchange, functools.partial(drain, process_setup_messages, "setup")) start_kill_consumer(lwr_exchange, functools.partial(drain, process_kill_messages, "kill")) # TODO: Think through job recovery, jobs shouldn't complete until after bind # has occurred. def bind_on_status_change(new_status, job_id): try: message = "Publishing LWR state change with status %s for job_id %s" % (new_status, job_id) log.debug(message) payload = manager_endpoint_util.full_status(manager, new_status, job_id) lwr_exchange.publish("status_update", payload) except: log.exception("Failure to publish LWR state change.") raise if conf.get("message_queue_publish", True): manager.set_state_change_callback(bind_on_status_change)
def __start_consumer(name, exchange, target): thread_name = "consume-%s-%s" % (name, exchange.url) thread = threading.Thread(name=thread_name, target=target) thread.daemon = False thread.start() return thread start_setup_consumer = functools.partial(__start_consumer, "setup") start_kill_consumer = functools.partial(__start_consumer, "kill") def __drain(name, queue_state, lwr_exchange, callback): lwr_exchange.consume(name, callback=callback, check=queue_state) def __process_kill_message(manager, body, message): try: job_id = __client_job_id_from_body(body) if job_id: manager.kill(job_id) except Exception: log.exception("Failed to kill job.") message.ack() def __process_setup_message(manager, body, message): try: manager_endpoint_util.submit_job(manager, body) except Exception: job_id = __client_job_id_from_body(body) log.warn("Failed to setup job %s obtained via message queue." % job_id) message.ack() def __client_job_id_from_body(body): job_id = body.get("job_id", None) return job_id