Commit bbac1d98 authored by André Anjos's avatar André Anjos 💬

Merge branch '579_implement_database_sharing_on_execution' into 'master'

Ensure database raw data access also work in the beatweb context

See merge request !410
parents 120b2a42 82d15246
Pipeline #45694 passed with stages
in 19 minutes and 13 seconds
......@@ -403,6 +403,9 @@ def get_configuration_for_split(split):
# Retrieve the block configuration
configuration = simplejson.loads(str(split.job.block.command))
if split.job.block.experiment.author.has_perm("can_share_databases"):
configuration["share_databases"] = True
# (If necessary) Add the infos needed to access the database files
if settings.DATASETS_UID is not None:
configuration["datasets_uid"] = settings.DATASETS_UID
......
......@@ -25,6 +25,7 @@
# #
###############################################################################
import json
import os
from django.conf import settings
......@@ -104,14 +105,32 @@ class BackendUtilitiesMixin(object):
environment=dict(name=env.name, version=env.version),
)
raw_access_db_name = "simple_rawdata_access/1"
source_prefix = os.path.join(settings.BASE_DIR, "src", "beat.examples")
db_root_file_path = os.path.join(settings.PREFIX, "db_root.json")
db_path = os.path.join(
settings.PREFIX, "data", raw_access_db_name.replace("/", "_")
)
db_root_data = {raw_access_db_name: db_path}
os.makedirs(db_path, exist_ok=True)
with open(os.path.join(db_path, "datafile.txt"), "wt") as datafile:
datafile.write("1")
install.install_contributions(source_prefix, "system", template_data)
install.install_contributions(source_prefix, "test", template_data)
with open(db_root_file_path, "wt") as db_root_file:
db_root_file.write(json.dumps(db_root_data))
for contribution in ["system", "test"]:
install.install_contributions(
source_prefix, contribution, template_data, db_root_file_path
)
if not os.path.exists(settings.CACHE_ROOT):
os.mkdir(settings.CACHE_ROOT)
os.remove(db_root_file_path)
def clean_cache(self):
for p, dirs, files in os.walk(settings.CACHE_ROOT, topdown=False):
......
......@@ -634,53 +634,53 @@ class Experiment(Shareable):
)
# Ties the block in
b = Block.objects.filter(experiment=self, name=block_name).first()
if b is None:
b = Block(experiment=self, name=block_name, algorithm=algorithm)
block = Block.objects.filter(experiment=self, name=block_name).first()
if block is None:
block = Block(experiment=self, name=block_name, algorithm=algorithm)
else:
b.algorithm = algorithm
b.execution_order = order_0 + 1
b.command = simplejson.dumps(job_description, indent=4)
b.status = Block.PENDING
b.analyzer = algorithm.analysis()
b.environment = env
b.queue = queue
b.required_slots = job_description["nb_slots"]
b.channel = job_description["channel"]
b.save()
block.algorithm = algorithm
block.execution_order = order_0 + 1
block.command = simplejson.dumps(job_description, indent=4)
block.status = Block.PENDING
block.analyzer = algorithm.analysis()
block.environment = env
block.queue = queue
block.required_slots = job_description["nb_slots"]
block.channel = job_description["channel"]
block.save()
# from this point: requires block to have an assigned id
b.dependencies.clear()
b.dependencies.add(
block.dependencies.clear()
block.dependencies.add(
*[self.blocks.get(name=k) for k in description["dependencies"]]
)
# reset inputs - creates if necessary only
b.inputs.clear()
for v in job_description["inputs"].values():
if "database" in v: # database input
block.inputs.clear()
for input_ in job_description["inputs"].values():
if "database" in input_: # database input
db = DatabaseSetOutput.objects.get(
set__hash=v["hash"], template__name=v["output"]
set__hash=input_["hash"], template__name=input_["output"]
)
BlockInput.objects.get_or_create(
block=b, channel=v["channel"], database=db
block=block, channel=input_["channel"], database=db
)
else:
cache, _ = CachedFile.objects.get_or_create(hash=v["hash"])
cache, _ = CachedFile.objects.get_or_create(hash=input_["hash"])
BlockInput.objects.get_or_create(
block=b, channel=v["channel"], cache=cache
block=block, channel=input_["channel"], cache=cache
)
# reset outputs - creates if necessary only
b.outputs.clear()
block.outputs.clear()
outputs = job_description.get(
"outputs", {"": job_description.get("result")}
)
for v in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=v["hash"])
cache.blocks.add(b)
for output in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=output["hash"])
cache.blocks.add(block)
# _____ Methods __________
......
......@@ -52,7 +52,7 @@ def start_broker(port, verbosity=0):
call_command("broker", port=port, verbosity=verbosity)
def start_worker(worker_name, broker_address, prefix, cache, verbosity=0):
def start_worker(worker_name, broker_address, prefix, cache, verbosity=0, docker=False):
call_command(
"worker",
name=worker_name,
......@@ -60,6 +60,7 @@ def start_worker(worker_name, broker_address, prefix, cache, verbosity=0):
prefix=prefix,
cache=cache,
verbosity=verbosity,
docker=docker,
)
......@@ -210,7 +211,7 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
self.broker.join()
self.broker = None
def start_worker(self, name, verbosity=None):
def start_worker(self, name, verbosity=None, docker=False):
if verbosity is None:
verbosity = self.command_verbosity
......@@ -222,6 +223,7 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
settings.PREFIX,
settings.CACHE_ROOT,
verbosity,
docker,
),
)
......@@ -327,6 +329,49 @@ class TestConnection(TestSchedulerBase):
# ----------------------------------------------------------
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestDockerExecution(TestSchedulerBase):
def setUp(self):
super().setUp()
import subprocess
subprocess.run(
[
"./bin/beat",
"--cache",
settings.CACHE_ROOT,
"--prefix",
settings.PREFIX,
"databases",
"index",
"simple_rawdata_access/1",
"--docker",
],
check=True,
)
self.start_scheduling()
self.start_worker("node1", docker=True)
self.check_worker_status("node1", True)
def test_database_rawdata_access_experiment(self):
fullname = "user/user/single/1/single_rawdata_access"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
schedule_experiment(xp)
xp.refresh_from_db()
while xp.status != Experiment.DONE:
xp.refresh_from_db()
echo = xp.blocks.get(name="echo")
analysis = xp.blocks.get(name="analysis")
self.assertEqual(echo.status, Block.DONE)
self.assertEqual(analysis.status, Block.DONE)
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestExecution(TestSchedulerBase):
def setUp(self):
......
......@@ -11,9 +11,9 @@ dependencies:
- beat-devel=2020.01.09
# beat dependencies matching release.cfg
- beat.core=1.10.7
- beat.backend.python=1.7.11
- beat.cmdline=1.8.2
- beat.core=1.12.0
- beat.backend.python=1.7.12
- beat.cmdline=1.10.0
# requirements.txt, they are indirectly pinned through the above
- docopt
......
......@@ -21,9 +21,9 @@ eggs = ${buildout:eggs}
interpreter = python
[sources]
beat.core = git https://gitlab.idiap.ch/beat/beat.core.git rev=v1.10.7
beat.cmdline = git https://gitlab.idiap.ch/beat/beat.cmdline.git rev=v1.8.2
beat.backend.python = git https://gitlab.idiap.ch/beat/beat.backend.python.git rev=v1.7.11
beat.core = git https://gitlab.idiap.ch/beat/beat.core.git rev=v1.12.0
beat.cmdline = git https://gitlab.idiap.ch/beat/beat.cmdline.git rev=v1.10.0
beat.backend.python = git https://gitlab.idiap.ch/beat/beat.backend.python.git rev=v1.7.12
[uwsgi]
recipe = buildout.recipe.uwsgi
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment