diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index f8b4282f2e98beea9255130b6222edd72f970ecf..72db894561c1ac5607cffdfb14bf93f3eab2e721 100755 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -1419,8 +1419,8 @@ class JobSplit(models.Model): try: if JobSplit.host is None: - JobSplit.host = Host(images_cache=docker_images_cache) - JobSplit.host.setup(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False))) + JobSplit.host = Host(images_cache=docker_images_cache, + raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False))) self.executor = beat.core.execution.DockerExecutor( JobSplit.host, settings.PREFIX, config, cache @@ -1448,9 +1448,8 @@ class JobSplit(models.Model): result = self.executor.process( virtual_memory_in_megabytes=queue.memory_limit, max_cpu_percent=int(100*float(queue.cores_per_slot)), #allows for 150% - timeout_in_minutes=queue.time_limit, - daemon=0, - ) + timeout_in_minutes=queue.time_limit + ) self.try_end(Result( status=result['status'], diff --git a/beat/web/backend/utils.py b/beat/web/backend/utils.py index ff220f74dd037318d6450edfbc0d5864f220c97c..89049f18e691cee04071e7ae94df8fac47712d14 100755 --- a/beat/web/backend/utils.py +++ b/beat/web/backend/utils.py @@ -403,6 +403,5 @@ def find_environments(paths=None): from beat.core.dock import Host - host = Host() - host.setup(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False))) - return host.environments + host = Host(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False))) + return host.processing_environments diff --git a/beat/web/experiments/models/__init__.py b/beat/web/experiments/models/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..be1bb8322812df530c67dbf6e4164aee9ac44e40 --- /dev/null +++ b/beat/web/experiments/models/__init__.py @@ -0,0 +1,7 @@ +from .block import Block +from .cached_file import CachedFile +from .block_input import BlockInput +from .result import Result +from .experiment import validate_experiment +from .experiment import DeclarationStorage +from .experiment import Experiment diff --git a/beat/web/experiments/models/block.py b/beat/web/experiments/models/block.py new file mode 100755 index 0000000000000000000000000000000000000000..aedc1026ede5811d00684974c2e80f60124d3720 --- /dev/null +++ b/beat/web/experiments/models/block.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from django.db import models +from django.conf import settings + +import beat.core.hash +import beat.core.data +import beat.core.algorithm + +from beat.core.utils import NumpyJSONEncoder + +from ...algorithms.models import Algorithm +from ...backend.models import Queue +from ...backend.models import Environment +from ...backend.models import Job + +from .result import Result + +import os +import simplejson + + +#---------------------------------------------------------- + + +class BlockManager(models.Manager): + + def get_by_natural_key(self, name, experiment_author, + toolchain_author, toolchain_name, + toolchain_version, experiment_name): + return self.get( + name=name, + experiment__author__username=experiment_author, + experiment__toolchain__author__username=toolchain_author, + experiment__toolchain__name=toolchain_name, + experiment__toolchain__version=toolchain_version, + experiment__name=experiment_name, + ) + + +#---------------------------------------------------------- + + +class Block(models.Model): + + NOT_CACHED = 'N' + PROCESSING = 'P' + CACHED = 'C' + FAILED = 'F' + SKIPPED = 'S' + CANCELLED = 'L' + + STATUS = ( + (NOT_CACHED, 'Not cached'), + (PROCESSING, 'Processing'), + (CACHED, 'Cached'), + (FAILED, 'Failed'), + (SKIPPED, 'Skipped'), + (CANCELLED, 'Cancelled'), + ) + + experiment = models.ForeignKey('Experiment', related_name='blocks', + on_delete=models.CASCADE) + name = models.CharField(max_length=200) + command = models.TextField(null=True, blank=True) + status = models.CharField(max_length=1, choices=STATUS, default=NOT_CACHED) + analyzer = models.BooleanField(default=False) + algorithm = models.ForeignKey(Algorithm, related_name='blocks', + on_delete=models.CASCADE) + creation_date = models.DateTimeField(null=True, blank=True, + auto_now_add=True) + start_date = models.DateTimeField(null=True, blank=True) + end_date = models.DateTimeField(null=True, blank=True) + environment = models.ForeignKey(Environment, related_name='blocks', + null=True, on_delete=models.SET_NULL) + queue = models.ForeignKey(Queue, related_name='blocks', null=True, + on_delete=models.SET_NULL) + + required_slots = models.PositiveIntegerField(default=1) + channel = models.CharField(max_length=200, default='', blank=True, + help_text="Synchronization channel within the toolchain") + + # relationship to blocks to which this block depends on + dependencies = models.ManyToManyField('self', + related_name='dependents', + blank=True, + symmetrical=False, + ) + + # order of this block within the experiment - useful for the `backup' + # command, so we can dump the blocks in the right dependence order + execution_order = models.PositiveIntegerField(null=True, blank=True) + + objects = BlockManager() + + + class Meta: + unique_together = ('experiment', 'name') + + # setup ordering so that the dump order respects self dependencies + ordering = ['experiment_id', 'execution_order'] + + + def __str__(self): + return self.experiment.fullname() + ', ' + self.name + ' (%s)' % self.get_status_display() + + def natural_key(self): + return ( + self.name, + self.experiment.author.username, + self.experiment.toolchain.author.username, + self.experiment.toolchain.name, + self.experiment.toolchain.version, + self.experiment.name, + ) + + # Accessors for statistics + + def __return_first__(self, field, default=None): + return getattr(self.outputs.first(), field, default) + + def first_cache(self): + return self.outputs.first() + + def error_report(self): + return self.__return_first__('error_report') + + def stdout(self): + return self.__return_first__('stdout') + + def stderr(self): + return self.__return_first__('stderr') + + def speed_up_real(self): + return self.__return_first__('speed_up_real') + + def speed_up_maximal(self): + return self.__return_first__('speed_up_maximal') + + def linear_execution_time(self): + return self.__return_first__('linear_execution_time') + + def queuing_time(self): + return self.__return_first__('queuing_time') + + def cpu_time(self): + return self.__return_first__('cpu_time') + + def max_memory(self): + return self.__return_first__('max_memory') + + def data_read_size(self): + return self.__return_first__('data_read_size') + + def data_read_nb_blocks(self): + return self.__return_first__('data_read_nb_blocks') + + def data_read_time(self): + return self.__return_first__('data_read_time') + + def data_written_size(self): + return self.__return_first__('data_written_size') + + def data_written_nb_blocks(self): + return self.__return_first__('data_written_nb_blocks') + + def data_written_time(self): + return self.__return_first__('data_written_time') + + # Accessor for results + results = property(lambda self: self.__return_first__('results')) + + + def _schedule(self): + '''Schedules this block for execution at the backend + + To "schedule" means solely creating a :py:class:`..backend.models.Job` + pointing to this object. This method **should only be called by the + owning experiment**. It is not part of the Block's public API. + ''' + + # lock self - avoids concurrent update from scheduler/worker subsystem + self_ = Block.objects.select_for_update().get(pk=self.pk) + + # checks we have not, meanwhile, been cancelled + if self_.done(): + return + + # checks queue and environment + if self.queue is None: + raise RuntimeError("Block `%s' does not have a queue assigned " \ + "- this normally indicates the originally selected " \ + "queue was deleted since the experiment was first " \ + "configured. Re-configure this experiment and select a new " \ + "default or block-specific queue" % self.name) + + if self.environment is None: + raise RuntimeError("Block `%s' does not have an environment " \ + "assigned - this normally indicates the originally selected " \ + "environment was deleted since the experiment was first " \ + "configured. Re-configure this experiment and select a new " \ + "default or block-specific environment" % self.name) + + # search for other jobs with similar outputs that have no children yet + # do this carefully, as other experiments may be scheduled at the same + # time, invalidating our "parent" choice + parent = Job.objects.filter(block__outputs__in=self.outputs.all(), + child=None).first() + + if parent is not None: #(candidate only) try to lock it + while True: + parent = Job.objects.select_for_update().get(pk=parent.pk) + if parent.child_ is not None: #was taken meanwhile, retry + parent = parent.child + continue + job = Job(block=self, parent=parent) + break + else: + job = Job(block=self) + + job.save() + + # checks if the job is immediately runnable - if so, tries to + # make it runnable (check caches and other) + if self.is_runnable(): + self.job._make_runnable() + + + def done(self): + '''Says whether the block has finished or not''' + + return self.status not in (Block.NOT_CACHED, Block.PROCESSING) + + + def _cancel(self): + '''Cancels the execution of this block on the backend. + + This method should only be called from the experiment equivalent. It is + not part of the Block's public API. + ''' + + # lock self - avoids concurrent update from scheduler/worker subsystem + self_ = Block.objects.select_for_update().get(pk=self.pk) + + if self_.done(): return + + if hasattr(self, 'job'): + self.job._cancel() + else: + self.status = Block.CANCELLED + self.save() + self.experiment._update_state() + + + def is_runnable(self): + '''Checks if a block is runnable presently''' + + return all([k.status in (Block.CACHED, Block.SKIPPED) \ + for k in self.dependencies.all()]) and \ + (hasattr(self, 'job') and self.job.parent is None) + + + def _cascade_updates(self): + '''Cascade updates to blocks once I'm done. + ''' + + for b in self.dependents.all(): + if any([k.status in (Block.FAILED, Block.CANCELLED) \ + for k in b.dependencies.all()]): + b._cancel() + if b.is_runnable(): b.job._make_runnable() + + # Update eventual running siblings in case of a failure + if self.status == Block.FAILED: + for b in Block.objects.filter(experiment=self.experiment, + status=Block.PROCESSING): + b._cancel() + + + def _update_state(self, timings=None): + '''Updates self state as a result of backend running + + + Parameters: + + timings (dict, Optional): A dictionary containing key-value pairs + corresponding to: + + * queuing time (in seconds) + * sequential execution time (in seconds) + * real speed-up obtained + * maximum speed-up obtainable + + + This method is supposed to be called only by the underlying job + instance. It is not part of the Block's public API. + + ''' + + # lock self - avoids concurrent update from scheduler/worker subsystem + self_ = Block.objects.select_for_update().get(pk=self.pk) + + if self_.done(): return + + if self.start_date is None: + self.start_date = self.job.start_date + + if self.job.result: + + statistics = self.job.result.stats + + result_stdout = self.job.result.stdout + result_stderr = self.job.result.stderr + + if result_stdout == '\n': + result_stdout = '' + + if result_stderr == '\n': + result_stderr = '' + + info = dict( + cpu_time = statistics.cpu['user'] + statistics.cpu['system'], + max_memory = statistics.memory['rss'], + stdout = result_stdout, + stderr = result_stderr, + error_report = self.job.result.usrerr, + ) + + if 'volume' in statistics.data: + info['data_read_size'] = statistics.data['volume'].get('read', 0) + info['data_written_size'] = statistics.data['volume'].get('write', 0) + + if 'blocks' in statistics.data: + info['data_read_nb_blocks'] = statistics.data['blocks'].get('read', 0) + info['data_written_nb_blocks'] = statistics.data['blocks'].get('write', 0) + + if 'time' in statistics.data: + info['data_read_time'] = statistics.data['time'].get('read', 0) + info['data_written_time'] = statistics.data['time'].get('write', 0) + + + if timings: + info.update(dict( + queuing_time = timings['queuing'], + linear_execution_time = timings['linear_execution'], + speed_up_real = timings['speed_up_real'], + speed_up_maximal = timings['speed_up_maximal'], + )) + + self.outputs.update(**info) + + if self.job.status == Block.SKIPPED: + self.status = Block.CACHED + else: + self.status = self.job.status + + if self.job.done(): + self.end_date = self.job.end_date + r = self.job.result + self.job.delete() + if r: r.delete() + + # Loads Results from cache + if self.job.result and self.analyzer and self.status == Block.CACHED: + cache = self.first_cache() + data_source = beat.core.data.CachedDataSource() + data_source.setup(os.path.join(settings.CACHE_ROOT, + beat.core.hash.toPath(cache.hash)), settings.PREFIX) + output_data = data_source.next()[0] + if output_data is not None: + algorithm = beat.core.algorithm.Algorithm(settings.PREFIX, + self.algorithm.fullname()) + for field, value in output_data.as_dict().items(): + res, _ = Result.objects.get_or_create(name=field, + cache=cache) + res.primary = algorithm.results[field]['display'] + res.type = algorithm.results[field]["type"] + + if res.type in ['int32', 'float32', 'bool', 'string']: + res.data_value = str(value) + else: + res.data_value = simplejson.dumps(value, indent=4, + cls=NumpyJSONEncoder) + + res.save() + + data_source.close() + + self.save() + self._cascade_updates() + self.experiment._update_state() diff --git a/beat/web/experiments/models/block_input.py b/beat/web/experiments/models/block_input.py new file mode 100755 index 0000000000000000000000000000000000000000..958512c15928b6046457fd998a9315ddd36482b4 --- /dev/null +++ b/beat/web/experiments/models/block_input.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from django.db import models +from django.conf import settings + +from ...databases.models import DatabaseSetOutput + +from .block import Block + + +#---------------------------------------------------------- + + +class BlockInputManager(models.Manager): + + def get_by_natural_key(self, name, experiment_author, toolchain_author, + toolchain_name, toolchain_version, experiment_name, + cache_hash, database_hash): + block = Block.objects.get_by_natural_key(name, experiment_author, + toolchain_author, toolchain_name, toolchain_version, + experiment_name) + + if cache_hash: + return self.get(cache__hash=cache_hash, block=block) + else: + return self.get(database__hash=database_hash, block=block) + + +#---------------------------------------------------------- + + +class BlockInput(models.Model): + + block = models.ForeignKey(Block, related_name='inputs', null=True, + on_delete=models.CASCADE) + + # if the input cames from another block, then this one is set + cache = models.ForeignKey('CachedFile', related_name='inputs', null=True, + on_delete=models.CASCADE) + + # if the input cames from a dataset, then this one is set + database = models.ForeignKey(DatabaseSetOutput, related_name='blocks', + null=True, on_delete=models.CASCADE) + + channel = models.CharField(max_length=200, default='', blank=True, + help_text="Synchronization channel within the toolchain") + + objects = BlockInputManager() + + + def natural_key(self): + cache_hash = self.cache and self.cache.hash + database_hash = self.database and self.database.hash + return self.block.natural_key() + (cache_hash, database_hash) diff --git a/beat/web/experiments/models/cached_file.py b/beat/web/experiments/models/cached_file.py new file mode 100755 index 0000000000000000000000000000000000000000..7ea30546c3ed2181947b347b56aade6890d5f729 --- /dev/null +++ b/beat/web/experiments/models/cached_file.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from django.db import models +from django.conf import settings + +import beat.core.hash + +import os +import glob + +import logging +logger = logging.getLogger(__name__) + + +#---------------------------------------------------------- + + +class CachedFileManager(models.Manager): + + def get_by_natural_key(self, hash): + return self.get(hash=hash) + + +#---------------------------------------------------------- + + +class CachedFile(models.Model): + + blocks = models.ManyToManyField('Block', related_name='outputs', blank=True) + hash = models.CharField(max_length=64, unique=True) + + # the total amount of time this block took to run considering the + # wall-clock time. + linear_execution_time = models.FloatField(default=0.) + + # the real speed-up obtained by running this block using X slots + speed_up_real = models.FloatField(default=0.) + + # the maximum obtainable speed-up that could be achieved if all slots + # were running in parallel. Essentially linear_execution_time / + # maximum_slot_time + speed_up_maximal = models.FloatField(default=0.) + + # the time this block waited to be executed + queuing_time = models.FloatField(default=0.) + + stdout = models.TextField(null=True, blank=True) + stderr = models.TextField(null=True, blank=True) + error_report = models.TextField(null=True, blank=True) + + # other statistics of interest + cpu_time = models.FloatField(default=0.) + max_memory = models.BigIntegerField(default=0) + data_read_size = models.BigIntegerField(default=0) + data_read_nb_blocks = models.IntegerField(default=0) + data_read_time = models.FloatField(default=0.) + data_written_size = models.BigIntegerField(default=0) + data_written_nb_blocks = models.IntegerField(default=0) + data_written_time = models.FloatField(default=0.) + + objects = CachedFileManager() + + + def __str__(self): + return 'CachedFile(%s, %d blocks)' % (self.hash, self.blocks.count()) + + + def natural_key(self): + return (self.hash,) + + + def path(self): + '''Returns the full path prefix to the cached file on disk''' + + return beat.core.hash.toPath(self.hash, suffix='') + + + def absolute_path(self, cache=settings.CACHE_ROOT): + '''Returns the full path prefix to the cached file on disk''' + + return os.path.join(cache, self.path()) + + + def files(self, cache=settings.CACHE_ROOT): + '''Checks if any file belonging to this cache exist on disk''' + + return glob.glob(self.absolute_path(cache) + '*') + + + def exists(self, cache=settings.CACHE_ROOT): + '''Checks if any file belonging to this cache exist on disk''' + + return bool(self.files(cache)) + + + def index_checksums(self, cache=settings.CACHE_ROOT): + '''Checks if this cached file indexes checksum properly''' + + abs_path = self.absolute_path(cache) + index = sorted(glob.glob(abs_path + '*.index')) + chksum = sorted(glob.glob(abs_path + '*.index.checksum')) + + if len(index) != len(chksum): + logger.warn("Number of index files (%d) is different from " \ + "checksums (%d) for cache `%s'", len(index), len(chksum), + abs_path) + return False + + for i, c in zip(index, chksum): + with open(c, 'rt') as f: recorded = f.read().strip() + actual = beat.core.hash.hashFileContents(i) + if actual != recorded: + logger.warn("Checksum for index of cache `%s' does not " \ + "match for file `%s' (%s != %s)", abs_path, i, + actual, recorded) + return False + + return True diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models/experiment.py similarity index 58% rename from beat/web/experiments/models.py rename to beat/web/experiments/models/experiment.py index fc718d71a24e87340d9971446a1403beaceb4836..7d7d68d76e94f64d1c8cfa6fce0f38be09df57ac 100755 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models/experiment.py @@ -3,7 +3,7 @@ ############################################################################### # # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.web module of the BEAT platform. # @@ -36,35 +36,37 @@ from django.template.loader import render_to_string from django.contrib.sites.models import Site import beat.core.hash -import beat.core.data -import beat.core.algorithm import beat.core.experiment from beat.core.utils import NumpyJSONEncoder -from ..algorithms.models import Algorithm -from ..toolchains.models import Toolchain - -from ..common.models import Shareable -from ..common.models import ContributionManager -from ..common.models import get_contribution_declaration_filename -from ..common.models import get_contribution_description_filename -from ..common.models import get_description -from ..common.models import set_description -from ..common import storage - -from ..common.exceptions import ShareError -from ..common.texts import Messages -from ..common.storage import OverwriteStorage -from ..backend.models import Queue, Environment -from ..databases.models import DatabaseSet, DatabaseSetOutput -from ..import __version__ - +from ...algorithms.models import Algorithm +from ...toolchains.models import Toolchain + +from ...common.models import Shareable +from ...common.models import ContributionManager +from ...common.models import get_contribution_declaration_filename +from ...common.models import get_contribution_description_filename +from ...common.models import get_description +from ...common.models import set_description +from ...common import storage + +from ...common.exceptions import ShareError +from ...common.texts import Messages +from ...common.storage import OverwriteStorage +from ...backend.models import Queue +from ...backend.models import Environment +from ...databases.models import DatabaseSet +from ...databases.models import DatabaseSetOutput +from ...import __version__ + +from .block import Block +from .block_input import BlockInput +from .cached_file import CachedFile from datetime import datetime import os -import glob import simplejson import logging @@ -119,6 +121,9 @@ def validate_environments(experiment, user=None): return errors +#---------------------------------------------------------- + + def validate_experiment(experiment_info, toolchain_info, user=None): """Makes sure the experiment can be run""" @@ -139,12 +144,6 @@ class DeclarationStorage(OverwriteStorage): super(DeclarationStorage, self).__init__(*args, location=settings.EXPERIMENTS_ROOT, **kwargs) -# Keep for backward compatibility with initial migration -class ConfigurationStorage(OverwriteStorage): - - def __init__(self, *args, **kwargs): - super(ConfigurationStorage, self).__init__(*args, location=settings.EXPERIMENTS_ROOT, **kwargs) - #---------------------------------------------------------- @@ -208,13 +207,6 @@ class ExperimentManager(ContributionManager): #---------------------------------------------------------- -def get_contribution_configuration_filename(obj, path): - return obj.configuration_filename() - - -#---------------------------------------------------------- - - class Experiment(Shareable): #_____ Constants __________ @@ -371,6 +363,7 @@ class Experiment(Shareable): algorithm.share(public=opensource, users=users, teams=teams) + #_____ Overrides __________ def has_attestation(self): @@ -381,12 +374,14 @@ class Experiment(Shareable): else: return True + @classmethod def from_db(cls, db, field_names, values): instance = super(Experiment, cls).from_db(db, field_names, values) instance._loaded_status = values[field_names.index('status')] return instance + def save(self, *args, **kwargs): new_experiment = (self.id is None) @@ -470,12 +465,13 @@ class Experiment(Shareable): try: (username, name, version) = algorithm.split('/') algorithm_db = Algorithm.objects.get( - author__username=username, - name=name, - version=int(version), - ) + author__username=username, + name=name, + version=int(version), + ) except: - if new_experiment: self.delete() + if new_experiment: + self.delete() raise SyntaxError("The algorithm '%s' can't be found" % \ algorithm.fullname()) @@ -523,8 +519,7 @@ class Experiment(Shareable): # Loads the experiment execution description, creating the Block's, # BlockInput's and BlockOutput's as required. - for order_0, (block_name, description) in \ - enumerate(corexp.setup().items()): + for order_0, (block_name, description) in enumerate(corexp.setup().items()): # Checks that the Queue/Environment exists job_description = description['configuration'] @@ -565,20 +560,20 @@ class Experiment(Shareable): ) # Ties the block in - slots = job_description.get('nb_slots') b = Block.objects.filter(experiment=self, name=block_name).first() if b is None: b = Block(experiment=self, name=block_name, algorithm=algorithm) else: b.algorithm = algorithm + b.execution_order = order_0 + 1 - b.command=simplejson.dumps(job_description, indent=4) - b.status=Block.NOT_CACHED - b.analyzer=algorithm.analysis() - b.environment=env - b.queue=queue - # TEMPORARILY DISABLED: b.required_slots=job_description['nb_slots'] - b.channel=job_description['channel'] + b.command = simplejson.dumps(job_description, indent=4) + b.status = Block.NOT_CACHED + b.analyzer = algorithm.analysis() + b.environment = env + b.queue = queue + b.required_slots = job_description['nb_slots'] + b.channel = job_description['channel'] b.save() # from this point: requires block to have an assigned id @@ -586,7 +581,7 @@ class Experiment(Shareable): b.dependencies.add(*[self.blocks.get(name=k) \ for k in description['dependencies']]) - # reset inputs and outputs - creates if necessary only + # reset inputs - creates if necessary only b.inputs.clear() for v in job_description['inputs'].values(): if 'database' in v: #database input @@ -598,9 +593,10 @@ class Experiment(Shareable): BlockInput.objects.get_or_create(block=b, channel=v['channel'], cache=cache) + # reset outputs - creates if necessary only b.outputs.clear() - outputs = job_description.get('outputs', - {'': job_description.get('result')}) + outputs = job_description.get('outputs', {'': job_description.get('result')}) + for v in outputs.values(): cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) cache.blocks.add(b) @@ -611,21 +607,27 @@ class Experiment(Shareable): def is_busy(self): return self.status in [Experiment.PENDING, Experiment.SCHEDULED, Experiment.CANCELING] + def is_done(self): return self.status in [Experiment.DONE, Experiment.FAILED] + def is_running(self): return self.status == Experiment.RUNNING + def modifiable(self): return (self.reports.count() == 0) and not self.has_attestation() and super(Experiment, self).modifiable() + def deletable(self): return (self.reports.count() == 0) and not self.has_attestation() and super(Experiment, self).deletable() + def core(self): return validate_experiment(self.declaration, self.toolchain.declaration, self.author)[0] + def job_splits(self, status=None): from ..backend.models import JobSplit retval = JobSplit.objects.filter(job__block__in=self.blocks.all()) @@ -633,6 +635,7 @@ class Experiment(Shareable): retval = retval.filter(status=status) return retval + def get_absolute_url(self): return reverse( 'experiments:view', @@ -645,6 +648,7 @@ class Experiment(Shareable): ), ) + def get_api_share_url(self): return reverse( 'api_experiments:share', @@ -657,6 +661,7 @@ class Experiment(Shareable): ), ) + def get_api_update_url(self): return reverse( 'api_experiments:object', @@ -669,9 +674,11 @@ class Experiment(Shareable): ), ) + def get_admin_change_url(self): return reverse('admin:experiments_experiment_change', args=(self.id,)) + def completion(self): if self.start_date is None: return 0 @@ -686,6 +693,7 @@ class Experiment(Shareable): return int(100 * float(len(filter(lambda x: x.status == Block.CACHED, blocks))) / len(blocks)) + def all_needed_dataformats(self): result = [] @@ -740,9 +748,7 @@ class Experiment(Shareable): def set_declaration(self, value): if isinstance(value, dict): - value = simplejson.dumps(value, - indent=4, - cls=NumpyJSONEncoder) + value = simplejson.dumps(value, indent=4, cls=NumpyJSONEncoder) storage.set_file_content(self, 'declaration_file', self.declaration_filename(), value) @@ -829,9 +835,11 @@ class Experiment(Shareable): self_ = Experiment.objects.select_for_update().get(pk=self.pk) - if self_.status != Experiment.PENDING: return + if self_.status != Experiment.PENDING: + return - for b in self.blocks.all(): b._schedule() + for block in self.blocks.all(): + block._schedule() # notice that the previous call may decide all is done already # so, we must respect that before setting the SCHEDULED status @@ -872,581 +880,3 @@ class Experiment(Shareable): self.toolchain, name, self.get_declaration(), self.short_description, self.description) return xp - - -#---------------------------------------------------------- - - -class BlockManager(models.Manager): - - def get_by_natural_key(self, name, experiment_author, - toolchain_author, toolchain_name, - toolchain_version, experiment_name): - return self.get( - name=name, - experiment__author__username=experiment_author, - experiment__toolchain__author__username=toolchain_author, - experiment__toolchain__name=toolchain_name, - experiment__toolchain__version=toolchain_version, - experiment__name=experiment_name, - ) - - -class Block(models.Model): - - NOT_CACHED = 'N' - PROCESSING = 'P' - CACHED = 'C' - FAILED = 'F' - SKIPPED = 'S' - CANCELLED = 'L' - - STATUS = ( - (NOT_CACHED, 'Not cached'), - (PROCESSING, 'Processing'), - (CACHED, 'Cached'), - (FAILED, 'Failed'), - (SKIPPED, 'Skipped'), - (CANCELLED, 'Cancelled'), - ) - - experiment = models.ForeignKey(Experiment, related_name='blocks', - on_delete=models.CASCADE) - name = models.CharField(max_length=200) - command = models.TextField(null=True, blank=True) - status = models.CharField(max_length=1, choices=STATUS, default=NOT_CACHED) - analyzer = models.BooleanField(default=False) - algorithm = models.ForeignKey(Algorithm, related_name='blocks', - on_delete=models.CASCADE) - creation_date = models.DateTimeField(null=True, blank=True, - auto_now_add=True) - start_date = models.DateTimeField(null=True, blank=True) - end_date = models.DateTimeField(null=True, blank=True) - environment = models.ForeignKey(Environment, related_name='blocks', - null=True, on_delete=models.SET_NULL) - queue = models.ForeignKey(Queue, related_name='blocks', null=True, - on_delete=models.SET_NULL) - - required_slots = models.PositiveIntegerField(default=1) - channel = models.CharField(max_length=200, default='', blank=True, - help_text="Synchronization channel within the toolchain") - - # relationship to blocks to which this block depends on - dependencies = models.ManyToManyField('self', - related_name='dependents', - blank=True, - symmetrical=False, - ) - - # order of this block within the experiment - useful for the `backup' - # command, so we can dump the blocks in the right dependence order - execution_order = models.PositiveIntegerField(null=True, blank=True) - - objects = BlockManager() - - - class Meta: - unique_together = ('experiment', 'name') - - # setup ordering so that the dump order respects self dependencies - ordering = ['experiment_id', 'execution_order'] - - - def __str__(self): - return self.experiment.fullname() + ', ' + self.name + ' (%s)' % self.get_status_display() - - def natural_key(self): - return ( - self.name, - self.experiment.author.username, - self.experiment.toolchain.author.username, - self.experiment.toolchain.name, - self.experiment.toolchain.version, - self.experiment.name, - ) - - # Accessors for statistics - - def __return_first__(self, field, default=None): - return getattr(self.outputs.first(), field, default) - - def first_cache(self): - return self.outputs.first() - - def error_report(self): - return self.__return_first__('error_report') - - def stdout(self): - return self.__return_first__('stdout') - - def stderr(self): - return self.__return_first__('stderr') - - def speed_up_real(self): - return self.__return_first__('speed_up_real') - - def speed_up_maximal(self): - return self.__return_first__('speed_up_maximal') - - def linear_execution_time(self): - return self.__return_first__('linear_execution_time') - - def queuing_time(self): - return self.__return_first__('queuing_time') - - def cpu_time(self): - return self.__return_first__('cpu_time') - - def max_memory(self): - return self.__return_first__('max_memory') - - def data_read_size(self): - return self.__return_first__('data_read_size') - - def data_read_nb_blocks(self): - return self.__return_first__('data_read_nb_blocks') - - def data_read_time(self): - return self.__return_first__('data_read_time') - - def data_written_size(self): - return self.__return_first__('data_written_size') - - def data_written_nb_blocks(self): - return self.__return_first__('data_written_nb_blocks') - - def data_written_time(self): - return self.__return_first__('data_written_time') - - # Accessor for results - results = property(lambda self: self.__return_first__('results')) - - - def _schedule(self): - '''Schedules this block for execution at the backend - - To "schedule" means solely creating a :py:class:`..backend.models.Job` - pointing to this object. This method **should only be called by the - owning experiment**. It is not part of the Block's public API. - ''' - - # lock self - avoids concurrent update from scheduler/worker subsystem - self_ = Block.objects.select_for_update().get(pk=self.pk) - - # checks we have not, meanwhile, been cancelled - if self_.done(): return - - # checks queue and environment - if self.queue is None: - raise RuntimeError("Block `%s' does not have a queue assigned " \ - "- this normally indicates the originally selected " \ - "queue was deleted since the experiment was first " \ - "configured. Re-configure this experiment and select a new " \ - "default or block-specific queue" % self.name) - - if self.environment is None: - raise RuntimeError("Block `%s' does not have an environment " \ - "assigned - this normally indicates the originally selected " \ - "environment was deleted since the experiment was first " \ - "configured. Re-configure this experiment and select a new " \ - "default or block-specific environment" % self.name) - - from ..backend.models import Job - - # search for other jobs with similar outputs that have no children yet - # do this carefully, as other experiments may be scheduled at the same - # time, invalidating our "parent" choice - parent = Job.objects.filter(block__outputs__in=self.outputs.all(), - child=None).first() - if parent is not None: #(candidate only) try to lock it - while True: - parent = Job.objects.select_for_update().get(pk=parent.pk) - if parent.child_ is not None: #was taken meanwhile, retry - parent = parent.child - continue - Job(block=self, parent=parent).save() - break - else: - Job(block=self).save() - - # checks if the job is immediately runnable - if so, tries to - # make it runnable (check caches and other) - if self.is_runnable(): self.job._make_runnable() - - - def done(self): - '''Says whether the block has finished or not''' - - return self.status not in (Block.NOT_CACHED, Block.PROCESSING) - - - def _cancel(self): - '''Cancels the execution of this block on the backend. - - This method should only be called from the experiment equivalent. It is - not part of the Block's public API. - ''' - - # lock self - avoids concurrent update from scheduler/worker subsystem - self_ = Block.objects.select_for_update().get(pk=self.pk) - - if self_.done(): return - - if hasattr(self, 'job'): - self.job._cancel() - else: - self.status = Block.CANCELLED - self.save() - self.experiment._update_state() - - - def is_runnable(self): - '''Checks if a block is runnable presently''' - - return all([k.status in (Block.CACHED, Block.SKIPPED) \ - for k in self.dependencies.all()]) and \ - (hasattr(self, 'job') and self.job.parent is None) - - - def _cascade_updates(self): - '''Cascade updates to blocks once I'm done. - ''' - - for b in self.dependents.all(): - if any([k.status in (Block.FAILED, Block.CANCELLED) \ - for k in b.dependencies.all()]): - b._cancel() - if b.is_runnable(): b.job._make_runnable() - - # Update eventual running siblings in case of a failure - if self.status == Block.FAILED: - for b in Block.objects.filter(experiment=self.experiment, - status=Block.PROCESSING): - b._cancel() - - - def _update_state(self, timings=None): - '''Updates self state as a result of backend running - - - Parameters: - - timings (dict, Optional): A dictionary containing key-value pairs - corresponding to: - - * queuing time (in seconds) - * sequential execution time (in seconds) - * real speed-up obtained - * maximum speed-up obtainable - - - This method is supposed to be called only by the underlying job - instance. It is not part of the Block's public API. - - ''' - - # lock self - avoids concurrent update from scheduler/worker subsystem - self_ = Block.objects.select_for_update().get(pk=self.pk) - - if self_.done(): return - - if self.start_date is None: - self.start_date = self.job.start_date - - if self.job.result: - - statistics = self.job.result.stats - - result_stdout = self.job.result.stdout - result_stderr = self.job.result.stderr - - if result_stdout == '\n': - result_stdout = '' - - if result_stderr == '\n': - result_stderr = '' - - info = dict( - cpu_time = statistics.cpu['user'] + statistics.cpu['system'], - max_memory = statistics.memory['rss'], - stdout = result_stdout, - stderr = result_stderr, - error_report = self.job.result.usrerr, - ) - - if 'volume' in statistics.data: - info['data_read_size'] = statistics.data['volume'].get('read', 0) - info['data_written_size'] = statistics.data['volume'].get('write', 0) - - if 'blocks' in statistics.data: - info['data_read_nb_blocks'] = statistics.data['blocks'].get('read', 0) - info['data_written_nb_blocks'] = statistics.data['blocks'].get('write', 0) - - if 'time' in statistics.data: - info['data_read_time'] = statistics.data['time'].get('read', 0) - info['data_written_time'] = statistics.data['time'].get('write', 0) - - - if timings: - info.update(dict( - queuing_time = timings['queuing'], - linear_execution_time = timings['linear_execution'], - speed_up_real = timings['speed_up_real'], - speed_up_maximal = timings['speed_up_maximal'], - )) - - self.outputs.update(**info) - - if self.job.status == Block.SKIPPED: - self.status = Block.CACHED - else: - self.status = self.job.status - - if self.job.done(): - self.end_date = self.job.end_date - r = self.job.result - self.job.delete() - if r: r.delete() - - # Loads Results from cache - if self.job.result and self.analyzer and self.status == Block.CACHED: - cache = self.first_cache() - data_source = beat.core.data.CachedDataSource() - data_source.setup(os.path.join(settings.CACHE_ROOT, - beat.core.hash.toPath(cache.hash)), settings.PREFIX) - output_data = data_source.next()[0] - if output_data is not None: - algorithm = beat.core.algorithm.Algorithm(settings.PREFIX, - self.algorithm.fullname()) - for field, value in output_data.as_dict().items(): - res, _ = Result.objects.get_or_create(name=field, - cache=cache) - res.primary = algorithm.results[field]['display'] - res.type = algorithm.results[field]["type"] - - if res.type in ['int32', 'float32', 'bool', 'string']: - res.data_value = str(value) - else: - res.data_value = simplejson.dumps(value, indent=4, - cls=NumpyJSONEncoder) - - res.save() - - data_source.close() - - self.save() - self._cascade_updates() - self.experiment._update_state() - - -#---------------------------------------------------------- - - -class CachedFileManager(models.Manager): - - def get_by_natural_key(self, hash): - return self.get(hash=hash) - - -class CachedFile(models.Model): - - blocks = models.ManyToManyField(Block, related_name='outputs', blank=True) - hash = models.CharField(max_length=64, unique=True) - - # the total amount of time this block took to run considering the - # wall-clock time. - linear_execution_time = models.FloatField(default=0.) - - # the real speed-up obtained by running this block using X slots - speed_up_real = models.FloatField(default=0.) - - # the maximum obtainable speed-up that could be achieved if all slots - # were running in parallel. Essentially linear_execution_time / - # maximum_slot_time - speed_up_maximal = models.FloatField(default=0.) - - # the time this block waited to be executed - queuing_time = models.FloatField(default=0.) - - stdout = models.TextField(null=True, blank=True) - stderr = models.TextField(null=True, blank=True) - error_report = models.TextField(null=True, blank=True) - - # other statistics of interest - cpu_time = models.FloatField(default=0.) - max_memory = models.BigIntegerField(default=0) - data_read_size = models.BigIntegerField(default=0) - data_read_nb_blocks = models.IntegerField(default=0) - data_read_time = models.FloatField(default=0.) - data_written_size = models.BigIntegerField(default=0) - data_written_nb_blocks = models.IntegerField(default=0) - data_written_time = models.FloatField(default=0.) - - objects = CachedFileManager() - - - def __str__(self): - return 'CachedFile(%s, %d blocks)' % (self.hash, self.blocks.count()) - - - def natural_key(self): - return (self.hash,) - - - def path(self): - '''Returns the full path prefix to the cached file on disk''' - - return beat.core.hash.toPath(self.hash, suffix='') - - - def absolute_path(self, cache=settings.CACHE_ROOT): - '''Returns the full path prefix to the cached file on disk''' - - return os.path.join(cache, self.path()) - - - def files(self, cache=settings.CACHE_ROOT): - '''Checks if any file belonging to this cache exist on disk''' - - return glob.glob(self.absolute_path(cache) + '*') - - - def exists(self, cache=settings.CACHE_ROOT): - '''Checks if any file belonging to this cache exist on disk''' - - return bool(self.files(cache)) - - - def index_checksums(self, cache=settings.CACHE_ROOT): - '''Checks if this cached file indexes checksum properly''' - - abs_path = self.absolute_path(cache) - index = sorted(glob.glob(abs_path + '*.index')) - chksum = sorted(glob.glob(abs_path + '*.index.checksum')) - - if len(index) != len(chksum): - logger.warn("Number of index files (%d) is different from " \ - "checksums (%d) for cache `%s'", len(index), len(chksum), - abs_path) - return False - - for i, c in zip(index, chksum): - with open(c, 'rt') as f: recorded = f.read().strip() - actual = beat.core.hash.hashFileContents(i) - if actual != recorded: - logger.warn("Checksum for index of cache `%s' does not " \ - "match for file `%s' (%s != %s)", abs_path, i, - actual, recorded) - return False - - return True - - -#---------------------------------------------------------- - - -class BlockInputManager(models.Manager): - - def get_by_natural_key(self, name, experiment_author, - toolchain_author, toolchain_name, - toolchain_version, experiment_name, cache_hash, database_hash): - block = Block.objects.get_by_natural_key(name, experiment_author, - toolchain_author, toolchain_name, toolchain_version, - experiment_name) - if cache_hash: - return self.get(cache__hash=cache_hash, block=block) - else: - return self.get(database__hash=database_hash, block=block) - - -class BlockInput(models.Model): - - block = models.ForeignKey(Block, related_name='inputs', null=True, - on_delete=models.CASCADE) - - # if the input cames from another block, then this one is set - cache = models.ForeignKey(CachedFile, related_name='inputs', null=True, - on_delete=models.CASCADE) - - # if the input cames from a dataset, then this one is set - database = models.ForeignKey(DatabaseSetOutput, related_name='blocks', - null=True, on_delete=models.CASCADE) - - channel = models.CharField(max_length=200, default='', blank=True, - help_text="Synchronization channel within the toolchain") - - objects = BlockInputManager() - - def natural_key(self): - cache_hash = self.cache and self.cache.hash - database_hash = self.database and self.database.hash - return self.block.natural_key() + (cache_hash, database_hash) - - -#---------------------------------------------------------- - - -class ResultManager(models.Manager): - - def get_by_natural_key(self, name, hash): - return self.get( - name=name, - cache__hash=hash, - ) - - -class Result(models.Model): - - SIMPLE_TYPE_NAMES = ('int32', 'float32', 'bool', 'string') - - cache = models.ForeignKey(CachedFile, related_name='results', null=True, - on_delete=models.CASCADE) - name = models.CharField(max_length=200) - type = models.CharField(max_length=200) - primary = models.BooleanField(default=False) - data_value = models.TextField(null=True, blank=True) - - objects = ResultManager() - - - #_____ Meta parameters __________ - - class Meta: - unique_together = ('cache', 'name') - - - def __str__(self): - return '%s - %s' % (self.cache, self.name) - - - def natural_key(self): - return ( - self.name, - self.cache.hash, - ) - - - def value(self): - if self.data_value in ['+inf', '-inf', 'NaN']: - return self.data_value - elif self.type == 'int32': - return int(self.data_value) - elif self.type == 'float32': - return float(self.data_value) - elif self.type == 'bool': - return bool(self.data_value) - elif self.type == 'string': - return str(self.data_value) - elif valid_result_type(self.type): - return simplejson.loads(self.data_value) - - return None - - - def is_chart(self): - - return self.type not in Result.SIMPLE_TYPE_NAMES - - -def valid_result_type(value_type): - return value_type.startswith('%s/' % settings.PLOT_ACCOUNT) or (value_type in Result.SIMPLE_TYPE_NAMES) diff --git a/beat/web/experiments/models/result.py b/beat/web/experiments/models/result.py new file mode 100755 index 0000000000000000000000000000000000000000..f7b21474323cf8baec5e223e411ce0506e717018 --- /dev/null +++ b/beat/web/experiments/models/result.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from django.db import models +from django.conf import settings + + +#---------------------------------------------------------- + + +class ResultManager(models.Manager): + + def get_by_natural_key(self, name, hash): + return self.get(name=name, cache__hash=hash) + + +#---------------------------------------------------------- + + +class Result(models.Model): + + SIMPLE_TYPE_NAMES = ('int32', 'float32', 'bool', 'string') + + cache = models.ForeignKey('CachedFile', related_name='results', null=True, + on_delete=models.CASCADE) + name = models.CharField(max_length=200) + type = models.CharField(max_length=200) + primary = models.BooleanField(default=False) + data_value = models.TextField(null=True, blank=True) + + + objects = ResultManager() + + + #_____ Meta parameters __________ + + class Meta: + unique_together = ('cache', 'name') + + + def __str__(self): + return '%s - %s' % (self.cache, self.name) + + + def natural_key(self): + return ( + self.name, + self.cache.hash, + ) + + + def value(self): + if self.data_value in ['+inf', '-inf', 'NaN']: + return self.data_value + elif self.type == 'int32': + return int(self.data_value) + elif self.type == 'float32': + return float(self.data_value) + elif self.type == 'bool': + return bool(self.data_value) + elif self.type == 'string': + return str(self.data_value) + elif self.type.startswith('%s/' % settings.PLOT_ACCOUNT) or (self.type in Result.SIMPLE_TYPE_NAMES): + return simplejson.loads(self.data_value) + + return None + + + def is_chart(self): + return self.type not in Result.SIMPLE_TYPE_NAMES