Skip to content
Snippets Groups Projects
Commit f7a9aad0 authored by Samuel GAIST's avatar Samuel GAIST Committed by Flavio TARSETTI
Browse files

[experiments][models] Pre-commit cleanup

parent b7aa84a6
No related branches found
No related tags found
2 merge requests!353Cleanup experiments,!342Django 3 migration
from .block import Block
from .cached_file import CachedFile
from .block_input import BlockInput
from .result import Result
from .experiment import validate_experiment
from .cached_file import CachedFile
from .experiment import DeclarationStorage
from .experiment import Experiment
from .experiment import validate_experiment
from .result import Result
__all__ = [
"Block",
"BlockInput",
"CachedFile",
"DeclarationStorage",
"Experiment",
"validate_experiment",
"Result",
]
......@@ -25,35 +25,28 @@
# #
###############################################################################
from django.db import models
from django.conf import settings
import beat.core.hash
import beat.core.data
import beat.core.algorithm
from datetime import datetime
from beat.core.utils import NumpyJSONEncoder
from django.db import models
from ...algorithms.models import Algorithm
from ...backend.models import Queue
from ...backend.models import Environment
from ...backend.models import Job
from .result import Result
import os
import simplejson
from datetime import datetime
from ...backend.models import Queue
#----------------------------------------------------------
# ----------------------------------------------------------
class BlockManager(models.Manager):
def get_by_natural_key(self, name, experiment_author,
toolchain_author, toolchain_name,
toolchain_version, experiment_name):
def get_by_natural_key(
self,
name,
experiment_author,
toolchain_author,
toolchain_name,
toolchain_version,
experiment_name,
):
return self.get(
name=name,
experiment__author__username=experiment_author,
......@@ -64,52 +57,57 @@ class BlockManager(models.Manager):
)
#----------------------------------------------------------
# ----------------------------------------------------------
class Block(models.Model):
PENDING = 'N'
PROCESSING = 'P'
DONE = 'C'
FAILED = 'F'
CANCELLED = 'L'
PENDING = "N"
PROCESSING = "P"
DONE = "C"
FAILED = "F"
CANCELLED = "L"
STATUS = (
(PENDING, 'Pending'),
(PROCESSING, 'Processing'),
(DONE, 'Done'),
(FAILED, 'Failed'),
(CANCELLED, 'Cancelled'),
(PENDING, "Pending"),
(PROCESSING, "Processing"),
(DONE, "Done"),
(FAILED, "Failed"),
(CANCELLED, "Cancelled"),
)
experiment = models.ForeignKey('Experiment', related_name='blocks',
on_delete=models.CASCADE)
experiment = models.ForeignKey(
"Experiment", related_name="blocks", on_delete=models.CASCADE
)
name = models.CharField(max_length=200)
command = models.TextField(null=True, blank=True)
status = models.CharField(max_length=1, choices=STATUS, default=PENDING)
analyzer = models.BooleanField(default=False)
algorithm = models.ForeignKey(Algorithm, related_name='blocks',
on_delete=models.CASCADE)
creation_date = models.DateTimeField(null=True, blank=True,
auto_now_add=True)
algorithm = models.ForeignKey(
Algorithm, related_name="blocks", on_delete=models.CASCADE
)
creation_date = models.DateTimeField(null=True, blank=True, auto_now_add=True)
start_date = models.DateTimeField(null=True, blank=True)
end_date = models.DateTimeField(null=True, blank=True)
environment = models.ForeignKey(Environment, related_name='blocks',
null=True, on_delete=models.SET_NULL)
queue = models.ForeignKey(Queue, related_name='blocks', null=True,
on_delete=models.SET_NULL)
environment = models.ForeignKey(
Environment, related_name="blocks", null=True, on_delete=models.SET_NULL
)
queue = models.ForeignKey(
Queue, related_name="blocks", null=True, on_delete=models.SET_NULL
)
required_slots = models.PositiveIntegerField(default=1)
channel = models.CharField(max_length=200, default='', blank=True,
help_text="Synchronization channel within the toolchain")
channel = models.CharField(
max_length=200,
default="",
blank=True,
help_text="Synchronization channel within the toolchain",
)
# relationship to blocks to which this block depends on
dependencies = models.ManyToManyField('self',
related_name='dependents',
blank=True,
symmetrical=False,
)
dependencies = models.ManyToManyField(
"self", related_name="dependents", blank=True, symmetrical=False,
)
# order of this block within the experiment - useful for the `backup'
# command, so we can dump the blocks in the right dependence order
......@@ -117,17 +115,19 @@ class Block(models.Model):
objects = BlockManager()
class Meta:
unique_together = ('experiment', 'name')
unique_together = ("experiment", "name")
# setup ordering so that the dump order respects self dependencies
ordering = ['experiment_id', 'execution_order']
ordering = ["experiment_id", "execution_order"]
def __str__(self):
return self.experiment.fullname() + ', ' + self.name + ' (%s)' % self.get_status_display()
return (
self.experiment.fullname()
+ ", "
+ self.name
+ " (%s)" % self.get_status_display()
)
def natural_key(self):
return (
......@@ -138,8 +138,8 @@ class Block(models.Model):
self.experiment.toolchain.version,
self.experiment.name,
)
natural_key.dependencies = ['experiments.experiment']
natural_key.dependencies = ["experiments.experiment"]
def save(self, *args, **kwargs):
# Ensure that the state of the block is consistent, just in case, but
......@@ -147,7 +147,7 @@ class Block(models.Model):
if self.status == Block.PENDING:
try:
self.results.all().delete()
except:
except Exception: # nosec
pass
self.start_date = None
......@@ -165,90 +165,69 @@ class Block(models.Model):
super(Block, self).save(*args, **kwargs)
# Accessors for statistics
def __return_first__(self, field, default=None):
return getattr(self.outputs.first(), field, default)
def first_cache(self):
return self.outputs.first()
def error_report(self):
return self.__return_first__('error_report')
return self.__return_first__("error_report")
def stdout(self):
return self.__return_first__('stdout')
return self.__return_first__("stdout")
def stderr(self):
return self.__return_first__('stderr')
return self.__return_first__("stderr")
def speed_up_real(self):
return self.__return_first__('speed_up_real')
return self.__return_first__("speed_up_real")
def speed_up_maximal(self):
return self.__return_first__('speed_up_maximal')
return self.__return_first__("speed_up_maximal")
def linear_execution_time(self):
return self.__return_first__('linear_execution_time')
return self.__return_first__("linear_execution_time")
def queuing_time(self):
return self.__return_first__('queuing_time')
return self.__return_first__("queuing_time")
def cpu_time(self):
return self.__return_first__('cpu_time')
return self.__return_first__("cpu_time")
def max_memory(self):
return self.__return_first__('max_memory')
return self.__return_first__("max_memory")
def data_read_size(self):
return self.__return_first__('data_read_size')
return self.__return_first__("data_read_size")
def data_read_nb_blocks(self):
return self.__return_first__('data_read_nb_blocks')
return self.__return_first__("data_read_nb_blocks")
def data_read_time(self):
return self.__return_first__('data_read_time')
return self.__return_first__("data_read_time")
def data_written_size(self):
return self.__return_first__('data_written_size')
return self.__return_first__("data_written_size")
def data_written_nb_blocks(self):
return self.__return_first__('data_written_nb_blocks')
return self.__return_first__("data_written_nb_blocks")
def data_written_time(self):
return self.__return_first__('data_written_time')
return self.__return_first__("data_written_time")
# Accessor for results
results = property(lambda self: self.__return_first__('results'))
results = property(lambda self: self.__return_first__("results"))
def done(self):
'''Says whether the block has finished or not'''
return self.status not in (Block.PENDING, Block.PROCESSING)
"""Says whether the block has finished or not"""
return self.status not in (Block.PENDING, Block.PROCESSING)
def is_runnable(self):
'''Checks if a block is runnable presently'''
return all([ k.status == Block.DONE for k in self.dependencies.all() ])
"""Checks if a block is runnable presently"""
return all([k.status == Block.DONE for k in self.dependencies.all()])
def set_canceled(self, end_date=None):
"""Update the block state to canceled
......@@ -270,7 +249,6 @@ class Block(models.Model):
self.save()
def set_failed(self, end_date):
"""Update the block state to failed
......
......@@ -31,14 +31,11 @@ from beat.web.databases.models import DatabaseSetOutput
from .block import Block
# ----------------------------------------------------------
class BlockInputManager(models.Manager):
def get_by_natural_key(self, block_natural_key,
cache_hash, database_natural_key):
def get_by_natural_key(self, block_natural_key, cache_hash, database_natural_key):
if block_natural_key:
block = Block.objects.get_by_natural_key(*block_natural_key)
else:
......@@ -48,28 +45,36 @@ class BlockInputManager(models.Manager):
return self.get(cache__hash=cache_hash, block=block)
else:
database = DatabaseSetOutput.objects.get_by_natural_key(
*database_natural_key)
*database_natural_key
)
return self.get(database=database, block=block)
# ----------------------------------------------------------
class BlockInput(models.Model):
block = models.ForeignKey(Block, related_name='inputs', null=True,
on_delete=models.CASCADE)
block = models.ForeignKey(
Block, related_name="inputs", null=True, on_delete=models.CASCADE
)
# if the input cames from another block, then this one is set
cache = models.ForeignKey('CachedFile', related_name='inputs', null=True,
on_delete=models.CASCADE)
cache = models.ForeignKey(
"CachedFile", related_name="inputs", null=True, on_delete=models.CASCADE
)
# if the input cames from a dataset, then this one is set
database = models.ForeignKey(DatabaseSetOutput, related_name='blocks',
null=True, on_delete=models.CASCADE)
database = models.ForeignKey(
DatabaseSetOutput, related_name="blocks", null=True, on_delete=models.CASCADE
)
channel = models.CharField(max_length=200, default='', blank=True,
help_text="Synchronization channel within "
"the toolchain")
channel = models.CharField(
max_length=200,
default="",
blank=True,
help_text="Synchronization channel within " "the toolchain",
)
objects = BlockInputManager()
......@@ -81,6 +86,9 @@ class BlockInput(models.Model):
cache_hash = self.cache and self.cache.hash
database_natural_key = self.database and self.database.natural_key()
return (block_natural_key, cache_hash, database_natural_key)
natural_key.dependencies = ['experiments.block',
'experiments.cachedfile',
'databases.databasesetoutput']
natural_key.dependencies = [
"experiments.block",
"experiments.cachedfile",
"databases.databasesetoutput",
]
......@@ -25,106 +25,100 @@
# #
###############################################################################
from django.db import models
import glob
import logging
import os
from django.conf import settings
from django.db import models
import beat.core.hash
import os
import glob
import logging
logger = logging.getLogger(__name__)
#----------------------------------------------------------
# ----------------------------------------------------------
class CachedFileManager(models.Manager):
def get_by_natural_key(self, hash):
return self.get(hash=hash)
#----------------------------------------------------------
# ----------------------------------------------------------
class CachedFile(models.Model):
NOT_CACHED = 'N'
PROCESSING = 'P'
CACHED = 'C'
NOT_CACHED = "N"
PROCESSING = "P"
CACHED = "C"
STATUS = (
(NOT_CACHED, 'Not cached'),
(PROCESSING, 'Processing'),
(CACHED, 'Cached'),
(NOT_CACHED, "Not cached"),
(PROCESSING, "Processing"),
(CACHED, "Cached"),
)
blocks = models.ManyToManyField('Block', related_name='outputs', blank=True)
blocks = models.ManyToManyField("Block", related_name="outputs", blank=True)
hash = models.CharField(max_length=64, unique=True)
status = models.CharField(max_length=1, choices=STATUS, default=NOT_CACHED)
# the total amount of time this block took to run considering the
# wall-clock time.
linear_execution_time = models.FloatField(default=0.)
linear_execution_time = models.FloatField(default=0.0)
# the real speed-up obtained by running this block using X slots
speed_up_real = models.FloatField(default=0.)
speed_up_real = models.FloatField(default=0.0)
# the maximum obtainable speed-up that could be achieved if all slots
# were running in parallel. Essentially linear_execution_time /
# maximum_slot_time
speed_up_maximal = models.FloatField(default=0.)
speed_up_maximal = models.FloatField(default=0.0)
# the time this block waited to be executed
queuing_time = models.FloatField(default=0.)
queuing_time = models.FloatField(default=0.0)
stdout = models.TextField(null=True, blank=True)
stderr = models.TextField(null=True, blank=True)
error_report = models.TextField(null=True, blank=True)
# other statistics of interest
cpu_time = models.FloatField(default=0.)
cpu_time = models.FloatField(default=0.0)
max_memory = models.BigIntegerField(default=0)
data_read_size = models.BigIntegerField(default=0)
data_read_nb_blocks = models.IntegerField(default=0)
data_read_time = models.FloatField(default=0.)
data_read_time = models.FloatField(default=0.0)
data_written_size = models.BigIntegerField(default=0)
data_written_nb_blocks = models.IntegerField(default=0)
data_written_time = models.FloatField(default=0.)
data_written_time = models.FloatField(default=0.0)
objects = CachedFileManager()
def __str__(self):
return 'CachedFile(%s, %s, %d blocks)' % (
self.hash, self.get_status_display(), self.blocks.count()
return "CachedFile(%s, %s, %d blocks)" % (
self.hash,
self.get_status_display(),
self.blocks.count(),
)
def natural_key(self):
return (self.hash,)
def path(self):
'''Returns the full path prefix to the cached file on disk'''
return beat.core.hash.toPath(self.hash, suffix='')
"""Returns the full path prefix to the cached file on disk"""
return beat.core.hash.toPath(self.hash, suffix="")
def absolute_path(self, cache=settings.CACHE_ROOT):
'''Returns the full path prefix to the cached file on disk'''
"""Returns the full path prefix to the cached file on disk"""
return os.path.join(cache, self.path())
def files(self, cache=settings.CACHE_ROOT):
'''Checks if any file belonging to this cache exist on disk'''
return glob.glob(self.absolute_path(cache) + '*')
"""Checks if any file belonging to this cache exist on disk"""
return glob.glob(self.absolute_path(cache) + "*")
def update(self, block_status):
from . import Block
......@@ -133,7 +127,9 @@ class CachedFile(models.Model):
self.status = CachedFile.CACHED
self.save()
elif (block_status == Block.PROCESSING) and (self.status != CachedFile.PROCESSING):
elif (block_status == Block.PROCESSING) and (
self.status != CachedFile.PROCESSING
):
self.status = CachedFile.PROCESSING
self.save()
......@@ -141,48 +137,55 @@ class CachedFile(models.Model):
self.status = CachedFile.NOT_CACHED
self.save()
def exists(self, cache=settings.CACHE_ROOT):
'''Checks if any file belonging to this cache exist on disk'''
"""Checks if any file belonging to this cache exist on disk"""
return bool(self.files(cache))
def check_checksums(self, cache=settings.CACHE_ROOT):
'''Checks if the cached files checksum properly'''
"""Checks if the cached files checksum properly"""
abs_path = self.absolute_path(cache)
data_files = sorted(glob.glob(abs_path + '*.index')) + \
sorted(glob.glob(abs_path + '*.data'))
data_files = sorted(glob.glob(abs_path + "*.index")) + sorted(
glob.glob(abs_path + "*.data")
)
checksum_files = sorted(glob.glob(abs_path + '*.index.checksum')) + \
sorted(glob.glob(abs_path + '*.data.checksum'))
checksum_files = sorted(glob.glob(abs_path + "*.index.checksum")) + sorted(
glob.glob(abs_path + "*.data.checksum")
)
if len(data_files) != len(checksum_files):
logger.warn("Number of files (%d) is different from checksums (%d) " \
"for cache `%s'", len(data_files), len(checksum_files),
abs_path)
logger.warn(
"Number of files (%d) is different from checksums (%d) "
"for cache `%s'",
len(data_files),
len(checksum_files),
abs_path,
)
return False
for data_file, checksum_file in zip(data_files, checksum_files):
with open(checksum_file, 'rt') as f:
with open(checksum_file, "rt") as f:
recorded = f.read().strip()
actual = beat.core.hash.hashFileContents(data_file)
if actual != recorded:
logger.warn("Checksum for file `%s' does not match (%s != %s)",
data_file, actual, recorded)
logger.warn(
"Checksum for file `%s' does not match (%s != %s)",
data_file,
actual,
recorded,
)
return False
return True
def delete_files(self, cache=settings.CACHE_ROOT):
'''
"""
Delete the files contained in this cache
'''
"""
files = self.files()
for file in files:
os.remove(file)
......
......@@ -25,73 +25,68 @@
# #
###############################################################################
from django.db import models
from django.conf import settings
import simplejson
from django.conf import settings
from django.db import models
#----------------------------------------------------------
# ----------------------------------------------------------
class ResultManager(models.Manager):
def get_by_natural_key(self, name, hash):
return self.get(name=name, cache__hash=hash)
#----------------------------------------------------------
# ----------------------------------------------------------
class Result(models.Model):
SIMPLE_TYPE_NAMES = ('int32', 'float32', 'bool', 'string')
SIMPLE_TYPE_NAMES = ("int32", "float32", "bool", "string")
cache = models.ForeignKey('CachedFile', related_name='results', null=True,
on_delete=models.CASCADE)
cache = models.ForeignKey(
"CachedFile", related_name="results", null=True, on_delete=models.CASCADE
)
name = models.CharField(max_length=200)
type = models.CharField(max_length=200)
primary = models.BooleanField(default=False)
data_value = models.TextField(null=True, blank=True)
objects = ResultManager()
#_____ Meta parameters __________
# _____ Meta parameters __________
class Meta:
unique_together = ('cache', 'name')
unique_together = ("cache", "name")
def __str__(self):
return '%s - %s' % (self.cache, self.name)
return "%s - %s" % (self.cache, self.name)
def natural_key(self):
return (
self.name,
self.cache.hash,
)
natural_key.dependencies = ['experiments.cachedfile']
natural_key.dependencies = ["experiments.cachedfile"]
def value(self):
if self.data_value in ['+inf', '-inf', 'NaN']:
if self.data_value in ["+inf", "-inf", "NaN"]:
return self.data_value
elif self.type == 'int32':
elif self.type == "int32":
return int(self.data_value)
elif self.type == 'float32':
elif self.type == "float32":
return float(self.data_value)
elif self.type == 'bool':
elif self.type == "bool":
return bool(self.data_value)
elif self.type == 'string':
elif self.type == "string":
return str(self.data_value)
elif self.type.startswith('%s/' % settings.PLOT_ACCOUNT) or (self.type in Result.SIMPLE_TYPE_NAMES):
elif self.type.startswith("%s/" % settings.PLOT_ACCOUNT) or (
self.type in Result.SIMPLE_TYPE_NAMES
):
return simplejson.loads(self.data_value)
return None
def is_chart(self):
return self.type not in Result.SIMPLE_TYPE_NAMES
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment