Commit a27907e9 authored by Philip ABBET's avatar Philip ABBET

[scripts] worker.py: Can reconnect to the scheduler when it shut down

parent abb731b7
......@@ -55,6 +55,7 @@ import signal
import simplejson
import multiprocessing
import tempfile
import Queue
from docopt import docopt
from socket import gethostname
......@@ -66,6 +67,7 @@ from ..worker import WorkerController
stop = False
logger = None
#----------------------------------------------------------
......@@ -89,6 +91,7 @@ class ExecutionProcess(multiprocessing.Process):
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
logger.debug("Process (pid=%d) started for job #%s", self.pid, self.job_id)
self.queue.put('STARTED')
# Create the executor
......@@ -119,6 +122,62 @@ class ExecutionProcess(multiprocessing.Process):
system_error = traceback.format_exc()
))
self.queue.close()
logger.debug("Process (pid=%d) done", self.pid)
return 0
#----------------------------------------------------------
def connect_to_scheduler(address, name):
# Starts our 0MQ server
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, name)
if address.find('://') < 0:
address = 'tcp://' + address
socket.connect(address)
logger.info("Connected to '%s'", address)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
# Tell the scheduler we are ready
socket.send('rdy')
# Wait for a response from the scheduler
logger.info("Waiting for the scheduler...")
while not stop:
socks = dict(poller.poll(100))
if not (socket in socks) or (socks[socket] != zmq.POLLIN):
continue
response = socket.recv()
if response != WorkerController.ACK:
logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'",
address, WorkerController.ACK, response)
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
return (None, None, None)
break
if stop:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
return (None, None, None)
logger.info("The scheduler answered")
return (context, socket, poller)
#----------------------------------------------------------
......@@ -153,16 +212,26 @@ def main(user_input=None):
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger('beat.core')
logger.addHandler(handler)
beat_core_logger = logging.getLogger('beat.core')
beat_core_logger.addHandler(handler)
beat_backend_logger = logging.getLogger('beat.backend.python')
beat_backend_logger.addHandler(handler)
if args['--verbose'] == 1:
logger.setLevel(logging.INFO)
elif args['--verbose'] >= 2:
logger.setLevel(logging.DEBUG)
beat_core_logger.setLevel(logging.INFO)
beat_backend_logger.setLevel(logging.INFO)
elif args['--verbose'] == 2:
beat_core_logger.setLevel(logging.DEBUG)
beat_backend_logger.setLevel(logging.INFO)
elif args['--verbose'] >= 3:
beat_core_logger.setLevel(logging.DEBUG)
beat_backend_logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.WARNING)
beat_core_logger.setLevel(logging.WARNING)
beat_backend_logger.setLevel(logging.WARNING)
global logger
logger = logging.getLogger(__name__)
......@@ -202,96 +271,71 @@ def main(user_input=None):
host = Host(images_cache=docker_images_cache, raise_on_errors=False)
# Starts our 0MQ server
context = zmq.Context()
socket = context.socket(zmq.DEALER)
socket.setsockopt(zmq.IDENTITY, args['--name'])
address = args['<address>']
if address.find('://') < 0:
address = 'tcp://' + address
socket.connect(address)
logger.info("Connected to '%s'", address)
poller = zmq.Poller()
poller.register(socket, zmq.POLLIN)
# Send READY messages until the scheduler acknowlege us
global stop
socket.send('rdy')
logger.info("Waiting for the scheduler...")
while not stop:
socks = dict(poller.poll(100))
if not (socket in socks) or (socks[socket] != zmq.POLLIN):
continue
response = socket.recv()
if response != WorkerController.ACK:
logger.error("Can't talk with the scheduler at '%s', expected '%s', got '%s'",
address, WorkerController.ACK, response)
return 1
break
if not stop:
logger.info("The scheduler answered")
# Establish a connection with the scheduler
(context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name'])
if context is None:
return 1
# Process the requests
execution_processes = []
scheduler_available = True
global stop
while not stop:
# If necessary, wait for the comeback of the scheduler
if not scheduler_available:
(context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name'])
if context is None:
break
scheduler_available = True
# Send the result of the processing (if any)
for execution_process in execution_processes:
if not execution_process.is_alive():
if execution_process.exitcode == 0:
result = execution_process.queue.get()
else:
result = dict(system_error='Execution error in the subprocess')
try:
result = execution_process.queue.get_nowait()
except Queue.Empty:
continue
if result.has_key('result'):
content = simplejson.dumps(result['result'])
execution_process.join()
status = WorkerController.DONE
if result['result']['status'] != 0:
status = WorkerController.JOB_ERROR
if result.has_key('result'):
content = simplejson.dumps(result['result'])
logger.info("Job #%s completed", execution_process.job_id)
logger.debug('send: """%s"""' % content.rstrip())
status = WorkerController.DONE
if result['result']['status'] != 0:
status = WorkerController.JOB_ERROR
message = [
status,
execution_process.job_id,
content
]
elif result.has_key('error'):
logger.error(result['error'])
logger.info("Job #%s completed", execution_process.job_id)
logger.debug('send: """%s"""' % content.rstrip())
message = [
WorkerController.JOB_ERROR,
execution_process.job_id,
]
message = [
status,
execution_process.job_id,
content
]
elif result.has_key('error'):
logger.error(result['error'])
message += result['details']
message = [
WorkerController.JOB_ERROR,
execution_process.job_id,
]
else:
logger.error(result['system_error'])
message += result['details']
message = [
WorkerController.ERROR,
execution_process.job_id,
result['system_error']
]
else:
logger.error(result['system_error'])
message = [
WorkerController.ERROR,
execution_process.job_id,
result['system_error']
]
socket.send_multipart(message)
socket.send_multipart(message)
execution_processes.remove(execution_process)
execution_processes.remove(execution_process)
if len(execution_processes) == 0:
......@@ -357,7 +401,22 @@ def main(user_input=None):
])
socket.send(WorkerController.EXIT)
# Command: scheduler shutdown
elif command == WorkerController.SCHEDULER_SHUTDOWN:
logger.info("The scheduler shut down, we will wait for it")
scheduler_available = False
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
poller = None
socket = None
context = None
if socket:
socket.send(WorkerController.EXIT)
# Cleanup
......@@ -365,9 +424,10 @@ def main(user_input=None):
execution_process.terminate()
execution_process.join()
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
if context:
socket.setsockopt(zmq.LINGER, 0)
socket.close()
context.destroy()
if (docker_images_cache is not None) and os.path.exists(docker_images_cache):
os.remove(docker_images_cache)
......
......@@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
import unittest
import simplejson
import multiprocessing
import Queue
from time import time
from time import sleep
......@@ -128,6 +129,48 @@ CONFIGURATION2 = {
#----------------------------------------------------------
class ControllerProcess(multiprocessing.Process):
def __init__(self, queue):
super(ControllerProcess, self).__init__()
self.queue = queue
def run(self):
self.queue.put('STARTED')
def onWorkerReady(name):
self.queue.put('READY ' + name)
def onWorkerGone(name):
self.queue.put('GONE ' + name)
self.controller = WorkerController(
'127.0.0.1',
port=51000,
callbacks=dict(
onWorkerReady = onWorkerReady,
onWorkerGone = onWorkerGone,
)
)
while True:
self.controller.process(100)
try:
command = self.queue.get_nowait()
if command == 'STOP':
break
except Queue.Empty:
pass
self.controller.destroy()
#----------------------------------------------------------
class WorkerProcess(multiprocessing.Process):
def __init__(self, queue, arguments):
......@@ -205,7 +248,7 @@ class TestWorkerBase(unittest.TestCase):
'--prefix=%s' % prefix,
'--cache=%s' % tmp_prefix,
'--name=%s' % name,
# '-vvv',
# '-vv',
self.controller.address if address is None else address,
]
......@@ -300,6 +343,34 @@ class TestConnection(TestWorkerBase):
self.wait_for_worker_connection(WORKER1)
def test_scheduler_shutdown(self):
controller = ControllerProcess(multiprocessing.Queue())
controller.start()
message = controller.queue.get()
self.assertEqual(message, 'STARTED')
self.start_worker(WORKER1, 'tcp://127.0.0.1:51000')
message = controller.queue.get()
self.assertEqual(message, 'READY ' + WORKER1)
controller.queue.put('STOP')
sleep(1)
controller = ControllerProcess(multiprocessing.Queue())
controller.start()
message = controller.queue.get()
self.assertEqual(message, 'STARTED')
message = controller.queue.get()
self.assertEqual(message, 'READY ' + WORKER1)
controller.queue.put('STOP')
#----------------------------------------------------------
......
......@@ -26,6 +26,7 @@
###############################################################################
import zmq
import socket
import simplejson
......@@ -43,6 +44,7 @@ class WorkerController(object):
EXECUTE = 'exe'
CANCEL = 'cnl'
ACK = 'ack'
SCHEDULER_SHUTDOWN = 'shd'
class Callbacks(object):
......@@ -54,6 +56,7 @@ class WorkerController(object):
def __init__(self, address, port, callbacks=None):
self.context = zmq.Context()
self.context.setsockopt(socket.SO_REUSEADDR, 1)
self.socket = self.context.socket(zmq.ROUTER)
......@@ -79,9 +82,23 @@ class WorkerController(object):
def destroy(self):
for worker in self.workers:
self.socket.send_multipart([
str(worker),
WorkerController.SCHEDULER_SHUTDOWN,
])
self.workers = []
self.poller.unregister(self.socket)
self.poller = None
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.socket = None
self.context.destroy()
self.context = None
def execute(self, worker, job_id, configuration):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment