diff --git a/beat/core/scripts/worker.py b/beat/core/scripts/worker.py index 115df896c5912e0bce0c76168fc3460a05d055c1..eb86d4de1ab02dca1e71c21c58662ef6ae6dcf05 100755 --- a/beat/core/scripts/worker.py +++ b/beat/core/scripts/worker.py @@ -55,6 +55,7 @@ import signal import simplejson import multiprocessing import tempfile +import Queue from docopt import docopt from socket import gethostname @@ -66,6 +67,7 @@ from ..worker import WorkerController stop = False +logger = None #---------------------------------------------------------- @@ -89,6 +91,7 @@ class ExecutionProcess(multiprocessing.Process): signal.signal(signal.SIGTERM, signal.SIG_DFL) signal.signal(signal.SIGINT, signal.SIG_DFL) + logger.debug("Process (pid=%d) started for job #%s", self.pid, self.job_id) self.queue.put('STARTED') # Create the executor @@ -119,6 +122,62 @@ class ExecutionProcess(multiprocessing.Process): system_error = traceback.format_exc() )) + self.queue.close() + + logger.debug("Process (pid=%d) done", self.pid) + return 0 + + +#---------------------------------------------------------- + + +def connect_to_scheduler(address, name): + # Starts our 0MQ server + context = zmq.Context() + socket = context.socket(zmq.DEALER) + socket.setsockopt(zmq.IDENTITY, name) + + if address.find('://') < 0: + address = 'tcp://' + address + + socket.connect(address) + logger.info("Connected to '%s'", address) + + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + + # Tell the scheduler we are ready + socket.send('rdy') + + # Wait for a response from the scheduler + logger.info("Waiting for the scheduler...") + + while not stop: + socks = dict(poller.poll(100)) + if not (socket in socks) or (socks[socket] != zmq.POLLIN): + continue + + response = socket.recv() + + if response != WorkerController.ACK: + logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'", + address, WorkerController.ACK, response) + socket.setsockopt(zmq.LINGER, 0) + socket.close() + context.destroy() + return (None, None, None) + + break + + if stop: + socket.setsockopt(zmq.LINGER, 0) + socket.close() + context.destroy() + return (None, None, None) + + logger.info("The scheduler answered") + return (context, socket, poller) + #---------------------------------------------------------- @@ -153,16 +212,26 @@ def main(user_input=None): handler = logging.StreamHandler() handler.setFormatter(formatter) - logger = logging.getLogger('beat.core') - logger.addHandler(handler) + beat_core_logger = logging.getLogger('beat.core') + beat_core_logger.addHandler(handler) + + beat_backend_logger = logging.getLogger('beat.backend.python') + beat_backend_logger.addHandler(handler) if args['--verbose'] == 1: - logger.setLevel(logging.INFO) - elif args['--verbose'] >= 2: - logger.setLevel(logging.DEBUG) + beat_core_logger.setLevel(logging.INFO) + beat_backend_logger.setLevel(logging.INFO) + elif args['--verbose'] == 2: + beat_core_logger.setLevel(logging.DEBUG) + beat_backend_logger.setLevel(logging.INFO) + elif args['--verbose'] >= 3: + beat_core_logger.setLevel(logging.DEBUG) + beat_backend_logger.setLevel(logging.DEBUG) else: - logger.setLevel(logging.WARNING) + beat_core_logger.setLevel(logging.WARNING) + beat_backend_logger.setLevel(logging.WARNING) + global logger logger = logging.getLogger(__name__) @@ -202,96 +271,71 @@ def main(user_input=None): host = Host(images_cache=docker_images_cache, raise_on_errors=False) - # Starts our 0MQ server - context = zmq.Context() - socket = context.socket(zmq.DEALER) - socket.setsockopt(zmq.IDENTITY, args['--name']) - - address = args['<address>'] - if address.find('://') < 0: - address = 'tcp://' + address - - socket.connect(address) - logger.info("Connected to '%s'", address) - - poller = zmq.Poller() - poller.register(socket, zmq.POLLIN) - - - # Send READY messages until the scheduler acknowlege us - global stop - - socket.send('rdy') - logger.info("Waiting for the scheduler...") - - while not stop: - - socks = dict(poller.poll(100)) - if not (socket in socks) or (socks[socket] != zmq.POLLIN): - continue - - response = socket.recv() - - if response != WorkerController.ACK: - logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'", - address, WorkerController.ACK, response) - return 1 - - break - - if not stop: - logger.info("The scheduler answered") + # Establish a connection with the scheduler + (context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name']) + if context is None: + return 1 # Process the requests execution_processes = [] + scheduler_available = True + global stop while not stop: + # If necessary, wait for the comeback of the scheduler + if not scheduler_available: + (context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name']) + if context is None: + break + scheduler_available = True + # Send the result of the processing (if any) for execution_process in execution_processes: - if not execution_process.is_alive(): - if execution_process.exitcode == 0: - result = execution_process.queue.get() - else: - result = dict(system_error='Execution error in the subprocess') + try: + result = execution_process.queue.get_nowait() + except Queue.Empty: + continue - if result.has_key('result'): - content = simplejson.dumps(result['result']) + execution_process.join() - status = WorkerController.DONE - if result['result']['status'] != 0: - status = WorkerController.JOB_ERROR + if result.has_key('result'): + content = simplejson.dumps(result['result']) - logger.info("Job #%s completed", execution_process.job_id) - logger.debug('send: """%s"""' % content.rstrip()) + status = WorkerController.DONE + if result['result']['status'] != 0: + status = WorkerController.JOB_ERROR - message = [ - status, - execution_process.job_id, - content - ] - elif result.has_key('error'): - logger.error(result['error']) + logger.info("Job #%s completed", execution_process.job_id) + logger.debug('send: """%s"""' % content.rstrip()) - message = [ - WorkerController.JOB_ERROR, - execution_process.job_id, - ] + message = [ + status, + execution_process.job_id, + content + ] + elif result.has_key('error'): + logger.error(result['error']) - message += result['details'] + message = [ + WorkerController.JOB_ERROR, + execution_process.job_id, + ] - else: - logger.error(result['system_error']) + message += result['details'] - message = [ - WorkerController.ERROR, - execution_process.job_id, - result['system_error'] - ] + else: + logger.error(result['system_error']) + + message = [ + WorkerController.ERROR, + execution_process.job_id, + result['system_error'] + ] - socket.send_multipart(message) + socket.send_multipart(message) - execution_processes.remove(execution_process) + execution_processes.remove(execution_process) if len(execution_processes) == 0: @@ -357,7 +401,22 @@ def main(user_input=None): ]) - socket.send(WorkerController.EXIT) + # Command: scheduler shutdown + elif command == WorkerController.SCHEDULER_SHUTDOWN: + logger.info("The scheduler shut down, we will wait for it") + scheduler_available = False + + socket.setsockopt(zmq.LINGER, 0) + socket.close() + context.destroy() + + poller = None + socket = None + context = None + + + if socket: + socket.send(WorkerController.EXIT) # Cleanup @@ -365,9 +424,10 @@ def main(user_input=None): execution_process.terminate() execution_process.join() - socket.setsockopt(zmq.LINGER, 0) - socket.close() - context.destroy() + if context: + socket.setsockopt(zmq.LINGER, 0) + socket.close() + context.destroy() if (docker_images_cache is not None) and os.path.exists(docker_images_cache): os.remove(docker_images_cache) diff --git a/beat/core/test/test_worker.py b/beat/core/test/test_worker.py old mode 100644 new mode 100755 index 1e5779c640d4290cf17c16ba8f112d8b95db7ca6..b88b637d02bbd21b290566f937b0c61e00f06895 --- a/beat/core/test/test_worker.py +++ b/beat/core/test/test_worker.py @@ -36,6 +36,7 @@ logger = logging.getLogger(__name__) import unittest import simplejson import multiprocessing +import Queue from time import time from time import sleep @@ -128,6 +129,48 @@ CONFIGURATION2 = { #---------------------------------------------------------- +class ControllerProcess(multiprocessing.Process): + + def __init__(self, queue): + super(ControllerProcess, self).__init__() + + self.queue = queue + + + def run(self): + self.queue.put('STARTED') + + def onWorkerReady(name): + self.queue.put('READY ' + name) + + def onWorkerGone(name): + self.queue.put('GONE ' + name) + + self.controller = WorkerController( + '127.0.0.1', + port=51000, + callbacks=dict( + onWorkerReady = onWorkerReady, + onWorkerGone = onWorkerGone, + ) + ) + + while True: + self.controller.process(100) + + try: + command = self.queue.get_nowait() + if command == 'STOP': + break + except Queue.Empty: + pass + + self.controller.destroy() + + +#---------------------------------------------------------- + + class WorkerProcess(multiprocessing.Process): def __init__(self, queue, arguments): @@ -205,7 +248,7 @@ class TestWorkerBase(unittest.TestCase): '--prefix=%s' % prefix, '--cache=%s' % tmp_prefix, '--name=%s' % name, - # '-vvv', + # '-vv', self.controller.address if address is None else address, ] @@ -300,6 +343,34 @@ class TestConnection(TestWorkerBase): self.wait_for_worker_connection(WORKER1) + def test_scheduler_shutdown(self): + controller = ControllerProcess(multiprocessing.Queue()) + controller.start() + + message = controller.queue.get() + self.assertEqual(message, 'STARTED') + + self.start_worker(WORKER1, 'tcp://127.0.0.1:51000') + + message = controller.queue.get() + self.assertEqual(message, 'READY ' + WORKER1) + + controller.queue.put('STOP') + + sleep(1) + + controller = ControllerProcess(multiprocessing.Queue()) + controller.start() + + message = controller.queue.get() + self.assertEqual(message, 'STARTED') + + message = controller.queue.get() + self.assertEqual(message, 'READY ' + WORKER1) + + controller.queue.put('STOP') + + #---------------------------------------------------------- diff --git a/beat/core/worker.py b/beat/core/worker.py index 3b44ab448bb37db847403ccba79e687ec895e76f..2484c7cab4a535f81d01e4269aa993a6909c800d 100755 --- a/beat/core/worker.py +++ b/beat/core/worker.py @@ -26,6 +26,7 @@ ############################################################################### import zmq +import socket import simplejson @@ -43,6 +44,7 @@ class WorkerController(object): EXECUTE = 'exe' CANCEL = 'cnl' ACK = 'ack' + SCHEDULER_SHUTDOWN = 'shd' class Callbacks(object): @@ -54,6 +56,7 @@ class WorkerController(object): def __init__(self, address, port, callbacks=None): self.context = zmq.Context() + self.context.setsockopt(socket.SO_REUSEADDR, 1) self.socket = self.context.socket(zmq.ROUTER) @@ -79,9 +82,23 @@ class WorkerController(object): def destroy(self): + for worker in self.workers: + self.socket.send_multipart([ + str(worker), + WorkerController.SCHEDULER_SHUTDOWN, + ]) + + self.workers = [] + + self.poller.unregister(self.socket) + self.poller = None + self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() + self.socket = None + self.context.destroy() + self.context = None def execute(self, worker, job_id, configuration):