Commit 99dd97f6 authored by Philip ABBET's avatar Philip ABBET
Browse files

[experiments] Refactoring: Split the models into separate, more manageable files

parent ceaf3112
......@@ -1419,8 +1419,8 @@ class JobSplit(models.Model):
try:
if JobSplit.host is None:
JobSplit.host = Host(images_cache=docker_images_cache)
JobSplit.host.setup(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False)))
JobSplit.host = Host(images_cache=docker_images_cache,
raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False)))
self.executor = beat.core.execution.DockerExecutor(
JobSplit.host, settings.PREFIX, config, cache
......@@ -1448,9 +1448,8 @@ class JobSplit(models.Model):
result = self.executor.process(
virtual_memory_in_megabytes=queue.memory_limit,
max_cpu_percent=int(100*float(queue.cores_per_slot)), #allows for 150%
timeout_in_minutes=queue.time_limit,
daemon=0,
)
timeout_in_minutes=queue.time_limit
)
self.try_end(Result(
status=result['status'],
......
......@@ -403,6 +403,5 @@ def find_environments(paths=None):
from beat.core.dock import Host
host = Host()
host.setup(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False)))
return host.environments
host = Host(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False)))
return host.processing_environments
from .block import Block
from .cached_file import CachedFile
from .block_input import BlockInput
from .result import Result
from .experiment import validate_experiment
from .experiment import DeclarationStorage
from .experiment import Experiment
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
from django.conf import settings
import beat.core.hash
import beat.core.data
import beat.core.algorithm
from beat.core.utils import NumpyJSONEncoder
from ...algorithms.models import Algorithm
from ...backend.models import Queue
from ...backend.models import Environment
from ...backend.models import Job
from .result import Result
import os
import simplejson
#----------------------------------------------------------
class BlockManager(models.Manager):
def get_by_natural_key(self, name, experiment_author,
toolchain_author, toolchain_name,
toolchain_version, experiment_name):
return self.get(
name=name,
experiment__author__username=experiment_author,
experiment__toolchain__author__username=toolchain_author,
experiment__toolchain__name=toolchain_name,
experiment__toolchain__version=toolchain_version,
experiment__name=experiment_name,
)
#----------------------------------------------------------
class Block(models.Model):
NOT_CACHED = 'N'
PROCESSING = 'P'
CACHED = 'C'
FAILED = 'F'
SKIPPED = 'S'
CANCELLED = 'L'
STATUS = (
(NOT_CACHED, 'Not cached'),
(PROCESSING, 'Processing'),
(CACHED, 'Cached'),
(FAILED, 'Failed'),
(SKIPPED, 'Skipped'),
(CANCELLED, 'Cancelled'),
)
experiment = models.ForeignKey('Experiment', related_name='blocks',
on_delete=models.CASCADE)
name = models.CharField(max_length=200)
command = models.TextField(null=True, blank=True)
status = models.CharField(max_length=1, choices=STATUS, default=NOT_CACHED)
analyzer = models.BooleanField(default=False)
algorithm = models.ForeignKey(Algorithm, related_name='blocks',
on_delete=models.CASCADE)
creation_date = models.DateTimeField(null=True, blank=True,
auto_now_add=True)
start_date = models.DateTimeField(null=True, blank=True)
end_date = models.DateTimeField(null=True, blank=True)
environment = models.ForeignKey(Environment, related_name='blocks',
null=True, on_delete=models.SET_NULL)
queue = models.ForeignKey(Queue, related_name='blocks', null=True,
on_delete=models.SET_NULL)
required_slots = models.PositiveIntegerField(default=1)
channel = models.CharField(max_length=200, default='', blank=True,
help_text="Synchronization channel within the toolchain")
# relationship to blocks to which this block depends on
dependencies = models.ManyToManyField('self',
related_name='dependents',
blank=True,
symmetrical=False,
)
# order of this block within the experiment - useful for the `backup'
# command, so we can dump the blocks in the right dependence order
execution_order = models.PositiveIntegerField(null=True, blank=True)
objects = BlockManager()
class Meta:
unique_together = ('experiment', 'name')
# setup ordering so that the dump order respects self dependencies
ordering = ['experiment_id', 'execution_order']
def __str__(self):
return self.experiment.fullname() + ', ' + self.name + ' (%s)' % self.get_status_display()
def natural_key(self):
return (
self.name,
self.experiment.author.username,
self.experiment.toolchain.author.username,
self.experiment.toolchain.name,
self.experiment.toolchain.version,
self.experiment.name,
)
# Accessors for statistics
def __return_first__(self, field, default=None):
return getattr(self.outputs.first(), field, default)
def first_cache(self):
return self.outputs.first()
def error_report(self):
return self.__return_first__('error_report')
def stdout(self):
return self.__return_first__('stdout')
def stderr(self):
return self.__return_first__('stderr')
def speed_up_real(self):
return self.__return_first__('speed_up_real')
def speed_up_maximal(self):
return self.__return_first__('speed_up_maximal')
def linear_execution_time(self):
return self.__return_first__('linear_execution_time')
def queuing_time(self):
return self.__return_first__('queuing_time')
def cpu_time(self):
return self.__return_first__('cpu_time')
def max_memory(self):
return self.__return_first__('max_memory')
def data_read_size(self):
return self.__return_first__('data_read_size')
def data_read_nb_blocks(self):
return self.__return_first__('data_read_nb_blocks')
def data_read_time(self):
return self.__return_first__('data_read_time')
def data_written_size(self):
return self.__return_first__('data_written_size')
def data_written_nb_blocks(self):
return self.__return_first__('data_written_nb_blocks')
def data_written_time(self):
return self.__return_first__('data_written_time')
# Accessor for results
results = property(lambda self: self.__return_first__('results'))
def _schedule(self):
'''Schedules this block for execution at the backend
To "schedule" means solely creating a :py:class:`..backend.models.Job`
pointing to this object. This method **should only be called by the
owning experiment**. It is not part of the Block's public API.
'''
# lock self - avoids concurrent update from scheduler/worker subsystem
self_ = Block.objects.select_for_update().get(pk=self.pk)
# checks we have not, meanwhile, been cancelled
if self_.done():
return
# checks queue and environment
if self.queue is None:
raise RuntimeError("Block `%s' does not have a queue assigned " \
"- this normally indicates the originally selected " \
"queue was deleted since the experiment was first " \
"configured. Re-configure this experiment and select a new " \
"default or block-specific queue" % self.name)
if self.environment is None:
raise RuntimeError("Block `%s' does not have an environment " \
"assigned - this normally indicates the originally selected " \
"environment was deleted since the experiment was first " \
"configured. Re-configure this experiment and select a new " \
"default or block-specific environment" % self.name)
# search for other jobs with similar outputs that have no children yet
# do this carefully, as other experiments may be scheduled at the same
# time, invalidating our "parent" choice
parent = Job.objects.filter(block__outputs__in=self.outputs.all(),
child=None).first()
if parent is not None: #(candidate only) try to lock it
while True:
parent = Job.objects.select_for_update().get(pk=parent.pk)
if parent.child_ is not None: #was taken meanwhile, retry
parent = parent.child
continue
job = Job(block=self, parent=parent)
break
else:
job = Job(block=self)
job.save()
# checks if the job is immediately runnable - if so, tries to
# make it runnable (check caches and other)
if self.is_runnable():
self.job._make_runnable()
def done(self):
'''Says whether the block has finished or not'''
return self.status not in (Block.NOT_CACHED, Block.PROCESSING)
def _cancel(self):
'''Cancels the execution of this block on the backend.
This method should only be called from the experiment equivalent. It is
not part of the Block's public API.
'''
# lock self - avoids concurrent update from scheduler/worker subsystem
self_ = Block.objects.select_for_update().get(pk=self.pk)
if self_.done(): return
if hasattr(self, 'job'):
self.job._cancel()
else:
self.status = Block.CANCELLED
self.save()
self.experiment._update_state()
def is_runnable(self):
'''Checks if a block is runnable presently'''
return all([k.status in (Block.CACHED, Block.SKIPPED) \
for k in self.dependencies.all()]) and \
(hasattr(self, 'job') and self.job.parent is None)
def _cascade_updates(self):
'''Cascade updates to blocks once I'm done.
'''
for b in self.dependents.all():
if any([k.status in (Block.FAILED, Block.CANCELLED) \
for k in b.dependencies.all()]):
b._cancel()
if b.is_runnable(): b.job._make_runnable()
# Update eventual running siblings in case of a failure
if self.status == Block.FAILED:
for b in Block.objects.filter(experiment=self.experiment,
status=Block.PROCESSING):
b._cancel()
def _update_state(self, timings=None):
'''Updates self state as a result of backend running
Parameters:
timings (dict, Optional): A dictionary containing key-value pairs
corresponding to:
* queuing time (in seconds)
* sequential execution time (in seconds)
* real speed-up obtained
* maximum speed-up obtainable
This method is supposed to be called only by the underlying job
instance. It is not part of the Block's public API.
'''
# lock self - avoids concurrent update from scheduler/worker subsystem
self_ = Block.objects.select_for_update().get(pk=self.pk)
if self_.done(): return
if self.start_date is None:
self.start_date = self.job.start_date
if self.job.result:
statistics = self.job.result.stats
result_stdout = self.job.result.stdout
result_stderr = self.job.result.stderr
if result_stdout == '\n':
result_stdout = ''
if result_stderr == '\n':
result_stderr = ''
info = dict(
cpu_time = statistics.cpu['user'] + statistics.cpu['system'],
max_memory = statistics.memory['rss'],
stdout = result_stdout,
stderr = result_stderr,
error_report = self.job.result.usrerr,
)
if 'volume' in statistics.data:
info['data_read_size'] = statistics.data['volume'].get('read', 0)
info['data_written_size'] = statistics.data['volume'].get('write', 0)
if 'blocks' in statistics.data:
info['data_read_nb_blocks'] = statistics.data['blocks'].get('read', 0)
info['data_written_nb_blocks'] = statistics.data['blocks'].get('write', 0)
if 'time' in statistics.data:
info['data_read_time'] = statistics.data['time'].get('read', 0)
info['data_written_time'] = statistics.data['time'].get('write', 0)
if timings:
info.update(dict(
queuing_time = timings['queuing'],
linear_execution_time = timings['linear_execution'],
speed_up_real = timings['speed_up_real'],
speed_up_maximal = timings['speed_up_maximal'],
))
self.outputs.update(**info)
if self.job.status == Block.SKIPPED:
self.status = Block.CACHED
else:
self.status = self.job.status
if self.job.done():
self.end_date = self.job.end_date
r = self.job.result
self.job.delete()
if r: r.delete()
# Loads Results from cache
if self.job.result and self.analyzer and self.status == Block.CACHED:
cache = self.first_cache()
data_source = beat.core.data.CachedDataSource()
data_source.setup(os.path.join(settings.CACHE_ROOT,
beat.core.hash.toPath(cache.hash)), settings.PREFIX)
output_data = data_source.next()[0]
if output_data is not None:
algorithm = beat.core.algorithm.Algorithm(settings.PREFIX,
self.algorithm.fullname())
for field, value in output_data.as_dict().items():
res, _ = Result.objects.get_or_create(name=field,
cache=cache)
res.primary = algorithm.results[field]['display']
res.type = algorithm.results[field]["type"]
if res.type in ['int32', 'float32', 'bool', 'string']:
res.data_value = str(value)
else:
res.data_value = simplejson.dumps(value, indent=4,
cls=NumpyJSONEncoder)
res.save()
data_source.close()
self.save()
self._cascade_updates()
self.experiment._update_state()
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
from django.conf import settings
from ...databases.models import DatabaseSetOutput
from .block import Block
#----------------------------------------------------------
class BlockInputManager(models.Manager):
def get_by_natural_key(self, name, experiment_author, toolchain_author,
toolchain_name, toolchain_version, experiment_name,
cache_hash, database_hash):
block = Block.objects.get_by_natural_key(name, experiment_author,
toolchain_author, toolchain_name, toolchain_version,
experiment_name)
if cache_hash:
return self.get(cache__hash=cache_hash, block=block)
else:
return self.get(database__hash=database_hash, block=block)
#----------------------------------------------------------
class BlockInput(models.Model):
block = models.ForeignKey(Block, related_name='inputs', null=True,
on_delete=models.CASCADE)
# if the input cames from another block, then this one is set
cache = models.ForeignKey('CachedFile', related_name='inputs', null=True,
on_delete=models.CASCADE)
# if the input cames from a dataset, then this one is set
database = models.ForeignKey(DatabaseSetOutput, related_name='blocks',
null=True, on_delete=models.CASCADE)
channel = models.CharField(max_length=200, default='', blank=True,
help_text="Synchronization channel within the toolchain")
objects = BlockInputManager()
def natural_key(self):
cache_hash = self.cache and self.cache.hash
database_hash = self.database and self.database.hash
return self.block.natural_key() + (cache_hash, database_hash)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
from django.conf import settings
import beat.core.hash
import os
import glob
import logging
logger = logging.getLogger(__name__)
#----------------------------------------------------------
class CachedFileManager(models.Manager):
def get_by_natural_key(self, hash):
return self.get(hash=hash)
#----------------------------------------------------------
class CachedFile(models.Model):
blocks = models.ManyToManyField('Block', related_name='outputs', blank=True)
hash = models.CharField(max_length=64, unique=True)
# the total amount of time this block took to run considering the
# wall-clock time.
linear_execution_time = models.FloatField(default=0.)
# the real speed-up obtained by running this block using X slots
speed_up_real = models.FloatField(default=0.)
# the maximum obtainable speed-up that could be achieved if all slots
# were running in parallel. Essentially linear_execution_time /
# maximum_slot_time