Commit 0a1c91f5 authored by Samuel GAIST's avatar Samuel GAIST

[execution][messagehandlers] Do not systematically decode incoming zmq parts

Under some circumstances the decoding may fail as the binary
data may contain values outside the utf-8 "visible" range.

Fixes #26
parent e824c998
Pipeline #33583 passed with stage
in 8 minutes and 26 seconds
...@@ -143,9 +143,9 @@ class MessageHandler(threading.Thread): ...@@ -143,9 +143,9 @@ class MessageHandler(threading.Thread):
more = True more = True
parts = [] parts = []
while more: while more:
parts.append(self.socket.recv().decode("utf-8")) parts.append(self.socket.recv())
more = self.socket.getsockopt(zmq.RCVMORE) more = self.socket.getsockopt(zmq.RCVMORE)
command = parts[0] command = parts[0].decode("utf-8")
if command in self.callbacks: if command in self.callbacks:
try: # to handle command try: # to handle command
...@@ -176,7 +176,8 @@ class MessageHandler(threading.Thread): ...@@ -176,7 +176,8 @@ class MessageHandler(threading.Thread):
import traceback import traceback
def parser(s): def parser(s):
return s if len(s) < 20 else s[:20] + "..." parsed = s if len(s) < 20 else s[:20] + b"..."
return parsed.decode("utf-8")
parsed_parts = " ".join([parser(k) for k in parts]) parsed_parts = " ".join([parser(k) for k in parts])
message = ( message = (
...@@ -229,6 +230,9 @@ class MessageHandler(threading.Thread): ...@@ -229,6 +230,9 @@ class MessageHandler(threading.Thread):
def error(self, t, msg): def error(self, t, msg):
"""Syntax: err type message""" """Syntax: err type message"""
t = t.decode("utf-8")
msg = msg.decode("utf-8")
logger.debug("recv: err %s <msg> (size=%d)", t, len(msg)) logger.debug("recv: err %s <msg> (size=%d)", t, len(msg))
if t == "usr": if t == "usr":
...@@ -242,6 +246,7 @@ class MessageHandler(threading.Thread): ...@@ -242,6 +246,7 @@ class MessageHandler(threading.Thread):
def infos(self, name): def infos(self, name):
"""Syntax: ifo name""" """Syntax: ifo name"""
name = name.decode("utf-8")
logger.debug("recv: ifo %s", name) logger.debug("recv: ifo %s", name)
if self.data_sources is None: if self.data_sources is None:
...@@ -268,6 +273,9 @@ class MessageHandler(threading.Thread): ...@@ -268,6 +273,9 @@ class MessageHandler(threading.Thread):
def get_data(self, name, index): def get_data(self, name, index):
"""Syntax: get name index""" """Syntax: get name index"""
name = name.decode("utf-8")
index = index.decode("utf-8")
logger.debug("recv: get %s %s", name, index) logger.debug("recv: get %s %s", name, index)
if self.data_sources is None: if self.data_sources is None:
...@@ -393,12 +401,11 @@ class LoopMessageHandler(MessageHandler): ...@@ -393,12 +401,11 @@ class LoopMessageHandler(MessageHandler):
Result to be validated. Result to be validated.
""" """
result = result.encode("utf-8")
logger.debug("recv: val %s", result)
data = self.request_data_format.type() data = self.request_data_format.type()
data.unpack(result) data.unpack(result)
logger.debug("recv: val %s", data)
is_valid, answer = self.executor.validate(data) is_valid, answer = self.executor.validate(data)
data = make_data_format(answer, self.answer_data_format) data = make_data_format(answer, self.answer_data_format)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment