diff --git a/beat/backend/python/algorithm.py b/beat/backend/python/algorithm.py index f7c645fef3cf8e335fffbc2cd6384c486e15b6f1..4456a3119554f6c52f3877eef55f8603e68cb16c 100644 --- a/beat/backend/python/algorithm.py +++ b/beat/backend/python/algorithm.py @@ -325,7 +325,7 @@ class Runner(object): return answer - def write(self, outputs, end_data_index): + def write(self, outputs, processor_output_name, end_data_index): """Write to the outputs""" exc = self.exc or RuntimeError @@ -341,7 +341,9 @@ class Runner(object): if not self.prepared: raise exc("Algorithm '%s' is not yet prepared" % self.name) - return loader.run(self.obj, "write", self.exc, outputs, end_data_index) + return loader.run( + self.obj, "write", self.exc, outputs, processor_output_name, end_data_index + ) def read(self, inputs): """Triggers a read of the inputs diff --git a/beat/backend/python/data.py b/beat/backend/python/data.py index 2f8bf57d0592db0cf49f438dee07f16745b03052..0af51df6d48d0655cfc885c6441d70a38701bfc2 100644 --- a/beat/backend/python/data.py +++ b/beat/backend/python/data.py @@ -957,6 +957,15 @@ class CachedDataSink(DataSink): if (self.last_written_data_index is None) or ( self.last_written_data_index < self.end_index ): + if self.last_written_data_index is None: + message = "No data written" + else: + message = "No enough data written: last written {} vs end {}".format( + self.last_data_index, self.end_index + ) + + logger.warning("Removing cache files: {}".format(message)) + try: os.remove(self.filename) os.remove(self.filename.replace(".data", ".index")) diff --git a/beat/backend/python/execution/loop.py b/beat/backend/python/execution/loop.py index 5073f5fcb512a0b42580e6075888e146db56de0c..c217bec7bf6bc1018c4655df29f2a5bd9b49a53e 100644 --- a/beat/backend/python/execution/loop.py +++ b/beat/backend/python/execution/loop.py @@ -259,10 +259,12 @@ class LoopExecutor(object): logger.debug("User loop has validated: {}\n{}".format(is_valid, answer)) return is_valid, answer - def write(self, end_data_index=None): + def write(self, processor_output_name, end_data_index=None): """Write the loop output""" - retval = self.runner.write(self.output_list, end_data_index) + retval = self.runner.write( + self.output_list, processor_output_name, end_data_index + ) logger.debug("User loop wrote output: {}".format(retval)) return retval diff --git a/beat/backend/python/execution/messagehandlers.py b/beat/backend/python/execution/messagehandlers.py index 38ce180726dd7bc22deb2cdf1e93f96e360c4b32..b2133f9573415c82181e3cce86ac695f5a995317 100644 --- a/beat/backend/python/execution/messagehandlers.py +++ b/beat/backend/python/execution/messagehandlers.py @@ -413,9 +413,10 @@ class LoopMessageHandler(MessageHandler): self.socket.send_string("True" if is_valid else "False", zmq.SNDMORE) self.socket.send(data.pack()) - def write(self, end_data_index): + def write(self, processor_output_name, end_data_index): """ Trigger a write on the output""" + processor_output_name = processor_output_name.decode("utf-8") end_data_index = end_data_index.decode("utf-8") if end_data_index != "None": @@ -427,11 +428,12 @@ class LoopMessageHandler(MessageHandler): else: end_data_index = None - logger.debug("recv: wrt {}".format(end_data_index)) + logger.debug("recv: wrt {} {}".format(processor_output_name, end_data_index)) try: - self.executor.write(end_data_index) - except Exception: + self.executor.write(processor_output_name, end_data_index) + except Exception as e: + logger.warning("recv: wrt write failed: {}".format(e)) raise finally: self.socket.send_string("ack") diff --git a/beat/backend/python/outputs.py b/beat/backend/python/outputs.py index 75a61319feecb06f95ff8f82123504418a8b50be..17b38675df9a95902f9d3416d700d96039b3e0ee 100644 --- a/beat/backend/python/outputs.py +++ b/beat/backend/python/outputs.py @@ -213,7 +213,9 @@ class RemotelySyncedOutput(Output): def write(self, data, end_data_index=None): super(RemotelySyncedOutput, self).write(data, end_data_index) + self.socket.send_string("wrt", zmq.SNDMORE) + self.socket.send_string(self.name, zmq.SNDMORE) self.socket.send_string("{}".format(end_data_index)) self.socket.recv() diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/loop_evaluator/1.py b/beat/backend/python/test/prefix/algorithms/autonomous/loop_evaluator/1.py index 7f61262248e1b17f3706344a8dbfda2bf26a6615..dd56277b35e4d4342cbd8d11896d41a57f4aa1f0 100644 --- a/beat/backend/python/test/prefix/algorithms/autonomous/loop_evaluator/1.py +++ b/beat/backend/python/test/prefix/algorithms/autonomous/loop_evaluator/1.py @@ -53,6 +53,6 @@ class Algorithm: return (result, {"value": np.int64(value)}) - def write(self, outputs, end_data_index): + def write(self, outputs, processor_output_name, end_data_index): outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index) return True diff --git a/beat/backend/python/test/prefix/algorithms/sequential/loop_evaluator/1.py b/beat/backend/python/test/prefix/algorithms/sequential/loop_evaluator/1.py index 818422fc381ca653f27d537adfce2f633914d69b..5da20b220bdc572b1eeeb2ddd86cf418ec4f5658 100644 --- a/beat/backend/python/test/prefix/algorithms/sequential/loop_evaluator/1.py +++ b/beat/backend/python/test/prefix/algorithms/sequential/loop_evaluator/1.py @@ -54,7 +54,7 @@ class Algorithm: return (result, {"value": np.int64(value)}) - def write(self, outputs, end_data_index): + def write(self, outputs, processor_output_name, end_data_index): outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index) return True