From 4eb0c5a656ee7356606dcd1968543b7fc602fcc9 Mon Sep 17 00:00:00 2001 From: Samuel Gaist <samuel.gaist@idiap.ch> Date: Thu, 11 Jul 2019 09:43:27 +0200 Subject: [PATCH] [execution][messagehandlers] Add handling of remote write sync --- .../python/execution/messagehandlers.py | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/beat/backend/python/execution/messagehandlers.py b/beat/backend/python/execution/messagehandlers.py index b2f0c26..5e5e10e 100644 --- a/beat/backend/python/execution/messagehandlers.py +++ b/beat/backend/python/execution/messagehandlers.py @@ -47,7 +47,6 @@ import logging import zmq import simplejson -import requests import threading from ..dataformat import DataFormat @@ -162,7 +161,7 @@ class MessageHandler(threading.Thread): self.kill_callback() self.stop.set() break - except RuntimeError as e: + except RuntimeError: import traceback message = traceback.format_exc() @@ -173,10 +172,12 @@ class MessageHandler(threading.Thread): self.kill_callback() self.stop.set() break - except: + except Exception: import traceback - parser = lambda s: s if len(s) < 20 else s[:20] + "..." + def parser(s): + return s if len(s) < 20 else s[:20] + "..." + parsed_parts = " ".join([parser(k) for k in parts]) message = ( "A problem occurred while performing command `%s' " @@ -249,7 +250,7 @@ class MessageHandler(threading.Thread): try: data_source = self.data_sources[name] - except: + except Exception: raise RemoteException("sys", "Unknown input: %s" % name) logger.debug("send: %d infos", len(data_source)) @@ -275,12 +276,12 @@ class MessageHandler(threading.Thread): try: data_source = self.data_sources[name] - except: + except Exception: raise RemoteException("sys", "Unknown input: %s" % name) try: index = int(index) - except: + except Exception: raise RemoteException("sys", "Invalid index: %s" % index) (data, start_index, end_index) = data_source[index] @@ -351,6 +352,7 @@ class LoopMessageHandler(MessageHandler): ) self.callbacks.update({"val": self.validate}) + self.callbacks.update({"wrt": self.write}) self.executor = None def setup(self, algorithm, prefix): @@ -401,3 +403,10 @@ class LoopMessageHandler(MessageHandler): self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE) self.socket.send(data.pack()) + + def write(self): + """ Trigger a write on the output""" + + logger.debug("recv: wrt") + + self.executor.write() -- GitLab