From c614bac17fb4243b9e03d9a4f232a97ea6b830ce Mon Sep 17 00:00:00 2001 From: Samuel Gaist <samuel.gaist@idiap.ch> Date: Wed, 9 Sep 2020 13:40:40 +0200 Subject: [PATCH] [backend][models] Pre-commit cleanup --- beat/web/backend/models/__init__.py | 24 ++- beat/web/backend/models/environment.py | 10 +- beat/web/backend/models/job.py | 165 +++++++++++---------- beat/web/backend/models/local_scheduler.py | 5 +- beat/web/backend/models/queue.py | 111 +++++++------- beat/web/backend/models/result.py | 24 ++- beat/web/backend/models/slot.py | 35 ++--- beat/web/backend/models/worker.py | 74 ++++----- 8 files changed, 220 insertions(+), 228 deletions(-) diff --git a/beat/web/backend/models/__init__.py b/beat/web/backend/models/__init__.py index 6b2e0d1af..50a277e0a 100755 --- a/beat/web/backend/models/__init__.py +++ b/beat/web/backend/models/__init__.py @@ -26,16 +26,32 @@ ############################################################################### -from .environment import EnvironmentManager from .environment import Environment from .environment import EnvironmentLanguage +from .environment import EnvironmentManager from .job import Job from .job import JobSplit from .local_scheduler import LocalSchedulerProcesses -from .queue import QueueManager from .queue import Queue +from .queue import QueueManager from .result import Result -from .slot import SlotManager from .slot import Slot -from .worker import WorkerManager +from .slot import SlotManager from .worker import Worker +from .worker import WorkerManager + +__all__ = [ + "Environment", + "EnvironmentLanguage", + "EnvironmentManager", + "Job", + "JobSplit", + "LocalSchedulerProcesses", + "Queue", + "QueueManager", + "Result", + "Slot", + "SlotManager", + "Worker", + "WorkerManager", +] diff --git a/beat/web/backend/models/environment.py b/beat/web/backend/models/environment.py index 346a15a8e..7dfe83195 100755 --- a/beat/web/backend/models/environment.py +++ b/beat/web/backend/models/environment.py @@ -26,15 +26,15 @@ ############################################################################### from django.db import models +from django.db.models import Count +from django.db.models import Q from django.urls import reverse -from django.db.models import Count, Q from ...code.models import Code from ...common.models import Shareable from ...common.models import ShareableManager from ...common.texts import Messages - # ---------------------------------------------------------- @@ -58,8 +58,8 @@ class EnvironmentManager(ShareableManager): used for blocks that are done. """ - from ...experiments.models import Experiment from ...experiments.models import Block + from ...experiments.models import Experiment # Tries to figure through a maximum if blocks in the list have been # successfully used inside an environment. @@ -165,7 +165,9 @@ class Environment(Shareable): class EnvironmentLanguage(models.Model): - environment = models.ForeignKey(Environment, related_name="languages", on_delete=models.CASCADE) + environment = models.ForeignKey( + Environment, related_name="languages", on_delete=models.CASCADE + ) language = models.CharField( max_length=1, choices=Code.CODE_LANGUAGE, default=Code.PYTHON diff --git a/beat/web/backend/models/job.py b/beat/web/backend/models/job.py index 2ddd21cf4..4efe1c616 100755 --- a/beat/web/backend/models/job.py +++ b/beat/web/backend/models/job.py @@ -26,32 +26,28 @@ ############################################################################### import datetime - import logging -logger = logging.getLogger(__name__) import simplejson - -from django.db import utils -from django.db import models from django.conf import settings +from django.db import models import beat.core.data import beat.core.hash from .result import Result +logger = logging.getLogger(__name__) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobManager(models.Manager): - def create_job(self, block): # Compute the key of the job - hashes = [ x.hash for x in block.outputs.order_by('hash') ] - key = beat.core.hash.hash(''.join(hashes)) + hashes = [x.hash for x in block.outputs.order_by("hash")] + key = beat.core.hash.hash("".join(hashes)) # Determine if the job can be run or is dependent on others runnable_date = None @@ -59,28 +55,26 @@ class JobManager(models.Manager): runnable_date = datetime.datetime.now() # Create the job - job = self.model( - block=block, - key=key, - runnable_date=runnable_date - ) + job = self.model(block=block, key=key, runnable_date=runnable_date) job.save() return job -#---------------------------------------------------------- +# ---------------------------------------------------------- class Job(models.Model): - '''Class describing the execution of a Job on the backend''' + """Class describing the execution of a Job on the backend""" - block = models.OneToOneField('experiments.Block', null=True, - on_delete=models.CASCADE, related_name='job') + block = models.OneToOneField( + "experiments.Block", null=True, on_delete=models.CASCADE, related_name="job" + ) - result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE, - related_name='job') + result = models.OneToOneField( + Result, null=True, on_delete=models.CASCADE, related_name="job" + ) runnable_date = models.DateTimeField(null=True, blank=True) @@ -92,23 +86,23 @@ class Job(models.Model): mirror = models.BooleanField(default=False) - objects = JobManager() - def __str__(self): - return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % \ - (self.block.name, self.block.experiment.name, - self.key, str(self.mirror), - self.block.required_slots, - self.block.queue.cores_per_slot) + return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % ( + self.block.name, + self.block.experiment.name, + self.key, + str(self.mirror), + self.block.required_slots, + self.block.queue.cores_per_slot, + ) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplitManager(models.Manager): - def create_splits(self, job): # If no splitting is required, only create one split @@ -117,21 +111,29 @@ class JobSplitManager(models.Manager): split.save() return [split] - # Retrieve the list of synchronized inputs configuration = simplejson.loads(job.block.command) - inputs = [ entry for name, entry in configuration['inputs'].items() - if entry['channel'] == configuration['channel'] ] - + inputs = [ + entry + for name, entry in configuration["inputs"].items() + if entry["channel"] == configuration["channel"] + ] # Load the list of indices for each inputs indices = [] for input_cfg in inputs: - if 'database' in input_cfg: - indices.extend(beat.core.data.load_data_index_db(settings.CACHE_ROOT, input_cfg['path'])) + if "database" in input_cfg: + indices.extend( + beat.core.data.load_data_index_db( + settings.CACHE_ROOT, input_cfg["path"] + ) + ) else: - indices.append(beat.core.data.load_data_index(settings.CACHE_ROOT, input_cfg['path'])) - + indices.append( + beat.core.data.load_data_index( + settings.CACHE_ROOT, input_cfg["path"] + ) + ) # Attempt to split the indices nb_splits = job.block.required_slots @@ -146,21 +148,23 @@ class JobSplitManager(models.Manager): nb_splits -= 1 if nb_splits != job.block.required_slots: - message = "The processing of the block `%s' of experiment `%s' " \ - "was splitted in %d instead of the requested %d" % \ - (job.block.name, job.block.experiment.fullname(), - nb_splits, job.block.required_slots) - logger.warning(message) - + message = ( + "The processing of the block `%s' of experiment `%s' " + "was splitted in %d instead of the requested %d" + % ( + job.block.name, + job.block.experiment.fullname(), + nb_splits, + job.block.required_slots, + ) + ) + logger.warning(message) # Create the necessary splits and assign the ranges splits = [] for i, indices in enumerate(split_indices): split = JobSplit( - job=job, - split_index=i, - start_index=indices[0], - end_index=indices[1], + job=job, split_index=i, start_index=indices[0], end_index=indices[1], ) split.save() @@ -169,34 +173,35 @@ class JobSplitManager(models.Manager): return splits -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplit(models.Model): - '''Class describing a part of job of an experiment''' + """Class describing a part of job of an experiment""" - QUEUED = 'N' - PROCESSING = 'P' - COMPLETED = 'C' - FAILED = 'F' - CANCELLED = 'L' - CANCELLING = 'K' + QUEUED = "N" + PROCESSING = "P" + COMPLETED = "C" + FAILED = "F" + CANCELLED = "L" + CANCELLING = "K" STATUS = ( - (QUEUED, 'Queued'), - (PROCESSING, 'Processing'), - (COMPLETED, 'Completed'), - (FAILED, 'Failed'), - (CANCELLED, 'Cancelled'), - (CANCELLING, 'Cancelling'), + (QUEUED, "Queued"), + (PROCESSING, "Processing"), + (COMPLETED, "Completed"), + (FAILED, "Failed"), + (CANCELLED, "Cancelled"), + (CANCELLING, "Cancelling"), ) + worker = models.ForeignKey( + "Worker", null=True, on_delete=models.SET_NULL, related_name="splits" + ) - worker = models.ForeignKey('Worker', null=True, on_delete=models.SET_NULL, - related_name='splits') - - job = models.ForeignKey(Job, null=True, on_delete=models.CASCADE, - related_name='splits') + job = models.ForeignKey( + Job, null=True, on_delete=models.CASCADE, related_name="splits" + ) split_index = models.PositiveIntegerField() @@ -206,32 +211,28 @@ class JobSplit(models.Model): status = models.CharField(max_length=1, choices=STATUS, default=QUEUED) - result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE, - related_name='split') + result = models.OneToOneField( + Result, null=True, on_delete=models.CASCADE, related_name="split" + ) start_date = models.DateTimeField(null=True) end_date = models.DateTimeField(null=True) - objects = JobSplitManager() - class Meta: - unique_together = ('job', 'split_index') - + unique_together = ("job", "split_index") def __str__(self): - return "JobSplit(%s, index=%d, state=%s)%s" % \ - (self.job, self.split_index, self.status, - ('@%s' % self.worker) if self.worker else '') - + return "JobSplit(%s, index=%d, state=%s)%s" % ( + self.job, + self.split_index, + self.status, + ("@%s" % self.worker) if self.worker else "", + ) def done(self): - '''Says whether the job has finished or not''' + """Says whether the job has finished or not""" - return self.status in ( - JobSplit.COMPLETED, - JobSplit.FAILED, - JobSplit.CANCELLED, - ) + return self.status in (JobSplit.COMPLETED, JobSplit.FAILED, JobSplit.CANCELLED,) diff --git a/beat/web/backend/models/local_scheduler.py b/beat/web/backend/models/local_scheduler.py index 8dad9a2c9..f8e842cd2 100755 --- a/beat/web/backend/models/local_scheduler.py +++ b/beat/web/backend/models/local_scheduler.py @@ -30,11 +30,10 @@ from django.db import models class LocalSchedulerProcesses(models.Model): - '''Information about the processes launched by the local scheduler''' + """Information about the processes launched by the local scheduler""" name = models.TextField() pid = models.IntegerField() - def __str__(self): - return '%s (pid = %d)' % (self.name, self.pid) + return "%s (pid = %d)" % (self.name, self.pid) diff --git a/beat/web/backend/models/queue.py b/beat/web/backend/models/queue.py index 75b94d93b..4d349b243 100755 --- a/beat/web/backend/models/queue.py +++ b/beat/web/backend/models/queue.py @@ -28,104 +28,93 @@ import operator +from django.contrib.auth.models import Group from django.db import models from django.urls import reverse -from django.contrib.auth.models import Group from django.utils.translation import ugettext_lazy as _ - from guardian.shortcuts import get_perms from ...common.texts import Messages - -#---------------------------------------------------------- +# ---------------------------------------------------------- class QueueManager(models.Manager): - def get_by_natural_key(self, name): return self.get(name=name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Queue(models.Model): - name = models.CharField(max_length=100, help_text=Messages['name'], - unique=True) + name = models.CharField(max_length=100, help_text=Messages["name"], unique=True) - memory_limit = models.PositiveIntegerField(help_text='In megabytes') + memory_limit = models.PositiveIntegerField(help_text="In megabytes") - time_limit = models.PositiveIntegerField(help_text='In minutes') + time_limit = models.PositiveIntegerField(help_text="In minutes") cores_per_slot = models.PositiveIntegerField() max_slots_per_user = models.PositiveIntegerField() - environments = models.ManyToManyField('Environment', related_name='queues') - + environments = models.ManyToManyField("Environment", related_name="queues") objects = QueueManager() - - #_____ Meta parameters __________ + # _____ Meta parameters __________ class Meta: - permissions = [ - ['can_access', _('Can access queue')] - ] - + permissions = [["can_access", _("Can access queue")]] - #_____ Overrides __________ + # _____ Overrides __________ def __str__(self): - return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % ( - self.name, - self.time_limit, - self.memory_limit, - self.cores_per_slot, - self.max_slots_per_user + return ( + "%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)" + % ( + self.name, + self.time_limit, + self.memory_limit, + self.cores_per_slot, + self.max_slots_per_user, + ) ) - def natural_key(self): return (self.name,) - def get_admin_change_url(self): - return reverse('admin:backend_queue_change', args=(self.id,)) - + return reverse("admin:backend_queue_change", args=(self.id,)) - #_____ Utilities __________ + # _____ Utilities __________ def number_of_slots(self): - '''Total number of slots considering all assigned worker/slots''' + """Total number of slots considering all assigned worker/slots""" r = self.slots.filter(worker__active=True) - return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0 - + return r.aggregate(nslots=models.Sum("quantity"))["nslots"] or 0 def availability(self): - '''Returns the availability for this queue in terms of number of slots + """Returns the availability for this queue in terms of number of slots This method does not take into consideration the occupation of this queue slots caused by jobs on other queues. It only looks to its inner occupancy and reports on that. Returns an integer between 0 and :py:meth:`Queue.slots`. - ''' + """ from ..models import JobSplit - from ..models import Job - running = JobSplit.objects.filter(job__block__in=self.blocks.all(), - status=JobSplit.PROCESSING).count() + running = JobSplit.objects.filter( + job__block__in=self.blocks.all(), status=JobSplit.PROCESSING + ).count() return max(self.number_of_slots() - running, 0) - def worker_availability(self): - '''Returns an ordered dictionary indicating the availability of workers + """Returns an ordered dictionary indicating the availability of workers according to their queue priority. The dictionary contains, as value, the number of slots available per @@ -137,34 +126,42 @@ class Queue(models.Model): * load (the lower, the better) * name (alphabetically) - ''' + """ - workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \ - for k in self.slots.filter(worker__active=True)] + workers = [ + (k.worker, -k.priority, k.worker.load(), k.worker.name) + for k in self.slots.filter(worker__active=True) + ] - workers = sorted(workers, key=operator.itemgetter(1,2,3)) + workers = sorted(workers, key=operator.itemgetter(1, 2, 3)) return [w[0] for w in workers] - def splits(self): - '''Lists all job splits currently associated to this queue''' + """Lists all job splits currently associated to this queue""" + from ..models import JobSplit - from ..models import Job return JobSplit.objects.filter(job__block__queue=self) - def as_dict(self): - '''Returns a representation as a dictionary''' + """Returns a representation as a dictionary""" return { - 'memory-limit': self.memory_limit, - 'time-limit': self.time_limit, - 'cores-per-slot': self.cores_per_slot, - 'max-slots-per-user': self.max_slots_per_user, - 'environments': [k.fullname() for k in self.environments.all()], - 'slots': dict([(s.worker.name, dict(quantity=s.quantity, priority=s.priority)) - for s in self.slots.all()]), - 'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)] + "memory-limit": self.memory_limit, + "time-limit": self.time_limit, + "cores-per-slot": self.cores_per_slot, + "max-slots-per-user": self.max_slots_per_user, + "environments": [k.fullname() for k in self.environments.all()], + "slots": dict( + [ + (s.worker.name, dict(quantity=s.quantity, priority=s.priority)) + for s in self.slots.all() + ] + ), + "groups": [ + k.name + for k in Group.objects.all() + if "can_access" in get_perms(k, self) + ], } diff --git a/beat/web/backend/models/result.py b/beat/web/backend/models/result.py index c58603e98..f000a2860 100755 --- a/beat/web/backend/models/result.py +++ b/beat/web/backend/models/result.py @@ -26,17 +26,15 @@ ############################################################################### -from django.db import models - import simplejson +from django.db import models import beat.core.stats - class Result(models.Model): - '''Logging and status information concerning block or job execution. - ''' + """Logging and status information concerning block or job execution. + """ # exit status code status = models.IntegerField() @@ -47,33 +45,29 @@ class Result(models.Model): timed_out = models.BooleanField(default=False) cancelled = models.BooleanField(default=False) - def __str__(self): - status = 'success' if self.status == 0 else 'failed' - retval = 'Result(%s' % status + status = "success" if self.status == 0 else "failed" + retval = "Result(%s" % status if self.stdout: - retval += ', stdout=' + self.stdout + retval += ", stdout=" + self.stdout if self.stderr: - retval += ', stderr=' + self.stderr + retval += ", stderr=" + self.stderr if self.usrerr: - retval += ', usrerr=' + self.usrerr + retval += ", usrerr=" + self.usrerr - retval += ')' + retval += ")" return retval - def _get_stats(self): if self._stats is not None: return beat.core.stats.Statistics(simplejson.loads(self._stats)) else: return beat.core.stats.Statistics() - def _set_stats(self, v): self._stats = simplejson.dumps(v.as_dict()) - stats = property(_get_stats, _set_stats) diff --git a/beat/web/backend/models/slot.py b/beat/web/backend/models/slot.py index 17b10a7d5..6e2eea908 100755 --- a/beat/web/backend/models/slot.py +++ b/beat/web/backend/models/slot.py @@ -28,51 +28,48 @@ from django.db import models - -#---------------------------------------------------------- +# ---------------------------------------------------------- class SlotManager(models.Manager): - def get_by_natural_key(self, queue_name, worker_name): return self.get(queue__name=queue_name, worker__name=worker_name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Slot(models.Model): - queue = models.ForeignKey('Queue', related_name='slots', on_delete=models.CASCADE) + queue = models.ForeignKey("Queue", related_name="slots", on_delete=models.CASCADE) - worker = models.ForeignKey('Worker', related_name='slots', on_delete=models.CASCADE) + worker = models.ForeignKey("Worker", related_name="slots", on_delete=models.CASCADE) quantity = models.PositiveIntegerField( - 'Number of slots', - help_text='Number of processing slots to dedicate in this worker for a given queue' + "Number of slots", + help_text="Number of processing slots to dedicate in this worker for a given queue", ) priority = models.PositiveIntegerField( - default=0, - help_text='Priority of these slots on the defined queue' + default=0, help_text="Priority of these slots on the defined queue" ) - objects = SlotManager() - - #_____ Meta parameters __________ + # _____ Meta parameters __________ class Meta: - unique_together = ('queue', 'worker') - + unique_together = ("queue", "worker") - #_____ Overrides __________ + # _____ Overrides __________ def __str__(self): - return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker, - self.quantity, self.priority) - + return "%s - %s (slots: %d, priority: %d)" % ( + self.queue, + self.worker, + self.quantity, + self.priority, + ) def natural_key(self): return (self.queue.name, self.worker.name) diff --git a/beat/web/backend/models/worker.py b/beat/web/backend/models/worker.py index 1897a249b..cc70226ff 100755 --- a/beat/web/backend/models/worker.py +++ b/beat/web/backend/models/worker.py @@ -25,27 +25,22 @@ # # ############################################################################### -import os -import signal -import subprocess -import psutil - import logging -logger = logging.getLogger(__name__) +import psutil from django.db import models -from django.db import transaction -from django.conf import settings from django.urls import reverse from ...common.texts import Messages +from .job import JobSplit +logger = logging.getLogger(__name__) -#---------------------------------------------------------- +# ---------------------------------------------------------- def _cleanup_zombies(): - '''Cleans-up eventual zombie subprocesses launched by the worker''' + """Cleans-up eventual zombie subprocesses launched by the worker""" for child in psutil.Process().children(recursive=True): try: @@ -56,33 +51,27 @@ def _cleanup_zombies(): pass -#---------------------------------------------------------- +# ---------------------------------------------------------- class WorkerManager(models.Manager): - def get_by_natural_key(self, name): return self.get(name=name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Worker(models.Model): - name = models.CharField( - max_length=100, - help_text=Messages['name'], - unique=True, - ) + name = models.CharField(max_length=100, help_text=Messages["name"], unique=True,) active = models.BooleanField( - help_text=u'If this worker is usable presently', - default=False, + help_text=u"If this worker is usable presently", default=False, ) update = models.BooleanField( - help_text=u'If this worker state must be updated at the next cycle', + help_text=u"If this worker state must be updated at the next cycle", default=False, ) @@ -90,52 +79,49 @@ class Worker(models.Model): cores = models.PositiveIntegerField() - memory = models.PositiveIntegerField(default=0, help_text='In megabytes') - - used_cores = models.PositiveIntegerField(default=0, help_text='In %') + memory = models.PositiveIntegerField(default=0, help_text="In megabytes") - used_memory = models.PositiveIntegerField(default=0, help_text='In %') + used_cores = models.PositiveIntegerField(default=0, help_text="In %") - info = models.TextField(null=True, blank=True, - help_text='Informative message from the worker') + used_memory = models.PositiveIntegerField(default=0, help_text="In %") + info = models.TextField( + null=True, blank=True, help_text="Informative message from the worker" + ) objects = WorkerManager() - - #_____ Overrides __________ - + # _____ Overrides __________ def __str__(self): - retval = '%s (%d cores, %d Mb)' % (self.name, self.cores, self.memory) + retval = "%s (%d cores, %d Mb)" % (self.name, self.cores, self.memory) if not self.active: - retval += ' [INACTIVE]' + retval += " [INACTIVE]" return retval - def natural_key(self): return (self.name,) - def get_admin_change_url(self): - return reverse('admin:backend_worker_change', args=(self.id,)) - + return reverse("admin:backend_worker_change", args=(self.id,)) def load(self): - '''Calculates the number of cores in use or to be used in the future''' + """Calculates the number of cores in use or to be used in the future""" return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) - def current_load(self): - '''Calculates the number of cores being used currently''' - return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=JobSplit.PROCESSING)]) - + """Calculates the number of cores being used currently""" + return sum( + [ + j.job.block.queue.cores_per_slot + for j in self.splits.filter(status=JobSplit.PROCESSING) + ] + ) def available_cores(self): - '''Calculates the number of available cores considering current load''' + """Calculates the number of available cores considering current load""" return max(self.cores - self.load(), 0) - def as_dict(self): - '''Returns a dictionary-like representation''' + """Returns a dictionary-like representation""" return dict(cores=self.cores, memory=self.memory) -- GitLab