diff --git a/beat/web/backend/models/__init__.py b/beat/web/backend/models/__init__.py index 6b2e0d1af4ea836c49177047c56e4ce142d8f35e..50a277e0a6f3f4a81bf9d5b6444865d296a01583 100755 --- a/beat/web/backend/models/__init__.py +++ b/beat/web/backend/models/__init__.py @@ -26,16 +26,32 @@ ############################################################################### -from .environment import EnvironmentManager from .environment import Environment from .environment import EnvironmentLanguage +from .environment import EnvironmentManager from .job import Job from .job import JobSplit from .local_scheduler import LocalSchedulerProcesses -from .queue import QueueManager from .queue import Queue +from .queue import QueueManager from .result import Result -from .slot import SlotManager from .slot import Slot -from .worker import WorkerManager +from .slot import SlotManager from .worker import Worker +from .worker import WorkerManager + +__all__ = [ + "Environment", + "EnvironmentLanguage", + "EnvironmentManager", + "Job", + "JobSplit", + "LocalSchedulerProcesses", + "Queue", + "QueueManager", + "Result", + "Slot", + "SlotManager", + "Worker", + "WorkerManager", +] diff --git a/beat/web/backend/models/environment.py b/beat/web/backend/models/environment.py index 346a15a8e8295d3242bddd815a35c52a2cd511a7..7dfe83195775c3b7561b2c18891e0fbc6ac59245 100755 --- a/beat/web/backend/models/environment.py +++ b/beat/web/backend/models/environment.py @@ -26,15 +26,15 @@ ############################################################################### from django.db import models +from django.db.models import Count +from django.db.models import Q from django.urls import reverse -from django.db.models import Count, Q from ...code.models import Code from ...common.models import Shareable from ...common.models import ShareableManager from ...common.texts import Messages - # ---------------------------------------------------------- @@ -58,8 +58,8 @@ class EnvironmentManager(ShareableManager): used for blocks that are done. """ - from ...experiments.models import Experiment from ...experiments.models import Block + from ...experiments.models import Experiment # Tries to figure through a maximum if blocks in the list have been # successfully used inside an environment. @@ -165,7 +165,9 @@ class Environment(Shareable): class EnvironmentLanguage(models.Model): - environment = models.ForeignKey(Environment, related_name="languages", on_delete=models.CASCADE) + environment = models.ForeignKey( + Environment, related_name="languages", on_delete=models.CASCADE + ) language = models.CharField( max_length=1, choices=Code.CODE_LANGUAGE, default=Code.PYTHON diff --git a/beat/web/backend/models/job.py b/beat/web/backend/models/job.py index 2ddd21cf477f322417bd4200beb3b5ca5ba1901d..4efe1c616c9748d760e14a358ece07bc15480a0d 100755 --- a/beat/web/backend/models/job.py +++ b/beat/web/backend/models/job.py @@ -26,32 +26,28 @@ ############################################################################### import datetime - import logging -logger = logging.getLogger(__name__) import simplejson - -from django.db import utils -from django.db import models from django.conf import settings +from django.db import models import beat.core.data import beat.core.hash from .result import Result +logger = logging.getLogger(__name__) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobManager(models.Manager): - def create_job(self, block): # Compute the key of the job - hashes = [ x.hash for x in block.outputs.order_by('hash') ] - key = beat.core.hash.hash(''.join(hashes)) + hashes = [x.hash for x in block.outputs.order_by("hash")] + key = beat.core.hash.hash("".join(hashes)) # Determine if the job can be run or is dependent on others runnable_date = None @@ -59,28 +55,26 @@ class JobManager(models.Manager): runnable_date = datetime.datetime.now() # Create the job - job = self.model( - block=block, - key=key, - runnable_date=runnable_date - ) + job = self.model(block=block, key=key, runnable_date=runnable_date) job.save() return job -#---------------------------------------------------------- +# ---------------------------------------------------------- class Job(models.Model): - '''Class describing the execution of a Job on the backend''' + """Class describing the execution of a Job on the backend""" - block = models.OneToOneField('experiments.Block', null=True, - on_delete=models.CASCADE, related_name='job') + block = models.OneToOneField( + "experiments.Block", null=True, on_delete=models.CASCADE, related_name="job" + ) - result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE, - related_name='job') + result = models.OneToOneField( + Result, null=True, on_delete=models.CASCADE, related_name="job" + ) runnable_date = models.DateTimeField(null=True, blank=True) @@ -92,23 +86,23 @@ class Job(models.Model): mirror = models.BooleanField(default=False) - objects = JobManager() - def __str__(self): - return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % \ - (self.block.name, self.block.experiment.name, - self.key, str(self.mirror), - self.block.required_slots, - self.block.queue.cores_per_slot) + return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % ( + self.block.name, + self.block.experiment.name, + self.key, + str(self.mirror), + self.block.required_slots, + self.block.queue.cores_per_slot, + ) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplitManager(models.Manager): - def create_splits(self, job): # If no splitting is required, only create one split @@ -117,21 +111,29 @@ class JobSplitManager(models.Manager): split.save() return [split] - # Retrieve the list of synchronized inputs configuration = simplejson.loads(job.block.command) - inputs = [ entry for name, entry in configuration['inputs'].items() - if entry['channel'] == configuration['channel'] ] - + inputs = [ + entry + for name, entry in configuration["inputs"].items() + if entry["channel"] == configuration["channel"] + ] # Load the list of indices for each inputs indices = [] for input_cfg in inputs: - if 'database' in input_cfg: - indices.extend(beat.core.data.load_data_index_db(settings.CACHE_ROOT, input_cfg['path'])) + if "database" in input_cfg: + indices.extend( + beat.core.data.load_data_index_db( + settings.CACHE_ROOT, input_cfg["path"] + ) + ) else: - indices.append(beat.core.data.load_data_index(settings.CACHE_ROOT, input_cfg['path'])) - + indices.append( + beat.core.data.load_data_index( + settings.CACHE_ROOT, input_cfg["path"] + ) + ) # Attempt to split the indices nb_splits = job.block.required_slots @@ -146,21 +148,23 @@ class JobSplitManager(models.Manager): nb_splits -= 1 if nb_splits != job.block.required_slots: - message = "The processing of the block `%s' of experiment `%s' " \ - "was splitted in %d instead of the requested %d" % \ - (job.block.name, job.block.experiment.fullname(), - nb_splits, job.block.required_slots) - logger.warning(message) - + message = ( + "The processing of the block `%s' of experiment `%s' " + "was splitted in %d instead of the requested %d" + % ( + job.block.name, + job.block.experiment.fullname(), + nb_splits, + job.block.required_slots, + ) + ) + logger.warning(message) # Create the necessary splits and assign the ranges splits = [] for i, indices in enumerate(split_indices): split = JobSplit( - job=job, - split_index=i, - start_index=indices[0], - end_index=indices[1], + job=job, split_index=i, start_index=indices[0], end_index=indices[1], ) split.save() @@ -169,34 +173,35 @@ class JobSplitManager(models.Manager): return splits -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplit(models.Model): - '''Class describing a part of job of an experiment''' + """Class describing a part of job of an experiment""" - QUEUED = 'N' - PROCESSING = 'P' - COMPLETED = 'C' - FAILED = 'F' - CANCELLED = 'L' - CANCELLING = 'K' + QUEUED = "N" + PROCESSING = "P" + COMPLETED = "C" + FAILED = "F" + CANCELLED = "L" + CANCELLING = "K" STATUS = ( - (QUEUED, 'Queued'), - (PROCESSING, 'Processing'), - (COMPLETED, 'Completed'), - (FAILED, 'Failed'), - (CANCELLED, 'Cancelled'), - (CANCELLING, 'Cancelling'), + (QUEUED, "Queued"), + (PROCESSING, "Processing"), + (COMPLETED, "Completed"), + (FAILED, "Failed"), + (CANCELLED, "Cancelled"), + (CANCELLING, "Cancelling"), ) + worker = models.ForeignKey( + "Worker", null=True, on_delete=models.SET_NULL, related_name="splits" + ) - worker = models.ForeignKey('Worker', null=True, on_delete=models.SET_NULL, - related_name='splits') - - job = models.ForeignKey(Job, null=True, on_delete=models.CASCADE, - related_name='splits') + job = models.ForeignKey( + Job, null=True, on_delete=models.CASCADE, related_name="splits" + ) split_index = models.PositiveIntegerField() @@ -206,32 +211,28 @@ class JobSplit(models.Model): status = models.CharField(max_length=1, choices=STATUS, default=QUEUED) - result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE, - related_name='split') + result = models.OneToOneField( + Result, null=True, on_delete=models.CASCADE, related_name="split" + ) start_date = models.DateTimeField(null=True) end_date = models.DateTimeField(null=True) - objects = JobSplitManager() - class Meta: - unique_together = ('job', 'split_index') - + unique_together = ("job", "split_index") def __str__(self): - return "JobSplit(%s, index=%d, state=%s)%s" % \ - (self.job, self.split_index, self.status, - ('@%s' % self.worker) if self.worker else '') - + return "JobSplit(%s, index=%d, state=%s)%s" % ( + self.job, + self.split_index, + self.status, + ("@%s" % self.worker) if self.worker else "", + ) def done(self): - '''Says whether the job has finished or not''' + """Says whether the job has finished or not""" - return self.status in ( - JobSplit.COMPLETED, - JobSplit.FAILED, - JobSplit.CANCELLED, - ) + return self.status in (JobSplit.COMPLETED, JobSplit.FAILED, JobSplit.CANCELLED,) diff --git a/beat/web/backend/models/local_scheduler.py b/beat/web/backend/models/local_scheduler.py index 8dad9a2c9fe8db282935c6f7d37f078244aba81d..f8e842cd27ab3762144c80f0fd891f29f50d2328 100755 --- a/beat/web/backend/models/local_scheduler.py +++ b/beat/web/backend/models/local_scheduler.py @@ -30,11 +30,10 @@ from django.db import models class LocalSchedulerProcesses(models.Model): - '''Information about the processes launched by the local scheduler''' + """Information about the processes launched by the local scheduler""" name = models.TextField() pid = models.IntegerField() - def __str__(self): - return '%s (pid = %d)' % (self.name, self.pid) + return "%s (pid = %d)" % (self.name, self.pid) diff --git a/beat/web/backend/models/queue.py b/beat/web/backend/models/queue.py index 75b94d93b6fe5dc5f76939948054714e09b78bb0..4d349b243bdb9f4bee8605bb68c5852a20a5ad03 100755 --- a/beat/web/backend/models/queue.py +++ b/beat/web/backend/models/queue.py @@ -28,104 +28,93 @@ import operator +from django.contrib.auth.models import Group from django.db import models from django.urls import reverse -from django.contrib.auth.models import Group from django.utils.translation import ugettext_lazy as _ - from guardian.shortcuts import get_perms from ...common.texts import Messages - -#---------------------------------------------------------- +# ---------------------------------------------------------- class QueueManager(models.Manager): - def get_by_natural_key(self, name): return self.get(name=name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Queue(models.Model): - name = models.CharField(max_length=100, help_text=Messages['name'], - unique=True) + name = models.CharField(max_length=100, help_text=Messages["name"], unique=True) - memory_limit = models.PositiveIntegerField(help_text='In megabytes') + memory_limit = models.PositiveIntegerField(help_text="In megabytes") - time_limit = models.PositiveIntegerField(help_text='In minutes') + time_limit = models.PositiveIntegerField(help_text="In minutes") cores_per_slot = models.PositiveIntegerField() max_slots_per_user = models.PositiveIntegerField() - environments = models.ManyToManyField('Environment', related_name='queues') - + environments = models.ManyToManyField("Environment", related_name="queues") objects = QueueManager() - - #_____ Meta parameters __________ + # _____ Meta parameters __________ class Meta: - permissions = [ - ['can_access', _('Can access queue')] - ] - + permissions = [["can_access", _("Can access queue")]] - #_____ Overrides __________ + # _____ Overrides __________ def __str__(self): - return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % ( - self.name, - self.time_limit, - self.memory_limit, - self.cores_per_slot, - self.max_slots_per_user + return ( + "%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)" + % ( + self.name, + self.time_limit, + self.memory_limit, + self.cores_per_slot, + self.max_slots_per_user, + ) ) - def natural_key(self): return (self.name,) - def get_admin_change_url(self): - return reverse('admin:backend_queue_change', args=(self.id,)) - + return reverse("admin:backend_queue_change", args=(self.id,)) - #_____ Utilities __________ + # _____ Utilities __________ def number_of_slots(self): - '''Total number of slots considering all assigned worker/slots''' + """Total number of slots considering all assigned worker/slots""" r = self.slots.filter(worker__active=True) - return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0 - + return r.aggregate(nslots=models.Sum("quantity"))["nslots"] or 0 def availability(self): - '''Returns the availability for this queue in terms of number of slots + """Returns the availability for this queue in terms of number of slots This method does not take into consideration the occupation of this queue slots caused by jobs on other queues. It only looks to its inner occupancy and reports on that. Returns an integer between 0 and :py:meth:`Queue.slots`. - ''' + """ from ..models import JobSplit - from ..models import Job - running = JobSplit.objects.filter(job__block__in=self.blocks.all(), - status=JobSplit.PROCESSING).count() + running = JobSplit.objects.filter( + job__block__in=self.blocks.all(), status=JobSplit.PROCESSING + ).count() return max(self.number_of_slots() - running, 0) - def worker_availability(self): - '''Returns an ordered dictionary indicating the availability of workers + """Returns an ordered dictionary indicating the availability of workers according to their queue priority. The dictionary contains, as value, the number of slots available per @@ -137,34 +126,42 @@ class Queue(models.Model): * load (the lower, the better) * name (alphabetically) - ''' + """ - workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \ - for k in self.slots.filter(worker__active=True)] + workers = [ + (k.worker, -k.priority, k.worker.load(), k.worker.name) + for k in self.slots.filter(worker__active=True) + ] - workers = sorted(workers, key=operator.itemgetter(1,2,3)) + workers = sorted(workers, key=operator.itemgetter(1, 2, 3)) return [w[0] for w in workers] - def splits(self): - '''Lists all job splits currently associated to this queue''' + """Lists all job splits currently associated to this queue""" + from ..models import JobSplit - from ..models import Job return JobSplit.objects.filter(job__block__queue=self) - def as_dict(self): - '''Returns a representation as a dictionary''' + """Returns a representation as a dictionary""" return { - 'memory-limit': self.memory_limit, - 'time-limit': self.time_limit, - 'cores-per-slot': self.cores_per_slot, - 'max-slots-per-user': self.max_slots_per_user, - 'environments': [k.fullname() for k in self.environments.all()], - 'slots': dict([(s.worker.name, dict(quantity=s.quantity, priority=s.priority)) - for s in self.slots.all()]), - 'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)] + "memory-limit": self.memory_limit, + "time-limit": self.time_limit, + "cores-per-slot": self.cores_per_slot, + "max-slots-per-user": self.max_slots_per_user, + "environments": [k.fullname() for k in self.environments.all()], + "slots": dict( + [ + (s.worker.name, dict(quantity=s.quantity, priority=s.priority)) + for s in self.slots.all() + ] + ), + "groups": [ + k.name + for k in Group.objects.all() + if "can_access" in get_perms(k, self) + ], } diff --git a/beat/web/backend/models/result.py b/beat/web/backend/models/result.py index c58603e98e077b609e2a1332820369c9c70add6c..f000a2860698ef366d8d6b13738db7c0a2d1d2f9 100755 --- a/beat/web/backend/models/result.py +++ b/beat/web/backend/models/result.py @@ -26,17 +26,15 @@ ############################################################################### -from django.db import models - import simplejson +from django.db import models import beat.core.stats - class Result(models.Model): - '''Logging and status information concerning block or job execution. - ''' + """Logging and status information concerning block or job execution. + """ # exit status code status = models.IntegerField() @@ -47,33 +45,29 @@ class Result(models.Model): timed_out = models.BooleanField(default=False) cancelled = models.BooleanField(default=False) - def __str__(self): - status = 'success' if self.status == 0 else 'failed' - retval = 'Result(%s' % status + status = "success" if self.status == 0 else "failed" + retval = "Result(%s" % status if self.stdout: - retval += ', stdout=' + self.stdout + retval += ", stdout=" + self.stdout if self.stderr: - retval += ', stderr=' + self.stderr + retval += ", stderr=" + self.stderr if self.usrerr: - retval += ', usrerr=' + self.usrerr + retval += ", usrerr=" + self.usrerr - retval += ')' + retval += ")" return retval - def _get_stats(self): if self._stats is not None: return beat.core.stats.Statistics(simplejson.loads(self._stats)) else: return beat.core.stats.Statistics() - def _set_stats(self, v): self._stats = simplejson.dumps(v.as_dict()) - stats = property(_get_stats, _set_stats) diff --git a/beat/web/backend/models/slot.py b/beat/web/backend/models/slot.py index 17b10a7d5b6bd24802250bc6d7063677b8bc58b0..6e2eea908805ecf476acbc01547b51365446be9a 100755 --- a/beat/web/backend/models/slot.py +++ b/beat/web/backend/models/slot.py @@ -28,51 +28,48 @@ from django.db import models - -#---------------------------------------------------------- +# ---------------------------------------------------------- class SlotManager(models.Manager): - def get_by_natural_key(self, queue_name, worker_name): return self.get(queue__name=queue_name, worker__name=worker_name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Slot(models.Model): - queue = models.ForeignKey('Queue', related_name='slots', on_delete=models.CASCADE) + queue = models.ForeignKey("Queue", related_name="slots", on_delete=models.CASCADE) - worker = models.ForeignKey('Worker', related_name='slots', on_delete=models.CASCADE) + worker = models.ForeignKey("Worker", related_name="slots", on_delete=models.CASCADE) quantity = models.PositiveIntegerField( - 'Number of slots', - help_text='Number of processing slots to dedicate in this worker for a given queue' + "Number of slots", + help_text="Number of processing slots to dedicate in this worker for a given queue", ) priority = models.PositiveIntegerField( - default=0, - help_text='Priority of these slots on the defined queue' + default=0, help_text="Priority of these slots on the defined queue" ) - objects = SlotManager() - - #_____ Meta parameters __________ + # _____ Meta parameters __________ class Meta: - unique_together = ('queue', 'worker') - + unique_together = ("queue", "worker") - #_____ Overrides __________ + # _____ Overrides __________ def __str__(self): - return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker, - self.quantity, self.priority) - + return "%s - %s (slots: %d, priority: %d)" % ( + self.queue, + self.worker, + self.quantity, + self.priority, + ) def natural_key(self): return (self.queue.name, self.worker.name) diff --git a/beat/web/backend/models/worker.py b/beat/web/backend/models/worker.py index 1897a249bc84af97b8c1c09ada2d12e8cd0dce20..cc70226fff3e973608a5977a1cd466e5d0fe4dcf 100755 --- a/beat/web/backend/models/worker.py +++ b/beat/web/backend/models/worker.py @@ -25,27 +25,22 @@ # # ############################################################################### -import os -import signal -import subprocess -import psutil - import logging -logger = logging.getLogger(__name__) +import psutil from django.db import models -from django.db import transaction -from django.conf import settings from django.urls import reverse from ...common.texts import Messages +from .job import JobSplit +logger = logging.getLogger(__name__) -#---------------------------------------------------------- +# ---------------------------------------------------------- def _cleanup_zombies(): - '''Cleans-up eventual zombie subprocesses launched by the worker''' + """Cleans-up eventual zombie subprocesses launched by the worker""" for child in psutil.Process().children(recursive=True): try: @@ -56,33 +51,27 @@ def _cleanup_zombies(): pass -#---------------------------------------------------------- +# ---------------------------------------------------------- class WorkerManager(models.Manager): - def get_by_natural_key(self, name): return self.get(name=name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Worker(models.Model): - name = models.CharField( - max_length=100, - help_text=Messages['name'], - unique=True, - ) + name = models.CharField(max_length=100, help_text=Messages["name"], unique=True,) active = models.BooleanField( - help_text=u'If this worker is usable presently', - default=False, + help_text=u"If this worker is usable presently", default=False, ) update = models.BooleanField( - help_text=u'If this worker state must be updated at the next cycle', + help_text=u"If this worker state must be updated at the next cycle", default=False, ) @@ -90,52 +79,49 @@ class Worker(models.Model): cores = models.PositiveIntegerField() - memory = models.PositiveIntegerField(default=0, help_text='In megabytes') - - used_cores = models.PositiveIntegerField(default=0, help_text='In %') + memory = models.PositiveIntegerField(default=0, help_text="In megabytes") - used_memory = models.PositiveIntegerField(default=0, help_text='In %') + used_cores = models.PositiveIntegerField(default=0, help_text="In %") - info = models.TextField(null=True, blank=True, - help_text='Informative message from the worker') + used_memory = models.PositiveIntegerField(default=0, help_text="In %") + info = models.TextField( + null=True, blank=True, help_text="Informative message from the worker" + ) objects = WorkerManager() - - #_____ Overrides __________ - + # _____ Overrides __________ def __str__(self): - retval = '%s (%d cores, %d Mb)' % (self.name, self.cores, self.memory) + retval = "%s (%d cores, %d Mb)" % (self.name, self.cores, self.memory) if not self.active: - retval += ' [INACTIVE]' + retval += " [INACTIVE]" return retval - def natural_key(self): return (self.name,) - def get_admin_change_url(self): - return reverse('admin:backend_worker_change', args=(self.id,)) - + return reverse("admin:backend_worker_change", args=(self.id,)) def load(self): - '''Calculates the number of cores in use or to be used in the future''' + """Calculates the number of cores in use or to be used in the future""" return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) - def current_load(self): - '''Calculates the number of cores being used currently''' - return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=JobSplit.PROCESSING)]) - + """Calculates the number of cores being used currently""" + return sum( + [ + j.job.block.queue.cores_per_slot + for j in self.splits.filter(status=JobSplit.PROCESSING) + ] + ) def available_cores(self): - '''Calculates the number of available cores considering current load''' + """Calculates the number of available cores considering current load""" return max(self.cores - self.load(), 0) - def as_dict(self): - '''Returns a dictionary-like representation''' + """Returns a dictionary-like representation""" return dict(cores=self.cores, memory=self.memory)