...
 
Commits (86)
...@@ -31,9 +31,9 @@ ...@@ -31,9 +31,9 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. .. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
=========================================================== ==============================
Authors of the Biometrics Evaluation and Testing Platform Authors of the BEAT Platform
=========================================================== ==============================
Andre Anjos <andre.anjos@idiap.ch> Andre Anjos <andre.anjos@idiap.ch>
Flavio Tarsetti <flavio.tarsetti@idiap.ch> Flavio Tarsetti <flavio.tarsetti@idiap.ch>
......
...@@ -182,6 +182,8 @@ class Algorithm(BackendAlgorithm): ...@@ -182,6 +182,8 @@ class Algorithm(BackendAlgorithm):
""" """
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None): def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache) super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
...@@ -303,89 +305,52 @@ class Algorithm(BackendAlgorithm): ...@@ -303,89 +305,52 @@ class Algorithm(BackendAlgorithm):
"declaration: %s" % (self.name, ", ".join(all_output_names)) "declaration: %s" % (self.name, ", ".join(all_output_names))
) )
def _validate_format(self, type_name, group_name, entry_name, dataformat):
if dataformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for %s `%s' on algorithm `%s': %s"
% (
type_name,
group_name,
entry_name,
self.name,
"\n".join(dataformat.errors),
)
)
def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
def _validate_required_dataformats(self, dataformat_cache): def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats """Makes sure we can load all requested formats
""" """
for group in self.groups: for group in self.groups:
for name, input in group["inputs"].items(): for name, input_ in group["inputs"].items():
if input["type"] in self.dataformats: self._validate_dataformats(group, "inputs", dataformat_cache)
continue
if dataformat_cache and input["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[input["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, input["type"])
if dataformat_cache is not None: # update it
dataformat_cache[input["type"]] = thisformat
self.dataformats[input["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for input `%s' on algorithm `%s': %s"
% (input["type"], name, self.name, "\n".join(thisformat.errors))
)
if "outputs" not in group:
continue
for name, output in group["outputs"].items():
if output["type"] in self.dataformats:
continue
if dataformat_cache and output["type"] in dataformat_cache: # reuse if "outputs" in group:
thisformat = dataformat_cache[output["type"]] self._validate_dataformats(group, "outputs", dataformat_cache)
else: # load it
thisformat = dataformat.DataFormat(self.prefix, output["type"])
if dataformat_cache is not None: # update it
dataformat_cache[output["type"]] = thisformat
self.dataformats[output["type"]] = thisformat if "loop" in group:
self._validate_dataformats(group, "loop", dataformat_cache)
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for output `%s' on algorithm `%s': %s"
% (
output["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
if self.results: if self.results:
for name, result in self.results.items(): for name, result in self.results.items():
result_type = result["type"]
if result["type"].find("/") != -1: # results can only contain base types and plots therefore, only
# process plots
if result["type"] in self.dataformats: if result_type.find("/") != -1:
continue thisformat = self._update_dataformat_cache(
result_type, dataformat_cache
if dataformat_cache and result["type"] in dataformat_cache: # reuse )
thisformat = dataformat_cache[result["type"]] self._validate_format(result_type, "result", name, thisformat)
else:
thisformat = dataformat.DataFormat(self.prefix, result["type"])
if dataformat_cache is not None: # update it
dataformat_cache[result["type"]] = thisformat
self.dataformats[result["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for result `%s' on algorithm `%s': %s"
% (
result["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
def _convert_parameter_types(self): def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and """Converts types to numpy equivalents, checks defaults, ranges and
......
...@@ -52,6 +52,8 @@ import six ...@@ -52,6 +52,8 @@ import six
from . import schema from . import schema
from .dataformat import DataFormat from .dataformat import DataFormat
from .protocoltemplate import ProtocolTemplate
from . import prototypes from . import prototypes
from beat.backend.python.database import Storage from beat.backend.python.database import Storage
...@@ -61,17 +63,27 @@ from beat.backend.python.protocoltemplate import Storage as PTStorage ...@@ -61,17 +63,27 @@ from beat.backend.python.protocoltemplate import Storage as PTStorage
def get_first_procotol_template(prefix): def get_first_procotol_template(prefix):
pt_root_folder = os.path.join(prefix, PTStorage.asset_folder) pt_root_folder = os.path.join(prefix, PTStorage.asset_folder)
pts_available = os.listdir(pt_root_folder) pts_available = sorted(os.listdir(pt_root_folder))
if not pts_available: if not pts_available:
raise RuntimeError("Invalid prefix content, no protocol template available") raise RuntimeError("Invalid prefix content, no protocol template available")
procotol_template_folder = pts_available[0] selected_protocol_template = None
protocol_template_versions = sorted( for procotol_template_folder in pts_available:
os.listdir(os.path.join(pt_root_folder, procotol_template_folder)) protocol_template_versions = sorted(
) os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
version = protocol_template_versions[-1].split(".")[0] )
return "{}/{}".format(procotol_template_folder, version) version = protocol_template_versions[-1].split(".")[0]
protocol_template_name = "{}/{}".format(procotol_template_folder, version)
protocol_template = ProtocolTemplate(prefix, protocol_template_name)
if protocol_template.valid:
selected_protocol_template = protocol_template_name
break
if selected_protocol_template is None:
raise RuntimeError("No valid protocol template found")
return selected_protocol_template
class Database(BackendDatabase): class Database(BackendDatabase):
......
...@@ -43,6 +43,7 @@ Docker helper classes ...@@ -43,6 +43,7 @@ Docker helper classes
""" """
import ast
import os import os
import simplejson as json import simplejson as json
import socket import socket
...@@ -52,6 +53,8 @@ import docker ...@@ -52,6 +53,8 @@ import docker
import subprocess as sp # nosec import subprocess as sp # nosec
import logging import logging
from packaging import version
from beat.core import stats from beat.core import stats
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -85,7 +88,12 @@ class Host(object): ...@@ -85,7 +88,12 @@ class Host(object):
( (
self.processing_environments, self.processing_environments,
self.db_environments, self.db_environments,
) = self._discover_environments() ) = self._discover_environments_using_labels()
if not self.db_environments and not self.processing_environments:
self.processing_environments, self.db_environments = (
self._discover_environments_using_describe()
)
# (If necessary) Save the known infos about the images # (If necessary) Save the known infos about the images
if self.images_cache_filename is not None: if self.images_cache_filename is not None:
...@@ -157,7 +165,7 @@ class Host(object): ...@@ -157,7 +165,7 @@ class Host(object):
s.connect(("8.8.8.8", 1)) # connecting to a UDP address doesn't send packets s.connect(("8.8.8.8", 1)) # connecting to a UDP address doesn't send packets
return s.getsockname()[0] return s.getsockname()[0]
def _discover_environments(self): def _discover_environments_using_describe(self):
"""Returns a dictionary containing information about docker environments """Returns a dictionary containing information about docker environments
Raises: Raises:
...@@ -299,7 +307,9 @@ class Host(object): ...@@ -299,7 +307,9 @@ class Host(object):
for image in images: for image in images:
# Call the "describe" application on each existing image # Call the "describe" application on each existing image
description = _describe(image) description = _describe(image)
if not description: if not description:
logger.debug("Description not found for", image)
continue continue
key = description["name"] + " (" + description["version"] + ")" key = description["name"] + " (" + description["version"] + ")"
...@@ -331,6 +341,85 @@ class Host(object): ...@@ -331,6 +341,85 @@ class Host(object):
return (environments, db_environments) return (environments, db_environments)
def _discover_environments_using_labels(self):
"""Search BEAT runtime environments using docker labels"""
def _must_replace(key, image, environments):
environment = environments[key]
if environment["image"] not in image.tags:
logger.warn(
"Different images providing the same environment: {} VS {}".format(
environment["image"], image.tags
)
)
if self.raise_on_errors:
raise RuntimeError(
"Environments at '%s' and '%s' have the "
"same name ('%s'). Distinct environments must be "
"uniquely named. Fix this and re-start."
% (image.tags[0], environments[key]["image"], key)
)
else:
logger.debug("Keeping more recent")
current_version = "{}{}".format(
environment["version"], environment["revision"]
)
new_version = "{}{}".format(
image.labels["beat.env.version"], image.labels["beat.env.revision"]
)
current_version = version.parse(current_version)
new_version = version.parse(new_version)
return new_version > current_version
def _parse_image_info(image):
labels = image.labels
data = {
"image": image.tags[0],
"name": labels["beat.env.name"],
"version": labels["beat.env.version"],
"revision": labels["beat.env.revision"],
}
database_list = labels.get("beat.env.databases")
if database_list:
data["databases"] = ast.literal_eval(database_list)
capabilities = labels.get("beat.env.capabilities")
if capabilities:
data["capabilities"] = ast.literal_eval(capabilities)
return data
def _process_image_list(image_list):
environments = {}
for image in image_list:
if not len(image.tags):
logger.warn("Untagged image, skipping")
continue
image_info = _parse_image_info(image)
key = "{} {}".format(image_info["name"], image_info["version"])
if key in environments:
if _must_replace(key, image, environments):
environments[key] = image_info
else:
environments[key] = image_info
Host.images_cache[image_info["image"]] = environments[key]
return environments
client = docker.from_env()
databases = client.images.list(filters={"label": ["beat.env.type=database"]})
db_environments = _process_image_list(databases)
executors = client.images.list(filters={"label": ["beat.env.type=execution"]})
environments = _process_image_list(executors)
return environments, db_environments
def create_container(self, image, command): def create_container(self, image, command):
if image in self: # Replace by a real image name if image in self: # Replace by a real image name
......
...@@ -230,6 +230,17 @@ class BaseExecutor(object): ...@@ -230,6 +230,17 @@ class BaseExecutor(object):
"The input '%s' doesn't exist in the loop algorithm" % name "The input '%s' doesn't exist in the loop algorithm" % name
) )
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent # Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map): if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append( self.errors.append(
......
...@@ -437,7 +437,7 @@ class DockerExecutor(RemoteExecutor): ...@@ -437,7 +437,7 @@ class DockerExecutor(RemoteExecutor):
if self.loop_algorithm is not None: if self.loop_algorithm is not None:
cmd.append( cmd.append(
"tcp://%s:%d" "--loop=tcp://%s:%d"
% (loop_algorithm_container_ip, loop_algorithm_container_port) % (loop_algorithm_container_ip, loop_algorithm_container_port)
) )
......
...@@ -188,9 +188,14 @@ class LocalExecutor(BaseExecutor): ...@@ -188,9 +188,14 @@ class LocalExecutor(BaseExecutor):
self.zmq_context = None self.zmq_context = None
def __cleanup(self): def __cleanup(self, early=False):
if self.loop_executor: if self.loop_executor:
if early:
self.loop_socket.send_string("don")
self.loop_socket.recv() # ack
self.loop_executor.wait() self.loop_executor.wait()
self.loop_executor.close()
for handler in [self.message_handler, self.loop_message_handler]: for handler in [self.message_handler, self.loop_message_handler]:
if handler: if handler:
...@@ -310,15 +315,35 @@ class LocalExecutor(BaseExecutor): ...@@ -310,15 +315,35 @@ class LocalExecutor(BaseExecutor):
cache_root=self.cache, cache_root=self.cache,
) )
retval = self.loop_executor.setup() try:
retval = self.loop_executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
retval = False
else:
message = None
if not retval: if not retval:
self.__cleanup() self.__cleanup()
raise RuntimeError("Loop algorithm setup failed") error = "Loop algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.loop_executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = False
else:
message = None
prepared = self.loop_executor.prepare()
if not prepared: if not prepared:
self.__cleanup() self.__cleanup()
raise RuntimeError("Loop algorithm prepare failed") error = "Loop algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
self.loop_executor.process() self.loop_executor.process()
...@@ -330,28 +355,50 @@ class LocalExecutor(BaseExecutor): ...@@ -330,28 +355,50 @@ class LocalExecutor(BaseExecutor):
loop_socket=self.loop_socket, loop_socket=self.loop_socket,
) )
retval = self.executor.setup() try:
if not retval: status = self.executor.setup()
self.__cleanup() except Exception as e:
raise RuntimeError("Algorithm setup failed") message = _process_exception(e, self.prefix, "algorithms")
status = 0
else:
message = None
if not status:
self.__cleanup(early=True)
error = "Algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = 0
else:
message = None
prepared = self.executor.prepare()
if not prepared: if not prepared:
self.__cleanup() self.__cleanup(early=True)
raise RuntimeError("Algorithm prepare failed") error = "Algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
_start = time.time() _start = time.time()
try: try:
processed = self.executor.process() processed = self.executor.process()
except Exception as e: except Exception as e:
message = _process_exception(e, self.prefix, "databases") message = _process_exception(e, self.prefix, "algorithms")
self.__cleanup() self.__cleanup()
return _create_result(1, message) return _create_result(1, message)
if not processed: if not processed:
self.__cleanup() self.__cleanup()
raise RuntimeError("Algorithm process failed") raise RuntimeError(
"Algorithm {} process failed".format(self.algorithm.name)
)
proc_time = time.time() - _start proc_time = time.time() - _start
......
...@@ -123,6 +123,12 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -123,6 +123,12 @@ class SubprocessExecutor(RemoteExecutor):
guarantee that the cache is refreshed as appropriate in case the guarantee that the cache is refreshed as appropriate in case the
underlying libraries change. underlying libraries change.
custom_root_folders (dict): A dictionary mapping databases name and
their location on disk
ip_address (str): IP address of the machine to connect to for the database
execution and message handlers.
python_path (str): Path to the python executable of the environment to use
for experiment execution.
Attributes: Attributes:
...@@ -172,8 +178,8 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -172,8 +178,8 @@ class SubprocessExecutor(RemoteExecutor):
library_cache=None, library_cache=None,
custom_root_folders=None, custom_root_folders=None,
ip_address="127.0.0.1", ip_address="127.0.0.1",
python_path=None,
): ):
super(SubprocessExecutor, self).__init__( super(SubprocessExecutor, self).__init__(
prefix, prefix,
data, data,
...@@ -186,14 +192,30 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -186,14 +192,30 @@ class SubprocessExecutor(RemoteExecutor):
custom_root_folders=custom_root_folders, custom_root_folders=custom_root_folders,
) )
# We need three apps to run this function: databases_provider and execute if python_path is None:
self.EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]), "execute")) base_path = os.path.dirname(sys.argv[0])
self.LOOP_EXECUTE_BIN = _which(
os.path.join(os.path.dirname(sys.argv[0]), "loop_execute") # We need three apps to run this function: databases_provider and execute
) self.EXECUTE_BIN = _which(os.path.join(base_path, "execute"))
self.DBPROVIDER_BIN = _which( self.LOOP_EXECUTE_BIN = _which(os.path.join(base_path, "loop_execute"))
os.path.join(os.path.dirname(sys.argv[0]), "databases_provider") self.DBPROVIDER_BIN = _which(os.path.join(base_path, "databases_provider"))
) else:
base_path = os.path.dirname(python_path)
self.EXECUTE_BIN = os.path.join(base_path, "execute")
self.LOOP_EXECUTE_BIN = os.path.join(base_path, "loop_execute")
self.DBPROVIDER_BIN = os.path.join(base_path, "databases_provider")
if any(
[
not os.path.exists(executable)
for executable in [
self.EXECUTE_BIN,
self.LOOP_EXECUTE_BIN,
self.DBPROVIDER_BIN,
]
]
):
raise RuntimeError("Invalid environment")
def __create_db_process(self, configuration_name=None): def __create_db_process(self, configuration_name=None):
databases_process = None databases_process = None
...@@ -384,7 +406,9 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -384,7 +406,9 @@ class SubprocessExecutor(RemoteExecutor):
) )
if self.loop_algorithm is not None: if self.loop_algorithm is not None:
cmd.append("tcp://" + self.ip_address + (":%d" % loop_algorithm_port)) cmd.append(
"--loop=tcp://" + self.ip_address + (":%d" % loop_algorithm_port)
)
if logger.getEffectiveLevel() <= logging.DEBUG: if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, "--debug") cmd.insert(1, "--debug")
......
This diff is collapsed.
...@@ -55,7 +55,6 @@ ...@@ -55,7 +55,6 @@
"language": { "$ref": "../common/1.json#/definitions/language" }, "language": { "$ref": "../common/1.json#/definitions/language" },
"description": { "$ref": "../common/1.json#/definitions/description" }, "description": { "$ref": "../common/1.json#/definitions/description" },
"groups": { "$ref": "common.json#/definitions/analyzer_groups" }, "groups": { "$ref": "common.json#/definitions/analyzer_groups" },
"parameters": { "$ref": "common.json#/definitions/parameters" },
"results": { "$ref": "common.json#/definitions/results" }, "results": { "$ref": "common.json#/definitions/results" },
"uses": { "$ref": "../common/1.json#/definitions/uses" }, "uses": { "$ref": "../common/1.json#/definitions/uses" },
"schema_version": { "$ref": "common.json#/definitions/schema_version" }, "schema_version": { "$ref": "common.json#/definitions/schema_version" },
......
...@@ -6,8 +6,8 @@ ...@@ -6,8 +6,8 @@
"oneOf": [ "oneOf": [
{ "$ref": "2.json#/definitions/block" }, { "$ref": "2.json#/definitions/block" },
{ "$ref": "2.json#/definitions/analyzer" }, { "$ref": "2.json#/definitions/analyzer" },
{ "$ref": "#/definitions/loop_user" }, { "$ref": "#/definitions/loop_evaluator" },
{ "$ref": "#/definitions/loop" } { "$ref": "#/definitions/loop_processor" }
], ],
"definitions": { "definitions": {
...@@ -71,11 +71,13 @@ ...@@ -71,11 +71,13 @@
"properties": { "properties": {
"name": { "type": "string" }, "name": { "type": "string" },
"inputs": { "$ref": "common.json#/definitions/endpoints" }, "inputs": { "$ref": "common.json#/definitions/endpoints" },
"outputs": { "$ref": "common.json#/definitions/endpoints" },
"loop": { "$ref": "#/definitions/loop_io_group" } "loop": { "$ref": "#/definitions/loop_io_group" }
}, },
"required": [ "required": [
"inputs", "inputs",
"outputs",
"loop" "loop"
], ],
...@@ -99,7 +101,7 @@ ...@@ -99,7 +101,7 @@
}, },
"loop_user": { "loop_processor": {
"type": "object", "type": "object",
...@@ -113,9 +115,8 @@ ...@@ -113,9 +115,8 @@
"schema_version": { "$ref": "common.json#/definitions/schema_version" }, "schema_version": { "$ref": "common.json#/definitions/schema_version" },
"api_version": { "$ref": "common.json#/definitions/api_version" }, "api_version": { "$ref": "common.json#/definitions/api_version" },
"type": { "type": {
"$comment": "Change enum to const when tools allow v6 json schema",
"type": "string", "type": "string",
"enum": ["loop_user"] "enum": ["sequential_loop_processor", "autonomous_loop_processor"]
} }
}, },
...@@ -129,7 +130,7 @@ ...@@ -129,7 +130,7 @@
}, },
"loop": { "loop_evaluator": {
"type": "object", "type": "object",
...@@ -142,9 +143,8 @@ ...@@ -142,9 +143,8 @@
"schema_version": { "$ref": "common.json#/definitions/schema_version" }, "schema_version": { "$ref": "common.json#/definitions/schema_version" },
"api_version": { "$ref": "common.json#/definitions/api_version" }, "api_version": { "$ref": "common.json#/definitions/api_version" },
"type": { "type": {
"$comment": "Change enum to const when tools allow v6 json schema",
"type": "string", "type": "string",
"enum": ["loop"] "enum": ["sequential_loop_evaluator", "autonomous_loop_evaluator"]
} }
}, },
......
...@@ -52,13 +52,28 @@ ...@@ -52,13 +52,28 @@
"loop": { "loop": {
"type": "object", "type": "object",
"properties": { "properties": {
"algorithm": { "$ref": "../common/1.json#/definitions/reference" }, "processor_algorithm": { "$ref": "../common/1.json#/definitions/reference" },
"parameters": { "$ref": "common.json#/definitions/parameter_set" }, "processor_parameters": { "$ref": "common.json#/definitions/parameter_set" },
"inputs": { "$ref": "common.json#/definitions/connection_map" }, "processor_inputs": { "$ref": "common.json#/definitions/connection_map" },
"processor_outputs": { "$ref": "common.json#/definitions/connection_map" },
"queue": { "$ref": "common.json#/definitions/queue" }, "queue": { "$ref": "common.json#/definitions/queue" },
"environment": { "$ref": "common.json#/definitions/environment" }, "processor_environment": { "$ref": "common.json#/definitions/environment" },
"nb_slots": { "$ref": "common.json#/definitions/slots" } "nb_slots": { "$ref": "common.json#/definitions/slots" },
} "evaluator_algorithm": { "$ref": "../common/1.json#/definitions/reference" },
"evaluator_parameters": { "$ref": "common.json#/definitions/parameter_set" },
"evaluator_inputs": { "$ref": "common.json#/definitions/connection_map" },
"evaluator_outputs": { "$ref": "common.json#/definitions/connection_map" },
"evaluator_environment": { "$ref": "common.json#/definitions/environment" }
},
"required": [
"processor_algorithm",
"processor_inputs",
"processor_outputs",
"evaluator_algorithm",
"evaluator_inputs",
"evaluator_outputs"
],
"additionalProperties": false
} }
} }
......
...@@ -74,7 +74,7 @@ ...@@ -74,7 +74,7 @@
"versioned_database": { "versioned_database": {
"type": "string", "type": "string",
"pattern": "^[a-zA-Z_][a-zA-Z0-9_]*/[0-9]+$" "pattern": "^[a-zA-Z_][a-zA-Z0-9_-]+[a-zA-Z0-9]+/[0-9]+$"
}, },
"dataset": { "dataset": {
......
...@@ -30,12 +30,6 @@ ...@@ -30,12 +30,6 @@
"$ref": "common.json#/definitions/connections" "$ref": "common.json#/definitions/connections"
}, },
"loop_connections": {
"type": "array",
"uniqueItems": true,
"items": { "$ref": "#/definitions/loop_connection" }
},
"representation": { "representation": {
"type": "object", "type": "object",
...@@ -45,9 +39,6 @@ ...@@ -45,9 +39,6 @@
"connections": { "connections": {
"$ref": "common.json#/definitions/representation/connection_list" "$ref": "common.json#/definitions/representation/connection_list"
}, },
"loop_connections": {
"$ref": "common.json#/definitions/representation/connection_list"
},
"blocks": { "blocks": {
"$ref": "common.json#/definitions/representation/block_list" "$ref": "common.json#/definitions/representation/block_list"
}, },
...@@ -74,6 +65,7 @@ ...@@ -74,6 +65,7 @@
"required": [ "required": [
"datasets", "datasets",
"blocks", "blocks",
"loops",
"analyzers", "analyzers",
"connections", "connections",
"representation" "representation"
...@@ -90,27 +82,40 @@ ...@@ -90,27 +82,40 @@
"synchronized_channel": { "synchronized_channel": {
"$ref": "../database/1.json#/definitions/identifier" "$ref": "../database/1.json#/definitions/identifier"
}, },
"inputs": { "processor_inputs": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" }
},
"processor_outputs": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" }
},
"evaluator_inputs": {
"type": "array",
"minItems": 1,
"uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" }
},
"evaluator_outputs": {
"type": "array", "type": "array",
"minItems": 1, "minItems": 1,
"uniqueItems": true, "uniqueItems": true,
"items": { "$ref": "common.json#/definitions/identifier" } "items": { "$ref": "common.json#/definitions/identifier" }
} }
}, },
"required": ["name", "synchronized_channel", "inputs"], "required": [
"name",
"synchronized_channel",
"processor_inputs",
"processor_outputs",
"evaluator_inputs",
"evaluator_outputs"
],
"additionalProperties": false "additionalProperties": false
},
"loop_connection": {
"type": "object",
"properties": {
"from": { "$ref": "common.json#/definitions/endpoint" },
"to": { "$ref": "common.json#/definitions/endpoint" }
},
"required": ["from", "to"],
"additionalProperties": false,
"definitions": {
}
} }
} }
......
{
"schema_version": 2,
"language": "python",
"api_version": 2,
"type": "autonomous",
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
}
}
],
"results": {
"out_data": {
"type": "int32",
"display": false
}
},
"parameters": {
"threshold": {
"default": 9,
"type": "int8",
"description": "Should trigger parsing error"
}
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
class Algorithm:
def process(self, inputs, data_loaders, output):
output.write({"out_data": inputs["in_data"].data.value})
return True
...@@ -2,8 +2,8 @@ ...@@ -2,8 +2,8 @@
"schema_version": 3, "schema_version": 3,
"language": "python", "language": "python",
"api_version": 2, "api_version": 2,
"type": "loop", "type": "loop_evaluator",
"type": "loop_user", "type": "loop_processor",
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
......
...@@ -11,9 +11,6 @@ ...@@ -11,9 +11,6 @@
} }
}, },
"outputs": { "outputs": {
"out": {
"type": "user/single_integer/1"
}
}, },
"loop": { "loop": {
"request": { "request": {
......
...@@ -2,20 +2,25 @@ ...@@ -2,20 +2,25 @@
"schema_version": 3, "schema_version": 3,
"language": "python", "language": "python",
"api_version": 2, "api_version": 2,
"type": "loop", "type": "autonomous_loop_evaluator",
"groups": [ "groups": [
{ {
"inputs": { "inputs": {
"in": { "in_loop": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_loop": {
"type": "user/single_integer/1" "type": "user/single_integer/1"
} }
}, },
"loop": { "loop": {
"request": { "request": {
"type": "user/single_integer/1" "type": "user/1d_array_of_integers/1"
}, },
"answer": { "answer": {
"type": "user/single_integer/1" "type": "user/single_float/1"
} }
} }
} }
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
class Algorithm:
def __init__(self):
self.output = None
def validate(self, result):
value = result.value[0]
self.output = value
return (True, {"value": np.float32(value)})
def write(self, outputs, processor_output_name, end_data_index):
outputs["out_loop"].write({"value": np.int32(self.output)}, end_data_index)
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
"schema_version": 3, "schema_version": 3,
"language": "python", "language": "python",
"api_version": 2, "api_version": 2,
"type": "loop_user", "type": "autonomous_loop_processor",
"splittable": false, "splittable": false,
"groups": [ "groups": [
{ {
...@@ -18,10 +18,10 @@ ...@@ -18,10 +18,10 @@
}, },
"loop": { "loop": {
"request": { "request": {
"type": "user/single_integer/1" "type": "user/1d_array_of_integers/1"
}, },
"answer": { "answer": {
"type": "user/single_integer/1" "type": "user/single_float/1"
} }
} }
} }
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import numpy as np
class Algorithm:
def process(self, data_loaders, outputs, loop_channel):