diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/invalid_loop_validate_output/1.json b/beat/backend/python/test/prefix/algorithms/autonomous/invalid_loop_validate_output/1.json index 97e8978fbe06dab70bdd328a25c728741ea0cce1..fdcd100af9534b19559a7a9d7689456aa6fb5896 100644 --- a/beat/backend/python/test/prefix/algorithms/autonomous/invalid_loop_validate_output/1.json +++ b/beat/backend/python/test/prefix/algorithms/autonomous/invalid_loop_validate_output/1.json @@ -7,12 +7,12 @@ "groups": [ { "inputs": { - "in": { + "in_loop": { "type": "user/single_integer/1" } }, "outputs": { - "out": { + "out_loop": { "type": "user/single_integer/1" } }, diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.json b/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.json index 7dd7d7c01970caef1c70909d268725f39e450977..464768069ca9cf393014fd7df9f831334eb9707c 100644 --- a/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.json +++ b/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.json @@ -6,12 +6,12 @@ "groups": [ { "inputs": { - "in": { + "in_loop": { "type": "user/single_integer/1" } }, "outputs": { - "out": { + "out_loop": { "type": "user/single_integer/1" } }, diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.py b/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.py index 19043c4320da55a5435b855a9e51641d5d13b150..7f61262248e1b17f3706344a8dbfda2bf26a6615 100644 --- a/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.py +++ b/beat/backend/python/test/prefix/algorithms/autonomous/loop/1.py @@ -53,7 +53,6 @@ class Algorithm: return (result, {"value": np.int64(value)}) - def write(self, outputs): - outputs["out"].write({"value": np.int32(self.output)}) + def write(self, outputs, end_data_index): + outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index) return True - diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.json b/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.json index aed1f93478fbcf4fa8fa3fad2d149f9f73083eb2..28e49e741e82ee1eaa7b223cdbd6d7e336d6541d 100644 --- a/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.json +++ b/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.json @@ -7,12 +7,12 @@ "groups": [ { "inputs": { - "in": { + "in_main": { "type": "user/single_integer/1" } }, "outputs": { - "out": { + "out_main": { "type": "user/single_integer/1" } }, diff --git a/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.py b/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.py index 4507fcf3b35626c484f626a6e9ad56b33b347fa6..7bf894d4dc00ee3f31f16f85076270a7935a8122 100644 --- a/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.py +++ b/beat/backend/python/test/prefix/algorithms/autonomous/loop_user/1.py @@ -45,13 +45,13 @@ class Algorithm: cnt = cnt - 1 is_valid, _ = loop_channel.validate({"value": np.full(10, cnt)}) - data_loader = data_loaders.loaderOf("in") + data_loader = data_loaders.loaderOf("in_main") for i in range(data_loader.count()): - view = data_loader.view("in", i) + view = data_loader.view("in_main", i) (data, start, end) = view[view.count() - 1] - value = data["in"].value + value = data["in_main"].value new_value = value + cnt - outputs["out"].write({"value": np.int32(new_value)}, end) + outputs["out_main"].write({"value": np.int32(new_value)}, end) return True diff --git a/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.json b/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.json index cc01fada360d51356724882b1ce23b5fe02b3ac6..d1ada21c78650d79b7cfc43d0261a5ea5535dac8 100644 --- a/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.json +++ b/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.json @@ -7,12 +7,12 @@ "groups": [ { "inputs": { - "in": { + "in_main": { "type": "user/single_integer/1" } }, "outputs": { - "out": { + "out_main": { "type": "user/single_integer/1" } }, diff --git a/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.py b/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.py index 5fd58414a31a98fbd174c91df2890bf4975621c4..5767abd8c88a6155bec4a8c48b5a8b716a25ea4a 100644 --- a/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.py +++ b/beat/backend/python/test/prefix/algorithms/sequential/loop_user/1.py @@ -45,8 +45,8 @@ class Algorithm: cnt = cnt - 1 is_valid, _ = loop_channel.validate({"value": np.full(10, cnt)}) - value = inputs["in"].data.value + value = inputs["in_main"].data.value new_value = value + cnt - outputs["out"].write({"value": np.int32(new_value)}) + outputs["out_main"].write({"value": np.int32(new_value)}) return True diff --git a/beat/backend/python/test/test_loop_executor.py b/beat/backend/python/test/test_loop_executor.py index 441f9aab312d9c419fe8975be155b38f43c947fb..12dd447b18bd03935b84bf1b6c23880978bac388 100644 --- a/beat/backend/python/test/test_loop_executor.py +++ b/beat/backend/python/test/test_loop_executor.py @@ -73,14 +73,14 @@ CONFIGURATION = { "algorithm": "", "channel": "main", "parameters": {}, - "inputs": {"in": {"path": "INPUT", "channel": "main"}}, - "outputs": {"out": {"path": "OUTPUT", "channel": "main"}}, + "inputs": {"in_main": {"path": "INPUT", "channel": "main"}}, + "outputs": {"out_main": {"path": "OUTPUT_MAIN", "channel": "main"}}, "loop": { "algorithm": "", "channel": "main", "parameters": {"threshold": 1}, - "inputs": {"in": {"path": "INPUT", "channel": "main"}}, - "outputs": {"out": {"path": "LOOP_OUTPUT", "channel": "main"}}, + "inputs": {"in_loop": {"path": "INPUT_LOOP", "channel": "main"}}, + "outputs": {"out_loop": {"path": "LOOP_OUTPUT", "channel": "main"}}, }, } @@ -105,6 +105,7 @@ class TestExecution(unittest.TestCase): shutil.rmtree(self.working_dir) if self.loop_executor: + self.loop_socket.send_string("don") self.loop_executor.wait() for handler in [self.message_handler, self.loop_message_handler]: @@ -123,11 +124,10 @@ class TestExecution(unittest.TestCase): self.zmq_context.destroy() self.zmq_context = None - def writeData(self, input_name, indices, start_value): + def writeData(self, config, input_name, indices, start_value): filename = os.path.join( - self.cache_root, CONFIGURATION["inputs"][input_name]["path"] + ".data" + self.cache_root, config["inputs"][input_name]["path"] + ".data" ) - dataformat = DataFormat(prefix, "user/single_integer/1") self.assertTrue(dataformat.valid) @@ -149,7 +149,11 @@ class TestExecution(unittest.TestCase): del data_sink def process(self, algorithm_name, loop_algorithm_name): - self.writeData("in", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000) + + self.writeData(CONFIGURATION, "in_main", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000) + self.writeData( + CONFIGURATION["loop"], "in_loop", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000 + ) # ------------------------------------------------------------------------- @@ -214,7 +218,8 @@ class TestExecution(unittest.TestCase): self.assertTrue( cached_file.setup( os.path.join( - self.cache_root, CONFIGURATION["outputs"]["out"]["path"] + ".data" + self.cache_root, + CONFIGURATION["outputs"]["out_main"]["path"] + ".data", ), prefix, ) @@ -230,7 +235,7 @@ class TestExecution(unittest.TestCase): success = cached_file.setup( os.path.join( self.cache_root, - CONFIGURATION["loop"]["outputs"]["out"]["path"] + ".data", + CONFIGURATION["loop"]["outputs"]["out_loop"]["path"] + ".data", ), prefix, )