Commit 37e667d7 authored by Philip ABBET's avatar Philip ABBET

Remove the dependency on gevent

parent 51e1ee41
......@@ -31,15 +31,11 @@
import os
import requests
import simplejson
import zmq.green as zmq
import zmq
import logging
logger = logging.getLogger(__name__)
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
from .. import stats
from .. import message_handler
from .. import utils
......@@ -417,6 +413,7 @@ class DockerExecutor(BaseExecutor):
finally:
self.message_handler.stop.set()
self.message_handler.join()
# Collects final information and returns to caller
......
......@@ -34,7 +34,7 @@ import glob
import errno
import tempfile
import subprocess
import zmq.green as zmq
import zmq
import time
import logging
......
......@@ -29,12 +29,7 @@
import logging
logger = logging.getLogger(__name__)
import gevent
import zmq.green as zmq
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
import zmq
from beat.backend.python.message_handler import MessageHandler
......
......@@ -47,20 +47,14 @@ Options:
"""
import logging
import gevent
import zmq.green as zmq
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
import os
import sys
import logging
import zmq
import signal
import simplejson
import threading
import multiprocessing
import tempfile
from docopt import docopt
from socket import gethostname
......@@ -74,22 +68,58 @@ from ..worker import WorkerController
stop = False
#----------------------------------------------------------
class ExecutionProcess(multiprocessing.Process):
class ExecutorTask(threading.Thread): #gevent.Greenlet):
def __init__(self, queue, prefix, data, cache, docker, images_cache=None):
super(ExecutionProcess, self).__init__()
self.queue = queue
self.prefix = prefix
self.data = data
self.cache = cache
self.docker = docker
self.images_cache = images_cache
def __init__(self, executor):
self.executor = executor
super(ExecutorTask, self).__init__()
# def _run(self):
def run(self):
with self.executor:
result = self.executor.process()
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.queue.put('STARTED')
# Create the executor
try:
if self.docker:
host = Host(images_cache=self.images_cache, raise_on_errors=False)
executor = DockerExecutor(host, self.prefix, self.data, cache=self.cache)
else:
executor = LocalExecutor(self.prefix, self.data, cache=self.cache)
if not executor.valid:
self.queue.put(dict(
error = "Failed to load the execution information",
details = executor.errors
))
return
# Execute the algorithm
with executor:
result = executor.process()
self.result = result
self.queue.put(dict(
result = result
))
except:
import traceback
self.queue.put(dict(
system_error = traceback.format_exc()
))
# return result
#----------------------------------------------------------
def main(user_input=None):
......@@ -141,7 +171,7 @@ def main(user_input=None):
# Install a signal handler
def handler(signum, frame):
#ignore further signals
# Ignore further signals
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
......@@ -154,9 +184,10 @@ def main(user_input=None):
# (If necessary) Docker-related initialisations
host = None
docker_images_cache = None
if args['--docker']:
host = Host(raise_on_errors=False)
docker_images_cache = os.path.join(tempfile.gettempdir(), 'beat-docker-images.json')
host = Host(images_cache=docker_images_cache, raise_on_errors=False)
# Starts our 0MQ server
......@@ -176,7 +207,9 @@ def main(user_input=None):
# Send READY messages until the scheduler acknowlege us
while True:
global stop
while not stop:
socket.send('rdy')
socks = dict(poller.poll(100))
......@@ -184,6 +217,7 @@ def main(user_input=None):
continue
response = socket.recv()
if response != WorkerController.ACK:
logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'",
address, WorkerController.ACK, response)
......@@ -193,35 +227,60 @@ def main(user_input=None):
# Process the requests
global stop
task = None
execution_process = None
current_job_id = None
while not stop:
# Send the result of the processing (if any)
if (task is not None) and not task.is_alive(): #task.ready():
message = simplejson.dumps(task.result)
# message = simplejson.dumps(task.value)
logger.debug('send: """%s"""' % message.rstrip())
if (execution_process is not None) and not execution_process.is_alive():
if execution_process.exitcode == 0:
result = execution_process.queue.get()
else:
result = dict(system_error='Execution error in the subprocess')
socket.send_multipart([
WorkerController.DONE,
current_job_id,
message
])
if result.has_key('result'):
content = simplejson.dumps(result['result'])
logger.debug('send: """%s"""' % content.rstrip())
task = None
message = [
WorkerController.DONE,
current_job_id,
content
]
elif result.has_key('error'):
logger.error(result['error'])
if task is None:
message = [
WorkerController.JOB_ERROR,
current_job_id,
]
message += result['details']
else:
logger.error(result['system_error'])
message = [
WorkerController.ERROR,
result['system_error']
]
socket.send_multipart(message)
execution_process = None
if execution_process is None:
timeout = 1000 # ms
else:
timeout = 100
socks = dict(poller.poll(timeout))
if not (socket in socks) or (socks[socket] != zmq.POLLIN):
continue
# Read the next command
parts = socket.recv_multipart()
......@@ -229,13 +288,14 @@ def main(user_input=None):
logger.debug("recv: %s", command)
# Command: execute <job-id> <json-command>
if command == WorkerController.EXECUTE:
job_id = parts[1]
data = simplejson.loads(parts[2])
# Check that the worker isn't busy
if task is not None:
if execution_process is not None:
socket.send_multipart([
WorkerController.JOB_ERROR,
job_id,
......@@ -243,35 +303,21 @@ def main(user_input=None):
])
continue
# Create the executor
if args['--docker']:
executor = DockerExecutor(host, prefix, data, cache=cache)
else:
executor = LocalExecutor(prefix, data, cache=cache)
# Start the execution
logger.info("Running '%s'", data['algorithm'])
current_job_id = job_id
if not executor.valid:
logger.error("Failed to load the execution information:")
execution_process = ExecutionProcess(multiprocessing.Queue(), prefix, data, cache,
docker=args['--docker'], images_cache=docker_images_cache)
execution_process.start()
message = [
WorkerController.JOB_ERROR,
job_id,
]
execution_process.queue.get()
message += executor.errors
socket.send_multipart(message)
else:
logger.info("Running '%s'", executor.algorithm.name)
current_job_id = job_id
task = ExecutorTask(executor)
task.start()
# Command: cancel
elif command == WorkerController.CANCEL:
# Check that the worker is busy
if task is None:
if execution_process is None:
socket.send_multipart([
WorkerController.ERROR,
"Worker isn't busy"
......@@ -279,8 +325,9 @@ def main(user_input=None):
continue
# Kill the processing thread
# task.kill()
task = None
execution_process.terminate()
execution_process.join()
execution_process = None
socket.send_multipart([
WorkerController.CANCELLED,
......@@ -292,8 +339,15 @@ def main(user_input=None):
# Cleanup
if execution_process is not None:
execution_process.terminate()
execution_process.join()
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
if (docker_images_cache is not None) and os.path.exists(docker_images_cache):
os.remove(docker_images_cache)
return 0
......@@ -42,7 +42,7 @@ if False:
logger.addHandler(ch)
import unittest
import zmq.green as zmq
import zmq
from ..dbexecution import DBExecutor
from ..inputs import RemoteInput
......
......@@ -42,7 +42,7 @@ if False:
logger.addHandler(ch)
import unittest
import zmq.green as zmq
import zmq
import nose.tools
from ..agent import MessageHandler
......@@ -123,6 +123,12 @@ class TestMessageHandler(unittest.TestCase):
self.message_handler.start()
def tearDown(self):
self.message_handler.kill()
self.message_handler.join()
self.message_handler = None
def test_input_has_more_data(self):
assert self.remote_input_a.hasMoreData()
......@@ -211,6 +217,12 @@ class TestMessageHandlerErrorHandling(unittest.TestCase):
self.message_handler.start()
def tearDown(self):
self.message_handler.kill()
self.message_handler.join()
self.message_handler = None
def test_input_has_more_data(self):
self.assertRaises(RemoteException, self.remote_input.hasMoreData)
......
......@@ -29,17 +29,13 @@
# Tests for experiment execution
import os
import logging
logger = logging.getLogger(__name__)
import gevent
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
import unittest
import simplejson
import multiprocessing
from time import time
from ..scripts import worker
......@@ -49,10 +45,16 @@ from ..dock import Host
from . import prefix, tmp_prefix
#----------------------------------------------------------
WORKER1 = 'worker1'
WORKER2 = 'worker2'
#----------------------------------------------------------
CONFIGURATION1 = {
'queue': 'queue',
'inputs': {
......@@ -86,6 +88,9 @@ CONFIGURATION1 = {
}
#----------------------------------------------------------
CONFIGURATION2 = {
'queue': 'queue',
'inputs': {
......@@ -119,47 +124,65 @@ CONFIGURATION2 = {
}
#----------------------------------------------------------
class WorkerThread(gevent.Greenlet):
class WorkerProcess(multiprocessing.Process):
def __init__(self, arguments):
super(WorkerThread, self).__init__()
def __init__(self, queue, arguments):
super(WorkerProcess, self).__init__()
self.queue = queue
self.arguments = arguments
def _run(self):
def run(self):
self.queue.put('STARTED')
worker.main(self.arguments)
#----------------------------------------------------------
class TestOneWorker(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(TestOneWorker, self).__init__(methodName)
self.controller = None
self.worker_thread = None
self.worker_process = None
self.docker = False
def setUp(self):
self.tearDown() # In case another test failed badly during its setUp()
self.controller = WorkerController('127.0.0.1', port=None)
connected_workers = []
def onWorkerReady(name):
connected_workers.append(name)
self.controller = WorkerController(
'127.0.0.1',
port=None,
callbacks=dict(
onWorkerReady = onWorkerReady,
)
)
args = [
'--prefix=%s' % prefix,
'--cache=%s' % tmp_prefix,
'--name=%s' % WORKER1,
self.controller.address,
'--prefix=%s' % prefix,
'--cache=%s' % tmp_prefix,
'--name=%s' % WORKER1,
self.controller.address,
]
if self.docker:
args.insert(3, '--docker')
self.worker_thread = WorkerThread(args)
self.worker_process = WorkerProcess(multiprocessing.Queue(), args)
self.worker_process.start()
self.worker_thread.start()
self.worker_process.queue.get()
start = time()
while len(self.controller.workers) == 0:
......@@ -169,11 +192,12 @@ class TestOneWorker(unittest.TestCase):
self.assertEqual(len(self.controller.workers), 1)
self.assertTrue(WORKER1 in self.controller.workers)
self.assertEqual(len(connected_workers), 1)
self.assertTrue(WORKER1 in connected_workers)
def tearDown(self):
if self.worker_thread is not None:
self.worker_thread.kill()
self.worker_thread = None
self.stop_worker()
if self.controller is not None:
self.controller.destroy()
......@@ -191,6 +215,13 @@ class TestOneWorker(unittest.TestCase):
return message
def stop_worker(self):
if self.worker_process is not None:
self.worker_process.terminate()
self.worker_process.join()
self.worker_process = None
def _check_done(self, message, expected_worker, expected_job_id):
self.assertTrue(message is not None)
......@@ -260,12 +291,18 @@ class TestOneWorker(unittest.TestCase):
def test_worker_shutdown(self):
worker.stop = True
did_shutdown = True
def onWorkerGone(name):
did_shutdown = (name == WORKER1)
self.controller.callbacks.onWorkerGone = onWorkerGone
self.stop_worker()
self.assertTrue(self.controller.process(2000) is None)
self.assertEqual(len(self.controller.workers), 0)
worker.stop = False
self.assertTrue(did_shutdown)
def test_error_busy(self):
......@@ -323,6 +360,8 @@ class TestOneWorker(unittest.TestCase):
self.assertEqual(data[0], "Worker isn't busy")
#----------------------------------------------------------
class TestOneWorkerDocker(TestOneWorker):
......@@ -336,22 +375,35 @@ class TestOneWorkerDocker(TestOneWorker):
cls.host = Host(raise_on_errors=False)
#----------------------------------------------------------
class TestTwoWorkers(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(TestTwoWorkers, self).__init__(methodName)
self.controller = None
self.worker_threads = None
self.worker_processes = None
self.docker = False
def setUp(self):
self.tearDown() # In case another test failed badly during its setUp()
self.controller = WorkerController('127.0.0.1', port=None)
connected_workers = []
def onWorkerReady(name):
connected_workers.append(name)
self.worker_threads = []
self.controller = WorkerController(
'127.0.0.1',
port=None,
callbacks=dict(
onWorkerReady = onWorkerReady,
)
)
self.worker_processes = []
for name in [ WORKER1, WORKER2 ]:
args = [
......@@ -364,11 +416,12 @@ class TestTwoWorkers(unittest.TestCase):
if self.docker:
args.insert(3, '--docker')
worker_thread = WorkerThread(args)
worker_process = WorkerProcess(multiprocessing.Queue(), args)
worker_process.start()
worker_thread.start()
worker_process.queue.get()
self.worker_threads.append(worker_thread)
self.worker_processes.append(worker_process)
start = time()
while len(self.controller.workers) < 2:
......@@ -378,12 +431,17 @@ class TestTwoWorkers(unittest.TestCase):
self.assertTrue(WORKER1 in self.controller.workers)
self.assertTrue(WORKER2 in self.controller.workers)
self.assertEqual(len(connected_workers), 2)
self.assertTrue(WORKER1 in connected_workers)
self.assertTrue(WORKER2 in connected_workers)
def tearDown(self):
if self.worker_threads is not None:
for worker_thread in self.worker_threads:
worker_thread.kill()
self.worker_threads = None
if self.worker_processes is not None:
for worker_process in self.worker_processes:
worker_process.terminate()
worker_process.join()
self.worker_processes = None
if self.controller is not None:
self.controller.destroy()
......@@ -450,6 +508,8 @@ class TestTwoWorkers(unittest.TestCase):
self.assertNotEqual(worker1, worker2)
#----------------------------------------------------------
class TestTwoWorkersDocker(TestTwoWorkers):
......
......@@ -25,13 +25,7 @@
# #
###############################################################################
import gevent
import zmq.green as zmq
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
import zmq
import simplejson
......@@ -51,7 +45,14 @@ class WorkerController(object):
ACK = 'ack'
def __init__(self, address, port):
class Callbacks(object):
def __init__(self):
self.onWorkerReady = None
self.onWorkerGone = None
def __init__(self, address, port, callbacks=None):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.ROUTER)
......@@ -69,6 +70,13 @@ class WorkerController(object):
self.workers = []
if callbacks is None:
callbacks = {}
self.callbacks = WorkerController.Callbacks()
for k, v in callbacks.items():
setattr(self.callbacks, k, v)
def destroy(self):
self.socket.setsockopt(zmq.LINGER, 0)
......@@ -112,10 +120,18 @@ class WorkerController(object):
if address not in self.workers:
self.workers.append(address)
self.ack(address)
if self.callbacks.onWorkerReady is not None:
self.callbacks.onWorkerReady(address)
timeout = 0
elif status == WorkerController.EXIT:
self.workers.remove(address)
if self.callbacks.onWorkerGone is not None:
self.callbacks.onWorkerGone(address)
timeout = 0
elif status in [ WorkerController.DONE, WorkerController.JOB_ERROR,
......
......@@ -42,7 +42,6 @@ requires = [
"sphinxcontrib-mscgen",
"sphinx-rtd-theme",
"matplotlib>=1.4",
"gevent",
"pyzmq",
"docker-py",
]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment