Skip to content
Snippets Groups Projects
Commit c614bac1 authored by Samuel GAIST's avatar Samuel GAIST Committed by Flavio TARSETTI
Browse files

[backend][models] Pre-commit cleanup

parent 398f0689
No related branches found
No related tags found
2 merge requests!347Cleanup backend,!342Django 3 migration
......@@ -26,16 +26,32 @@
###############################################################################
from .environment import EnvironmentManager
from .environment import Environment
from .environment import EnvironmentLanguage
from .environment import EnvironmentManager
from .job import Job
from .job import JobSplit
from .local_scheduler import LocalSchedulerProcesses
from .queue import QueueManager
from .queue import Queue
from .queue import QueueManager
from .result import Result
from .slot import SlotManager
from .slot import Slot
from .worker import WorkerManager
from .slot import SlotManager
from .worker import Worker
from .worker import WorkerManager
__all__ = [
"Environment",
"EnvironmentLanguage",
"EnvironmentManager",
"Job",
"JobSplit",
"LocalSchedulerProcesses",
"Queue",
"QueueManager",
"Result",
"Slot",
"SlotManager",
"Worker",
"WorkerManager",
]
......@@ -26,15 +26,15 @@
###############################################################################
from django.db import models
from django.db.models import Count
from django.db.models import Q
from django.urls import reverse
from django.db.models import Count, Q
from ...code.models import Code
from ...common.models import Shareable
from ...common.models import ShareableManager
from ...common.texts import Messages
# ----------------------------------------------------------
......@@ -58,8 +58,8 @@ class EnvironmentManager(ShareableManager):
used for blocks that are done.
"""
from ...experiments.models import Experiment
from ...experiments.models import Block
from ...experiments.models import Experiment
# Tries to figure through a maximum if blocks in the list have been
# successfully used inside an environment.
......@@ -165,7 +165,9 @@ class Environment(Shareable):
class EnvironmentLanguage(models.Model):
environment = models.ForeignKey(Environment, related_name="languages", on_delete=models.CASCADE)
environment = models.ForeignKey(
Environment, related_name="languages", on_delete=models.CASCADE
)
language = models.CharField(
max_length=1, choices=Code.CODE_LANGUAGE, default=Code.PYTHON
......
......@@ -26,32 +26,28 @@
###############################################################################
import datetime
import logging
logger = logging.getLogger(__name__)
import simplejson
from django.db import utils
from django.db import models
from django.conf import settings
from django.db import models
import beat.core.data
import beat.core.hash
from .result import Result
logger = logging.getLogger(__name__)
#----------------------------------------------------------
# ----------------------------------------------------------
class JobManager(models.Manager):
def create_job(self, block):
# Compute the key of the job
hashes = [ x.hash for x in block.outputs.order_by('hash') ]
key = beat.core.hash.hash(''.join(hashes))
hashes = [x.hash for x in block.outputs.order_by("hash")]
key = beat.core.hash.hash("".join(hashes))
# Determine if the job can be run or is dependent on others
runnable_date = None
......@@ -59,28 +55,26 @@ class JobManager(models.Manager):
runnable_date = datetime.datetime.now()
# Create the job
job = self.model(
block=block,
key=key,
runnable_date=runnable_date
)
job = self.model(block=block, key=key, runnable_date=runnable_date)
job.save()
return job
#----------------------------------------------------------
# ----------------------------------------------------------
class Job(models.Model):
'''Class describing the execution of a Job on the backend'''
"""Class describing the execution of a Job on the backend"""
block = models.OneToOneField('experiments.Block', null=True,
on_delete=models.CASCADE, related_name='job')
block = models.OneToOneField(
"experiments.Block", null=True, on_delete=models.CASCADE, related_name="job"
)
result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE,
related_name='job')
result = models.OneToOneField(
Result, null=True, on_delete=models.CASCADE, related_name="job"
)
runnable_date = models.DateTimeField(null=True, blank=True)
......@@ -92,23 +86,23 @@ class Job(models.Model):
mirror = models.BooleanField(default=False)
objects = JobManager()
def __str__(self):
return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % \
(self.block.name, self.block.experiment.name,
self.key, str(self.mirror),
self.block.required_slots,
self.block.queue.cores_per_slot)
return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % (
self.block.name,
self.block.experiment.name,
self.key,
str(self.mirror),
self.block.required_slots,
self.block.queue.cores_per_slot,
)
#----------------------------------------------------------
# ----------------------------------------------------------
class JobSplitManager(models.Manager):
def create_splits(self, job):
# If no splitting is required, only create one split
......@@ -117,21 +111,29 @@ class JobSplitManager(models.Manager):
split.save()
return [split]
# Retrieve the list of synchronized inputs
configuration = simplejson.loads(job.block.command)
inputs = [ entry for name, entry in configuration['inputs'].items()
if entry['channel'] == configuration['channel'] ]
inputs = [
entry
for name, entry in configuration["inputs"].items()
if entry["channel"] == configuration["channel"]
]
# Load the list of indices for each inputs
indices = []
for input_cfg in inputs:
if 'database' in input_cfg:
indices.extend(beat.core.data.load_data_index_db(settings.CACHE_ROOT, input_cfg['path']))
if "database" in input_cfg:
indices.extend(
beat.core.data.load_data_index_db(
settings.CACHE_ROOT, input_cfg["path"]
)
)
else:
indices.append(beat.core.data.load_data_index(settings.CACHE_ROOT, input_cfg['path']))
indices.append(
beat.core.data.load_data_index(
settings.CACHE_ROOT, input_cfg["path"]
)
)
# Attempt to split the indices
nb_splits = job.block.required_slots
......@@ -146,21 +148,23 @@ class JobSplitManager(models.Manager):
nb_splits -= 1
if nb_splits != job.block.required_slots:
message = "The processing of the block `%s' of experiment `%s' " \
"was splitted in %d instead of the requested %d" % \
(job.block.name, job.block.experiment.fullname(),
nb_splits, job.block.required_slots)
logger.warning(message)
message = (
"The processing of the block `%s' of experiment `%s' "
"was splitted in %d instead of the requested %d"
% (
job.block.name,
job.block.experiment.fullname(),
nb_splits,
job.block.required_slots,
)
)
logger.warning(message)
# Create the necessary splits and assign the ranges
splits = []
for i, indices in enumerate(split_indices):
split = JobSplit(
job=job,
split_index=i,
start_index=indices[0],
end_index=indices[1],
job=job, split_index=i, start_index=indices[0], end_index=indices[1],
)
split.save()
......@@ -169,34 +173,35 @@ class JobSplitManager(models.Manager):
return splits
#----------------------------------------------------------
# ----------------------------------------------------------
class JobSplit(models.Model):
'''Class describing a part of job of an experiment'''
"""Class describing a part of job of an experiment"""
QUEUED = 'N'
PROCESSING = 'P'
COMPLETED = 'C'
FAILED = 'F'
CANCELLED = 'L'
CANCELLING = 'K'
QUEUED = "N"
PROCESSING = "P"
COMPLETED = "C"
FAILED = "F"
CANCELLED = "L"
CANCELLING = "K"
STATUS = (
(QUEUED, 'Queued'),
(PROCESSING, 'Processing'),
(COMPLETED, 'Completed'),
(FAILED, 'Failed'),
(CANCELLED, 'Cancelled'),
(CANCELLING, 'Cancelling'),
(QUEUED, "Queued"),
(PROCESSING, "Processing"),
(COMPLETED, "Completed"),
(FAILED, "Failed"),
(CANCELLED, "Cancelled"),
(CANCELLING, "Cancelling"),
)
worker = models.ForeignKey(
"Worker", null=True, on_delete=models.SET_NULL, related_name="splits"
)
worker = models.ForeignKey('Worker', null=True, on_delete=models.SET_NULL,
related_name='splits')
job = models.ForeignKey(Job, null=True, on_delete=models.CASCADE,
related_name='splits')
job = models.ForeignKey(
Job, null=True, on_delete=models.CASCADE, related_name="splits"
)
split_index = models.PositiveIntegerField()
......@@ -206,32 +211,28 @@ class JobSplit(models.Model):
status = models.CharField(max_length=1, choices=STATUS, default=QUEUED)
result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE,
related_name='split')
result = models.OneToOneField(
Result, null=True, on_delete=models.CASCADE, related_name="split"
)
start_date = models.DateTimeField(null=True)
end_date = models.DateTimeField(null=True)
objects = JobSplitManager()
class Meta:
unique_together = ('job', 'split_index')
unique_together = ("job", "split_index")
def __str__(self):
return "JobSplit(%s, index=%d, state=%s)%s" % \
(self.job, self.split_index, self.status,
('@%s' % self.worker) if self.worker else '')
return "JobSplit(%s, index=%d, state=%s)%s" % (
self.job,
self.split_index,
self.status,
("@%s" % self.worker) if self.worker else "",
)
def done(self):
'''Says whether the job has finished or not'''
"""Says whether the job has finished or not"""
return self.status in (
JobSplit.COMPLETED,
JobSplit.FAILED,
JobSplit.CANCELLED,
)
return self.status in (JobSplit.COMPLETED, JobSplit.FAILED, JobSplit.CANCELLED,)
......@@ -30,11 +30,10 @@ from django.db import models
class LocalSchedulerProcesses(models.Model):
'''Information about the processes launched by the local scheduler'''
"""Information about the processes launched by the local scheduler"""
name = models.TextField()
pid = models.IntegerField()
def __str__(self):
return '%s (pid = %d)' % (self.name, self.pid)
return "%s (pid = %d)" % (self.name, self.pid)
......@@ -28,104 +28,93 @@
import operator
from django.contrib.auth.models import Group
from django.db import models
from django.urls import reverse
from django.contrib.auth.models import Group
from django.utils.translation import ugettext_lazy as _
from guardian.shortcuts import get_perms
from ...common.texts import Messages
#----------------------------------------------------------
# ----------------------------------------------------------
class QueueManager(models.Manager):
def get_by_natural_key(self, name):
return self.get(name=name)
#----------------------------------------------------------
# ----------------------------------------------------------
class Queue(models.Model):
name = models.CharField(max_length=100, help_text=Messages['name'],
unique=True)
name = models.CharField(max_length=100, help_text=Messages["name"], unique=True)
memory_limit = models.PositiveIntegerField(help_text='In megabytes')
memory_limit = models.PositiveIntegerField(help_text="In megabytes")
time_limit = models.PositiveIntegerField(help_text='In minutes')
time_limit = models.PositiveIntegerField(help_text="In minutes")
cores_per_slot = models.PositiveIntegerField()
max_slots_per_user = models.PositiveIntegerField()
environments = models.ManyToManyField('Environment', related_name='queues')
environments = models.ManyToManyField("Environment", related_name="queues")
objects = QueueManager()
#_____ Meta parameters __________
# _____ Meta parameters __________
class Meta:
permissions = [
['can_access', _('Can access queue')]
]
permissions = [["can_access", _("Can access queue")]]
#_____ Overrides __________
# _____ Overrides __________
def __str__(self):
return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % (
self.name,
self.time_limit,
self.memory_limit,
self.cores_per_slot,
self.max_slots_per_user
return (
"%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)"
% (
self.name,
self.time_limit,
self.memory_limit,
self.cores_per_slot,
self.max_slots_per_user,
)
)
def natural_key(self):
return (self.name,)
def get_admin_change_url(self):
return reverse('admin:backend_queue_change', args=(self.id,))
return reverse("admin:backend_queue_change", args=(self.id,))
#_____ Utilities __________
# _____ Utilities __________
def number_of_slots(self):
'''Total number of slots considering all assigned worker/slots'''
"""Total number of slots considering all assigned worker/slots"""
r = self.slots.filter(worker__active=True)
return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0
return r.aggregate(nslots=models.Sum("quantity"))["nslots"] or 0
def availability(self):
'''Returns the availability for this queue in terms of number of slots
"""Returns the availability for this queue in terms of number of slots
This method does not take into consideration the occupation of this
queue slots caused by jobs on other queues. It only looks to its inner
occupancy and reports on that.
Returns an integer between 0 and :py:meth:`Queue.slots`.
'''
"""
from ..models import JobSplit
from ..models import Job
running = JobSplit.objects.filter(job__block__in=self.blocks.all(),
status=JobSplit.PROCESSING).count()
running = JobSplit.objects.filter(
job__block__in=self.blocks.all(), status=JobSplit.PROCESSING
).count()
return max(self.number_of_slots() - running, 0)
def worker_availability(self):
'''Returns an ordered dictionary indicating the availability of workers
"""Returns an ordered dictionary indicating the availability of workers
according to their queue priority.
The dictionary contains, as value, the number of slots available per
......@@ -137,34 +126,42 @@ class Queue(models.Model):
* load (the lower, the better)
* name (alphabetically)
'''
"""
workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \
for k in self.slots.filter(worker__active=True)]
workers = [
(k.worker, -k.priority, k.worker.load(), k.worker.name)
for k in self.slots.filter(worker__active=True)
]
workers = sorted(workers, key=operator.itemgetter(1,2,3))
workers = sorted(workers, key=operator.itemgetter(1, 2, 3))
return [w[0] for w in workers]
def splits(self):
'''Lists all job splits currently associated to this queue'''
"""Lists all job splits currently associated to this queue"""
from ..models import JobSplit
from ..models import Job
return JobSplit.objects.filter(job__block__queue=self)
def as_dict(self):
'''Returns a representation as a dictionary'''
"""Returns a representation as a dictionary"""
return {
'memory-limit': self.memory_limit,
'time-limit': self.time_limit,
'cores-per-slot': self.cores_per_slot,
'max-slots-per-user': self.max_slots_per_user,
'environments': [k.fullname() for k in self.environments.all()],
'slots': dict([(s.worker.name, dict(quantity=s.quantity, priority=s.priority))
for s in self.slots.all()]),
'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)]
"memory-limit": self.memory_limit,
"time-limit": self.time_limit,
"cores-per-slot": self.cores_per_slot,
"max-slots-per-user": self.max_slots_per_user,
"environments": [k.fullname() for k in self.environments.all()],
"slots": dict(
[
(s.worker.name, dict(quantity=s.quantity, priority=s.priority))
for s in self.slots.all()
]
),
"groups": [
k.name
for k in Group.objects.all()
if "can_access" in get_perms(k, self)
],
}
......@@ -26,17 +26,15 @@
###############################################################################
from django.db import models
import simplejson
from django.db import models
import beat.core.stats
class Result(models.Model):
'''Logging and status information concerning block or job execution.
'''
"""Logging and status information concerning block or job execution.
"""
# exit status code
status = models.IntegerField()
......@@ -47,33 +45,29 @@ class Result(models.Model):
timed_out = models.BooleanField(default=False)
cancelled = models.BooleanField(default=False)
def __str__(self):
status = 'success' if self.status == 0 else 'failed'
retval = 'Result(%s' % status
status = "success" if self.status == 0 else "failed"
retval = "Result(%s" % status
if self.stdout:
retval += ', stdout=' + self.stdout
retval += ", stdout=" + self.stdout
if self.stderr:
retval += ', stderr=' + self.stderr
retval += ", stderr=" + self.stderr
if self.usrerr:
retval += ', usrerr=' + self.usrerr
retval += ", usrerr=" + self.usrerr
retval += ')'
retval += ")"
return retval
def _get_stats(self):
if self._stats is not None:
return beat.core.stats.Statistics(simplejson.loads(self._stats))
else:
return beat.core.stats.Statistics()
def _set_stats(self, v):
self._stats = simplejson.dumps(v.as_dict())
stats = property(_get_stats, _set_stats)
......@@ -28,51 +28,48 @@
from django.db import models
#----------------------------------------------------------
# ----------------------------------------------------------
class SlotManager(models.Manager):
def get_by_natural_key(self, queue_name, worker_name):
return self.get(queue__name=queue_name, worker__name=worker_name)
#----------------------------------------------------------
# ----------------------------------------------------------
class Slot(models.Model):
queue = models.ForeignKey('Queue', related_name='slots', on_delete=models.CASCADE)
queue = models.ForeignKey("Queue", related_name="slots", on_delete=models.CASCADE)
worker = models.ForeignKey('Worker', related_name='slots', on_delete=models.CASCADE)
worker = models.ForeignKey("Worker", related_name="slots", on_delete=models.CASCADE)
quantity = models.PositiveIntegerField(
'Number of slots',
help_text='Number of processing slots to dedicate in this worker for a given queue'
"Number of slots",
help_text="Number of processing slots to dedicate in this worker for a given queue",
)
priority = models.PositiveIntegerField(
default=0,
help_text='Priority of these slots on the defined queue'
default=0, help_text="Priority of these slots on the defined queue"
)
objects = SlotManager()
#_____ Meta parameters __________
# _____ Meta parameters __________
class Meta:
unique_together = ('queue', 'worker')
unique_together = ("queue", "worker")
#_____ Overrides __________
# _____ Overrides __________
def __str__(self):
return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker,
self.quantity, self.priority)
return "%s - %s (slots: %d, priority: %d)" % (
self.queue,
self.worker,
self.quantity,
self.priority,
)
def natural_key(self):
return (self.queue.name, self.worker.name)
......@@ -25,27 +25,22 @@
# #
###############################################################################
import os
import signal
import subprocess
import psutil
import logging
logger = logging.getLogger(__name__)
import psutil
from django.db import models
from django.db import transaction
from django.conf import settings
from django.urls import reverse
from ...common.texts import Messages
from .job import JobSplit
logger = logging.getLogger(__name__)
#----------------------------------------------------------
# ----------------------------------------------------------
def _cleanup_zombies():
'''Cleans-up eventual zombie subprocesses launched by the worker'''
"""Cleans-up eventual zombie subprocesses launched by the worker"""
for child in psutil.Process().children(recursive=True):
try:
......@@ -56,33 +51,27 @@ def _cleanup_zombies():
pass
#----------------------------------------------------------
# ----------------------------------------------------------
class WorkerManager(models.Manager):
def get_by_natural_key(self, name):
return self.get(name=name)
#----------------------------------------------------------
# ----------------------------------------------------------
class Worker(models.Model):
name = models.CharField(
max_length=100,
help_text=Messages['name'],
unique=True,
)
name = models.CharField(max_length=100, help_text=Messages["name"], unique=True,)
active = models.BooleanField(
help_text=u'If this worker is usable presently',
default=False,
help_text=u"If this worker is usable presently", default=False,
)
update = models.BooleanField(
help_text=u'If this worker state must be updated at the next cycle',
help_text=u"If this worker state must be updated at the next cycle",
default=False,
)
......@@ -90,52 +79,49 @@ class Worker(models.Model):
cores = models.PositiveIntegerField()
memory = models.PositiveIntegerField(default=0, help_text='In megabytes')
used_cores = models.PositiveIntegerField(default=0, help_text='In %')
memory = models.PositiveIntegerField(default=0, help_text="In megabytes")
used_memory = models.PositiveIntegerField(default=0, help_text='In %')
used_cores = models.PositiveIntegerField(default=0, help_text="In %")
info = models.TextField(null=True, blank=True,
help_text='Informative message from the worker')
used_memory = models.PositiveIntegerField(default=0, help_text="In %")
info = models.TextField(
null=True, blank=True, help_text="Informative message from the worker"
)
objects = WorkerManager()
#_____ Overrides __________
# _____ Overrides __________
def __str__(self):
retval = '%s (%d cores, %d Mb)' % (self.name, self.cores, self.memory)
retval = "%s (%d cores, %d Mb)" % (self.name, self.cores, self.memory)
if not self.active:
retval += ' [INACTIVE]'
retval += " [INACTIVE]"
return retval
def natural_key(self):
return (self.name,)
def get_admin_change_url(self):
return reverse('admin:backend_worker_change', args=(self.id,))
return reverse("admin:backend_worker_change", args=(self.id,))
def load(self):
'''Calculates the number of cores in use or to be used in the future'''
"""Calculates the number of cores in use or to be used in the future"""
return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()])
def current_load(self):
'''Calculates the number of cores being used currently'''
return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=JobSplit.PROCESSING)])
"""Calculates the number of cores being used currently"""
return sum(
[
j.job.block.queue.cores_per_slot
for j in self.splits.filter(status=JobSplit.PROCESSING)
]
)
def available_cores(self):
'''Calculates the number of available cores considering current load'''
"""Calculates the number of available cores considering current load"""
return max(self.cores - self.load(), 0)
def as_dict(self):
'''Returns a dictionary-like representation'''
"""Returns a dictionary-like representation"""
return dict(cores=self.cores, memory=self.memory)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment