Commit d829ac7c authored by Samuel GAIST's avatar Samuel GAIST

[execution][docker] Complete refactor of database raw data sharing

parent 1a66c641
Pipeline #45622 failed with stage
in 38 minutes and 22 seconds
......@@ -384,14 +384,15 @@ class DockerExecutor(RemoteExecutor):
file_path = result["path"]
__add_writable_volume(file_path)
def __share_databases(self, algorithm_container, db_infos):
"""Add volumes to the algorithm container for the datasets"""
def __setup_databases_raw_access(self, algorithm_container):
"""Add volumes to the algorithm container if the database allows that"""
for database_name, database in self.databases.items():
db_data = database.data
algorithm_container.add_volume(
db_data["root_folder"], os.path.join("/databases", database_name)
)
if db_data.get("direct_rawdata_access", False):
algorithm_container.add_volume(
db_data["root_folder"], os.path.join("/databases", database_name)
)
def process(
self, virtual_memory_in_megabytes=0, max_cpu_percent=0, timeout_in_minutes=0
......@@ -481,8 +482,6 @@ class DockerExecutor(RemoteExecutor):
network_name = self.data.pop("network_name", "bridge")
databases_infos = {}
share_databases = self.data.pop("share_databases", False)
if len(self.databases) > 0:
databases_infos["db"] = self.__create_db_container(
datasets_uid, network_name
......@@ -537,8 +536,7 @@ class DockerExecutor(RemoteExecutor):
loop_algorithm_container, volume_cache_mount_point, self.data["loop"]
)
if share_databases:
self.__share_databases(loop_algorithm_container, databases_infos)
self.__setup_databases_raw_access(loop_algorithm_container)
# Start the container
self.host.start(
......@@ -584,8 +582,7 @@ class DockerExecutor(RemoteExecutor):
algorithm_container, volume_cache_mount_point, self.data
)
if share_databases:
self.__share_databases(algorithm_container, databases_infos)
self.__setup_databases_raw_access(algorithm_container)
# Start the container
self.host.start(
......
{
"schema_version": 2,
"schema_version": 3,
"language": "python",
"api_version": 2,
"type": "autonomous",
"type": "sequential",
"splittable": false,
"parameters": {
"sync": {
"default": "in1",
"type": "string"
}
},
"groups": [
{
"name": "main",
"inputs": {
"in1": {
"type": "user/single_integer/1"
},
"in2": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out": {
"out_data": {
"type": "user/single_integer/1"
}
}
......
......@@ -31,33 +31,20 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #
# #
###################################################################################
import os
import numpy
class Algorithm:
def __init__(self):
self.offset = 1
def setup(self, parameters):
self.sync = parameters["sync"]
return True
def process(self, data_loaders, outputs):
data_loader = data_loaders.loaderOf("in1")
print(os.listdir("/databases/integers_db/1"))
with open("/databases/integers_db/1/datafile.txt", "rt") as shared_data:
def process(self, inputs, data_loaders, outputs):
with open(
"/databases/simple_rawdata_access/1/datafile.txt", "rt"
) as shared_data:
value = shared_data.read()
shared_offset = int(value)
for i in range(data_loader.count(self.sync)):
view = data_loader.view(self.sync, i)
(data, start, end) = view[view.count() - 1]
value = numpy.int32(data["in1"].value + data["in2"].value + shared_offset)
outputs["out"].write({"value": value}, end)
out_data = {"value": numpy.int32(inputs["in_data"].data.value + shared_offset)}
outputs["out_data"].write(out_data)
return True
{
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Example databases",
"version": "1.4.0"
},
"direct_rawdata_access": true,
"protocols": [
{
......
{
"root_folder": "/tmp/beat_core_test",
"environment": {
"name": "Example databases",
"version": "1.4.0"
},
"direct_rawdata_access": true,
"protocols": [
{
......
{
"analyzers": {
"analysis": {
"algorithm": "v1/integers_echo_analyzer/1",
"inputs": {
"in_data": "in"
}
}
},
"blocks": {
"echo": {
"algorithm": "user/integers_rawdata_access/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
}
},
"datasets": {
"set": {
"database": "simple/1",
"protocol": "protocol",
"set": "set"
}
},
"globals": {
"queue": "queue",
"environment": {
"name": "Python for tests",
"version": "1.3.0"
}
}
}
Test experiment for validating the rawdata access feature. This ensure that if the
database does not have the field then nothing is mounted.
{
"blocks": {
"addition": {
"algorithm": "user/shared_datasets/1",
"inputs": {
"in1": "a",
"in2": "b"
},
"outputs": {
"out": "sum"
}
}
},
"datasets": {
"integers": {
"database": "integers_db/1",
"protocol": "double",
"set": "double"
}
},
"analyzers": {
"analysis": {
"algorithm": "v1/integers_analysis/1",
"inputs": {
"input": "input"
}
}
},
"globals": {
"environment": {
"name": "Python for tests",
"version": "1.3.0"
},
"queue": "queue",
"user/shared_datasets/1": {
"sync": "in1"
}
}
}
This experiment is to test whether the dataset sharing
functionnality works as expected.
{
"analyzers": {
"analysis": {
"algorithm": "v1/integers_echo_analyzer/1",
"inputs": {
"in_data": "in"
}
}
},
"blocks": {
"echo": {
"algorithm": "user/integers_rawdata_access/1",
"inputs": {
"in_data": "in"
},
"outputs": {
"out_data": "out"
}
}
},
"datasets": {
"set": {
"database": "simple_rawdata_access/1",
"protocol": "protocol",
"set": "set"
}
},
"globals": {
"queue": "queue",
"environment": {
"name": "Python for tests",
"version": "1.3.0"
}
}
}
Test experiment for validating the rawdata access feature
......@@ -63,6 +63,21 @@ BUILDER_IMAGE = (
# ----------------------------------------------------------
def write_rawdata_for_database(database_name, raw_data):
"""Generate raw data for give database"""
db = Database(test_prefix, database_name)
nose.tools.assert_true(db.valid, db.errors)
data_sharing_path = db.data["root_folder"]
with open(os.path.join(data_sharing_path, "datafile.txt"), "wt") as data_file:
data_file.write("{}".format(raw_data))
# ----------------------------------------------------------
class TestDockerExecution(BaseExecutionMixIn):
@classmethod
def setup_class(cls):
......@@ -152,24 +167,26 @@ class TestDockerExecution(BaseExecutionMixIn):
nose.tools.assert_is_none(result)
@slow
def test_databases_sharing(self):
db = Database(test_prefix, "integers_db/1")
nose.tools.assert_true(db.valid, db.errors)
data_sharing_path = db.data["root_folder"]
def test_database_rawdata_access(self):
offset = 12
with open(os.path.join(data_sharing_path, "datafile.txt"), "wt") as data_file:
data_file.write("{}".format(offset))
write_rawdata_for_database("simple_rawdata_access/1", offset)
result = self.execute(
"user/user/integers_addition/1/shared_datasets",
[{"sum": 495 + 9 * offset, "nb": 9}],
share_databases=True,
"user/user/single/1/single_rawdata_access", [{"out_data": 42 + offset}]
)
nose.tools.assert_is_none(result)
@slow
def test_database_no_rawdata_access(self):
write_rawdata_for_database("simple/1", "should not be loaded")
result = self.execute("errors/user/single/1/single_no_rawdata_access", [None])
nose.tools.eq_(result["status"], 1)
nose.tools.assert_true("FileNotFoundError" in result["user_error"])
@slow
def test_single_1_prepare_error(self):
result = self.execute("errors/user/single/1/prepare_error", [None])
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment