Commit a749c91c authored by André Anjos's avatar André Anjos 💬
Browse files

Merge branch 'loop_read_sequentially_input' into 'master'

Implement loop reading input sequentially

See merge request !57
parents 89f83797 425f794e
Pipeline #32167 passed with stages
in 7 minutes and 47 seconds
......@@ -307,7 +307,7 @@ class Runner(object):
exc = self.exc or RuntimeError
if self.algorithm.type != Algorithm.LOOP:
if not self.algorithm.is_loop:
raise exc("Wrong algorithm type: %s" % self.algorithm.type)
# setup() must have run
......@@ -327,9 +327,10 @@ class Runner(object):
def write(self, outputs, end_data_index):
"""Write to the outputs"""
exc = self.exc or RuntimeError
if self.algorithm.type != Algorithm.LOOP:
if not self.algorithm.is_loop:
raise exc("Wrong algorithm type: %s" % self.algorithm.type)
# setup() must have run
......@@ -339,8 +340,31 @@ class Runner(object):
# prepare() must have run
if not self.prepared:
raise exc("Algorithm '%s' is not yet prepared" % self.name)
return loader.run(self.obj, "write", self.exc, outputs, end_data_index)
def read(self, inputs):
"""Triggers a read of the inputs
This is used by the loop when used in conjunction with a sequential
loop user.
"""
exc = self.exc or RuntimeError
if not self.algorithm.is_loop:
raise exc("Wrong algorithm type: %s" % self.algorithm.type)
# setup() must have run
if not self.ready:
raise exc("Algorithm '%s' is not yet setup" % self.name)
# prepare() must have run
if not self.prepared:
raise exc("Algorithm '%s' is not yet prepared" % self.name)
return loader.run(self.obj, "read", self.exc, inputs)
def __getattr__(self, key):
"""Returns an attribute of the algorithm - only called at last resort
"""
......@@ -427,7 +451,8 @@ class Algorithm(object):
LEGACY = "legacy"
SEQUENTIAL = "sequential"
AUTONOMOUS = "autonomous"
LOOP = "loop"
SEQUENTIAL_LOOP = "sequential_loop"
AUTONOMOUS_LOOP = "autonomous_loop"
SEQUENTIAL_LOOP_USER = "sequential_loop_user"
AUTONOMOUS_LOOP_USER = "autonomous_loop_user"
......@@ -673,14 +698,22 @@ class Algorithm(object):
""" Returns whether the algorithm is in the autonomous category"""
return self.type in [
Algorithm.AUTONOMOUS,
Algorithm.AUTONOMOUS_LOOP,
Algorithm.AUTONOMOUS_LOOP_USER,
Algorithm.LOOP,
]
@property
def is_sequential(self):
""" Returns whether the algorithm is in the sequential category"""
return self.type in [Algorithm.SEQUENTIAL, Algorithm.SEQUENTIAL_LOOP_USER]
return self.type in [
Algorithm.SEQUENTIAL,
Algorithm.SEQUENTIAL_LOOP,
Algorithm.SEQUENTIAL_LOOP_USER,
]
@property
def is_loop(self):
return self.type in [Algorithm.SEQUENTIAL_LOOP, Algorithm.AUTONOMOUS_LOOP]
@language.setter
def language(self, value):
......
......@@ -238,6 +238,10 @@ class AlgorithmExecutor(object):
main_group.next()
main_group.restricted_access = True
if self.loop_socket:
self.loop_socket.send_string("rdi")
self.loop_socket.recv()
if self.algorithm.type == Algorithm.LEGACY:
if self.analysis:
result = self.runner.process(
......
......@@ -266,6 +266,15 @@ class LoopExecutor(object):
logger.debug("User loop wrote output: {}".format(retval))
return retval
def read(self):
"""Move input to next element and make it read"""
main_group = self.input_list.main_group
main_group.restricted_access = False
main_group.next()
main_group.restricted_access = True
return self.runner.read(self.input_list)
@property
def address(self):
""" Address of the message handler"""
......
......@@ -353,6 +353,8 @@ class LoopMessageHandler(MessageHandler):
self.callbacks.update({"val": self.validate})
self.callbacks.update({"wrt": self.write})
self.callbacks.update({"rdi": self.read})
self.executor = None
def setup(self, algorithm, prefix):
......@@ -420,3 +422,13 @@ class LoopMessageHandler(MessageHandler):
raise
finally:
self.socket.send_string("ack")
def read(self):
"""Read next data"""
try:
self.executor.read()
except Exception:
raise
finally:
self.socket.send_string("ack")
......@@ -2,7 +2,7 @@
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "loop",
"type": "autonomous_loop",
"groups": [
{
"inputs": {
......
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "sequential_loop",
"groups": [
{
"inputs": {
"in_loop": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_loop": {
"type": "user/single_integer/1"
}
},
"loop": {
"request": {
"type": "user/1d_array_of_integers/1"
},
"answer": {
"type": "user/single_integer64/1"
}
}
}
],
"parameters": {
"threshold": {
"default": 9,
"type": "int8",
"description": "Value that will change loop result"
}
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
class Algorithm:
def __init__(self):
self.threshold = None
self.output = None
self.current_input = None
def setup(self, parameters):
self.threshold = parameters["threshold"]
return True
def validate(self, result):
value = result.value[0]
result = value < self.threshold
if not result:
self.output = value
return (result, {"value": np.int64(value)})
def write(self, outputs, end_data_index):
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
return True
def read(self, inputs):
self.current_input = inputs["in_loop"].data.value
return True
......@@ -234,6 +234,24 @@ class TestSequentialAPI_Loading(unittest.TestCase):
self.assertFalse(runnable.ready)
self.assertFalse(runnable.prepared)
def test_load_valid_loop(self):
algorithm = Algorithm(prefix, "sequential/loop/1")
self.assertEqual(algorithm.name, "sequential/loop/1")
self.assertTrue(algorithm.valid)
self.assertFalse(algorithm.errors)
self.assertFalse(algorithm.results) # it is not an analyzer
self.assertTrue(algorithm.parameters) # does not contain parameters
self.assertTrue(algorithm.input_map)
self.assertTrue(algorithm.output_map)
self.assertEqual(algorithm.schema_version, 3)
self.assertEqual(algorithm.api_version, 2)
self.assertEqual(algorithm.type, Algorithm.SEQUENTIAL_LOOP)
runnable = algorithm.runner()
self.assertFalse(runnable.ready)
self.assertTrue(runnable.prepared)
def test_load_valid_loop_user(self):
algorithm = Algorithm(prefix, "sequential/loop_user/1")
self.assertEqual(algorithm.name, "sequential/loop_user/1")
......@@ -303,30 +321,7 @@ class TestAutonomousAPI_Loading(unittest.TestCase):
self.assertFalse(runnable.ready)
self.assertFalse(runnable.prepared)
def test_load_valid_loop_user(self):
algorithm = Algorithm(prefix, "autonomous/loop_user/1")
self.assertEqual(algorithm.name, "autonomous/loop_user/1")
self.assertTrue(algorithm.valid)
self.assertFalse(algorithm.errors)
self.assertFalse(algorithm.results) # it is not an analyzer
self.assertFalse(algorithm.parameters) # does not contain parameters
self.assertTrue(algorithm.input_map)
self.assertTrue(algorithm.output_map)
self.assertEqual(algorithm.schema_version, 3)
self.assertEqual(algorithm.api_version, 2)
self.assertEqual(algorithm.type, Algorithm.AUTONOMOUS_LOOP_USER)
runnable = algorithm.runner()
self.assertTrue(runnable.ready)
self.assertTrue(runnable.prepared)
# ----------------------------------------------------------
class TestLoopAPI_Loading(unittest.TestCase):
def test_load_valid_algorithm(self):
def test_load_valid_loop(self):
algorithm = Algorithm(prefix, "autonomous/loop/1")
self.assertEqual(algorithm.name, "autonomous/loop/1")
self.assertTrue(algorithm.valid)
......@@ -338,7 +333,7 @@ class TestLoopAPI_Loading(unittest.TestCase):
self.assertTrue(algorithm.output_map)
self.assertEqual(algorithm.schema_version, 3)
self.assertEqual(algorithm.api_version, 2)
self.assertEqual(algorithm.type, Algorithm.LOOP)
self.assertEqual(algorithm.type, Algorithm.AUTONOMOUS_LOOP)
runnable = algorithm.runner()
self.assertFalse(runnable.ready)
......@@ -346,19 +341,14 @@ class TestLoopAPI_Loading(unittest.TestCase):
runnable.prepared
) # loop/1 has no prepare method so is prepared
# ----------------------------------------------------------
class TestLoopUserAPI_Loading(unittest.TestCase):
def test_load_valid_algorithm(self):
def test_load_valid_loop_user(self):
algorithm = Algorithm(prefix, "autonomous/loop_user/1")
self.assertEqual(algorithm.name, "autonomous/loop_user/1")
self.assertTrue(algorithm.valid)
self.assertFalse(algorithm.errors)
self.assertFalse(algorithm.results) # it is not an analyzer
self.assertFalse(algorithm.parameters) # loop_user/1 has no parameter
self.assertFalse(algorithm.parameters) # does not contain parameters
self.assertTrue(algorithm.input_map)
self.assertTrue(algorithm.output_map)
self.assertEqual(algorithm.schema_version, 3)
......@@ -366,10 +356,8 @@ class TestLoopUserAPI_Loading(unittest.TestCase):
self.assertEqual(algorithm.type, Algorithm.AUTONOMOUS_LOOP_USER)
runnable = algorithm.runner()
self.assertTrue(runnable.ready) # loop_user/1 has no setup method so is ready
self.assertTrue(
runnable.prepared
) # loop/1 has no prepare method so is prepared
self.assertTrue(runnable.ready)
self.assertTrue(runnable.prepared)
# ----------------------------------------------------------
......
......@@ -104,6 +104,7 @@ class TestExecution(unittest.TestCase):
shutil.rmtree(self.cache_root)
shutil.rmtree(self.working_dir)
self.loop_socket.send_string("don")
if self.loop_executor:
self.loop_socket.send_string("don")
self.loop_executor.wait()
......@@ -209,7 +210,6 @@ class TestExecution(unittest.TestCase):
self.assertTrue(executor.setup())
self.assertTrue(executor.prepare())
self.assertTrue(executor.process())
self.loop_executor.close()
......@@ -251,7 +251,7 @@ class TestExecution(unittest.TestCase):
self.process("autonomous/loop_user/1", "autonomous/loop/1")
def test_sequential_loop_user(self):
self.process("sequential/loop_user/1", "autonomous/loop/1")
self.process("sequential/loop_user/1", "sequential/loop/1")
def test_autonomous_loop_invalid_validate_output(self):
with self.assertRaises(RemoteException):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment