Commit 3d1857d9 authored by André Anjos's avatar André Anjos 💬

Merge branch '105_refactor_database_sharing' into 'master'

Refactor database sharing

Closes #105

See merge request !132
parents 6294cc57 3820b580
Pipeline #45643 passed with stages
in 18 minutes and 38 seconds
......@@ -247,3 +247,12 @@ class Database(BackendDatabase):
if self.schema_version != 1:
for view in protocol["views"].keys():
self._validate_view(view)
@property
def is_database_rawdata_access_enabled(self):
"""Returns whether raw data sharing was enabled
This property is only useful for the Docker executor.
"""
return self.data.get("direct_rawdata_access", False)
......@@ -384,14 +384,15 @@ class DockerExecutor(RemoteExecutor):
file_path = result["path"]
__add_writable_volume(file_path)
def __share_databases(self, algorithm_container, db_infos):
"""Add volumes to the algorithm container for the datasets"""
def __setup_databases_raw_access(self, algorithm_container):
"""Add volumes to the algorithm container if the database allows that"""
for database_name, database in self.databases.items():
db_data = database.data
algorithm_container.add_volume(
db_data["root_folder"], os.path.join("/databases", database_name)
)
if db_data.get("direct_rawdata_access", False):
algorithm_container.add_volume(
db_data["root_folder"], os.path.join("/databases", database_name)
)
def process(
self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
......@@ -481,8 +482,6 @@ class DockerExecutor(RemoteExecutor):
network_name = self.data.pop("network_name", "bridge")
databases_infos = {}
share_databases = self.data.pop("share_databases", False)
if len(self.databases) > 0:
databases_infos["db"] = self.__create_db_container(
datasets_uid, network_name
......@@ -537,8 +536,7 @@ class DockerExecutor(RemoteExecutor):
loop_algorithm_container, volume_cache_mount_point, self.data["loop"]
)
if share_databases:
self.__share_databases(loop_algorithm_container, databases_infos)
self.__setup_databases_raw_access(loop_algorithm_container)
# Start the container
self.host.start(
......@@ -584,8 +582,7 @@ class DockerExecutor(RemoteExecutor):
algorithm_container, volume_cache_mount_point, self.data
)
if share_databases:
self.__share_databases(algorithm_container, databases_infos)
self.__setup_databases_raw_access(algorithm_container)
# Start the container
self.host.start(
......
......@@ -19,23 +19,13 @@
"items": { "$ref": "#/definitions/protocol" }
},
"environment": {
"type": "object",
"properties": {
"name": { "type": "string" },
"version": { "type": "string" }
},
"required": [
"name",
"version"
],
"additionalProperties": false
},
"environment": {"$ref": "common.json#/definitions/environment"},
"description": { "$ref": "../common/1.json#/definitions/description" },
"schema_version": { "$ref": "../common/1.json#/definitions/version" }
"schema_version": { "$ref": "../common/1.json#/definitions/version" },
"direct_rawdata_access": {"$ref": "common.json#/definitions/direct_rawdata_access"}
},
"required": [
......
......@@ -21,20 +21,11 @@
"description": { "$ref": "../common/1.json#/definitions/description" },
"environment": {
"type": "object",
"properties": {
"name": { "type": "string" },
"version": { "type": "string" }
},
"required": [
"name",
"version"
],
"additionalProperties": false
},
"environment": {"$ref": "common.json#/definitions/environment"},
"schema_version": { "const": 2 },
"schema_version": { "const": 2 }
"direct_rawdata_access": {"$ref": "common.json#/definitions/direct_rawdata_access"}
},
......
{
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "Algorithm common components descriptor",
"description": "This schema defines the components used in one or more versions of the Database",
"definitions": {
"direct_rawdata_access": {
"type": "boolean",
"default": false
},
"environment": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"version": {
"type": "string"
}
},
"required": [
"name",
"version"
],
"additionalProperties": false
}
}
}
......@@ -109,6 +109,12 @@ def initialize_db_root_folder(database_root_folder, databases_path):
json.dump(declaration, db_file, indent=4)
def setup_root_db_folder():
initialize_db_root_folder(
os.path.join(prefix_folder, "beat_core_test"), os.path.join(prefix, "databases")
)
def setup_package():
sync_prefixes(
[
......@@ -118,9 +124,7 @@ def setup_package():
prefix_folder,
)
initialize_db_root_folder(
os.path.join(prefix_folder, "beat_core_test"), os.path.join(prefix, "databases")
)
setup_root_db_folder()
if DOCKER_NETWORK_TEST_ENABLED:
import docker
......
{
"schema_version": 2,
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "autonomous",
"type": "sequential",
"splittable": false,
"parameters": {
"sync": {
"default": "in1",
"type": "string"
}
},
"groups": [
{
"name": "main",
"inputs": {
"in1": {
"type": "user/single_integer/1"
},
"in2": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"out_data": {
"type": "user/single_integer/1"
}
}
......
......@@ -31,33 +31,20 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import os
import numpy
class Algorithm:
def __init__(self):
self.offset = 1
def setup(self, parameters):
self.sync = parameters["sync"]
return True
def process(self, data_loaders, outputs):
data_loader = data_loaders.loaderOf("in1")
print(os.listdir("/databases/integers_db/1"))
with open("/databases/integers_db/1/datafile.txt", "rt") as shared_data:
def process(self, inputs, data_loaders, outputs):
with open(
"/databases/simple_rawdata_access/1/datafile.txt", "rt"
) as shared_data:
value = shared_data.read()
shared_offset = int(value)
for i in range(data_loader.count(self.sync)):
view = data_loader.view(self.sync, i)
(data, start, end) = view[view.count() - 1]
value = numpy.int32(data["in1"].value + data["in2"].value + shared_offset)
outputs["out"].write({"value": value}, end)
out_data = {"value": numpy.int32(inputs["in_data"].data.value + shared_offset)}
outputs["out_data"].write(out_data)
return True
{
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Example databases",
"version": "1.4.0"
},
"direct_rawdata_access": true,
"protocols": [
{
"name": "protocol",
"template": "test_integers",
"sets": [
{
"name": "set",
"template": "set",
"view": "View",
"outputs": {
"out": "user/single_integer/1"
}
},
{
"name": "set2",
"template": "set",
"view": "View2",
"outputs": {
"out": "user/single_integer/1"
}
}
]
},
{
"name": "protocol2",
"template": "test_integers",
"sets": [
{
"name": "set",
"template": "set",
"view": "LargeView",
"outputs": {
"out": "user/single_integer/1"
}
},
{
"name": "set2",
"template": "set",
"view": "View2",
"outputs": {
"out": "user/single_integer/1"
}
}
]
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
from collections import namedtuple
import numpy
from beat.backend.python.database import View as BaseView
class View(BaseView):
def setup(
self, root_folder, outputs, parameters, start_index=None, end_index=None,
):
"""Initializes the database"""
super().setup(root_folder, outputs, parameters, start_index, end_index)
return True
def index(self, root_folder, parameters):
Entry = namedtuple("Entry", ["out"])
return [Entry(42)]
def get(self, output, index):
obj = self.objs[index]
if output == "out":
return {"value": numpy.int32(obj.out)}
# ----------------------------------------------------------
class View2(BaseView):
def index(self, root_folder, parameters):
Entry = namedtuple("Entry", ["out"])
return [Entry(53)]
def get(self, output, index):
obj = self.objs[index]
if output == "out":
return {"value": numpy.int32(obj.out)}
# ----------------------------------------------------------
class LargeView(BaseView):
def index(self, root_folder, parameters):
Entry = namedtuple("Entry", ["out"])
return [Entry(0), Entry(1), Entry(2), Entry(3), Entry(4)]
def get(self, output, index):
obj = self.objs[index]
if output == "out":
return {"value": numpy.int32(obj.out)}
.. Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ ..
.. Contact: beat.support@idiap.ch ..
.. ..
.. This file is part of the beat.backend.python module of the BEAT platform. ..
.. ..
.. Redistribution and use in source and binary forms, with or without
.. modification, are permitted provided that the following conditions are met:
.. 1. Redistributions of source code must retain the above copyright notice, this
.. list of conditions and the following disclaimer.
.. 2. Redistributions in binary form must reproduce the above copyright notice,
.. this list of conditions and the following disclaimer in the documentation
.. and/or other materials provided with the distribution.
.. 3. Neither the name of the copyright holder nor the names of its contributors
.. may be used to endorse or promote products derived from this software without
.. specific prior written permission.
.. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
.. ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
.. WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
.. DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
.. FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
.. DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
.. SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
.. CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
.. OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
The Simple Digit Database
-------------------------
This database emits two integers (one from each of its set). The first integer
(from ``set``) has a value of 42, while the second (from ``set2``), a value of
53.
{
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Example databases",
"version": "1.4.0"
},
"direct_rawdata_access": true,
"protocols": [
{
"name": "protocol",
"template": "protocol/1",
"views": {
"set": {
"view": "View"
},
"set2": {
"view": "View2"
}
}
},
{
"name": "protocol2",
"template": "protocol2/1",
"views": {
"set": {
"view": "LargeView"
},
"set2": {
"view": "View2"
}
}
}
],
"schema_version": 2
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
from collections import namedtuple
import numpy
from beat.backend.python.database import View as BaseView
class View(BaseView):
def setup(
self, root_folder, outputs, parameters, start_index=None, end_index=None,
):
"""Initializes the database"""
super().setup(root_folder, outputs, parameters, start_index, end_index)
return True
def index(self, root_folder, parameters):
Entry = namedtuple("Entry", ["out"])
return [Entry(42)]
def get(self, output, index):
obj = self.objs[index]
if output == "out":
return {"value": numpy.int32(obj.out)}
# ----------------------------------------------------------
class View2(BaseView):
def index(self, root_folder, parameters):
Entry = namedtuple("Entry", ["out"])
return [Entry(53)]
def get(self, output, index):
obj = self.objs[index]
if output == "out":
return {"value": numpy.int32(obj.out)}
# ----------------------------------------------------------
class LargeView(BaseView):
def index(self, root_folder, parameters):
Entry = namedtuple("Entry", ["out"])
return [Entry(0), Entry(1), Entry(2), Entry(3), Entry(4)]
def get(self, output, index):
obj = self.objs[index]
if output == "out":
return {"value": numpy.int32(obj.out)}
{
"analyzers": {
"analysis": {
"algorithm": "v1/integers_echo_analyzer/1",
"inputs": {
"in_data": "in"
}
}
},
"blocks": {
"echo": {
"algorithm": "user/integers_rawdata_access/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
}
},
"datasets": {
"set": {
"database": "simple/1",
"protocol": "protocol",
"set": "set"
}
},
"globals": {
"queue": "queue",
"environment": {
"name": "Python for tests",
"version": "1.3.0"
}
}
}
Test experiment for validating the rawdata access feature. This ensure that if the
database does not have the field then nothing is mounted.
{
"blocks": {
"addition": {
"algorithm": "user/shared_datasets/1",
"inputs": {
"in1": "a",
"in2": "b"
},
"outputs": {
"out": "sum"
}
}
},
"datasets": {
"integers": {
"database": "integers_db/1",
"protocol": "double",
"set": "double"
}
},
"analyzers": {
"analysis": {
"algorithm": "v1/integers_analysis/1",
"inputs": {
"input": "input"
}
}
},
"globals": {
"environment": {
"name": "Python for tests",
"version": "1.3.0"
},
"queue": "queue",
"user/shared_datasets/1": {
"sync": "in1"
}
}
}
This experiment is to test whether the dataset sharing
functionnality works as expected.
{
"analyzers": {
"analysis": {
"algorithm": "v1/integers_echo_analyzer/1",
"inputs": {
"in_data": "in"
}
}
},
"blocks": {
"echo": {
"algorithm": "user/integers_rawdata_access/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
}
},
"datasets": {
"set": {
"database": "simple_rawdata_access/1",
"protocol": "protocol",
"set": "set"
}
},
"globals": {
"queue": "queue",
"environment": {
"name": "Python for tests",
"version": "1.3.0"