Commit 33889a5a authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[output] Add end data index parameter and make it a synchronous operation

Synchronous operation means that the following loop user
write will happen after the loop write method was called.

The end data index parameter allows to keep the output of
both block in sync.
parent 69ee30e2
...@@ -325,9 +325,8 @@ class Runner(object): ...@@ -325,9 +325,8 @@ class Runner(object):
return answer return answer
def write(self, outputs): def write(self, outputs, end_data_index):
"""Write to the outputs""" """Write to the outputs"""
exc = self.exc or RuntimeError exc = self.exc or RuntimeError
if self.algorithm.type != Algorithm.LOOP: if self.algorithm.type != Algorithm.LOOP:
...@@ -340,8 +339,7 @@ class Runner(object): ...@@ -340,8 +339,7 @@ class Runner(object):
# prepare() must have run # prepare() must have run
if not self.prepared: if not self.prepared:
raise exc("Algorithm '%s' is not yet prepared" % self.name) raise exc("Algorithm '%s' is not yet prepared" % self.name)
return loader.run(self.obj, "write", self.exc, outputs, end_data_index)
return loader.run(self.obj, "write", self.exc, outputs)
def __getattr__(self, key): def __getattr__(self, key):
"""Returns an attribute of the algorithm - only called at last resort """Returns an attribute of the algorithm - only called at last resort
......
...@@ -258,11 +258,10 @@ class LoopExecutor(object): ...@@ -258,11 +258,10 @@ class LoopExecutor(object):
logger.debug("User loop has validated: {}\n{}".format(is_valid, answer)) logger.debug("User loop has validated: {}\n{}".format(is_valid, answer))
return is_valid, answer return is_valid, answer
def write(self): def write(self, end_data_index=None):
"""Write the loop output""" """Write the loop output"""
retval = self.runner.write(self.output_list) retval = self.runner.write(self.output_list, end_data_index)
logger.debug("User loop wrote output: {}".format(retval)) logger.debug("User loop wrote output: {}".format(retval))
return retval return retval
......
...@@ -404,9 +404,19 @@ class LoopMessageHandler(MessageHandler): ...@@ -404,9 +404,19 @@ class LoopMessageHandler(MessageHandler):
self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE) self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE)
self.socket.send(data.pack()) self.socket.send(data.pack())
def write(self): def write(self, end_data_index):
""" Trigger a write on the output""" """ Trigger a write on the output"""
logger.debug("recv: wrt") try:
end_data_index = int(end_data_index)
except ValueError:
logger.warning("recv: wrt invalid value {}".format(end_data_index))
end_data_index = None
logger.debug("recv: wrt {}".format(end_data_index))
self.executor.write() try:
self.executor.write(end_data_index)
except Exception:
raise
finally:
self.socket.send_string("ack")
...@@ -45,6 +45,7 @@ This module implements output related classes ...@@ -45,6 +45,7 @@ This module implements output related classes
import six import six
import logging import logging
import zmq
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -212,7 +213,9 @@ class RemotelySyncedOutput(Output): ...@@ -212,7 +213,9 @@ class RemotelySyncedOutput(Output):
def write(self, data, end_data_index=None): def write(self, data, end_data_index=None):
super(RemotelySyncedOutput, self).write(data, end_data_index) super(RemotelySyncedOutput, self).write(data, end_data_index)
self.socket.send_string("wrt") self.socket.send_string("wrt", zmq.SNDMORE)
self.socket.send_string("{}".format(end_data_index))
self.socket.recv()
# ---------------------------------------------------------- # ----------------------------------------------------------
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment