From b3c6d35b14762c5926b0324eebf8f54b5ab38c82 Mon Sep 17 00:00:00 2001 From: Philip ABBET <philip.abbet@idiap.ch> Date: Thu, 28 Sep 2017 17:26:33 +0200 Subject: [PATCH] [backend] Refactoring: Split the models into separate, more manageable files --- beat/web/backend/helpers.py | 62 +- beat/web/backend/models/__init__.py | 40 ++ beat/web/backend/models/environment.py | 160 +++++ beat/web/backend/{models.py => models/job.py} | 662 +----------------- beat/web/backend/models/queue.py | 170 +++++ beat/web/backend/models/result.py | 79 +++ beat/web/backend/models/slot.py | 78 +++ beat/web/backend/models/worker.py | 376 ++++++++++ beat/web/backend/schedule.py | 5 + beat/web/backend/tests.py | 118 +++- .../migrations/0009_block_status.py | 20 + beat/web/experiments/models/block.py | 6 +- beat/web/experiments/models/experiment.py | 5 +- 13 files changed, 1078 insertions(+), 703 deletions(-) create mode 100755 beat/web/backend/models/__init__.py create mode 100755 beat/web/backend/models/environment.py rename beat/web/backend/{models.py => models/job.py} (59%) create mode 100755 beat/web/backend/models/queue.py create mode 100755 beat/web/backend/models/result.py create mode 100755 beat/web/backend/models/slot.py create mode 100755 beat/web/backend/models/worker.py mode change 100644 => 100755 beat/web/backend/schedule.py create mode 100644 beat/web/experiments/migrations/0009_block_status.py diff --git a/beat/web/backend/helpers.py b/beat/web/backend/helpers.py index d2e01cbba..118c4d3d4 100755 --- a/beat/web/backend/helpers.py +++ b/beat/web/backend/helpers.py @@ -27,8 +27,11 @@ from django.db import transaction +from datetime import datetime + from ..experiments.models import Experiment from ..experiments.models import Block +from ..experiments.models import CachedFile from .models import Job @@ -69,32 +72,53 @@ def schedule_experiment(experiment): # Process all the blocks of the experiment + already_done = True + for block in experiment.blocks.all(): # Lock the block, so nobody else can modify it block = Block.objects.select_for_update().get(pk=block.pk) - # search for other jobs with similar outputs that have no children yet - # do this carefully, as other experiments may be scheduled at the same - # time, invalidating our "parent" choice - parent = Job.objects.filter(block__outputs__in=block.outputs.all(), - child=None).first() - - if parent is not None: #(candidate only) try to lock it - while True: - parent = Job.objects.select_for_update().get(pk=parent.pk) - if parent.child_ is not None: #was taken meanwhile, retry - parent = parent.child - continue - job = Job(block=block, parent=parent) - break - else: - job = Job(block=block) + # Check if the block outputs aren't already in the cache + must_skip = all([cached_file.status == CachedFile.CACHED + for cached_file in block.outputs.all()]) - job.save() + if must_skip: + block.status = Block.DONE + block.start_date = datetime.now() + block.end_date = block.start_date + block.save() + else: + # search for other jobs with similar outputs that have no children yet + # do this carefully, as other experiments may be scheduled at the same + # time, invalidating our "parent" choice + parent = Job.objects.filter(block__outputs__in=block.outputs.all(), + child=None).first() + + if parent is not None: #(candidate only) try to lock it + while True: + parent = Job.objects.select_for_update().get(pk=parent.pk) + if parent.child_ is not None: #was taken meanwhile, retry + parent = parent.child + continue + job = Job(block=block, parent=parent) + break + else: + job = Job(block=block) + + job.save() + + already_done = False + + + # Mark the experiment as scheduled (or done) + if already_done: + experiment.start_date = datetime.now() + experiment.end_date = experiment.start_date + experiment.status = Experiment.DONE + else: + experiment.status = Experiment.SCHEDULED - # Mark the experiment as scheduled - experiment.status = Experiment.SCHEDULED experiment.save() diff --git a/beat/web/backend/models/__init__.py b/beat/web/backend/models/__init__.py new file mode 100755 index 000000000..9672ab91a --- /dev/null +++ b/beat/web/backend/models/__init__.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from .environment import EnvironmentManager +from .environment import Environment +from .environment import EnvironmentLanguage +from .job import Job +from .job import JobSplit +from .queue import QueueManager +from .queue import Queue +from .result import Result +from .slot import SlotManager +from .slot import Slot +from .worker import WorkerManager +from .worker import Worker diff --git a/beat/web/backend/models/environment.py b/beat/web/backend/models/environment.py new file mode 100755 index 000000000..7233b7920 --- /dev/null +++ b/beat/web/backend/models/environment.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from django.db import models +from django.core.urlresolvers import reverse + +from ...code.models import Code +from ...common.models import Shareable +from ...common.models import ShareableManager +from ...common.texts import Messages + + +#---------------------------------------------------------- + + +class EnvironmentManager(ShareableManager): + + def get_by_natural_key(self, name, version): + return self.get(name=name, version=version) + + def get_by_fullname(self, fullname): + name, version = fullname.rsplit(' ', 1) + return self.get_by_natural_key(name, version[1:-1]) + + +#---------------------------------------------------------- + + +class Environment(Shareable): + """Defines a software environment to run algorithms""" + + name = models.CharField( + max_length=200, + help_text=Messages['name'], + ) + + version = models.CharField( + max_length=20, + help_text='Free-style version for this environment (normally read from the Worker/Scheduler available environments)', + ) + + short_description = models.CharField( + max_length=100, + default='', + blank=True, + help_text=Messages['short_description'], + ) + + description = models.TextField( + default='', + blank=True, + help_text=Messages['description'], + ) + + creation_date = models.DateTimeField( + 'Creation date', + auto_now_add = True, + ) + + active = models.BooleanField( + default=True, + help_text='If this environment can be used in experiments', + ) + + previous_version = models.ForeignKey( + 'self', + related_name='next_versions', + null=True, + blank=True, + on_delete=models.SET_NULL, + ) + + + objects = EnvironmentManager() + + + #_____ Meta parameters __________ + + class Meta: + unique_together = ('name', 'version') + + + #_____ Overrides __________ + + def __str__(self): + return self.fullname() + + + def natural_key(self): + return (self.name, self.version) + + + #_____ Utilities __________ + + def fullname(self): + return '%s (%s)' % (self.name, self.version) + + + def get_absolute_url(self): + return reverse( + 'backend:view-environment', + args=(self.name, self.version,), + ) + + + def get_admin_change_url(self): + return reverse('admin:backend_environment_change', args=(self.id,)) + + + def queues_for(self, user): + """Returns all queues associated to this environment for which the user + has the 'can_access' permission""" + + return [q for q in self.queues.all() if user.has_perm('backend.can_access', q)] + + + def as_dict(self): + '''Returns a representation as a dictionary''' + + return dict( + name=self.name, + version=self.version, + short_description=self.short_description, + description=self.description, + ) + + +#---------------------------------------------------------- + + +class EnvironmentLanguage(models.Model): + + environment = models.ForeignKey(Environment, related_name='languages') + + language = models.CharField(max_length=1, choices=Code.CODE_LANGUAGE, + default=Code.PYTHON) diff --git a/beat/web/backend/models.py b/beat/web/backend/models/job.py similarity index 59% rename from beat/web/backend/models.py rename to beat/web/backend/models/job.py index 053b42af6..cf699cd69 100755 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models/job.py @@ -3,7 +3,7 @@ ############################################################################### # # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.web module of the BEAT platform. # @@ -27,636 +27,27 @@ import os import time -import signal import datetime -import operator import traceback -import subprocess import logging logger = logging.getLogger(__name__) -import psutil import simplejson from django.db import utils from django.db import models from django.db import transaction from django.conf import settings -from django.core.urlresolvers import reverse -from django.utils.translation import ugettext_lazy as _ -from django.contrib.auth.models import Group - -from guardian.shortcuts import get_perms import beat.core.stats import beat.core.data import beat.core.execution from beat.core.dock import Host -from ..code.models import Code -from ..common.models import Shareable, ShareableManager -from ..common.texts import Messages -from ..statistics.utils import updateStatistics - - -#---------------------------------------------------------- - - -class EnvironmentManager(ShareableManager): - - def get_by_natural_key(self, name, version): - return self.get(name=name, version=version) - - def get_by_fullname(self, fullname): - name, version = fullname.rsplit(' ', 1) - return self.get_by_natural_key(name, version[1:-1]) - - -class Environment(Shareable): - """Defines a software environment to run algorithms""" - - name = models.CharField( - max_length=200, - help_text=Messages['name'], - ) - - version = models.CharField( - max_length=20, - help_text='Free-style version for this environment (normally read from the Worker/Scheduler available environments)', - ) - - short_description = models.CharField( - max_length=100, - default='', - blank=True, - help_text=Messages['short_description'], - ) - - description = models.TextField( - default='', - blank=True, - help_text=Messages['description'], - ) - - creation_date = models.DateTimeField( - 'Creation date', - auto_now_add = True, - ) - - active = models.BooleanField( - default=True, - help_text='If this environment can be used in experiments', - ) - - previous_version = models.ForeignKey('self', - related_name='next_versions', - null=True, - blank=True, - on_delete=models.SET_NULL, - ) - - objects = EnvironmentManager() - - #_____ Meta parameters __________ - - class Meta: - unique_together = ('name', 'version') - - #_____ Overrides __________ - - def __str__(self): - return self.fullname() - - - def natural_key(self): - return (self.name, self.version) - - - #_____ Utilities __________ - - def fullname(self): - return '%s (%s)' % (self.name, self.version) - - def get_absolute_url(self): +from ...statistics.utils import updateStatistics - return reverse( - 'backend:view-environment', - args=(self.name, self.version,), - ) - - def get_admin_change_url(self): - return reverse('admin:backend_environment_change', args=(self.id,)) - - def queues_for(self, user): - """Returns all queues associated to this environment for which the user - has the 'can_access' permission""" - - return [q for q in self.queues.all() if user.has_perm('backend.can_access', q)] - - - def as_dict(self): - '''Returns a representation as a dictionary''' - - return dict( - name=self.name, - version=self.version, - short_description=self.short_description, - description=self.description, - ) - - -class EnvironmentLanguage(models.Model): - - environment = models.ForeignKey(Environment, - related_name='languages' - ) - - language = models.CharField(max_length=1, choices=Code.CODE_LANGUAGE, - default=Code.PYTHON) - - -#---------------------------------------------------------- - - -def _cleanup_zombies(): - '''Cleans-up eventual zombie subprocesses launched by the worker''' - - for child in psutil.Process().children(recursive=True): - try: - if child.status() == psutil.STATUS_ZOMBIE: - child.wait() - except psutil.NoSuchProcess: - # process is gone meanwhile, which is ok - pass - - -class WorkerManager(models.Manager): - - def get_by_natural_key(self, name): - return self.get(name=name) - - -class Worker(models.Model): - - name = models.CharField( - max_length=100, - help_text=Messages['name'], - unique=True, - ) - - active = models.BooleanField( - help_text=u'If this worker is usable presently', - default=False, - ) - - update = models.BooleanField( - help_text=u'If this worker state must be updated at the next cycle', - default=False, - ) - - updated = models.DateTimeField(null=True, auto_now=True) - - cores = models.PositiveIntegerField() - - memory = models.PositiveIntegerField(default=0, help_text='In megabytes') - - used_cores = models.PositiveIntegerField(default=0, help_text='In %') - - used_memory = models.PositiveIntegerField(default=0, help_text='In %') - - info = models.TextField(null=True, blank=True, - help_text='Informative message from the worker') - - objects = WorkerManager() - - - #_____ Overrides __________ - - - def __str__(self): - retval = '%s (%d cores, %d Mb)' % (self.name, self.cores, self.memory) - if not self.active: retval += ' [INACTIVE]' - return retval - - - def natural_key(self): - return (self.name,) - - - def get_admin_change_url(self): - return reverse('admin:backend_worker_change', args=(self.id,)) - - - def load(self): - '''Calculates the number of cores in use or to be used in the future''' - return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) - - - def current_load(self): - '''Calculates the number of cores being used currently''' - return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)]) - - - def available_cores(self): - '''Calculates the number of available cores considering current load''' - - return max(self.cores - self.load(), 0) - - - def deactivate(self, reason): - '''Deactivates the current worker for a reason, that is registered''' - - self.info = reason - self.active = False - - - def activate(self, reason=None): - '''Reactivates the worker, deletes any associated information''' - - self.info = reason - self.active = True - - - def as_dict(self): - '''Returns a dictionary-like representation''' - - return dict(cores=self.cores, memory=self.memory) - - - def check_environments(self, environments): - '''Checks that this worker has access to all environments it needs - - This method will check if the found set of environments (in the - dictionary ``environments``) contains, at least, one environment for - each environment object this worker is supposed to be able to execute - user algorithms for. - - - Parameters: - - environments (dict): A dictionary of environments found by using - :py:func:`utils.find_environments` in which, keys represent the - natural keys of Django database environments. - - - Returns: - - list: A list of missing environments this worker can be assigned to - work with, but where not found - - list: A list of unused environments this worker cannot be assigned to - work with, but where nevertheless found - - ''' - - slots = Slot.objects.filter(worker=self) - queues = Queue.objects.filter(slots__in=slots) - wishlist = Environment.objects.filter(queues__in=queues, active=True) - wishlist = wishlist.order_by('id').distinct() - - required = [k.fullname() for k in wishlist] - missing = [k for k in required if k not in environments] - unused = [k for k in environments if k not in required] - - return missing, unused - - - def update_state(self): - '''Updates state on the database based on current machine readings''' - - # check I have at least all cores and memory I'm supposed to have - cores = psutil.cpu_count() - ram = psutil.virtual_memory().total/(1024*1024) - self.info = '' - - if cores < self.cores: - logger.warn("Worker `%s' only has %d cores which is less then " \ - "the value declared on the database - it's not a problem, " \ - "but note this self may get overloaded", self, cores) - self.info += 'only %d cores;' % cores - - if ram < self.memory: - logger.warn("Worker `%s' only has %d Mb of RAM which is less " \ - "then the value declared on the database - it's not a " \ - "problem, but note this self may get overloaded", self, - ram) - self.info += 'only %d Mb of RAM;' % ram - - with transaction.atomic(): - self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock - - # update process and memory usage - self.used_cores = int(psutil.cpu_percent()) - self.used_memory = int(psutil.virtual_memory().percent) - - # save current self state - self.active = True - self.update = False - self.save() - - - def terminate(self): - '''Cleanly terminates a particular worker at the database - - .. note:: - - This method does not destroy running or assigned processes that may - be running or assigned to this worker. This is implemented in this - way to allow for a clean replacement of the worker program w/o an - interruption of the backend service. - - ''' - - # disables worker, so no more splits can be assigned to it - with transaction.atomic(): - self_ = Worker.objects.select_for_update().get(pk=self.pk) - self_.active = False - self_.used_cores = 0 - self_.used_memory = 0 - self_.info = 'Worker deactivated by system administrator' - self_.save() - - # cancel job splits which should be cancelled anyways - for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, - end_date__isnull=True, process_id__isnull=False): - if psutil.pid_exists(j.process_id): - os.kill(j.process_id, signal.SIGTERM) - - # cleans-up zombie processes that may linger - _cleanup_zombies() - - - def shutdown(self): - '''Removes all running/assigned jobs from the queue, shuts down - - This method should be used with care as it may potentially cancel all - assigned splits for the current worker. - - ''' - - self.terminate() - - message = 'Cancelled on forced worker shutdown (maintenance)' \ - ' - you may retry submitting your experiment shortly' - - # cancel job splits which were not yet started - for j in JobSplit.objects.filter(worker=self, status=Job.QUEUED, - start_date__isnull=True, process_id__isnull=True): - j.end(Result(status=1, usrerr=message)) - - # cancel job splits which are running - for j in JobSplit.objects.filter(worker=self, status=Job.PROCESSING, - end_date__isnull=True, process_id__isnull=False): - j._cancel() - - - - def work(self, environments, process): - '''Launches user code on isolated processes - - This function is supposed to be called asynchronously, by a - scheduled agent, every few seconds. It examines job splits assigned - to the current host and launches an individual process to handle - these splits. The process is started locally and the process ID - stored with the split. - - Job split cancelling is executed by setting the split state as - ``CANCEL`` and waiting for this function to handle it. - - - Parameters: - - environments (dict): A dictionary containing installed - environments, their description and execute-file paths. - - process (str): The path to the ``process.py`` program to use for - running the user code on isolated processes. - - ''' - - # refresh state from database and update state if required - self.refresh_from_db() - if self.update: self.update_state() - - # cancel job splits by killing associated processes - for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, - end_date__isnull=True): - if j.process_id is not None and psutil.pid_exists(j.process_id): - os.kill(j.process_id, signal.SIGTERM) - else: # process went away without any apparent reason - with transaction.atomic(): - message = "Split %d/%d running at worker `%s' for " \ - "block `%s' of experiment `%s' finished before " \ - "even starting. Force-cancelling job split at " \ - "database..." % (j.split_index+1, - j.job.block.required_slots, - self, - j.job.block.name, - j.job.block.experiment.fullname(), - ) - logger.error(message) - j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR)) - - # cmdline base argument - cmdline = [process] - if settings.DEBUG: - cmdline += ['-vv'] - else: - cmdline += ['-v'] - - # start newly assigned job splits - with transaction.atomic(): - splits = JobSplit.objects.select_for_update().filter(worker=self, - status=Job.QUEUED, start_date__isnull=True, - process_id__isnull=True) - for split in splits: - # if we get to this point, then we launch the user process - # -> see settings.WORKER_DETACH_CHILDREN for more info - kwargs = dict() - if settings.WORKER_DETACH_CHILDREN: - kwargs['preexec_fn'] = os.setpgrp - subprocess.Popen(cmdline + [str(split.pk)], **kwargs) - split.status = Job.PROCESSING #avoids re-running - split.save() - - # cleans-up zombie processes that may linger - _cleanup_zombies() - - - def __enter__(self): - self.update_state() - return self - - - def __exit__(self, *exc): - self.terminate() - return False #propagate exceptions - - -#---------------------------------------------------------- - - -class QueueManager(models.Manager): - - def get_by_natural_key(self, name): - return self.get(name=name) - - -class Queue(models.Model): - - name = models.CharField(max_length=100, help_text=Messages['name'], - unique=True) - - memory_limit = models.PositiveIntegerField(help_text='In megabytes') - - time_limit = models.PositiveIntegerField(help_text='In minutes') - - cores_per_slot = models.PositiveIntegerField() - - max_slots_per_user = models.PositiveIntegerField() - - environments = models.ManyToManyField( - Environment, - related_name='queues', - ) - - objects = QueueManager() - - #_____ Meta parameters __________ - - class Meta: - permissions = [ - ['can_access', _('Can access queue')] - ] - - - #_____ Overrides __________ - - def __str__(self): - return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % ( - self.name, - self.time_limit, - self.memory_limit, - self.cores_per_slot, - self.max_slots_per_user - ) - - - def natural_key(self): - return (self.name,) - - - def get_admin_change_url(self): - return reverse('admin:backend_queue_change', args=(self.id,)) - - #_____ Utilities __________ - - def number_of_slots(self): - '''Total number of slots considering all assigned worker/slots''' - - r = self.slots.filter(worker__active=True) - return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0 - - - def availability(self): - '''Returns the availability for this queue in terms of number of slots - - This method does not take into consideration the occupation of this - queue slots caused by jobs on other queues. It only looks to its inner - occupancy and reports on that. - - Returns an integer between 0 and :py:meth:`Queue.slots`. - ''' - - running = JobSplit.objects.filter(job__block__in=self.blocks.all(), - status=Job.PROCESSING).count() - return max(self.number_of_slots() - running, 0) - - - def worker_availability(self): - '''Returns an ordered dictionary indicating the availability of workers - according to their queue priority. - - The dictionary contains, as value, the number of slots available per - worker - - The order of workers is sorted by: - - * slot priority (the higher, the better) - * load (the lower, the better) - * name (alphabetically) - - ''' - - workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \ - for k in self.slots.filter(worker__active=True)] - - workers = sorted(workers, key=operator.itemgetter(1,2,3)) - - return [w[0] for w in workers] - - - def splits(self): - '''Lists all job splits currently associated to this queue''' - - return JobSplit.objects.filter(job__block__queue=self) - - - def as_dict(self): - '''Returns a representation as a dictionary''' - - return { - 'memory-limit': self.memory_limit, - 'time-limit': self.time_limit, - 'cores-per-slot': self.cores_per_slot, - 'max-slots-per-user': self.max_slots_per_user, - 'environments': [k.fullname() for k in self.environments.all()], - 'slots': dict([(s.worker.name, dict(quantity=s.quantity, - priority=s.priority)) for s in self.slots.all()]), - 'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)] - } - - -class SlotManager(models.Manager): - - def get_by_natural_key(self, queue_name, worker_name): - return self.get(queue__name=queue_name, worker__name=worker_name) - - -class Slot(models.Model): - - queue = models.ForeignKey(Queue, related_name='slots', - on_delete=models.CASCADE) - - worker = models.ForeignKey(Worker, related_name='slots', - on_delete=models.CASCADE) - - quantity = models.PositiveIntegerField( - 'Number of slots', - help_text='Number of processing slots to dedicate in this worker for a given queue' - ) - - priority = models.PositiveIntegerField( - default=0, - help_text='Priority of these slots on the defined queue' - ) - - objects = SlotManager() - - #_____ Meta parameters __________ - - class Meta: - unique_together = ('queue', 'worker') - - #_____ Overrides __________ - - def __str__(self): - return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker, self.quantity, self.priority) - - - def natural_key(self): - return (self.queue.name, self.worker.name) +from .result import Result #---------------------------------------------------------- @@ -674,47 +65,6 @@ def _merge_strings(s): #---------------------------------------------------------- -class Result(models.Model): - '''Logging and status information concerning block or job execution. - ''' - - # exit status code - status = models.IntegerField() - stdout = models.TextField(null=True, blank=True) - stderr = models.TextField(null=True, blank=True) - usrerr = models.TextField(null=True, blank=True) - _stats = models.TextField(null=True, blank=True) - timed_out = models.BooleanField(default=False) - cancelled = models.BooleanField(default=False) - - - def __str__(self): - status = 'success' if self.status == 0 else 'failed' - retval = 'Result(%s' % status - if self.stdout: retval += ', stdout=' + self.stdout - if self.stderr: retval += ', stderr=' + self.stderr - if self.usrerr: retval += ', usrerr=' + self.usrerr - retval += ')' - return retval - - - def _get_stats(self): - if self._stats is not None: - return beat.core.stats.Statistics(simplejson.loads(self._stats)) - else: - return beat.core.stats.Statistics() - - - def _set_stats(self, v): - self._stats = simplejson.dumps(v.as_dict()) - - - stats = property(_get_stats, _set_stats) - - -#---------------------------------------------------------- - - class Job(models.Model): '''Class describing the execution of a Job on the backend''' @@ -722,7 +72,7 @@ class Job(models.Model): PROCESSING = 'P' #Block.PROCESSING COMPLETED = 'C' #Block.COMPLETED FAILED = 'F' #Block.FAILED - SKIPPED = 'S' #Block.SKIPPED + SKIPPED = 'S' CANCELLED = 'L' #Block.CANCELLED CANCEL = 'K' #Job was asked to be killed @@ -1091,7 +441,7 @@ class Job(models.Model): class JobSplit(models.Model): '''Class describing a part of job of an experiment''' - worker = models.ForeignKey(Worker, null=True, on_delete=models.SET_NULL, + worker = models.ForeignKey('Worker', null=True, on_delete=models.SET_NULL, related_name='splits') job = models.ForeignKey(Job, null=True, on_delete=models.CASCADE, @@ -1154,6 +504,8 @@ class JobSplit(models.Model): ''' + from .worker import Worker + # lock self - avoids concurrent update from scheduler/worker # subsystem self_ = JobSplit.objects.select_for_update().get(pk=self.pk) diff --git a/beat/web/backend/models/queue.py b/beat/web/backend/models/queue.py new file mode 100755 index 000000000..ea52698c8 --- /dev/null +++ b/beat/web/backend/models/queue.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +import operator + +from django.db import models +from django.core.urlresolvers import reverse +from django.contrib.auth.models import Group +from django.utils.translation import ugettext_lazy as _ + +from guardian.shortcuts import get_perms + +from ...common.texts import Messages + + +#---------------------------------------------------------- + + +class QueueManager(models.Manager): + + def get_by_natural_key(self, name): + return self.get(name=name) + + +#---------------------------------------------------------- + + +class Queue(models.Model): + + name = models.CharField(max_length=100, help_text=Messages['name'], + unique=True) + + memory_limit = models.PositiveIntegerField(help_text='In megabytes') + + time_limit = models.PositiveIntegerField(help_text='In minutes') + + cores_per_slot = models.PositiveIntegerField() + + max_slots_per_user = models.PositiveIntegerField() + + environments = models.ManyToManyField('Environment', related_name='queues') + + + objects = QueueManager() + + + #_____ Meta parameters __________ + + class Meta: + permissions = [ + ['can_access', _('Can access queue')] + ] + + + #_____ Overrides __________ + + def __str__(self): + return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % ( + self.name, + self.time_limit, + self.memory_limit, + self.cores_per_slot, + self.max_slots_per_user + ) + + + def natural_key(self): + return (self.name,) + + + def get_admin_change_url(self): + return reverse('admin:backend_queue_change', args=(self.id,)) + + + #_____ Utilities __________ + + def number_of_slots(self): + '''Total number of slots considering all assigned worker/slots''' + + r = self.slots.filter(worker__active=True) + return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0 + + + def availability(self): + '''Returns the availability for this queue in terms of number of slots + + This method does not take into consideration the occupation of this + queue slots caused by jobs on other queues. It only looks to its inner + occupancy and reports on that. + + Returns an integer between 0 and :py:meth:`Queue.slots`. + ''' + + from ..models import JobSplit + from ..models import Job + + running = JobSplit.objects.filter(job__block__in=self.blocks.all(), + status=Job.PROCESSING).count() + return max(self.number_of_slots() - running, 0) + + + def worker_availability(self): + '''Returns an ordered dictionary indicating the availability of workers + according to their queue priority. + + The dictionary contains, as value, the number of slots available per + worker + + The order of workers is sorted by: + + * slot priority (the higher, the better) + * load (the lower, the better) + * name (alphabetically) + + ''' + + workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \ + for k in self.slots.filter(worker__active=True)] + + workers = sorted(workers, key=operator.itemgetter(1,2,3)) + + return [w[0] for w in workers] + + + def splits(self): + '''Lists all job splits currently associated to this queue''' + from ..models import JobSplit + from ..models import Job + + return JobSplit.objects.filter(job__block__queue=self) + + + def as_dict(self): + '''Returns a representation as a dictionary''' + + return { + 'memory-limit': self.memory_limit, + 'time-limit': self.time_limit, + 'cores-per-slot': self.cores_per_slot, + 'max-slots-per-user': self.max_slots_per_user, + 'environments': [k.fullname() for k in self.environments.all()], + 'slots': dict([(s.worker.name, dict(quantity=s.quantity, priority=s.priority)) + for s in self.slots.all()]), + 'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)] + } diff --git a/beat/web/backend/models/result.py b/beat/web/backend/models/result.py new file mode 100755 index 000000000..c58603e98 --- /dev/null +++ b/beat/web/backend/models/result.py @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from django.db import models + +import simplejson + +import beat.core.stats + + + +class Result(models.Model): + '''Logging and status information concerning block or job execution. + ''' + + # exit status code + status = models.IntegerField() + stdout = models.TextField(null=True, blank=True) + stderr = models.TextField(null=True, blank=True) + usrerr = models.TextField(null=True, blank=True) + _stats = models.TextField(null=True, blank=True) + timed_out = models.BooleanField(default=False) + cancelled = models.BooleanField(default=False) + + + def __str__(self): + status = 'success' if self.status == 0 else 'failed' + retval = 'Result(%s' % status + + if self.stdout: + retval += ', stdout=' + self.stdout + + if self.stderr: + retval += ', stderr=' + self.stderr + + if self.usrerr: + retval += ', usrerr=' + self.usrerr + + retval += ')' + return retval + + + def _get_stats(self): + if self._stats is not None: + return beat.core.stats.Statistics(simplejson.loads(self._stats)) + else: + return beat.core.stats.Statistics() + + + def _set_stats(self, v): + self._stats = simplejson.dumps(v.as_dict()) + + + stats = property(_get_stats, _set_stats) diff --git a/beat/web/backend/models/slot.py b/beat/web/backend/models/slot.py new file mode 100755 index 000000000..17b10a7d5 --- /dev/null +++ b/beat/web/backend/models/slot.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from django.db import models + + +#---------------------------------------------------------- + + +class SlotManager(models.Manager): + + def get_by_natural_key(self, queue_name, worker_name): + return self.get(queue__name=queue_name, worker__name=worker_name) + + +#---------------------------------------------------------- + + +class Slot(models.Model): + + queue = models.ForeignKey('Queue', related_name='slots', on_delete=models.CASCADE) + + worker = models.ForeignKey('Worker', related_name='slots', on_delete=models.CASCADE) + + quantity = models.PositiveIntegerField( + 'Number of slots', + help_text='Number of processing slots to dedicate in this worker for a given queue' + ) + + priority = models.PositiveIntegerField( + default=0, + help_text='Priority of these slots on the defined queue' + ) + + + objects = SlotManager() + + + #_____ Meta parameters __________ + + class Meta: + unique_together = ('queue', 'worker') + + + #_____ Overrides __________ + + def __str__(self): + return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker, + self.quantity, self.priority) + + + def natural_key(self): + return (self.queue.name, self.worker.name) diff --git a/beat/web/backend/models/worker.py b/beat/web/backend/models/worker.py new file mode 100755 index 000000000..b63241ff2 --- /dev/null +++ b/beat/web/backend/models/worker.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +import os +import signal +import subprocess +import psutil + +import logging +logger = logging.getLogger(__name__) + +from django.db import models +from django.db import transaction +from django.conf import settings +from django.core.urlresolvers import reverse + +from ...common.texts import Messages + + +#---------------------------------------------------------- + + +def _cleanup_zombies(): + '''Cleans-up eventual zombie subprocesses launched by the worker''' + + for child in psutil.Process().children(recursive=True): + try: + if child.status() == psutil.STATUS_ZOMBIE: + child.wait() + except psutil.NoSuchProcess: + # process is gone meanwhile, which is ok + pass + + +#---------------------------------------------------------- + + +class WorkerManager(models.Manager): + + def get_by_natural_key(self, name): + return self.get(name=name) + + +#---------------------------------------------------------- + + +class Worker(models.Model): + + name = models.CharField( + max_length=100, + help_text=Messages['name'], + unique=True, + ) + + active = models.BooleanField( + help_text=u'If this worker is usable presently', + default=False, + ) + + update = models.BooleanField( + help_text=u'If this worker state must be updated at the next cycle', + default=False, + ) + + updated = models.DateTimeField(null=True, auto_now=True) + + cores = models.PositiveIntegerField() + + memory = models.PositiveIntegerField(default=0, help_text='In megabytes') + + used_cores = models.PositiveIntegerField(default=0, help_text='In %') + + used_memory = models.PositiveIntegerField(default=0, help_text='In %') + + info = models.TextField(null=True, blank=True, + help_text='Informative message from the worker') + + + objects = WorkerManager() + + + #_____ Overrides __________ + + + def __str__(self): + retval = '%s (%d cores, %d Mb)' % (self.name, self.cores, self.memory) + if not self.active: + retval += ' [INACTIVE]' + return retval + + + def natural_key(self): + return (self.name,) + + + def get_admin_change_url(self): + return reverse('admin:backend_worker_change', args=(self.id,)) + + + def load(self): + '''Calculates the number of cores in use or to be used in the future''' + return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) + + + def current_load(self): + '''Calculates the number of cores being used currently''' + return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)]) + + + def available_cores(self): + '''Calculates the number of available cores considering current load''' + + return max(self.cores - self.load(), 0) + + + def deactivate(self, reason): + '''Deactivates the current worker for a reason, that is registered''' + + self.info = reason + self.active = False + + + def activate(self, reason=None): + '''Reactivates the worker, deletes any associated information''' + + self.info = reason + self.active = True + + + def as_dict(self): + '''Returns a dictionary-like representation''' + + return dict(cores=self.cores, memory=self.memory) + + + def check_environments(self, environments): + '''Checks that this worker has access to all environments it needs + + This method will check if the found set of environments (in the + dictionary ``environments``) contains, at least, one environment for + each environment object this worker is supposed to be able to execute + user algorithms for. + + + Parameters: + + environments (dict): A dictionary of environments found by using + :py:func:`utils.find_environments` in which, keys represent the + natural keys of Django database environments. + + + Returns: + + list: A list of missing environments this worker can be assigned to + work with, but where not found + + list: A list of unused environments this worker cannot be assigned to + work with, but where nevertheless found + + ''' + + slots = Slot.objects.filter(worker=self) + queues = Queue.objects.filter(slots__in=slots) + wishlist = Environment.objects.filter(queues__in=queues, active=True) + wishlist = wishlist.order_by('id').distinct() + + required = [k.fullname() for k in wishlist] + missing = [k for k in required if k not in environments] + unused = [k for k in environments if k not in required] + + return missing, unused + + + def update_state(self): + '''Updates state on the database based on current machine readings''' + + # check I have at least all cores and memory I'm supposed to have + cores = psutil.cpu_count() + ram = psutil.virtual_memory().total / (1024 * 1024) + self.info = '' + + if cores < self.cores: + logger.warn("Worker `%s' only has %d cores which is less then " \ + "the value declared on the database - it's not a problem, " \ + "but note this self may get overloaded", self, cores) + self.info += 'only %d cores;' % cores + + if ram < self.memory: + logger.warn("Worker `%s' only has %d Mb of RAM which is less " \ + "then the value declared on the database - it's not a " \ + "problem, but note this self may get overloaded", self, + ram) + self.info += 'only %d Mb of RAM;' % ram + + with transaction.atomic(): + self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock + + # update process and memory usage + self.used_cores = int(psutil.cpu_percent()) + self.used_memory = int(psutil.virtual_memory().percent) + + # save current self state + self.active = True + self.update = False + self.save() + + + def terminate(self): + '''Cleanly terminates a particular worker at the database + + .. note:: + + This method does not destroy running or assigned processes that may + be running or assigned to this worker. This is implemented in this + way to allow for a clean replacement of the worker program w/o an + interruption of the backend service. + + ''' + + from ..models import JobSplit + from ..models import Job + + # disables worker, so no more splits can be assigned to it + with transaction.atomic(): + self_ = Worker.objects.select_for_update().get(pk=self.pk) + self_.active = False + self_.used_cores = 0 + self_.used_memory = 0 + self_.info = 'Worker deactivated by system administrator' + self_.save() + + # cancel job splits which should be cancelled anyways + for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, + end_date__isnull=True, process_id__isnull=False): + if psutil.pid_exists(j.process_id): + os.kill(j.process_id, signal.SIGTERM) + + # cleans-up zombie processes that may linger + _cleanup_zombies() + + + def shutdown(self): + '''Removes all running/assigned jobs from the queue, shuts down + + This method should be used with care as it may potentially cancel all + assigned splits for the current worker. + + ''' + + from ..models import JobSplit + from ..models import Job + + self.terminate() + + message = 'Cancelled on forced worker shutdown (maintenance)' \ + ' - you may retry submitting your experiment shortly' + + # cancel job splits which were not yet started + for j in JobSplit.objects.filter(worker=self, status=Job.QUEUED, + start_date__isnull=True, process_id__isnull=True): + j.end(Result(status=1, usrerr=message)) + + # cancel job splits which are running + for j in JobSplit.objects.filter(worker=self, status=Job.PROCESSING, + end_date__isnull=True, process_id__isnull=False): + j._cancel() + + + + def work(self, environments, process): + '''Launches user code on isolated processes + + This function is supposed to be called asynchronously, by a + scheduled agent, every few seconds. It examines job splits assigned + to the current host and launches an individual process to handle + these splits. The process is started locally and the process ID + stored with the split. + + Job split cancelling is executed by setting the split state as + ``CANCEL`` and waiting for this function to handle it. + + + Parameters: + + environments (dict): A dictionary containing installed + environments, their description and execute-file paths. + + process (str): The path to the ``process.py`` program to use for + running the user code on isolated processes. + + ''' + + from ..models import JobSplit + from ..models import Job + + # refresh state from database and update state if required + self.refresh_from_db() + if self.update: self.update_state() + + # cancel job splits by killing associated processes + for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, + end_date__isnull=True): + if j.process_id is not None and psutil.pid_exists(j.process_id): + os.kill(j.process_id, signal.SIGTERM) + else: # process went away without any apparent reason + with transaction.atomic(): + message = "Split %d/%d running at worker `%s' for " \ + "block `%s' of experiment `%s' finished before " \ + "even starting. Force-cancelling job split at " \ + "database..." % (j.split_index+1, + j.job.block.required_slots, + self, + j.job.block.name, + j.job.block.experiment.fullname(), + ) + logger.error(message) + j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR)) + + # cmdline base argument + cmdline = [process] + if settings.DEBUG: + cmdline += ['-vv'] + else: + cmdline += ['-v'] + + # start newly assigned job splits + with transaction.atomic(): + splits = JobSplit.objects.select_for_update().filter(worker=self, + status=Job.QUEUED, start_date__isnull=True, + process_id__isnull=True) + for split in splits: + # if we get to this point, then we launch the user process + # -> see settings.WORKER_DETACH_CHILDREN for more info + kwargs = dict() + if settings.WORKER_DETACH_CHILDREN: + kwargs['preexec_fn'] = os.setpgrp + subprocess.Popen(cmdline + [str(split.pk)], **kwargs) + split.status = Job.PROCESSING #avoids re-running + split.save() + + # cleans-up zombie processes that may linger + _cleanup_zombies() + + + def __enter__(self): + self.update_state() + return self + + + def __exit__(self, *exc): + self.terminate() + return False #propagate exceptions diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py old mode 100644 new mode 100755 index e8fc2fa71..06eb7c140 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -177,6 +177,11 @@ def schedule(): ''' + for j in Job.objects.filter(status=Job.QUEUED, runnable_date__isnull=True): + if j.block.is_runnable(): + j._make_runnable() + + # updates jobs with split errors, cancel experiments if problems occur for j in Job.objects.filter(split_errors__gt=0): j._split_indices() diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index 2f57e7cd4..eb52eebaa 100755 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -424,9 +424,8 @@ class BaseBackendTestCase(TestCase): self.assertEqual(b0.required_slots, 1) self.assertEqual(b0.inputs.count(), 1) self.assertEqual(b0.outputs.count(), 1) - self.assertEqual(b0.job.splits.count(), 1) - self.assertEqual(b0.job.splits.get().status, Job.QUEUED) - assert b0.job.splits.get().worker is None + self.assertEqual(b0.job.splits.count(), 0) #not scheduled yet + assert not b0.done() b1 = xp.blocks.all()[1] @@ -896,10 +895,14 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None + assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -954,10 +957,14 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None + assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1056,7 +1063,7 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None xp.cancel() @@ -1091,10 +1098,14 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None + assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1168,10 +1179,14 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None + assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1232,10 +1247,14 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None + assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1297,13 +1316,18 @@ class Scheduling(BaseBackendTestCase): xpc.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assert xpc.blocks.first().job.runnable_date is None assert xpc.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1430,13 +1454,18 @@ class Scheduling(BaseBackendTestCase): xpc.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assert xpc.blocks.first().job.runnable_date is None assert xpc.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1507,13 +1536,18 @@ class Scheduling(BaseBackendTestCase): xpc.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assert xpc.blocks.first().job.runnable_date is None assert xpc.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1618,13 +1652,18 @@ class Scheduling(BaseBackendTestCase): xpc.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assert xpc.blocks.first().job.runnable_date is None assert xpc.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1714,13 +1753,18 @@ class Scheduling(BaseBackendTestCase): xpc.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assert xpc.blocks.first().job.runnable_date is None assert xpc.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -1826,6 +1870,9 @@ class Scheduling(BaseBackendTestCase): # schedules the experiment and check it xp.schedule() + + schedule() + xp.refresh_from_db() self.assertEqual(xp.status, Experiment.FAILED) @@ -2133,11 +2180,14 @@ class Working(BaseBackendTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -2223,11 +2273,14 @@ class Working(BaseBackendTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -2282,11 +2335,14 @@ class Working(BaseBackendTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -2364,6 +2420,7 @@ class Working(BaseBackendTestCase): # schedules the experiment (it should immediately load from the db) xpc.schedule() + xpc.refresh_from_db() self.assertEqual(xpc.status, Experiment.DONE) @@ -2381,11 +2438,14 @@ class Working(BaseBackendTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -2434,9 +2494,12 @@ class Working(BaseBackendTestCase): # since this job was successful, the next one should be ready to run # schedules the last block of the experiment - assert xpc.blocks.last().job.runnable_date is not None + assert xpc.blocks.last().job.runnable_date is None + assigned_splits = schedule() + assert xpc.blocks.last().job.runnable_date is not None + self.assertEqual(len(assigned_splits), 1) split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xpc) @@ -2492,11 +2555,14 @@ class Working(BaseBackendTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -2536,6 +2602,8 @@ class Working(BaseBackendTestCase): xpc = xp.fork(name='single_copy') xpc.schedule() + schedule() + self.assertEqual([k.status for k in xpc.blocks.all()], [Block.DONE, Block.PENDING]) assert xpc.blocks.last().job.parent == xp.blocks.last().job @@ -2600,11 +2668,14 @@ class WorkingExternally(TransactionTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) @@ -2704,11 +2775,14 @@ class WorkingExternally(TransactionTestCase): xp.schedule() # schedules the first runnable block - assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.first().job.runnable_date is None assert xp.blocks.last().job.runnable_date is None assigned_splits = schedule() + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + worker = Worker.objects.get() self.assertEqual(len(assigned_splits), 1) diff --git a/beat/web/experiments/migrations/0009_block_status.py b/beat/web/experiments/migrations/0009_block_status.py new file mode 100644 index 000000000..db3f2c539 --- /dev/null +++ b/beat/web/experiments/migrations/0009_block_status.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.13 on 2017-09-29 08:42 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('experiments', '0008_block_status'), + ] + + operations = [ + migrations.AlterField( + model_name='block', + name='status', + field=models.CharField(choices=[(b'N', b'Pending'), (b'P', b'Processing'), (b'C', b'Done'), (b'F', b'Failed'), (b'L', b'Cancelled')], default=b'N', max_length=1), + ), + ] diff --git a/beat/web/experiments/models/block.py b/beat/web/experiments/models/block.py index 78bd834e0..548d2ef7c 100755 --- a/beat/web/experiments/models/block.py +++ b/beat/web/experiments/models/block.py @@ -72,7 +72,6 @@ class Block(models.Model): PROCESSING = 'P' DONE = 'C' FAILED = 'F' - SKIPPED = 'S' CANCELLED = 'L' STATUS = ( @@ -80,7 +79,6 @@ class Block(models.Model): (PROCESSING, 'Processing'), (DONE, 'Done'), (FAILED, 'Failed'), - (SKIPPED, 'Skipped'), (CANCELLED, 'Cancelled'), ) @@ -225,7 +223,7 @@ class Block(models.Model): def is_runnable(self): '''Checks if a block is runnable presently''' - return all([k.status in (Block.DONE, Block.SKIPPED) \ + return all([k.status == Block.DONE \ for k in self.dependencies.all()]) and \ (hasattr(self, 'job') and self.job.parent is None) @@ -319,7 +317,7 @@ class Block(models.Model): self.outputs.update(**info) - if self.job.status == Block.SKIPPED: + if self.job.status == Job.SKIPPED: self.status = Block.DONE else: self.status = self.job.status diff --git a/beat/web/experiments/models/experiment.py b/beat/web/experiments/models/experiment.py index 7fe06d1a4..cf441b610 100755 --- a/beat/web/experiments/models/experiment.py +++ b/beat/web/experiments/models/experiment.py @@ -794,9 +794,8 @@ class Experiment(Shareable): self.status = Experiment.FAILED elif (Block.PROCESSING in block_statuses) or \ - ((Block.PENDING in block_statuses or \ - Block.SKIPPED in block_statuses) and \ - Block.DONE in block_statuses): + ((Block.PENDING in block_statuses) and \ + (Block.DONE in block_statuses)): self.status = Experiment.RUNNING elif Block.PENDING not in block_statuses: -- GitLab