Commit 22821581 authored by Samuel GAIST's avatar Samuel GAIST

[loop] Add written to processor output name to evaluator write parameters

This allows the evaluator to properly write to its own
output as the method is called each time a processor
output is written to.

Fixes #27
parent c32a8bf6
Pipeline #33786 passed with stage
in 5 minutes and 33 seconds
......@@ -325,7 +325,7 @@ class Runner(object):
return answer
def write(self, outputs, end_data_index):
def write(self, outputs, processor_output_name, end_data_index):
"""Write to the outputs"""
exc = self.exc or RuntimeError
......@@ -341,7 +341,9 @@ class Runner(object):
if not self.prepared:
raise exc("Algorithm '%s' is not yet prepared" % self.name)
return loader.run(self.obj, "write", self.exc, outputs, end_data_index)
return loader.run(
self.obj, "write", self.exc, outputs, processor_output_name, end_data_index
)
def read(self, inputs):
"""Triggers a read of the inputs
......
......@@ -259,10 +259,12 @@ class LoopExecutor(object):
logger.debug("User loop has validated: {}\n{}".format(is_valid, answer))
return is_valid, answer
def write(self, end_data_index=None):
def write(self, processor_output_name, end_data_index=None):
"""Write the loop output"""
retval = self.runner.write(self.output_list, end_data_index)
retval = self.runner.write(
self.output_list, processor_output_name, end_data_index
)
logger.debug("User loop wrote output: {}".format(retval))
return retval
......
......@@ -413,9 +413,10 @@ class LoopMessageHandler(MessageHandler):
self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE)
self.socket.send(data.pack())
def write(self, end_data_index):
def write(self, processor_output_name, end_data_index):
""" Trigger a write on the output"""
processor_output_name = processor_output_name.decode("utf-8")
end_data_index = end_data_index.decode("utf-8")
if end_data_index != "None":
......@@ -427,11 +428,12 @@ class LoopMessageHandler(MessageHandler):
else:
end_data_index = None
logger.debug("recv: wrt {}".format(end_data_index))
logger.debug("recv: wrt {} {}".format(processor_output_name, end_data_index))
try:
self.executor.write(end_data_index)
except Exception:
self.executor.write(processor_output_name, end_data_index)
except Exception as e:
logger.warning("recv: wrt write failed: {}".format(e))
raise
finally:
self.socket.send_string("ack")
......
......@@ -213,7 +213,9 @@ class RemotelySyncedOutput(Output):
def write(self, data, end_data_index=None):
super(RemotelySyncedOutput, self).write(data, end_data_index)
self.socket.send_string("wrt", zmq.SNDMORE)
self.socket.send_string(self.name, zmq.SNDMORE)
self.socket.send_string("{}".format(end_data_index))
self.socket.recv()
......
......@@ -53,6 +53,6 @@ class Algorithm:
return (result, {"value": np.int64(value)})
def write(self, outputs, end_data_index):
def write(self, outputs, processor_output_name, end_data_index):
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
return True
......@@ -54,7 +54,7 @@ class Algorithm:
return (result, {"value": np.int64(value)})
def write(self, outputs, end_data_index):
def write(self, outputs, processor_output_name, end_data_index):
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
return True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment