Commit df60fe98 authored by Philip ABBET's avatar Philip ABBET
Browse files

Separate the 0MQ Server into a base class usable in a dataset-dedicated...

Separate the 0MQ Server into a base class usable in a dataset-dedicated container, and a derived one with full functionality
parent e9b884e6
......@@ -45,12 +45,16 @@ from . import dock
from . import baseformat
class Server(gevent.Greenlet):
'''A 0MQ server for our communication with the user process'''
class MessageHandler(gevent.Greenlet):
'''A 0MQ message handler for our communication with other processes
Support for more messages can be implemented by subclassing this class.
This one only support input-related messages.
'''
def __init__(self, input_list, output_list, host_address):
def __init__(self, input_list, zmq_context, zmq_socket):
super(Server, self).__init__()
super(MessageHandler, self).__init__()
# An event unblocking a graceful stop
self.stop = gevent.event.Event()
......@@ -60,18 +64,13 @@ class Server(gevent.Greenlet):
self.must_kill.clear()
# Starts our 0MQ server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.context = zmq_context
self.socket = zmq_socket
self.address = 'tcp://' + host_address
port = self.socket.bind_to_random_port(self.address)
self.address += ':%d' % port
logger.debug("zmq server bound to `%s'", self.address)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.input_list = input_list
self.output_list = output_list
self.system_error = ''
self.user_error = ''
......@@ -79,15 +78,12 @@ class Server(gevent.Greenlet):
# implementations
self.callbacks = dict(
nxt = self.next,
hmd = self.has_more_data,
idd = self.is_dataunit_done,
wrt = self.write,
idm = self.is_data_missing,
oic = self.output_is_connected,
don = self.done,
err = self.error,
)
nxt = self.next,
hmd = self.has_more_data,
idd = self.is_dataunit_done,
don = self.done,
err = self.error,
)
def set_process(self, process):
......@@ -95,10 +91,6 @@ class Server(gevent.Greenlet):
self.process.statistics() # initialize internal statistics
def __str__(self):
return 'Server(%s)' % self.address
def _run(self):
logger.debug("0MQ server thread started")
......@@ -106,7 +98,8 @@ class Server(gevent.Greenlet):
while not self.stop.is_set(): #keep on
if self.must_kill.is_set():
self.process.kill()
if self.process is not None:
self.process.kill()
self.must_kill.clear()
timeout = 1000 #ms
......@@ -136,7 +129,8 @@ class Server(gevent.Greenlet):
(parsed_parts, traceback.format_exc())
logger.error(message, exc_info=True)
self.system_error = message
self.process.kill()
if self.process is not None:
self.process.kill()
self.stop.set()
break
......@@ -145,7 +139,8 @@ class Server(gevent.Greenlet):
% command
logger.error(message)
self.system_error = message
self.process.kill()
if self.process is not None:
self.process.kill()
self.stop.set()
break
......@@ -165,13 +160,6 @@ class Server(gevent.Greenlet):
return retval
def _get_output_candidate(self, name):
retval = self.output_list[name]
if retval is None: raise RuntimeError("Could not find output `%s'" % name)
return retval
def next(self, channel, name=None):
"""Syntax: nxt channel [name] ..."""
......@@ -256,50 +244,11 @@ class Server(gevent.Greenlet):
self.socket.send(what)
def write(self, name, packed):
"""Syntax: wrt output data"""
logger.debug('recv: wrt %s <bin> (size=%d)', name, len(packed))
# Get output object
output_candidate = self._get_output_candidate(name)
if output_candidate is None:
raise RuntimeError("Could not find output `%s' to write to" % name)
data = output_candidate.data_sink.dataformat.type()
data.unpack(packed)
output_candidate.write(data)
logger.debug('send: ack')
self.socket.send('ack')
def is_data_missing(self, name):
"""Syntax: idm output"""
logger.debug('recv: idm %s', name)
output_candidate = self._get_output_candidate(name)
what = 'tru' if output_candidate.isDataMissing() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def output_is_connected(self, name):
"""Syntax: oic output"""
logger.debug('recv: oic %s', name)
output_candidate = self._get_output_candidate(name)
what = 'tru' if output_candidate.isConnected() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def _collect_statistics(self):
logger.debug('collecting user process statistics...')
self.last_statistics = self.process.statistics()
if self.process is not None:
self.last_statistics = self.process.statistics()
def _acknowledge(self):
......@@ -341,6 +290,84 @@ class Server(gevent.Greenlet):
class Server(MessageHandler):
'''A 0MQ server for our communication with the user process'''
def __init__(self, input_list, output_list, host_address):
# Starts our 0MQ server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.address = 'tcp://' + host_address
port = self.socket.bind_to_random_port(self.address)
self.address += ':%d' % port
logger.debug("zmq server bound to `%s'", self.address)
super(Server, self).__init__(input_list, self.context, self.socket)
self.output_list = output_list
# implementations
self.callbacks.update(dict(
wrt = self.write,
idm = self.is_data_missing,
oic = self.output_is_connected,
))
def __str__(self):
return 'Server(%s)' % self.address
def _get_output_candidate(self, name):
retval = self.output_list[name]
if retval is None: raise RuntimeError("Could not find output `%s'" % name)
return retval
def write(self, name, packed):
"""Syntax: wrt output data"""
logger.debug('recv: wrt %s <bin> (size=%d)', name, len(packed))
# Get output object
output_candidate = self._get_output_candidate(name)
if output_candidate is None:
raise RuntimeError("Could not find output `%s' to write to" % name)
data = output_candidate.data_sink.dataformat.type()
data.unpack(packed)
output_candidate.write(data)
logger.debug('send: ack')
self.socket.send('ack')
def is_data_missing(self, name):
"""Syntax: idm output"""
logger.debug('recv: idm %s', name)
output_candidate = self._get_output_candidate(name)
what = 'tru' if output_candidate.isDataMissing() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def output_is_connected(self, name):
"""Syntax: oic output"""
logger.debug('recv: oic %s', name)
output_candidate = self._get_output_candidate(name)
what = 'tru' if output_candidate.isConnected() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
class Agent(object):
'''Handles synchronous commands.
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment