Commit 3da7337b authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[backend] Add ExperimentModel

This class modelises an experiment and allows
to check whether block connections are valid.
parent c11e99e6
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2020 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.editor module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import typing
from beat.core.database import Database
from .asset import Asset
from .asset import AssetType
class Connection:
"""Class representing the connection between the output of a source block
and the input of the corresponding sink.
The information comes from the toolchain.
"""
def __init__(
self, source: str, output_name: str, sink: str, input_name: str
) -> None:
self.source = source
self.output_name = output_name
self.sink = sink
self.input_name = input_name
def is_used_by_block(self, block_name: str):
"""Returns whether the given block is concerned by this connection"""
return self.source == block_name or self.sink == block_name
def __repr__(self) -> str:
text = [
f"{self.__class__.__name__}(",
f"from: {self.source}.{self.output_name}",
f"to: {self.sink}.{self.input_name}",
")",
]
return "\n".join(text)
@property
def from_output(self) -> str:
return f"{self.source}.{self.output_name}"
@property
def to_input(self) -> str:
return f"{self.sink}.{self.input_name}"
class ExperimentBlock:
"""Base class experiment blocks representation"""
def __init__(self, name: str, config: dict) -> None:
self.name: str = name
self.parse(config)
def parse(self, config: dict) -> None:
raise NotImplementedError
def dataformat_for_endpoint(self, endpoint: str) -> str:
raise NotImplementedError
def __repr__(self) -> str:
return f"{self.__class__.__name__}({self.name})"
class AlgorithmData:
"""Class containing the information related to the endpoints of an algorithm
"""
def __init__(self) -> None:
self.input_type_map: typing.Mapping[str, str] = {}
self.output_type_map: typing.Mapping[str, str] = {}
# left: alg io right: block io
self.input_mapping: typing.Mapping[str, str] = {}
self.output_mapping: typing.Mapping[str, str] = {}
def parse(self, config: dict) -> None:
prefix = config.pop("prefix")
algorithm_name = config.get("algorithm")
if algorithm_name is not None:
algorithm = Asset(prefix, AssetType.ALGORITHM, algorithm_name)
for group in algorithm.declaration["groups"]:
for io_type, type_map in [
("inputs", self.input_type_map),
("outputs", self.output_type_map),
]:
for input_name, input_data in group.get(io_type, {}).items():
type_map[input_name] = input_data["type"]
self.input_mapping = config.get("inputs", {})
self.output_mapping = config.get("outputs", {})
def dataformat_for_endpoint(self, endpoint: str) -> str:
for alg_in, block_in in self.input_mapping.items():
if block_in == endpoint:
return self.input_type_map[alg_in]
for alg_out, block_out in self.output_mapping.items():
if block_out == endpoint:
return self.output_type_map[alg_out]
return ""
def __repr__(self) -> str:
representation = [f"{self.__class__.__name__}("]
if self.input_type_map:
representation += [
"inputs:" f" {self.input_mapping}" f" {self.input_type_map}"
]
if self.output_type_map:
representation += [
"outputs:" f" {self.output_mapping}" f" {self.output_type_map}"
]
representation.append(")")
return "\n".join(representation)
class AlgorithmBlock(ExperimentBlock):
"""Class containing the endpoints of an algorithm block"""
def __init__(self, name: str, config: dict) -> None:
self.algorithm_data: AlgorithmData = AlgorithmData()
super().__init__(name, config)
def parse(self, config: dict) -> None:
self.algorithm_data.parse(config)
def dataformat_for_endpoint(self, endpoint: str) -> str:
return self.algorithm_data.dataformat_for_endpoint(endpoint)
def __repr__(self) -> str:
text = [
f"{self.__class__.__name__}(",
f"name: {self.name}",
f" {self.algorithm_data}",
")",
]
return "\n".join(text)
class LoopBlock(ExperimentBlock):
"""Class containing the endpoints of a loop block"""
def __init__(self, name: str, config: dict) -> None:
self.processor_data: AlgorithmData = AlgorithmData()
self.evaluator_data: AlgorithmData = AlgorithmData()
super().__init__(name, config)
def parse(self, config: dict) -> None:
for algorithm_type, algorithm_data in (
("processor_", self.processor_data),
("evaluator_", self.evaluator_data),
):
keys = [key for key in config.keys() if key.startswith(algorithm_type)]
algorithm_config = {"prefix": config["prefix"]}
for key in keys:
algorithm_config[key[len(algorithm_type) :]] = config[key]
algorithm_data.parse(algorithm_config)
def dataformat_for_endpoint(self, endpoint: str) -> str:
data_format = self.processor_data.dataformat_for_endpoint(endpoint)
if not data_format:
data_format = self.evaluator_data.dataformat_for_endpoint(endpoint)
return data_format
def __repr__(self) -> str:
text = [
f"{self.__class__.__name__}(",
f"name: {self.name}",
f"processor: {self.processor_data}",
f"evaluator: {self.evaluator_data}",
")",
]
return "\n".join(text)
class AnalyzerBlock(AlgorithmBlock):
"""Class containing the endpoints of an analyzer block"""
class DatasetBlock(ExperimentBlock):
"""Class containing the endpoints of a dataset block"""
def __init__(self, name: str, config: dict) -> None:
self.output_type_map: typing.Mapping[str, str] = {}
super().__init__(name, config)
def parse(self, config: dict) -> None:
prefix = config.pop("prefix")
database_name = config.get("database")
if database_name:
database = Database(prefix, database_name)
if database.valid:
set_data = database.set(config["protocol"], config["set"])
self.output_type_map = set_data.get("outputs", {})
def dataformat_for_endpoint(self, endpoint: str) -> str:
return self.output_type_map[endpoint]
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}(\n"
f"name: {self.name}\n"
f"outputs: {self.output_type_map}\n"
")\n"
)
ErrorMap = typing.Mapping[str, typing.List[str]]
class ExperimentModel:
"""This class contains the connection related information for an experiment.
It's goal is to allow the verification of the compatibility between two
blocks connected endpoints.
"""
def __init__(self) -> None:
self.connections: typing.List[Connection] = []
self.blocks: typing.Mapping[str, ExperimentBlock] = {}
self.experiment: Asset = None
def _load_toolchain_info(self) -> None:
"""Load the needed information from the toolchain
Currently only the connections are of interest.
"""
toolchain = Asset(
self.experiment.prefix,
AssetType.TOOLCHAIN,
"/".join(self.experiment.name.split("/")[1:4]),
)
declaration = toolchain.declaration
# Load connections
connections = declaration["connections"]
for connection in connections:
source, output = connection["from"].split(".")
sink, input_ = connection["to"].split(".")
self.connections.append(Connection(source, output, sink, input_))
def _load_experiment_info(self) -> None:
"""Load all the endpoints related information."""
declaration = self.experiment.declaration
for block_type, klass in [
("blocks", AlgorithmBlock),
("loops", LoopBlock),
("analyzers", AnalyzerBlock),
("datasets", DatasetBlock),
]:
for block_name, config in declaration.get(block_type, {}).items():
config["prefix"] = self.experiment.prefix
self.blocks[block_name] = klass(block_name, config)
def load_experiment(self, experiment: Asset) -> None:
"""Load the required experiment data"""
self.clear()
self.experiment = experiment
self._load_toolchain_info()
self._load_experiment_info()
def clear(self) -> None:
"""Clear the model content"""
self.connections = []
self.blocks = {}
def update_block(self, block_name: str, config: dict) -> None:
"""Update one block content"""
config["prefix"] = self.experiment.prefix
self.blocks[block_name].parse(config)
def check_all_blocks(self) -> ErrorMap:
"""Check that all blocks connections are compatible"""
declaration = self.experiment.declaration
error_map: ErrorMap = {}
for block_type in ["blocks", "loops", "analyzers", "datasets"]:
for block_name in declaration.get(block_type, {}).keys():
error_list = self.check_block(block_name)
if error_list:
error_map[block_name] = error_list
return error_map
def check_block(self, block_name: str) -> typing.List[str]:
"""Check that one block connections are compatible"""
connections = [
connection
for connection in self.connections
if connection.is_used_by_block(block_name)
]
connection_error_list = []
for connection in connections:
source_block = self.blocks[connection.source]
from_df = source_block.dataformat_for_endpoint(connection.output_name)
sink_block = self.blocks[connection.sink]
to_df = sink_block.dataformat_for_endpoint(connection.input_name)
if from_df and to_df:
if from_df != to_df:
connection_error_list.append(
f"{connection.from_output} {from_df} is not compatible with {connection.to_input} {to_df}"
)
# else:
# Either side of the connection being unassigned means that the
#  connection is "correct".
return connection_error_list
......@@ -56,6 +56,7 @@ def sync_prefix():
prefixes = [
pkg_resources.resource_filename("beat.backend.python.test", "prefix"),
pkg_resources.resource_filename("beat.core.test", "prefix"),
pkg_resources.resource_filename("beat.editor.test", "prefix"),
]
for path in prefixes:
......
{
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "sequential",
"splittable": false,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_string/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_string/1"
}
}
}
],
"parameters": {
"offset": {
"default": 0,
"type": "int8",
"description": "Offset to apply"
}
}
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
class Algorithm:
def setup(self, parameters):
self.offset = parameters["offset"]
return True
def process(self, inputs, data_loaders, outputs):
outputs["out_data"].write({"value": inputs["in_data"].data.value + self.offset})
return True
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.editor module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from ..backend.asset import Asset
from ..backend.asset import AssetType
from ..backend.experimentmodel import ExperimentModel
class TestExperimentModeling:
"""Test that the experiment modeling works correctly"""
def test_model_load(self, test_prefix):
experiment = Asset(
test_prefix, AssetType.EXPERIMENT, "user/user/two_loops/1/two_loops"
)
experiment_model = ExperimentModel()
experiment_model.load_experiment(experiment)
error_list = experiment_model.check_block("offsetter_for_loop_evaluator")
assert len(error_list) == 0
error_map = experiment_model.check_all_blocks()
assert len(error_map) == 0
def test_error(self, test_prefix):
experiment = Asset(
test_prefix, AssetType.EXPERIMENT, "user/user/two_loops/1/two_loops"
)
experiment_model = ExperimentModel()
experiment_model.load_experiment(experiment)
configuration = {
"algorithm": "user/string_offsetter/1",
"inputs": {"in_data": "in"},
"outputs": {"out_data": "out"},
}
BLOCK_TO_CHANGE = "offsetter_for_loop_evaluator"
experiment_model.update_block(BLOCK_TO_CHANGE, configuration)
error_list = experiment_model.check_block(BLOCK_TO_CHANGE)
assert len(error_list) > 0
error_map = experiment_model.check_all_blocks()
assert len(error_map) > 0
assert BLOCK_TO_CHANGE in error_map
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment