Commit 4eb0c5a6 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[execution][messagehandlers] Add handling of remote write sync

parent f8eabdc4
...@@ -47,7 +47,6 @@ import logging ...@@ -47,7 +47,6 @@ import logging
import zmq import zmq
import simplejson import simplejson
import requests
import threading import threading
from ..dataformat import DataFormat from ..dataformat import DataFormat
...@@ -162,7 +161,7 @@ class MessageHandler(threading.Thread): ...@@ -162,7 +161,7 @@ class MessageHandler(threading.Thread):
self.kill_callback() self.kill_callback()
self.stop.set() self.stop.set()
break break
except RuntimeError as e: except RuntimeError:
import traceback import traceback
message = traceback.format_exc() message = traceback.format_exc()
...@@ -173,10 +172,12 @@ class MessageHandler(threading.Thread): ...@@ -173,10 +172,12 @@ class MessageHandler(threading.Thread):
self.kill_callback() self.kill_callback()
self.stop.set() self.stop.set()
break break
except: except Exception:
import traceback import traceback
parser = lambda s: s if len(s) < 20 else s[:20] + "..." def parser(s):
return s if len(s) < 20 else s[:20] + "..."
parsed_parts = " ".join([parser(k) for k in parts]) parsed_parts = " ".join([parser(k) for k in parts])
message = ( message = (
"A problem occurred while performing command `%s' " "A problem occurred while performing command `%s' "
...@@ -249,7 +250,7 @@ class MessageHandler(threading.Thread): ...@@ -249,7 +250,7 @@ class MessageHandler(threading.Thread):
try: try:
data_source = self.data_sources[name] data_source = self.data_sources[name]
except: except Exception:
raise RemoteException("sys", "Unknown input: %s" % name) raise RemoteException("sys", "Unknown input: %s" % name)
logger.debug("send: %d infos", len(data_source)) logger.debug("send: %d infos", len(data_source))
...@@ -275,12 +276,12 @@ class MessageHandler(threading.Thread): ...@@ -275,12 +276,12 @@ class MessageHandler(threading.Thread):
try: try:
data_source = self.data_sources[name] data_source = self.data_sources[name]
except: except Exception:
raise RemoteException("sys", "Unknown input: %s" % name) raise RemoteException("sys", "Unknown input: %s" % name)
try: try:
index = int(index) index = int(index)
except: except Exception:
raise RemoteException("sys", "Invalid index: %s" % index) raise RemoteException("sys", "Invalid index: %s" % index)
(data, start_index, end_index) = data_source[index] (data, start_index, end_index) = data_source[index]
...@@ -351,6 +352,7 @@ class LoopMessageHandler(MessageHandler): ...@@ -351,6 +352,7 @@ class LoopMessageHandler(MessageHandler):
) )
self.callbacks.update({"val": self.validate}) self.callbacks.update({"val": self.validate})
self.callbacks.update({"wrt": self.write})
self.executor = None self.executor = None
def setup(self, algorithm, prefix): def setup(self, algorithm, prefix):
...@@ -401,3 +403,10 @@ class LoopMessageHandler(MessageHandler): ...@@ -401,3 +403,10 @@ class LoopMessageHandler(MessageHandler):
self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE) self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE)
self.socket.send(data.pack()) self.socket.send(data.pack())
def write(self):
""" Trigger a write on the output"""
logger.debug("recv: wrt")
self.executor.write()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment