Commit bab77a41 authored by Philip ABBET's avatar Philip ABBET

Remove the 'process' notion from 'MessageHandler'

A 'kill callback' can now be provided to the constructor to perform whatever action
you want when the message handler decides that it's time to stop everything
parent 1066ed81
......@@ -49,7 +49,7 @@ class MessageHandler(gevent.Greenlet):
This one only support input-related messages.
'''
def __init__(self, input_list, zmq_context, zmq_socket):
def __init__(self, input_list, zmq_context, zmq_socket, kill_callback=None):
super(MessageHandler, self).__init__()
......@@ -71,8 +71,9 @@ class MessageHandler(gevent.Greenlet):
self.system_error = ''
self.user_error = ''
self.last_statistics = {}
self.process = None
self.statistics = {}
self.kill_callback = kill_callback
# implementations
self.callbacks = dict(
......@@ -84,11 +85,6 @@ class MessageHandler(gevent.Greenlet):
)
def set_process(self, process):
self.process = process
self.process.statistics() # initialize internal statistics
def _run(self):
logger.debug("0MQ server thread started")
......@@ -96,8 +92,8 @@ class MessageHandler(gevent.Greenlet):
while not self.stop.is_set(): #keep on
if self.must_kill.is_set():
if self.process is not None:
self.process.kill()
if self.kill_callback is not None:
self.kill_callback()
self.must_kill.clear()
timeout = 1000 #ms
......@@ -125,8 +121,8 @@ class MessageHandler(gevent.Greenlet):
else:
self.send_error(e.user_error, kind='usr')
self.user_error = e.user_error
if self.process is not None:
self.process.kill()
if self.kill_callback is not None:
self.kill_callback()
self.stop.set()
break
except RuntimeError as e:
......@@ -135,8 +131,8 @@ class MessageHandler(gevent.Greenlet):
logger.error(message, exc_info=True)
self.send_error(message, kind='usr')
self.user_error = message
if self.process is not None:
self.process.kill()
if self.kill_callback is not None:
self.kill_callback()
self.stop.set()
break
except:
......@@ -149,8 +145,8 @@ class MessageHandler(gevent.Greenlet):
logger.error(message, exc_info=True)
self.send_error(message)
self.system_error = message
if self.process is not None:
self.process.kill()
if self.kill_callback is not None:
self.kill_callback()
self.stop.set()
break
......@@ -160,8 +156,8 @@ class MessageHandler(gevent.Greenlet):
logger.error(message)
self.send_error(message)
self.system_error = message
if self.process is not None:
self.process.kill()
if self.kill_callback is not None:
self.kill_callback()
self.stop.set()
break
......@@ -268,13 +264,6 @@ class MessageHandler(gevent.Greenlet):
self.socket.send(what)
def _collect_statistics(self):
logger.debug('collecting user process statistics...')
if self.process is not None:
self.last_statistics = self.process.statistics()
def _acknowledge(self):
logger.debug('send: ack')
......@@ -289,10 +278,7 @@ class MessageHandler(gevent.Greenlet):
logger.debug('recv: don %s', statistics)
if statistics is not None:
self._collect_statistics()
# collect I/O stats from client
self.last_statistics['data'] = simplejson.loads(statistics)
self.statistics = simplejson.loads(statistics)
self._acknowledge()
......@@ -301,11 +287,13 @@ class MessageHandler(gevent.Greenlet):
"""Syntax: err type message"""
logger.debug('recv: err %s <msg> (size=%d)', t, len(msg))
if t == 'usr': self.user_error = msg
else: self.system_error = msg
self._collect_statistics()
self.last_statistics['data'] = dict(network=dict(wait_time=0.))
if t == 'usr':
self.user_error = msg
else:
self.system_error = msg
self.statistics = dict(network=dict(wait_time=0.))
self._acknowledge()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment