Source code for lwr.mesos.framework

import collections
import os

from lwr.mesos import (
    Scheduler,
    MesosSchedulerDriver,
    mesos_pb2,
)
from lwr.lwr_client.util import to_base64_json

import logging
log = logging.getLogger(__name__)

from lwr.daemon import (
    LWR_ROOT_DIR,
)
from lwr.messaging import bind_amqp


DEFAULT_FRAMEWORK_USER = ""  # Let Mesos auto-fill this.
DEFAULT_FRAMEWORK_NAME = "LWR Framework"
DEFAULT_FRAMEWORK_PRINCIPAL = "LWR"

DEFAULT_EXECUTOR_ID = "default"
DEFAULT_EXECUTOR_NAME = "LWR Executor"
DEFAULT_EXECUTOR_SOURCE = "LWR"


[docs]class LwrScheduler(Scheduler): def __init__(self, executor, manager_options, mesos_url): self.executor = executor self.manager_options = manager_options self.mesos_url = mesos_url self.taskData = {} self.tasksLaunched = 0 self.tasksFinished = 0 self.messagesSent = 0 self.messagesReceived = 0 # HACK: Storing these messages in a non-persistent queue is a bad idea, # obviously. Need something persistent - or possibly better - just not # removing them from message queue. self.in_memory_queue = collections.deque()
[docs] def registered(self, driver, frameworkId, masterInfo): log.info("Registered with LWR mesos framework ID %s" % frameworkId.value)
[docs] def resourceOffers(self, driver, offers): log.info("Got %d resource offers" % len(offers)) for offer in offers: tasks = self._tasks_for_offer(offer) if tasks: log.info("Launching tasks %s" % tasks) try: driver.launchTasks(offer.id, tasks) except Exception: log.exception("Failed to launch tasks") raise
def _tasks_for_offer(self, offer): tasks = [] log.info("Got resource offer %s" % offer.id.value) try: next_job = self.in_memory_queue.pop() except IndexError: log.info("No jobs, skipping iteration") return tasks # TODO: This is also stupid... if we have any resource offer # we are assinging it one job. Should be some attempt here # to size jobs and match them to resources. tid = self.tasksLaunched self.tasksLaunched += 1 print "Accepting offer on %s to start task %d" \ % (offer.hostname, tid) task = mesos_pb2.TaskInfo() task.task_id.value = str(tid) task.slave_id.value = offer.slave_id.value task.name = "task %d" % tid task.executor.MergeFrom(self.executor) self._populate_task_data_for_job(task, next_job) cpus = task.resources.add() cpus.name = "cpus" cpus.type = mesos_pb2.Value.SCALAR cpus.scalar.value = 1 mem = task.resources.add() mem.name = "mem" mem.type = mesos_pb2.Value.SCALAR mem.scalar.value = 32 tasks.append(task) self.taskData[task.task_id.value] = ( offer.slave_id, task.executor.executor_id ) return tasks
[docs] def statusUpdate(self, driver, update): log.info("%s" % update.SerializeToString()) if update.state == mesos_pb2.TASK_FINISHED: self.tasksFinished += 1 slave_id, executor_id = self.taskData[update.task_id.value] self.messagesSent += 1 driver.sendFrameworkMessage( executor_id, slave_id, 'Update' )
[docs] def frameworkMessage(self, driver, executorId, slaveId, message): self.messagesReceived += 1 log.info("Received message: %s", repr(str(message)))
[docs] def handle_setup_message(self, body, message): try: self.__queue_setup_message(body) finally: message.ack()
def _populate_task_data_for_job(self, task, job): if "env" not in job: job["env"] = [] job["env"].extend(self._mesos_env_vars()) # In case job itself wants to utilize Mesos # populate environment variables. task_data = dict( job=job, manager=self.manager_options ) task.data = to_base64_json( task_data ) def __queue_setup_message(self, body): self.in_memory_queue.appendleft(body) def _mesos_env_vars(self): return [ dict(name="MESOS_URL", value=self.mesos_url), ]
[docs]def run(master, manager_options, config): executor = mesos_pb2.ExecutorInfo() executor.executor_id.value = DEFAULT_EXECUTOR_ID executor.command.value = os.path.join( LWR_ROOT_DIR, "scripts", "mesos_executor" ) executor.name = DEFAULT_EXECUTOR_NAME executor.source = DEFAULT_EXECUTOR_SOURCE framework = mesos_pb2.FrameworkInfo() framework.user = DEFAULT_FRAMEWORK_USER framework.name = DEFAULT_FRAMEWORK_NAME # TODO: Handle authenticate... framework.principal = DEFAULT_FRAMEWORK_PRINCIPAL scheduler = LwrScheduler( executor, manager_options=manager_options, mesos_url=master, ) driver = MesosSchedulerDriver( scheduler, framework, master ) message_queue_url = config.get("message_queue_url", None) exchange = bind_amqp.get_exchange( message_queue_url, manager_name=manager_options["manager"], conf=config, ) def drain(): exchange.consume("setup", callback=scheduler.handle_setup_message, check=True) log.info("Binding to LWR framework to queue.") bind_amqp.start_setup_consumer( exchange, drain, ) try: log.info("Starting Mesos driver") if driver.run() != mesos_pb2.DRIVER_STOPPED: raise Exception("Driver did not run properly") except Exception: log.exception("Problem running mesos scheduler") raise finally: driver.stop()