Commit 92579c29 authored by André Anjos's avatar André Anjos 💬

Merge branch '84_loop_output' into 'master'

Loop block output

Closes #84

See merge request !83
parents 103ee393 2c015839
Pipeline #32142 passed with stages
in 22 minutes and 8 seconds
...@@ -182,6 +182,8 @@ class Algorithm(BackendAlgorithm): ...@@ -182,6 +182,8 @@ class Algorithm(BackendAlgorithm):
""" """
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None): def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache) super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
...@@ -303,116 +305,52 @@ class Algorithm(BackendAlgorithm): ...@@ -303,116 +305,52 @@ class Algorithm(BackendAlgorithm):
"declaration: %s" % (self.name, ", ".join(all_output_names)) "declaration: %s" % (self.name, ", ".join(all_output_names))
) )
def _validate_format(self, type_name, group_name, entry_name, dataformat):
if dataformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for %s `%s' on algorithm `%s': %s"
% (
type_name,
group_name,
entry_name,
self.name,
"\n".join(dataformat.errors),
)
)
def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
def _validate_required_dataformats(self, dataformat_cache): def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats """Makes sure we can load all requested formats
""" """
for group in self.groups: for group in self.groups:
for name, input in group["inputs"].items(): for name, input_ in group["inputs"].items():
if input["type"] in self.dataformats: self._validate_dataformats(group, "inputs", dataformat_cache)
continue
if dataformat_cache and input["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[input["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, input["type"])
if dataformat_cache is not None: # update it
dataformat_cache[input["type"]] = thisformat
self.dataformats[input["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for input `%s' on algorithm `%s': %s"
% (input["type"], name, self.name, "\n".join(thisformat.errors))
)
if "outputs" in group: if "outputs" in group:
self._validate_dataformats(group, "outputs", dataformat_cache)
for name, output in group["outputs"].items():
if output["type"] in self.dataformats:
continue
if dataformat_cache and output["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[output["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, output["type"])
if dataformat_cache is not None: # update it
dataformat_cache[output["type"]] = thisformat
self.dataformats[output["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for output `%s' on algorithm `%s': %s"
% (
output["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
if "loop" in group: if "loop" in group:
self._validate_dataformats(group, "loop", dataformat_cache)
for name, entry in group["loop"].items():
entry_format = entry["type"]
if entry_format in self.dataformats:
continue
if dataformat_cache and entry_format in dataformat_cache:
thisformat = dataformat_cache[entry_format]
else:
thisformat = dataformat.DataFormat(self.prefix, entry_format)
if dataformat_cache is not None:
dataformat_cache[entry_format] = thisformat
self.dataformats[entry_format] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for loop `%s' on algorithm `%s': %s"
% (
entry_format,
name,
self.name,
"\n".join(thisformat.errors),
)
)
if self.results: if self.results:
for name, result in self.results.items(): for name, result in self.results.items():
result_type = result["type"]
if result["type"].find("/") != -1: # results can only contain base types and plots therefore, only
# process plots
if result["type"] in self.dataformats: if result_type.find("/") != -1:
continue thisformat = self._update_dataformat_cache(
result_type, dataformat_cache
if dataformat_cache and result["type"] in dataformat_cache: # reuse )
thisformat = dataformat_cache[result["type"]] self._validate_format(result_type, "result", name, thisformat)
else:
thisformat = dataformat.DataFormat(self.prefix, result["type"])
if dataformat_cache is not None: # update it
dataformat_cache[result["type"]] = thisformat
self.dataformats[result["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for result `%s' on algorithm `%s': %s"
% (
result["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
def _convert_parameter_types(self): def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and """Converts types to numpy equivalents, checks defaults, ranges and
......
...@@ -230,6 +230,17 @@ class BaseExecutor(object): ...@@ -230,6 +230,17 @@ class BaseExecutor(object):
"The input '%s' doesn't exist in the loop algorithm" % name "The input '%s' doesn't exist in the loop algorithm" % name
) )
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent # Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map): if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append( self.errors.append(
......
...@@ -437,7 +437,7 @@ class DockerExecutor(RemoteExecutor): ...@@ -437,7 +437,7 @@ class DockerExecutor(RemoteExecutor):
if self.loop_algorithm is not None: if self.loop_algorithm is not None:
cmd.append( cmd.append(
"tcp://%s:%d" "--loop=tcp://%s:%d"
% (loop_algorithm_container_ip, loop_algorithm_container_port) % (loop_algorithm_container_ip, loop_algorithm_container_port)
) )
......
...@@ -191,6 +191,7 @@ class LocalExecutor(BaseExecutor): ...@@ -191,6 +191,7 @@ class LocalExecutor(BaseExecutor):
def __cleanup(self): def __cleanup(self):
if self.loop_executor: if self.loop_executor:
self.loop_executor.wait() self.loop_executor.wait()
self.loop_executor.close()
for handler in [self.message_handler, self.loop_message_handler]: for handler in [self.message_handler, self.loop_message_handler]:
if handler: if handler:
......
...@@ -384,7 +384,9 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -384,7 +384,9 @@ class SubprocessExecutor(RemoteExecutor):
) )
if self.loop_algorithm is not None: if self.loop_algorithm is not None:
cmd.append("tcp://" + self.ip_address + (":%d" % loop_algorithm_port)) cmd.append(
"--loop=tcp://" + self.ip_address + (":%d" % loop_algorithm_port)
)
if logger.getEffectiveLevel() <= logging.DEBUG: if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, "--debug") cmd.insert(1, "--debug")
......
This diff is collapsed.
...@@ -71,11 +71,13 @@ ...@@ -71,11 +71,13 @@
"properties": { "properties": {
"name": { "type": "string" }, "name": { "type": "string" },
"inputs": { "$ref": "common.json#/definitions/endpoints" }, "inputs": { "$ref": "common.json#/definitions/endpoints" },
"outputs": { "$ref": "common.json#/definitions/endpoints" },
"loop": { "$ref": "#/definitions/loop_io_group" } "loop": { "$ref": "#/definitions/loop_io_group" }
}, },
"required": [ "required": [
"inputs", "inputs",
"outputs",
"loop" "loop"
], ],
......
...@@ -55,10 +55,25 @@ ...@@ -55,10 +55,25 @@
"algorithm": { "$ref": "../common/1.json#/definitions/reference" }, "algorithm": { "$ref": "../common/1.json#/definitions/reference" },
"parameters": { "$ref": "common.json#/definitions/parameter_set" }, "parameters": { "$ref": "common.json#/definitions/parameter_set" },
"inputs": { "$ref": "common.json#/definitions/connection_map" }, "inputs": { "$ref": "common.json#/definitions/connection_map" },
"outputs": { "$ref": "common.json#/definitions/connection_map" },
"queue": { "$ref": "common.json#/definitions/queue" }, "queue": { "$ref": "common.json#/definitions/queue" },
"environment": { "$ref": "common.json#/definitions/environment" }, "environment": { "$ref": "common.json#/definitions/environment" },
"nb_slots": { "$ref": "common.json#/definitions/slots" } "nb_slots": { "$ref": "common.json#/definitions/slots" },
} "loop_algorithm": { "$ref": "../common/1.json#/definitions/reference" },
"loop_parameters": { "$ref": "common.json#/definitions/parameter_set" },
"loop_inputs": { "$ref": "common.json#/definitions/connection_map" },
"loop_outputs": { "$ref": "common.json#/definitions/connection_map" },
"loop_environment": { "$ref": "common.json#/definitions/environment" }
},
"required": [
"algorithm",
"inputs",
"outputs",
"loop_algorithm",
"loop_inputs",
"loop_outputs"
],
"additionalProperties": false
} }
} }
......
...@@ -74,7 +74,7 @@ ...@@ -74,7 +74,7 @@
"versioned_database": { "versioned_database": {
"type": "string", "type": "string",
"pattern": "^[a-zA-Z_][a-zA-Z0-9_]*/[0-9]+$" "pattern": "^[a-zA-Z_][a-zA-Z0-9_-]+[a-zA-Z0-9]+/[0-9]+$"
}, },
"dataset": { "dataset": {
......
...@@ -30,12 +30,6 @@ ...@@ -30,12 +30,6 @@
"$ref": "common.json#/definitions/connections" "$ref": "common.json#/definitions/connections"
}, },
"loop_connections": {
"type": "array",
"uniqueItems": true,
"items": { "$ref": "#/definitions/loop_connection" }
},
"representation": { "representation": {
"type": "object", "type": "object",
...@@ -45,12 +39,12 @@ ...@@ -45,12 +39,12 @@
"connections": { "connections": {
"$ref": "common.json#/definitions/representation/connection_list" "$ref": "common.json#/definitions/representation/connection_list"
}, },
"loop_connections": {
"$ref": "common.json#/definitions/representation/connection_list"
},
"blocks": { "blocks": {
"$ref": "common.json#/definitions/representation/block_list" "$ref": "common.json#/definitions/representation/block_list"
}, },
"loops": {
"$ref": "common.json#/definitions/representation/block_list"
},
"channel_colors": { "channel_colors": {
"$ref": "common.json#/definitions/representation/channel_colors" "$ref": "common.json#/definitions/representation/channel_colors"
} }
...@@ -60,6 +54,7 @@ ...@@ -60,6 +54,7 @@
"required": [ "required": [
"connections", "connections",
"blocks", "blocks",
"loops",
"channel_colors" "channel_colors"
], ],
...@@ -74,6 +69,7 @@ ...@@ -74,6 +69,7 @@
"required": [ "required": [
"datasets", "datasets",
"blocks", "blocks",
"loops",
"analyzers", "analyzers",
"connections", "connections",
"representation" "representation"
...@@ -95,22 +91,35 @@ ...@@ -95,22 +91,35 @@
"minItems": 1, "minItems": 1,
"uniqueItems": true, "uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" } "items": { "$ref": "common.json#/definitions/identifier" }
},
"outputs": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" }
},
"loop_inputs": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" }
},
"loop_outputs": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" }
} }
}, },
"required": ["name", "synchronized_channel", "inputs"], "required": [
"name",
"synchronized_channel",
"inputs",
"outputs",
"loop_inputs",
"loop_outputs"
],
"additionalProperties": false "additionalProperties": false
},
"loop_connection": {
"type": "object",
"properties": {
"from": { "$ref": "common.json#/definitions/endpoint" },
"to": { "$ref": "common.json#/definitions/endpoint" }
},
"required": ["from", "to"],
"additionalProperties": false,
"definitions": {
}
} }
} }
......
...@@ -11,9 +11,6 @@ ...@@ -11,9 +11,6 @@
} }
}, },
"outputs": { "outputs": {
"out": {
"type": "user/single_integer/1"
}
}, },
"loop": { "loop": {
"request": { "request": {
......
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "loop",
"groups": [
{
"inputs": {
"in_loop": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_loop": {
"type": "user/single_integer/1"
}
},
"loop": {
"request": {
"type": "user/1d_array_of_integers/1"
},
"answer": {
"type": "user/single_float/1"
}
}
}
],
"parameters": {
"threshold": {
"default": 9,
"type": "int8",
"description": "Value that will change loop result"
}
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
class Algorithm:
def __init__(self):
self.output = None
def validate(self, result):
print("We are validating")
value = result.value[0]
self.output = value
return (True, {"value": np.float32(value)})
def write(self, outputs, end_data_index):
print("block input loop writing", outputs, end_data_index)
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
print("block input loop writing done")
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "autonomous_loop_user",
"splittable": false,
"groups": [
{
"inputs": {
"in": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"type": "user/single_integer/1"
}
},
"loop": {
"request": {
"type": "user/1d_array_of_integers/1"
},
"answer": {
"type": "user/single_float/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
class Algorithm:
def process(self, data_loaders, outputs, loop_channel):
data_loader = data_loaders.loaderOf("in")
for i in range(data_loader.count()):
view = data_loader.view("in", i)
(data, _, end) = view[view.count() - 1]
value = data["in"].value
_, validated = loop_channel.validate({"value": np.full(10, 12)})
new_value = value + validated.value
print("BLOCK INPUT LOOP USER VALUE IS", value)
print("BLOCK INPUT LOOP USER WRITING NEW VALUE", new_value)
outputs["out"].write({"value": np.int32(new_value)}, end)
print("BLOCK INPUT LOOP USER WRITING DONE")
return True
...@@ -6,7 +6,12 @@ ...@@ -6,7 +6,12 @@
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
"in": { "in_loop": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_loop": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
......
...@@ -41,18 +41,19 @@ class Algorithm: ...@@ -41,18 +41,19 @@ class Algorithm:
def __init__(self): def __init__(self):
self.threshold = None self.threshold = None
self.max = 0 self.max = 0
self.output = None
def setup(self, parameters): def setup(self, parameters):
self.threshold = parameters["threshold"] self.threshold = parameters["threshold"]
return True return True
def prepare(self, data_loaders): def prepare(self, data_loaders):
data_loader = data_loaders.loaderOf("in") data_loader = data_loaders.loaderOf("in_loop")
for i in range(data_loader.count()): for i in range(data_loader.count()):
view = data_loader.view("in", i) view = data_loader.view("in_loop", i)
(data, _, _) = view[view.count() - 1] (data, _, _) = view[view.count() - 1]
value = data["in"].value value = data["in_loop"].value
self.max += value self.max += value
return True return True
...@@ -61,4 +62,8 @@ class Algorithm: ...@@ -61,4 +62,8 @@ class Algorithm:
value = result.value[0] value = result.value[0]
result = value > self.threshold and value < self.max result = value > self.threshold and value < self.max
delta = self.max - value delta = self.max - value
self.output = delta