Commit 7464d206 authored by André Anjos's avatar André Anjos 💬

Merge branch '103_algorithm_database_sharing' into 'master'

Algorithm database sharing

Closes #103

See merge request !128
parents b9960cee 67a4e0d0
Pipeline #45259 passed with stages
in 36 minutes and 30 seconds
......@@ -384,6 +384,15 @@ class DockerExecutor(RemoteExecutor):
file_path = result["path"]
__add_writable_volume(file_path)
def __share_databases(self, algorithm_container, db_infos):
"""Add volumes to the algorithm container for the datasets"""
for database_name, database in self.databases.items():
db_data = database.data
algorithm_container.add_volume(
db_data["root_folder"], os.path.join("/databases", database_name)
)
def process(
self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
):
......@@ -472,6 +481,8 @@ class DockerExecutor(RemoteExecutor):
network_name = self.data.pop("network_name", "bridge")
databases_infos = {}
share_databases = self.data.pop("share_databases", False)
if len(self.databases) > 0:
databases_infos["db"] = self.__create_db_container(
datasets_uid, network_name
......@@ -526,6 +537,9 @@ class DockerExecutor(RemoteExecutor):
loop_algorithm_container, volume_cache_mount_point, self.data["loop"]
)
if share_databases:
self.__share_databases(loop_algorithm_container, databases_infos)
# Start the container
self.host.start(
loop_algorithm_container,
......@@ -570,6 +584,9 @@ class DockerExecutor(RemoteExecutor):
algorithm_container, volume_cache_mount_point, self.data
)
if share_databases:
self.__share_databases(algorithm_container, databases_infos)
# Start the container
self.host.start(
algorithm_container,
......
......@@ -35,6 +35,7 @@
# Basic setup for slow tests
import json
import logging
import os
import shutil
......@@ -84,14 +85,42 @@ if VERBOSE_TEST_LOGGING:
logger.addHandler(handler)
def sync_prefixes(source_prefixes, target_prefix):
for path in source_prefixes:
sp.check_call(["rsync", "-arz", path, target_prefix])
def initialize_db_root_folder(database_root_folder, databases_path):
os.makedirs(database_root_folder, exist_ok=True)
for root, dirs, files in os.walk(databases_path, topdown=False):
for file_ in files:
if file_.endswith(".json"):
path = os.path.join(root, file_)
try:
with open(path, "rt") as db_file:
declaration = json.load(db_file)
except json.JSONDecodeError:
# some are explicitly invalid.
continue
else:
declaration["root_folder"] = database_root_folder
with open(path, "wt") as db_file:
json.dump(declaration, db_file, indent=4)
def setup_package():
prefixes = [
pkg_resources.resource_filename("beat.backend.python.test", "prefix"),
pkg_resources.resource_filename("beat.core.test", "prefix"),
]
sync_prefixes(
[
pkg_resources.resource_filename("beat.backend.python.test", "prefix"),
pkg_resources.resource_filename("beat.core.test", "prefix"),
],
prefix_folder,
)
for path in prefixes:
sp.check_call(["rsync", "-arz", path, prefix_folder])
initialize_db_root_folder(
os.path.join(prefix_folder, "beat_core_test"), os.path.join(prefix, "databases")
)
if DOCKER_NETWORK_TEST_ENABLED:
import docker
......
{
"schema_version": 2,
"language": "python",
"api_version": 2,
"type": "autonomous",
"splittable": false,
"parameters": {
"sync": {
"default": "in1",
"type": "string"
}
},
"groups": [
{
"inputs": {
"in1": {
"type": "user/single_integer/1"
},
"in2": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"type": "user/single_integer/1"
}
}
}
]
}
# vim: set fileencoding=utf-8 :
###################################################################################
# #
# Copyright (c) 2020 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met: #
# #
# 1. Redistributions of source code must retain the above copyright notice, this #
# list of conditions and the following disclaimer. #
# #
# 2. Redistributions in binary form must reproduce the above copyright notice, #
# this list of conditions and the following disclaimer in the documentation #
# and/or other materials provided with the distribution. #
# #
# 3. Neither the name of the copyright holder nor the names of its contributors #
# may be used to endorse or promote products derived from this software without #
# specific prior written permission. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import os
import numpy
class Algorithm:
def __init__(self):
self.offset = 1
def setup(self, parameters):
self.sync = parameters["sync"]
return True
def process(self, data_loaders, outputs):
data_loader = data_loaders.loaderOf("in1")
print(os.listdir("/databases/integers_db/1"))
with open("/databases/integers_db/1/datafile.txt", "rt") as shared_data:
value = shared_data.read()
shared_offset = int(value)
for i in range(data_loader.count(self.sync)):
view = data_loader.view(self.sync, i)
(data, start, end) = view[view.count() - 1]
value = numpy.int32(data["in1"].value + data["in2"].value + shared_offset)
outputs["out"].write({"value": value}, end)
return True
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
]
}
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Does not exist",
"version": "1.4.0"
......
......@@ -4,7 +4,7 @@
"name": "Does not exist",
"version": "1.4.0"
},
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "large",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "large",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"sets": [
......
{
"root_folder": "/tmp/path/not/set"
"root_folder": "/tmp/beat_core_test"
}
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "protocol",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/path/not/set",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "double",
......
{
"root_folder": "/tmp/foo/bar",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "protocol",
......
{
"root_folder": "/tmp/foo/bar",
"root_folder": "/tmp/beat_core_test",
"protocols": [
{
"name": "protocol",
......
{
"root_folder": "/tmp/foo/bar",
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Example databases",
"version": "1.4.0"
......
{
"root_folder": "/tmp/foo/bar",
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Example databases",
"version": "1.4.0"
......
{
"root_folder": "/tmp/foo/bar",
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Not existing",
"version": "1.4.0"
......
{
"root_folder": "/tmp/foo/bar",
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Not existing",
"version": "1.4.0"
......
{
"blocks": {
"addition": {
"algorithm": "user/shared_datasets/1",
"parameters": {
},
"inputs": {
"in1": "a",
"in2": "b"
},
"outputs": {
"out": "sum"
}
}
},
"datasets": {
"integers": {
"database": "integers_db/1",
"protocol": "double",
"set": "double"
}
},
"analyzers": {
"analysis": {
"algorithm": "v1/integers_analysis/1",
"parameters": {
},
"inputs": {
"input": "input"
}
}
},
"globals": {
"environment": {
"name": "Python for tests",
"version": "1.3.0"
},
"queue": "queue"
}
}
This experiment is to test whether the dataset sharing
functionnality works as expected.
......@@ -41,10 +41,13 @@ import subprocess # nosec
import nose.tools
from beat.core.database import Database
from ..dock import Host
from ..execution import DockerExecutor
from . import DOCKER_NETWORK_TEST_ENABLED
from . import network_name
from . import prefix as test_prefix
from . import prefix_folder
from .test_execution import BaseExecutionMixIn
from .utils import DOCKER_TEST_IMAGES
......@@ -148,6 +151,25 @@ class TestDockerExecution(BaseExecutionMixIn):
nose.tools.assert_is_none(result)
@slow
def test_databases_sharing(self):
db = Database(test_prefix, "integers_db/1")
nose.tools.assert_true(db.valid, db.errors)
data_sharing_path = db.data["root_folder"]
offset = 12
with open(os.path.join(data_sharing_path, "datafile.txt"), "wt") as data_file:
data_file.write("{}".format(offset))
result = self.execute(
"user/user/integers_addition/1/shared_datasets",
[{"sum": 495 + 9 * offset, "nb": 9}],
share_databases=True,
)
nose.tools.assert_is_none(result)
@slow
def test_single_1_prepare_error(self):
result = self.execute("errors/user/single/1/prepare_error", [None])
......
......@@ -37,7 +37,7 @@ requirements:
- pyzmq
- simplejson
- six
- beat.backend.python >=1.7.9
- beat.backend.python >=1.7.12
test:
requires:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment