Commit f241a438 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[test][loop_executor] Refactor test to ensure proper run

Currently input and output cache files were "shared"
because of how the configuration was made.

Also, the input for the loop algorithm, although the same
as the loop_user, was not properly generated as a separate
input.
parent 44c49345
...@@ -7,12 +7,12 @@ ...@@ -7,12 +7,12 @@
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
"in": { "in_loop": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
"outputs": { "outputs": {
"out": { "out_loop": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
......
...@@ -6,12 +6,12 @@ ...@@ -6,12 +6,12 @@
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
"in": { "in_loop": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
"outputs": { "outputs": {
"out": { "out_loop": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
......
...@@ -53,7 +53,6 @@ class Algorithm: ...@@ -53,7 +53,6 @@ class Algorithm:
return (result, {"value": np.int64(value)}) return (result, {"value": np.int64(value)})
def write(self, outputs): def write(self, outputs, end_data_index):
outputs["out"].write({"value": np.int32(self.output)}) outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
return True return True
...@@ -7,12 +7,12 @@ ...@@ -7,12 +7,12 @@
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
"in": { "in_main": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
"outputs": { "outputs": {
"out": { "out_main": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
......
...@@ -45,13 +45,13 @@ class Algorithm: ...@@ -45,13 +45,13 @@ class Algorithm:
cnt = cnt - 1 cnt = cnt - 1
is_valid, _ = loop_channel.validate({"value": np.full(10, cnt)}) is_valid, _ = loop_channel.validate({"value": np.full(10, cnt)})
data_loader = data_loaders.loaderOf("in") data_loader = data_loaders.loaderOf("in_main")
for i in range(data_loader.count()): for i in range(data_loader.count()):
view = data_loader.view("in", i) view = data_loader.view("in_main", i)
(data, start, end) = view[view.count() - 1] (data, start, end) = view[view.count() - 1]
value = data["in"].value value = data["in_main"].value
new_value = value + cnt new_value = value + cnt
outputs["out"].write({"value": np.int32(new_value)}, end) outputs["out_main"].write({"value": np.int32(new_value)}, end)
return True return True
...@@ -7,12 +7,12 @@ ...@@ -7,12 +7,12 @@
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
"in": { "in_main": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
"outputs": { "outputs": {
"out": { "out_main": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
......
...@@ -45,8 +45,8 @@ class Algorithm: ...@@ -45,8 +45,8 @@ class Algorithm:
cnt = cnt - 1 cnt = cnt - 1
is_valid, _ = loop_channel.validate({"value": np.full(10, cnt)}) is_valid, _ = loop_channel.validate({"value": np.full(10, cnt)})
value = inputs["in"].data.value value = inputs["in_main"].data.value
new_value = value + cnt new_value = value + cnt
outputs["out"].write({"value": np.int32(new_value)}) outputs["out_main"].write({"value": np.int32(new_value)})
return True return True
...@@ -73,14 +73,14 @@ CONFIGURATION = { ...@@ -73,14 +73,14 @@ CONFIGURATION = {
"algorithm": "", "algorithm": "",
"channel": "main", "channel": "main",
"parameters": {}, "parameters": {},
"inputs": {"in": {"path": "INPUT", "channel": "main"}}, "inputs": {"in_main": {"path": "INPUT", "channel": "main"}},
"outputs": {"out": {"path": "OUTPUT", "channel": "main"}}, "outputs": {"out_main": {"path": "OUTPUT_MAIN", "channel": "main"}},
"loop": { "loop": {
"algorithm": "", "algorithm": "",
"channel": "main", "channel": "main",
"parameters": {"threshold": 1}, "parameters": {"threshold": 1},
"inputs": {"in": {"path": "INPUT", "channel": "main"}}, "inputs": {"in_loop": {"path": "INPUT_LOOP", "channel": "main"}},
"outputs": {"out": {"path": "LOOP_OUTPUT", "channel": "main"}}, "outputs": {"out_loop": {"path": "LOOP_OUTPUT", "channel": "main"}},
}, },
} }
...@@ -105,6 +105,7 @@ class TestExecution(unittest.TestCase): ...@@ -105,6 +105,7 @@ class TestExecution(unittest.TestCase):
shutil.rmtree(self.working_dir) shutil.rmtree(self.working_dir)
if self.loop_executor: if self.loop_executor:
self.loop_socket.send_string("don")
self.loop_executor.wait() self.loop_executor.wait()
for handler in [self.message_handler, self.loop_message_handler]: for handler in [self.message_handler, self.loop_message_handler]:
...@@ -123,11 +124,10 @@ class TestExecution(unittest.TestCase): ...@@ -123,11 +124,10 @@ class TestExecution(unittest.TestCase):
self.zmq_context.destroy() self.zmq_context.destroy()
self.zmq_context = None self.zmq_context = None
def writeData(self, input_name, indices, start_value): def writeData(self, config, input_name, indices, start_value):
filename = os.path.join( filename = os.path.join(
self.cache_root, CONFIGURATION["inputs"][input_name]["path"] + ".data" self.cache_root, config["inputs"][input_name]["path"] + ".data"
) )
dataformat = DataFormat(prefix, "user/single_integer/1") dataformat = DataFormat(prefix, "user/single_integer/1")
self.assertTrue(dataformat.valid) self.assertTrue(dataformat.valid)
...@@ -149,7 +149,11 @@ class TestExecution(unittest.TestCase): ...@@ -149,7 +149,11 @@ class TestExecution(unittest.TestCase):
del data_sink del data_sink
def process(self, algorithm_name, loop_algorithm_name): def process(self, algorithm_name, loop_algorithm_name):
self.writeData("in", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
self.writeData(CONFIGURATION, "in_main", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
self.writeData(
CONFIGURATION["loop"], "in_loop", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000
)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
...@@ -214,7 +218,8 @@ class TestExecution(unittest.TestCase): ...@@ -214,7 +218,8 @@ class TestExecution(unittest.TestCase):
self.assertTrue( self.assertTrue(
cached_file.setup( cached_file.setup(
os.path.join( os.path.join(
self.cache_root, CONFIGURATION["outputs"]["out"]["path"] + ".data" self.cache_root,
CONFIGURATION["outputs"]["out_main"]["path"] + ".data",
), ),
prefix, prefix,
) )
...@@ -230,7 +235,7 @@ class TestExecution(unittest.TestCase): ...@@ -230,7 +235,7 @@ class TestExecution(unittest.TestCase):
success = cached_file.setup( success = cached_file.setup(
os.path.join( os.path.join(
self.cache_root, self.cache_root,
CONFIGURATION["loop"]["outputs"]["out"]["path"] + ".data", CONFIGURATION["loop"]["outputs"]["out_loop"]["path"] + ".data",
), ),
prefix, prefix,
) )
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment