Commit b3c6d35b authored by Philip ABBET's avatar Philip ABBET
Browse files

[backend] Refactoring: Split the models into separate, more manageable files

parent 706bdf51
......@@ -27,8 +27,11 @@
from django.db import transaction
from datetime import datetime
from ..experiments.models import Experiment
from ..experiments.models import Block
from ..experiments.models import CachedFile
from .models import Job
......@@ -69,32 +72,53 @@ def schedule_experiment(experiment):
# Process all the blocks of the experiment
already_done = True
for block in experiment.blocks.all():
# Lock the block, so nobody else can modify it
block = Block.objects.select_for_update().get(pk=block.pk)
# search for other jobs with similar outputs that have no children yet
# do this carefully, as other experiments may be scheduled at the same
# time, invalidating our "parent" choice
parent = Job.objects.filter(block__outputs__in=block.outputs.all(),
child=None).first()
if parent is not None: #(candidate only) try to lock it
while True:
parent = Job.objects.select_for_update().get(pk=parent.pk)
if parent.child_ is not None: #was taken meanwhile, retry
parent = parent.child
continue
job = Job(block=block, parent=parent)
break
else:
job = Job(block=block)
# Check if the block outputs aren't already in the cache
must_skip = all([cached_file.status == CachedFile.CACHED
for cached_file in block.outputs.all()])
job.save()
if must_skip:
block.status = Block.DONE
block.start_date = datetime.now()
block.end_date = block.start_date
block.save()
else:
# search for other jobs with similar outputs that have no children yet
# do this carefully, as other experiments may be scheduled at the same
# time, invalidating our "parent" choice
parent = Job.objects.filter(block__outputs__in=block.outputs.all(),
child=None).first()
if parent is not None: #(candidate only) try to lock it
while True:
parent = Job.objects.select_for_update().get(pk=parent.pk)
if parent.child_ is not None: #was taken meanwhile, retry
parent = parent.child
continue
job = Job(block=block, parent=parent)
break
else:
job = Job(block=block)
job.save()
already_done = False
# Mark the experiment as scheduled (or done)
if already_done:
experiment.start_date = datetime.now()
experiment.end_date = experiment.start_date
experiment.status = Experiment.DONE
else:
experiment.status = Experiment.SCHEDULED
# Mark the experiment as scheduled
experiment.status = Experiment.SCHEDULED
experiment.save()
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from .environment import EnvironmentManager
from .environment import Environment
from .environment import EnvironmentLanguage
from .job import Job
from .job import JobSplit
from .queue import QueueManager
from .queue import Queue
from .result import Result
from .slot import SlotManager
from .slot import Slot
from .worker import WorkerManager
from .worker import Worker
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
from django.core.urlresolvers import reverse
from ...code.models import Code
from ...common.models import Shareable
from ...common.models import ShareableManager
from ...common.texts import Messages
#----------------------------------------------------------
class EnvironmentManager(ShareableManager):
def get_by_natural_key(self, name, version):
return self.get(name=name, version=version)
def get_by_fullname(self, fullname):
name, version = fullname.rsplit(' ', 1)
return self.get_by_natural_key(name, version[1:-1])
#----------------------------------------------------------
class Environment(Shareable):
"""Defines a software environment to run algorithms"""
name = models.CharField(
max_length=200,
help_text=Messages['name'],
)
version = models.CharField(
max_length=20,
help_text='Free-style version for this environment (normally read from the Worker/Scheduler available environments)',
)
short_description = models.CharField(
max_length=100,
default='',
blank=True,
help_text=Messages['short_description'],
)
description = models.TextField(
default='',
blank=True,
help_text=Messages['description'],
)
creation_date = models.DateTimeField(
'Creation date',
auto_now_add = True,
)
active = models.BooleanField(
default=True,
help_text='If this environment can be used in experiments',
)
previous_version = models.ForeignKey(
'self',
related_name='next_versions',
null=True,
blank=True,
on_delete=models.SET_NULL,
)
objects = EnvironmentManager()
#_____ Meta parameters __________
class Meta:
unique_together = ('name', 'version')
#_____ Overrides __________
def __str__(self):
return self.fullname()
def natural_key(self):
return (self.name, self.version)
#_____ Utilities __________
def fullname(self):
return '%s (%s)' % (self.name, self.version)
def get_absolute_url(self):
return reverse(
'backend:view-environment',
args=(self.name, self.version,),
)
def get_admin_change_url(self):
return reverse('admin:backend_environment_change', args=(self.id,))
def queues_for(self, user):
"""Returns all queues associated to this environment for which the user
has the 'can_access' permission"""
return [q for q in self.queues.all() if user.has_perm('backend.can_access', q)]
def as_dict(self):
'''Returns a representation as a dictionary'''
return dict(
name=self.name,
version=self.version,
short_description=self.short_description,
description=self.description,
)
#----------------------------------------------------------
class EnvironmentLanguage(models.Model):
environment = models.ForeignKey(Environment, related_name='languages')
language = models.CharField(max_length=1, choices=Code.CODE_LANGUAGE,
default=Code.PYTHON)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import operator
from django.db import models
from django.core.urlresolvers import reverse
from django.contrib.auth.models import Group
from django.utils.translation import ugettext_lazy as _
from guardian.shortcuts import get_perms
from ...common.texts import Messages
#----------------------------------------------------------
class QueueManager(models.Manager):
def get_by_natural_key(self, name):
return self.get(name=name)
#----------------------------------------------------------
class Queue(models.Model):
name = models.CharField(max_length=100, help_text=Messages['name'],
unique=True)
memory_limit = models.PositiveIntegerField(help_text='In megabytes')
time_limit = models.PositiveIntegerField(help_text='In minutes')
cores_per_slot = models.PositiveIntegerField()
max_slots_per_user = models.PositiveIntegerField()
environments = models.ManyToManyField('Environment', related_name='queues')
objects = QueueManager()
#_____ Meta parameters __________
class Meta:
permissions = [
['can_access', _('Can access queue')]
]
#_____ Overrides __________
def __str__(self):
return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % (
self.name,
self.time_limit,
self.memory_limit,
self.cores_per_slot,
self.max_slots_per_user
)
def natural_key(self):
return (self.name,)
def get_admin_change_url(self):
return reverse('admin:backend_queue_change', args=(self.id,))
#_____ Utilities __________
def number_of_slots(self):
'''Total number of slots considering all assigned worker/slots'''
r = self.slots.filter(worker__active=True)
return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0
def availability(self):
'''Returns the availability for this queue in terms of number of slots
This method does not take into consideration the occupation of this
queue slots caused by jobs on other queues. It only looks to its inner
occupancy and reports on that.
Returns an integer between 0 and :py:meth:`Queue.slots`.
'''
from ..models import JobSplit
from ..models import Job
running = JobSplit.objects.filter(job__block__in=self.blocks.all(),
status=Job.PROCESSING).count()
return max(self.number_of_slots() - running, 0)
def worker_availability(self):
'''Returns an ordered dictionary indicating the availability of workers
according to their queue priority.
The dictionary contains, as value, the number of slots available per
worker
The order of workers is sorted by:
* slot priority (the higher, the better)
* load (the lower, the better)
* name (alphabetically)
'''
workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \
for k in self.slots.filter(worker__active=True)]
workers = sorted(workers, key=operator.itemgetter(1,2,3))
return [w[0] for w in workers]
def splits(self):
'''Lists all job splits currently associated to this queue'''
from ..models import JobSplit
from ..models import Job
return JobSplit.objects.filter(job__block__queue=self)
def as_dict(self):
'''Returns a representation as a dictionary'''
return {
'memory-limit': self.memory_limit,
'time-limit': self.time_limit,
'cores-per-slot': self.cores_per_slot,
'max-slots-per-user': self.max_slots_per_user,
'environments': [k.fullname() for k in self.environments.all()],
'slots': dict([(s.worker.name, dict(quantity=s.quantity, priority=s.priority))
for s in self.slots.all()]),
'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
import simplejson
import beat.core.stats
class Result(models.Model):
'''Logging and status information concerning block or job execution.
'''
# exit status code
status = models.IntegerField()
stdout = models.TextField(null=True, blank=True)
stderr = models.TextField(null=True, blank=True)
usrerr = models.TextField(null=True, blank=True)
_stats = models.TextField(null=True, blank=True)
timed_out = models.BooleanField(default=False)
cancelled = models.BooleanField(default=False)
def __str__(self):
status = 'success' if self.status == 0 else 'failed'
retval = 'Result(%s' % status
if self.stdout:
retval += ', stdout=' + self.stdout
if self.stderr:
retval += ', stderr=' + self.stderr
if self.usrerr:
retval += ', usrerr=' + self.usrerr
retval += ')'
return retval
def _get_stats(self):
if self._stats is not None:
return beat.core.stats.Statistics(simplejson.loads(self._stats))
else:
return beat.core.stats.Statistics()
def _set_stats(self, v):
self._stats = simplejson.dumps(v.as_dict())
stats = property(_get_stats, _set_stats)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
#----------------------------------------------------------
class SlotManager(models.Manager):
def get_by_natural_key(self, queue_name, worker_name):
return self.get(queue__name=queue_name, worker__name=worker_name)
#----------------------------------------------------------
class Slot(models.Model):
queue = models.ForeignKey('Queue', related_name='slots', on_delete=models.CASCADE)
worker = models.ForeignKey('Worker', related_name='slots', on_delete=models.CASCADE)
quantity = models.PositiveIntegerField(
'Number of slots',
help_text='Number of processing slots to dedicate in this worker for a given queue'
)
priority = models.PositiveIntegerField(
default=0,
help_text='Priority of these slots on the defined queue'
)
objects = SlotManager()
#_____ Meta parameters __________
class Meta:
unique_together = ('queue', 'worker')
#_____ Overrides __________
def __str__(self):
return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker,
self.quantity, self.priority)