diff --git a/beat/core/execution/docker.py b/beat/core/execution/docker.py index 4278cc513a537fe839185e98ad0e3f96dec8a6e2..b9e66ccb422297ccb4d67f42101f6ec9112eb3a4 100755 --- a/beat/core/execution/docker.py +++ b/beat/core/execution/docker.py @@ -31,15 +31,11 @@ import os import requests import simplejson -import zmq.green as zmq +import zmq import logging logger = logging.getLogger(__name__) -from gevent import monkey -monkey.patch_socket(dns=False) -monkey.patch_ssl() - from .. import stats from .. import message_handler from .. import utils @@ -417,6 +413,7 @@ class DockerExecutor(BaseExecutor): finally: self.message_handler.stop.set() + self.message_handler.join() # Collects final information and returns to caller diff --git a/beat/core/execution/local.py b/beat/core/execution/local.py index 7c1680d2158f7551c683e18eb50e0de6ce7f626d..578eb834ed129e7b9f94636b4ed424c4a23b6569 100755 --- a/beat/core/execution/local.py +++ b/beat/core/execution/local.py @@ -34,7 +34,7 @@ import glob import errno import tempfile import subprocess -import zmq.green as zmq +import zmq import time import logging diff --git a/beat/core/message_handler.py b/beat/core/message_handler.py index 8d782122cd2746985d3cabc9357a2018fa897bd2..7b31f76813f79889ac746318f9b796023646c80a 100755 --- a/beat/core/message_handler.py +++ b/beat/core/message_handler.py @@ -29,12 +29,7 @@ import logging logger = logging.getLogger(__name__) -import gevent -import zmq.green as zmq - -from gevent import monkey -monkey.patch_socket(dns=False) -monkey.patch_ssl() +import zmq from beat.backend.python.message_handler import MessageHandler diff --git a/beat/core/scripts/worker.py b/beat/core/scripts/worker.py index 115f2d4ca84ee714b263f0a00952e2f0c3fba2be..5321dcdea1edc1e7128fab2bb63145c4d2e3307d 100755 --- a/beat/core/scripts/worker.py +++ b/beat/core/scripts/worker.py @@ -47,20 +47,14 @@ Options: """ -import logging - -import gevent -import zmq.green as zmq - -from gevent import monkey -monkey.patch_socket(dns=False) -monkey.patch_ssl() - import os import sys +import logging +import zmq import signal import simplejson -import threading +import multiprocessing +import tempfile from docopt import docopt from socket import gethostname @@ -74,22 +68,58 @@ from ..worker import WorkerController stop = False +#---------------------------------------------------------- + + +class ExecutionProcess(multiprocessing.Process): -class ExecutorTask(threading.Thread): #gevent.Greenlet): + def __init__(self, queue, prefix, data, cache, docker, images_cache=None): + super(ExecutionProcess, self).__init__() + + self.queue = queue + self.prefix = prefix + self.data = data + self.cache = cache + self.docker = docker + self.images_cache = images_cache - def __init__(self, executor): - self.executor = executor - super(ExecutorTask, self).__init__() - # def _run(self): def run(self): - with self.executor: - result = self.executor.process() + signal.signal(signal.SIGTERM, signal.SIG_DFL) + signal.signal(signal.SIGINT, signal.SIG_DFL) + + self.queue.put('STARTED') + + # Create the executor + try: + if self.docker: + host = Host(images_cache=self.images_cache, raise_on_errors=False) + executor = DockerExecutor(host, self.prefix, self.data, cache=self.cache) + else: + executor = LocalExecutor(self.prefix, self.data, cache=self.cache) + + if not executor.valid: + self.queue.put(dict( + error = "Failed to load the execution information", + details = executor.errors + )) + return + + # Execute the algorithm + with executor: + result = executor.process() - self.result = result + self.queue.put(dict( + result = result + )) + except: + import traceback + self.queue.put(dict( + system_error = traceback.format_exc() + )) - # return result +#---------------------------------------------------------- def main(user_input=None): @@ -141,7 +171,7 @@ def main(user_input=None): # Install a signal handler def handler(signum, frame): - #ignore further signals + # Ignore further signals signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -154,9 +184,10 @@ def main(user_input=None): # (If necessary) Docker-related initialisations - host = None + docker_images_cache = None if args['--docker']: - host = Host(raise_on_errors=False) + docker_images_cache = os.path.join(tempfile.gettempdir(), 'beat-docker-images.json') + host = Host(images_cache=docker_images_cache, raise_on_errors=False) # Starts our 0MQ server @@ -176,7 +207,9 @@ def main(user_input=None): # Send READY messages until the scheduler acknowlege us - while True: + global stop + + while not stop: socket.send('rdy') socks = dict(poller.poll(100)) @@ -184,6 +217,7 @@ def main(user_input=None): continue response = socket.recv() + if response != WorkerController.ACK: logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'", address, WorkerController.ACK, response) @@ -193,35 +227,60 @@ def main(user_input=None): # Process the requests - global stop - - task = None + execution_process = None current_job_id = None while not stop: # Send the result of the processing (if any) - if (task is not None) and not task.is_alive(): #task.ready(): - message = simplejson.dumps(task.result) - # message = simplejson.dumps(task.value) - logger.debug('send: """%s"""' % message.rstrip()) + if (execution_process is not None) and not execution_process.is_alive(): + if execution_process.exitcode == 0: + result = execution_process.queue.get() + else: + result = dict(system_error='Execution error in the subprocess') - socket.send_multipart([ - WorkerController.DONE, - current_job_id, - message - ]) + if result.has_key('result'): + content = simplejson.dumps(result['result']) + logger.debug('send: """%s"""' % content.rstrip()) - task = None + message = [ + WorkerController.DONE, + current_job_id, + content + ] + elif result.has_key('error'): + logger.error(result['error']) - if task is None: + message = [ + WorkerController.JOB_ERROR, + current_job_id, + ] + + message += result['details'] + + else: + logger.error(result['system_error']) + + message = [ + WorkerController.ERROR, + result['system_error'] + ] + + socket.send_multipart(message) + + execution_process = None + + + if execution_process is None: timeout = 1000 # ms else: timeout = 100 + socks = dict(poller.poll(timeout)) if not (socket in socks) or (socks[socket] != zmq.POLLIN): continue + # Read the next command parts = socket.recv_multipart() @@ -229,13 +288,14 @@ def main(user_input=None): logger.debug("recv: %s", command) + # Command: execute <job-id> <json-command> if command == WorkerController.EXECUTE: job_id = parts[1] data = simplejson.loads(parts[2]) # Check that the worker isn't busy - if task is not None: + if execution_process is not None: socket.send_multipart([ WorkerController.JOB_ERROR, job_id, @@ -243,35 +303,21 @@ def main(user_input=None): ]) continue - # Create the executor - if args['--docker']: - executor = DockerExecutor(host, prefix, data, cache=cache) - else: - executor = LocalExecutor(prefix, data, cache=cache) + # Start the execution + logger.info("Running '%s'", data['algorithm']) + current_job_id = job_id - if not executor.valid: - logger.error("Failed to load the execution information:") + execution_process = ExecutionProcess(multiprocessing.Queue(), prefix, data, cache, + docker=args['--docker'], images_cache=docker_images_cache) + execution_process.start() - message = [ - WorkerController.JOB_ERROR, - job_id, - ] + execution_process.queue.get() - message += executor.errors - - socket.send_multipart(message) - - else: - logger.info("Running '%s'", executor.algorithm.name) - current_job_id = job_id - - task = ExecutorTask(executor) - task.start() # Command: cancel elif command == WorkerController.CANCEL: # Check that the worker is busy - if task is None: + if execution_process is None: socket.send_multipart([ WorkerController.ERROR, "Worker isn't busy" @@ -279,8 +325,9 @@ def main(user_input=None): continue # Kill the processing thread - # task.kill() - task = None + execution_process.terminate() + execution_process.join() + execution_process = None socket.send_multipart([ WorkerController.CANCELLED, @@ -292,8 +339,15 @@ def main(user_input=None): # Cleanup + if execution_process is not None: + execution_process.terminate() + execution_process.join() + socket.setsockopt(zmq.LINGER, 0) socket.close() context.destroy() + if (docker_images_cache is not None) and os.path.exists(docker_images_cache): + os.remove(docker_images_cache) + return 0 diff --git a/beat/core/test/test_dbexecution.py b/beat/core/test/test_dbexecution.py index 4803d7376b7bfba07b4130e476821b862d09d205..930cfcea9a8b1e2bf30fff21ab0863203c067a50 100644 --- a/beat/core/test/test_dbexecution.py +++ b/beat/core/test/test_dbexecution.py @@ -42,7 +42,7 @@ if False: logger.addHandler(ch) import unittest -import zmq.green as zmq +import zmq from ..dbexecution import DBExecutor from ..inputs import RemoteInput diff --git a/beat/core/test/test_message_handler.py b/beat/core/test/test_message_handler.py index e5092fae9edde476053f0a41f32a160feb78fbae..2744ea23f5a6d9dd2a5d1d8dc2e3a972a176c7a3 100644 --- a/beat/core/test/test_message_handler.py +++ b/beat/core/test/test_message_handler.py @@ -42,7 +42,7 @@ if False: logger.addHandler(ch) import unittest -import zmq.green as zmq +import zmq import nose.tools from ..agent import MessageHandler @@ -123,6 +123,12 @@ class TestMessageHandler(unittest.TestCase): self.message_handler.start() + def tearDown(self): + self.message_handler.kill() + self.message_handler.join() + self.message_handler = None + + def test_input_has_more_data(self): assert self.remote_input_a.hasMoreData() @@ -211,6 +217,12 @@ class TestMessageHandlerErrorHandling(unittest.TestCase): self.message_handler.start() + def tearDown(self): + self.message_handler.kill() + self.message_handler.join() + self.message_handler = None + + def test_input_has_more_data(self): self.assertRaises(RemoteException, self.remote_input.hasMoreData) diff --git a/beat/core/test/test_worker.py b/beat/core/test/test_worker.py index 0ed4d7bbca22dce374182b865cb24aa6f9abef39..f12e87ff3492f2812d1c596da49bb2bfea7c532e 100644 --- a/beat/core/test/test_worker.py +++ b/beat/core/test/test_worker.py @@ -29,17 +29,13 @@ # Tests for experiment execution import os + import logging logger = logging.getLogger(__name__) -import gevent - -from gevent import monkey -monkey.patch_socket(dns=False) -monkey.patch_ssl() - import unittest import simplejson +import multiprocessing from time import time from ..scripts import worker @@ -49,10 +45,16 @@ from ..dock import Host from . import prefix, tmp_prefix +#---------------------------------------------------------- + + WORKER1 = 'worker1' WORKER2 = 'worker2' +#---------------------------------------------------------- + + CONFIGURATION1 = { 'queue': 'queue', 'inputs': { @@ -86,6 +88,9 @@ CONFIGURATION1 = { } +#---------------------------------------------------------- + + CONFIGURATION2 = { 'queue': 'queue', 'inputs': { @@ -119,47 +124,65 @@ CONFIGURATION2 = { } +#---------------------------------------------------------- + -class WorkerThread(gevent.Greenlet): +class WorkerProcess(multiprocessing.Process): - def __init__(self, arguments): - super(WorkerThread, self).__init__() + def __init__(self, queue, arguments): + super(WorkerProcess, self).__init__() + self.queue = queue self.arguments = arguments - def _run(self): + def run(self): + self.queue.put('STARTED') worker.main(self.arguments) +#---------------------------------------------------------- + class TestOneWorker(unittest.TestCase): def __init__(self, methodName='runTest'): super(TestOneWorker, self).__init__(methodName) self.controller = None - self.worker_thread = None + self.worker_process = None self.docker = False def setUp(self): self.tearDown() # In case another test failed badly during its setUp() - self.controller = WorkerController('127.0.0.1', port=None) + connected_workers = [] + + def onWorkerReady(name): + connected_workers.append(name) + + self.controller = WorkerController( + '127.0.0.1', + port=None, + callbacks=dict( + onWorkerReady = onWorkerReady, + ) + ) args = [ - '--prefix=%s' % prefix, - '--cache=%s' % tmp_prefix, - '--name=%s' % WORKER1, - self.controller.address, + '--prefix=%s' % prefix, + '--cache=%s' % tmp_prefix, + '--name=%s' % WORKER1, + self.controller.address, ] if self.docker: args.insert(3, '--docker') - self.worker_thread = WorkerThread(args) + self.worker_process = WorkerProcess(multiprocessing.Queue(), args) + self.worker_process.start() - self.worker_thread.start() + self.worker_process.queue.get() start = time() while len(self.controller.workers) == 0: @@ -169,11 +192,12 @@ class TestOneWorker(unittest.TestCase): self.assertEqual(len(self.controller.workers), 1) self.assertTrue(WORKER1 in self.controller.workers) + self.assertEqual(len(connected_workers), 1) + self.assertTrue(WORKER1 in connected_workers) + def tearDown(self): - if self.worker_thread is not None: - self.worker_thread.kill() - self.worker_thread = None + self.stop_worker() if self.controller is not None: self.controller.destroy() @@ -191,6 +215,13 @@ class TestOneWorker(unittest.TestCase): return message + def stop_worker(self): + if self.worker_process is not None: + self.worker_process.terminate() + self.worker_process.join() + self.worker_process = None + + def _check_done(self, message, expected_worker, expected_job_id): self.assertTrue(message is not None) @@ -260,12 +291,18 @@ class TestOneWorker(unittest.TestCase): def test_worker_shutdown(self): - worker.stop = True + did_shutdown = True + + def onWorkerGone(name): + did_shutdown = (name == WORKER1) + + self.controller.callbacks.onWorkerGone = onWorkerGone + + self.stop_worker() self.assertTrue(self.controller.process(2000) is None) self.assertEqual(len(self.controller.workers), 0) - - worker.stop = False + self.assertTrue(did_shutdown) def test_error_busy(self): @@ -323,6 +360,8 @@ class TestOneWorker(unittest.TestCase): self.assertEqual(data[0], "Worker isn't busy") +#---------------------------------------------------------- + class TestOneWorkerDocker(TestOneWorker): @@ -336,22 +375,35 @@ class TestOneWorkerDocker(TestOneWorker): cls.host = Host(raise_on_errors=False) +#---------------------------------------------------------- + class TestTwoWorkers(unittest.TestCase): def __init__(self, methodName='runTest'): super(TestTwoWorkers, self).__init__(methodName) self.controller = None - self.worker_threads = None + self.worker_processes = None self.docker = False def setUp(self): self.tearDown() # In case another test failed badly during its setUp() - self.controller = WorkerController('127.0.0.1', port=None) + connected_workers = [] + + def onWorkerReady(name): + connected_workers.append(name) - self.worker_threads = [] + self.controller = WorkerController( + '127.0.0.1', + port=None, + callbacks=dict( + onWorkerReady = onWorkerReady, + ) + ) + + self.worker_processes = [] for name in [ WORKER1, WORKER2 ]: args = [ @@ -364,11 +416,12 @@ class TestTwoWorkers(unittest.TestCase): if self.docker: args.insert(3, '--docker') - worker_thread = WorkerThread(args) + worker_process = WorkerProcess(multiprocessing.Queue(), args) + worker_process.start() - worker_thread.start() + worker_process.queue.get() - self.worker_threads.append(worker_thread) + self.worker_processes.append(worker_process) start = time() while len(self.controller.workers) < 2: @@ -378,12 +431,17 @@ class TestTwoWorkers(unittest.TestCase): self.assertTrue(WORKER1 in self.controller.workers) self.assertTrue(WORKER2 in self.controller.workers) + self.assertEqual(len(connected_workers), 2) + self.assertTrue(WORKER1 in connected_workers) + self.assertTrue(WORKER2 in connected_workers) + def tearDown(self): - if self.worker_threads is not None: - for worker_thread in self.worker_threads: - worker_thread.kill() - self.worker_threads = None + if self.worker_processes is not None: + for worker_process in self.worker_processes: + worker_process.terminate() + worker_process.join() + self.worker_processes = None if self.controller is not None: self.controller.destroy() @@ -450,6 +508,8 @@ class TestTwoWorkers(unittest.TestCase): self.assertNotEqual(worker1, worker2) +#---------------------------------------------------------- + class TestTwoWorkersDocker(TestTwoWorkers): diff --git a/beat/core/worker.py b/beat/core/worker.py index 48710e2e046feceedd1bd722d0b58249d1d6e5d3..33da4f86dbb735838039901570c6ef2a23d7ab95 100755 --- a/beat/core/worker.py +++ b/beat/core/worker.py @@ -25,13 +25,7 @@ # # ############################################################################### -import gevent -import zmq.green as zmq - -from gevent import monkey -monkey.patch_socket(dns=False) -monkey.patch_ssl() - +import zmq import simplejson @@ -51,7 +45,14 @@ class WorkerController(object): ACK = 'ack' - def __init__(self, address, port): + class Callbacks(object): + + def __init__(self): + self.onWorkerReady = None + self.onWorkerGone = None + + + def __init__(self, address, port, callbacks=None): self.context = zmq.Context() self.socket = self.context.socket(zmq.ROUTER) @@ -69,6 +70,13 @@ class WorkerController(object): self.workers = [] + if callbacks is None: + callbacks = {} + + self.callbacks = WorkerController.Callbacks() + for k, v in callbacks.items(): + setattr(self.callbacks, k, v) + def destroy(self): self.socket.setsockopt(zmq.LINGER, 0) @@ -112,10 +120,18 @@ class WorkerController(object): if address not in self.workers: self.workers.append(address) self.ack(address) + + if self.callbacks.onWorkerReady is not None: + self.callbacks.onWorkerReady(address) + timeout = 0 elif status == WorkerController.EXIT: self.workers.remove(address) + + if self.callbacks.onWorkerGone is not None: + self.callbacks.onWorkerGone(address) + timeout = 0 elif status in [ WorkerController.DONE, WorkerController.JOB_ERROR, diff --git a/setup.py b/setup.py index adb5617e1af61c776b308d3a7c8f15ca3db1cda1..402965fb475d8778010552a8f2eaab80519b6efc 100755 --- a/setup.py +++ b/setup.py @@ -42,7 +42,6 @@ requires = [ "sphinxcontrib-mscgen", "sphinx-rtd-theme", "matplotlib>=1.4", - "gevent", "pyzmq", "docker-py", ]