Skip to content
Snippets Groups Projects
Commit 2dfffdd8 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

Merge branch '27_improve_synchronized_output_handling' into 'master'

Improve synchronized output handling

Closes #27

See merge request !61
parents 18f947a2 22821581
No related branches found
No related tags found
1 merge request!61Improve synchronized output handling
Pipeline #33858 passed
......@@ -325,7 +325,7 @@ class Runner(object):
return answer
def write(self, outputs, end_data_index):
def write(self, outputs, processor_output_name, end_data_index):
"""Write to the outputs"""
exc = self.exc or RuntimeError
......@@ -341,7 +341,9 @@ class Runner(object):
if not self.prepared:
raise exc("Algorithm '%s' is not yet prepared" % self.name)
return loader.run(self.obj, "write", self.exc, outputs, end_data_index)
return loader.run(
self.obj, "write", self.exc, outputs, processor_output_name, end_data_index
)
def read(self, inputs):
"""Triggers a read of the inputs
......
......@@ -957,6 +957,15 @@ class CachedDataSink(DataSink):
if (self.last_written_data_index is None) or (
self.last_written_data_index < self.end_index
):
if self.last_written_data_index is None:
message = "No data written"
else:
message = "No enough data written: last written {} vs end {}".format(
self.last_data_index, self.end_index
)
logger.warning("Removing cache files: {}".format(message))
try:
os.remove(self.filename)
os.remove(self.filename.replace(".data", ".index"))
......
......@@ -259,10 +259,12 @@ class LoopExecutor(object):
logger.debug("User loop has validated: {}\n{}".format(is_valid, answer))
return is_valid, answer
def write(self, end_data_index=None):
def write(self, processor_output_name, end_data_index=None):
"""Write the loop output"""
retval = self.runner.write(self.output_list, end_data_index)
retval = self.runner.write(
self.output_list, processor_output_name, end_data_index
)
logger.debug("User loop wrote output: {}".format(retval))
return retval
......
......@@ -413,9 +413,10 @@ class LoopMessageHandler(MessageHandler):
self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE)
self.socket.send(data.pack())
def write(self, end_data_index):
def write(self, processor_output_name, end_data_index):
""" Trigger a write on the output"""
processor_output_name = processor_output_name.decode("utf-8")
end_data_index = end_data_index.decode("utf-8")
if end_data_index != "None":
......@@ -427,11 +428,12 @@ class LoopMessageHandler(MessageHandler):
else:
end_data_index = None
logger.debug("recv: wrt {}".format(end_data_index))
logger.debug("recv: wrt {} {}".format(processor_output_name, end_data_index))
try:
self.executor.write(end_data_index)
except Exception:
self.executor.write(processor_output_name, end_data_index)
except Exception as e:
logger.warning("recv: wrt write failed: {}".format(e))
raise
finally:
self.socket.send_string("ack")
......
......@@ -213,7 +213,9 @@ class RemotelySyncedOutput(Output):
def write(self, data, end_data_index=None):
super(RemotelySyncedOutput, self).write(data, end_data_index)
self.socket.send_string("wrt", zmq.SNDMORE)
self.socket.send_string(self.name, zmq.SNDMORE)
self.socket.send_string("{}".format(end_data_index))
self.socket.recv()
......
......@@ -53,6 +53,6 @@ class Algorithm:
return (result, {"value": np.int64(value)})
def write(self, outputs, end_data_index):
def write(self, outputs, processor_output_name, end_data_index):
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
return True
......@@ -54,7 +54,7 @@ class Algorithm:
return (result, {"value": np.int64(value)})
def write(self, outputs, end_data_index):
def write(self, outputs, processor_output_name, end_data_index):
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
return True
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment