Commit c723f155 authored by André Anjos's avatar André Anjos 💬

Merge branch 'new_loop_user_types' into 'master'

New loop user types

See merge request !55
parents 2a846868 6cbe1625
Pipeline #31780 passed with stages
in 14 minutes and 13 seconds
......@@ -214,7 +214,10 @@ class Runner(object):
# The method is optional
if hasattr(self.obj, "prepare"):
if self.algorithm.type in [Algorithm.AUTONOMOUS, Algorithm.LOOP_USER]:
if self.algorithm.type in [
Algorithm.AUTONOMOUS,
Algorithm.AUTONOMOUS_LOOP_USER,
]:
self.prepared = loader.run(
self.obj, "prepare", self.exc, data_loaders.secondaries()
)
......@@ -264,35 +267,40 @@ class Runner(object):
else:
_check_argument(data_loaders, "data_loaders")
if self.algorithm.type == Algorithm.SEQUENTIAL:
if self.algorithm.is_sequential:
_check_argument(inputs, "inputs")
return loader.run(
self.obj, "process", self.exc, inputs, data_loaders, outputs_to_use
)
run_args = [
self.obj,
"process",
self.exc,
inputs,
data_loaders,
outputs_to_use,
]
elif self.algorithm.is_autonomous:
run_args = [self.obj, "process", self.exc, data_loaders, outputs_to_use]
has_loop_arg = utils.has_argument(self.obj.process, "loop_channel")
if loop_channel is not None:
if has_loop_arg:
run_args.append(loop_channel)
else:
raise exc(
"Algorithm '%s' is not a valid loop enabled algorithm"
% self.name
)
elif has_loop_arg:
else:
raise exc("Unknown algorithm type: %s" % self.algorithm.type)
has_loop_arg = utils.has_argument(self.obj.process, "loop_channel")
if loop_channel is not None:
if has_loop_arg:
run_args.append(loop_channel)
else:
raise exc(
"Algorithm '%s' is a loop enabled algorithm but no loop_channel given"
"Algorithm '%s' is not a valid loop enabled algorithm"
% self.name
)
elif has_loop_arg:
raise exc(
"Algorithm '%s' is a loop enabled algorithm but no loop_channel given"
% self.name
)
return loader.run(*run_args)
else:
raise exc("Unknown algorithm type: %s" % self.algorithm.type)
return loader.run(*run_args)
def validate(self, result):
"""Validates the given results"""
......@@ -404,7 +412,8 @@ class Algorithm(object):
SEQUENTIAL = "sequential"
AUTONOMOUS = "autonomous"
LOOP = "loop"
LOOP_USER = "loop_user"
SEQUENTIAL_LOOP_USER = "sequential_loop_user"
AUTONOMOUS_LOOP_USER = "autonomous_loop_user"
def __init__(self, prefix, name, dataformat_cache=None, library_cache=None):
......@@ -672,7 +681,16 @@ class Algorithm(object):
@property
def is_autonomous(self):
""" Returns whether the algorithm is in the autonomous category"""
return self.type in [Algorithm.AUTONOMOUS, Algorithm.LOOP_USER, Algorithm.LOOP]
return self.type in [
Algorithm.AUTONOMOUS,
Algorithm.AUTONOMOUS_LOOP_USER,
Algorithm.LOOP,
]
@property
def is_sequential(self):
""" Returns whether the algorithm is in the sequential category"""
return self.type in [Algorithm.SEQUENTIAL, Algorithm.SEQUENTIAL_LOOP_USER]
@language.setter
def language(self, value):
......
......@@ -124,8 +124,6 @@ class AlgorithmExecutor(object):
self.prefix, self.data["algorithm"], dataformat_cache, library_cache
)
main_channel = self.data["channel"]
if db_socket:
db_access_mode = AccessMode.REMOTE
databases = None
......@@ -249,7 +247,7 @@ class AlgorithmExecutor(object):
inputs=self.input_list, outputs=self.output_list
)
elif self.algorithm.type == Algorithm.SEQUENTIAL:
elif self.algorithm.is_sequential:
if self.analysis:
result = self.runner.process(
inputs=self.input_list,
......@@ -261,6 +259,7 @@ class AlgorithmExecutor(object):
inputs=self.input_list,
data_loaders=self.data_loaders,
outputs=self.output_list,
loop_channel=self.loop_channel,
)
if not result:
......
This diff is collapsed.
......@@ -2,7 +2,7 @@
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "loop_user",
"type": "autonomous_loop_user",
"splittable": false,
"groups": [
{
......
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "sequential_loop_user",
"splittable": false,
"groups": [
{
"inputs": {
"in": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"type": "user/single_integer/1"
}
},
"loop": {
"request": {
"type": "user/single_integer/1"
},
"answer": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
class Algorithm:
def process(self, inputs, data_loaders, outputs, loop_channel):
cnt = 10
is_valid, _ = loop_channel.validate({"value": np.int32(cnt)})
while not is_valid:
cnt = cnt - 1
is_valid, _ = loop_channel.validate({"value": np.int32(cnt)})
value = inputs["in"].data.value
new_value = value + cnt
outputs["out"].write({"value": np.int32(new_value)})
return True
......@@ -234,6 +234,24 @@ class TestSequentialAPI_Loading(unittest.TestCase):
self.assertFalse(runnable.ready)
self.assertFalse(runnable.prepared)
def test_load_valid_loop_user(self):
algorithm = Algorithm(prefix, "sequential/loop_user/1")
self.assertEqual(algorithm.name, "sequential/loop_user/1")
self.assertTrue(algorithm.valid)
self.assertFalse(algorithm.errors)
self.assertFalse(algorithm.results) # it is not an analyzer
self.assertFalse(algorithm.parameters) # does not contain parameters
self.assertTrue(algorithm.input_map)
self.assertTrue(algorithm.output_map)
self.assertEqual(algorithm.schema_version, 3)
self.assertEqual(algorithm.api_version, 2)
self.assertEqual(algorithm.type, Algorithm.SEQUENTIAL_LOOP_USER)
runnable = algorithm.runner()
self.assertTrue(runnable.ready)
self.assertTrue(runnable.prepared)
# ----------------------------------------------------------
......@@ -285,6 +303,24 @@ class TestAutonomousAPI_Loading(unittest.TestCase):
self.assertFalse(runnable.ready)
self.assertFalse(runnable.prepared)
def test_load_valid_loop_user(self):
algorithm = Algorithm(prefix, "autonomous/loop_user/1")
self.assertEqual(algorithm.name, "autonomous/loop_user/1")
self.assertTrue(algorithm.valid)
self.assertFalse(algorithm.errors)
self.assertFalse(algorithm.results) # it is not an analyzer
self.assertFalse(algorithm.parameters) # does not contain parameters
self.assertTrue(algorithm.input_map)
self.assertTrue(algorithm.output_map)
self.assertEqual(algorithm.schema_version, 3)
self.assertEqual(algorithm.api_version, 2)
self.assertEqual(algorithm.type, Algorithm.AUTONOMOUS_LOOP_USER)
runnable = algorithm.runner()
self.assertTrue(runnable.ready)
self.assertTrue(runnable.prepared)
# ----------------------------------------------------------
......
......@@ -59,7 +59,6 @@ from ..data import CachedDataSink
from ..data import CachedDataSource
from ..helpers import convert_experiment_configuration_to_container
from ..helpers import AccessMode
from . import prefix
......@@ -67,48 +66,28 @@ from . import prefix
logger = logging.getLogger(__name__)
#----------------------------------------------------------
# ----------------------------------------------------------
CONFIGURATION = {
'algorithm': '',
'channel': 'main',
'parameters': {
"algorithm": "",
"channel": "main",
"parameters": {},
"inputs": {"in": {"path": "INPUT", "channel": "main"}},
"outputs": {"out": {"path": "OUTPUT", "channel": "main"}},
"loop": {
"algorithm": "",
"channel": "main",
"parameters": {"threshold": 1},
"inputs": {"in": {"path": "INPUT", "channel": "main"}},
},
'inputs': {
'in': {
'path': 'INPUT',
'channel': 'main',
}
},
'outputs': {
'out': {
'path': 'OUTPUT',
'channel': 'main'
}
},
'loop': {
'algorithm': '',
'channel': 'main',
'parameters': {
'threshold': 1
},
'inputs': {
'in': {
'path': 'INPUT',
'channel': 'main'
}
}
}
}
#----------------------------------------------------------
# ----------------------------------------------------------
class TestExecution(unittest.TestCase):
def setUp(self):
self.cache_root = tempfile.mkdtemp(prefix=__name__)
self.working_dir = tempfile.mkdtemp(prefix=__name__)
......@@ -120,7 +99,6 @@ class TestExecution(unittest.TestCase):
self.loop_socket = None
self.zmq_context = None
def tearDown(self):
shutil.rmtree(self.cache_root)
shutil.rmtree(self.working_dir)
......@@ -135,7 +113,6 @@ class TestExecution(unittest.TestCase):
handler.destroy()
handler = None
for socket in [self.executor_socket, self.loop_socket]:
if socket is not None:
socket.setsockopt(zmq.LINGER, 0)
......@@ -145,15 +122,18 @@ class TestExecution(unittest.TestCase):
self.zmq_context.destroy()
self.zmq_context = None
def writeData(self, input_name, indices, start_value):
filename = os.path.join(self.cache_root, CONFIGURATION['inputs'][input_name]['path'] + '.data')
filename = os.path.join(
self.cache_root, CONFIGURATION["inputs"][input_name]["path"] + ".data"
)
dataformat = DataFormat(prefix, 'user/single_integer/1')
dataformat = DataFormat(prefix, "user/single_integer/1")
self.assertTrue(dataformat.valid)
data_sink = CachedDataSink()
self.assertTrue(data_sink.setup(filename, dataformat, indices[0][0], indices[-1][1]))
self.assertTrue(
data_sink.setup(filename, dataformat, indices[0][0], indices[-1][1])
)
for i in indices:
data = dataformat.type()
......@@ -167,40 +147,39 @@ class TestExecution(unittest.TestCase):
data_sink.close()
del data_sink
def process(self, algorithm_name, loop_algorithm_name):
self.writeData('in', [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
self.writeData("in", [(0, 0), (1, 1), (2, 2), (3, 3)], 1000)
# -------------------------------------------------------------------------
config = deepcopy(CONFIGURATION)
config['algorithm'] = algorithm_name
config['loop']['algorithm'] = loop_algorithm_name
config["algorithm"] = algorithm_name
config["loop"]["algorithm"] = loop_algorithm_name
config = convert_experiment_configuration_to_container(config)
with open(os.path.join(self.working_dir, 'configuration.json'), 'wb') as f:
data = simplejson.dumps(config, indent=4).encode('utf-8')
with open(os.path.join(self.working_dir, "configuration.json"), "wb") as f:
data = simplejson.dumps(config, indent=4).encode("utf-8")
f.write(data)
working_prefix = os.path.join(self.working_dir, 'prefix')
working_prefix = os.path.join(self.working_dir, "prefix")
if not os.path.exists(working_prefix):
os.makedirs(working_prefix)
algorithm = Algorithm(prefix, algorithm_name)
assert(algorithm.valid)
self.assertTrue(algorithm.valid, algorithm.errors)
algorithm.export(working_prefix)
# -------------------------------------------------------------------------
loop_algorithm = Algorithm(prefix, loop_algorithm_name)
assert(loop_algorithm.valid)
self.assertTrue(loop_algorithm.valid, loop_algorithm.errors)
loop_algorithm.export(working_prefix)
# -------------------------------------------------------------------------
self.message_handler = MessageHandler('127.0.0.1')
self.message_handler = MessageHandler("127.0.0.1")
self.message_handler.start()
self.loop_message_handler = LoopMessageHandler('127.0.0.1')
self.loop_message_handler = LoopMessageHandler("127.0.0.1")
self.zmq_context = zmq.Context()
self.executor_socket = self.zmq_context.socket(zmq.PAIR)
......@@ -208,12 +187,19 @@ class TestExecution(unittest.TestCase):
self.loop_socket = self.zmq_context.socket(zmq.PAIR)
self.loop_socket.connect(self.loop_message_handler.address)
self.loop_executor = LoopExecutor(self.loop_message_handler, self.working_dir, cache_root=self.cache_root)
self.loop_executor = LoopExecutor(
self.loop_message_handler, self.working_dir, cache_root=self.cache_root
)
self.assertTrue(self.loop_executor.setup())
self.assertTrue(self.loop_executor.prepare())
self.loop_executor.process()
executor = AlgorithmExecutor(self.executor_socket, self.working_dir, cache_root=self.cache_root, loop_socket=self.loop_socket)
executor = AlgorithmExecutor(
self.executor_socket,
self.working_dir,
cache_root=self.cache_root,
loop_socket=self.loop_socket,
)
self.assertTrue(executor.setup())
self.assertTrue(executor.prepare())
......@@ -221,7 +207,14 @@ class TestExecution(unittest.TestCase):
self.assertTrue(executor.process())
cached_file = CachedDataSource()
self.assertTrue(cached_file.setup(os.path.join(self.cache_root, CONFIGURATION['outputs']['out']['path'] + '.data'), prefix))
self.assertTrue(
cached_file.setup(
os.path.join(
self.cache_root, CONFIGURATION["outputs"]["out"]["path"] + ".data"
),
prefix,
)
)
for i in range(len(cached_file)):
data, start, end = cached_file[i]
......@@ -229,13 +222,12 @@ class TestExecution(unittest.TestCase):
self.assertEqual(start, i)
self.assertEqual(end, i)
def test_autonomous_loop_user(self):
self.process("autonomous/loop_user/1", "autonomous/loop/1")
def test_autonomous_loop(self):
self.process('autonomous/loop_user/1',
'autonomous/loop/1')
def test_sequential_loop_user(self):
self.process("sequential/loop_user/1", "autonomous/loop/1")
def test_autonomous_loop_invalid_output(self):
with self.assertRaises(RemoteException):
self.process('autonomous/loop_user/1',
'autonomous/invalid_loop_output/1')
\ No newline at end of file
self.process("autonomous/loop_user/1", "autonomous/invalid_loop_output/1")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment