Skip to content
Snippets Groups Projects
Commit 29694586 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[experiments] Add constrain to Result objects; Simplify some code

parent b1777848
No related branches found
No related tags found
1 merge request!194Scheduler
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models, utils
from django.conf import settings
import simplejson
import beat.core.experiment
from ...common import storage
def move_result_to_cache(apps, schema_editor):
'''Moves the result association from the block to the related cache file'''
Result = apps.get_model("experiments", "Result")
total = Result.objects.count()
if total: print('')
for i, r in enumerate(Result.objects.all()):
print("Resetting result %d/%d..." % (i+1, total))
r.cache = r.block.hashes.first()
r.save()
def reset_blocks(apps, schema_editor):
'''Resets block dependencies and queue relationship'''
Experiment = apps.get_model("experiments", "Experiment")
Block = apps.get_model("experiments", "Block")
BlockInput = apps.get_model("experiments", "BlockInput")
CachedFile = apps.get_model("experiments", "CachedFile")
Queue = apps.get_model("backend", "Queue")
Environment = apps.get_model("backend", "Environment")
Algorithm = apps.get_model("algorithms", "Algorithm")
DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput")
total = Experiment.objects.count()
for i, e in enumerate(Experiment.objects.order_by('id')):
fullname = '%s/%s/%s/%d/%s' % (
e.author.username,
e.toolchain.author.username,
e.toolchain.name,
e.toolchain.version,
e.name,
)
print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \
(i+1, total, fullname, e.id))
xp_decl = simplejson.loads(storage.get_file_content(e,
'declaration_file'))
tc_decl = simplejson.loads(storage.get_file_content(e.toolchain,
'declaration_file'))
xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl,
tc_decl))
if xp.errors:
message = "The experiment `%s' isn't valid (skipping " \
"block update), due to the following errors:\n * %s"
print message % (fullname, '\n * '.join(xp.errors))
continue
# Loads the experiment execution description, creating the Block's,
# BlockInput's and BlockOutput's as required.
for block_name, description in xp.setup().items():
# Checks that the Queue/Environment exists
job_description = description['configuration']
env = Environment.objects.filter(
name=job_description['environment']['name'],
version=job_description['environment']['version'],
)
if not env:
print("Cannot find environment `%s (%s)' - not setting" % \
(job_description['environment']['name'],
job_description['environment']['version']))
env = None
else:
env = env[0]
# Search for queue that contains a specific environment
# notice we don't require environment to exist in relation to
# the queue as it may have been removed already.
queue = Queue.objects.filter(name=job_description['queue'])
if not queue:
print("Cannot find queue `%s'" % job_description['queue'])
queue = None
else:
queue = queue[0]
parts = job_description['algorithm'].split('/')
algorithm = Algorithm.objects.get(
author__username=parts[0],
name=parts[1],
version=parts[2],
)
# Ties the block in
slots = job_description.get('nb_slots')
try:
b, _ = Block.objects.get_or_create(experiment=e,
name=block_name, algorithm=algorithm)
except utils.IntegrityError as exc:
print("Block `%s' for experiment `%s' already exists - " \
"modifying entry for migration purposes. This " \
"issue is due a misconnection on the toolchain level " \
"(known case: tpereira/full_isv/2)" % \
(block_name, fullname))
b = Block.objects.get(experiment=e, name=block_name)
b.command=simplejson.dumps(job_description, indent=4)
b.status='N' if (e.status == 'P') else b.status
b.environment=env
b.queue=queue
b.algorithm = algorithm
b.analyzer = (algorithm.result_dataformat is not None)
b.required_slots=job_description['nb_slots']
b.channel=job_description['channel']
b.save()
# from this point: requires block to have an assigned id
b.dependencies.add(*[e.blocks.get(name=k) \
for k in description['dependencies']])
# reset inputs and outputs - creates if necessary only
for v in job_description['inputs'].values():
if 'database' in v: #database input
db = DatabaseSetOutput.objects.get(hash=v['hash'])
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], database=db)
else:
cache = CachedFile.objects.get(hash=v['hash'])
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], cache=cache)
current = list(b.outputs.all())
b.outputs.clear() #dissociates all current outputs
outputs = job_description.get('outputs',
{'': job_description.get('result')})
for v in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=v['hash'])
if cr:
if len(current) == len(outputs): #copy
cache.linear_exedution_time = \
current[0].linear_execution_time
cache.speed_up_real = current[0].speed_up_real
cache.speed_up_maximal = current[0].speed_up_maximal
cache.queuing_time = current[0].queuing_time
cache.stdout = current[0].stdout
cache.stderr = current[0].stderr
cache.error_report = current[0].error_report
cache.cpu_time = current[0].cpu_time
cache.max_memory = current[0].max_memory
cache.data_read_size = current[0].data_read_size
cache.data_read_nb_blocks = \
current[0].data_read_nb_blocks
cache.data_read_time = current[0].data_read_time
cache.data_written_size = current[0].data_written_size
cache.data_written_nb_blocks = \
current[0].data_written_nb_blocks
cache.data_written_time = current[0].data_written_time
print("CachedFile data `%s' COPIED from `%s'" % \
(cache.hash, current[0].hash))
else:
print("CachedFile (hash=%s) CREATED for block `%s' " \
"of experiment `%s' which is in state `%s'" % \
(cache.hash, block_name, fullname,
b.get_status_display()))
cache.blocks.add(b)
#asserts all blocks (except analysis blocks have dependents)
for b in e.blocks.all():
assert (b.analyzer and b.dependents.count() == 0) or \
b.dependents.count() > 0
#asserts all analysis blocks have only one output
for b in e.blocks.filter(analyzer=True):
assert b.outputs.count() == 1
class Migration(migrations.Migration):
dependencies = [
('backend', '0002_auto_20160330_0005'),
('databases', '0002_auto_20160329_1733'),
('experiments', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='result',
name='cache',
field=models.ForeignKey(related_name='results',
to='experiments.CachedFile', null=True),
),
migrations.RunPython(move_result_to_cache),
migrations.RemoveField(
model_name='result',
name='block',
),
migrations.CreateModel(
name='BlockInput',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False,
auto_created=True, primary_key=True)),
('channel', models.CharField(default=b'',
help_text=b'Synchronization channel within the toolchain',
max_length=200, blank=True)),
('block', models.ForeignKey(related_name='inputs',
to='experiments.Block', null=True)),
('cache', models.ForeignKey(related_name='inputs',
to='experiments.CachedFile', null=True)),
('database', models.ForeignKey(related_name='blocks',
to='databases.DatabaseSetOutput', null=True)),
],
),
migrations.AddField(
model_name='block',
name='channel',
field=models.CharField(default=b'',
help_text=b'Synchronization channel within the toolchain',
max_length=200, blank=True),
),
migrations.AddField(
model_name='block',
name='command',
field=models.TextField(null=True, blank=True),
),
migrations.AddField(
model_name='block',
name='dependencies',
field=models.ManyToManyField(related_name='dependents',
to='experiments.Block', blank=True),
),
migrations.AlterField(
model_name='block',
name='environment',
field=models.ForeignKey(related_name='blocks',
on_delete=models.deletion.SET_NULL, to='backend.Environment',
null=True),
),
migrations.AddField(
model_name='block',
name='queue',
field=models.ForeignKey(related_name='blocks',
on_delete=models.deletion.SET_NULL, to='backend.Queue',
null=True),
),
migrations.AddField(
model_name='block',
name='required_slots',
field=models.PositiveIntegerField(default=1),
),
migrations.AddField(
model_name='block',
name='runnable_date',
field=models.DateTimeField(null=True, blank=True),
),
migrations.AlterField(
model_name='block',
name='status',
field=models.CharField(default=b'N', max_length=1,
choices=[
(b'N', b'Not cached'),
(b'P', b'Processing'),
(b'C', b'Cached'),
(b'F', b'Failed'),
(b'S', b'Skipped'),
(b'L', b'Cancelled'),
]
),
),
migrations.AlterUniqueTogether(
name='block',
unique_together=set([('experiment', 'name')]),
),
migrations.AlterField(
model_name='cachedfile',
name='blocks',
field=models.ManyToManyField(related_name='outputs',
to='experiments.Block', blank=True),
),
migrations.RunPython(reset_blocks),
]
......@@ -568,14 +568,11 @@ class Experiment(Shareable):
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], cache=cache)
current = list(b.outputs.all())
b.outputs.clear()
outputs = job_description.get('outputs',
{'': job_description.get('result')})
for v in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=v['hash'])
if cr:
if len(current) == len(outputs): cache.copy(current[0])
cache.blocks.add(b)
......@@ -806,12 +803,10 @@ class Block(models.Model):
# Accessors for statistics
def __return_first__(self, field):
if not self.outputs.count(): return ''
return getattr(self.outputs.first(), field)
def __return_first__(self, field, default=None):
return getattr(self.outputs.first(), field, default)
def first_cache(self):
if not self.outputs.count(): return None
return self.outputs.first()
def error_report(self):
......@@ -824,40 +819,40 @@ class Block(models.Model):
return self.__return_first__('stderr')
def speed_up_real(self):
return self.__return_first__('speed_up_real') or 0.
return self.__return_first__('speed_up_real')
def speed_up_maximal(self):
return self.__return_first__('speed_up_maximal') or 0.
return self.__return_first__('speed_up_maximal')
def linear_execution_time(self):
return self.__return_first__('linear_execution_time') or 0.
return self.__return_first__('linear_execution_time')
def queuing_time(self):
return self.__return_first__('queuing_time') or 0.
return self.__return_first__('queuing_time')
def cpu_time(self):
return self.__return_first__('cpu_time') or 0.
return self.__return_first__('cpu_time')
def max_memory(self):
return self.__return_first__('max_memory') or 0
return self.__return_first__('max_memory')
def data_read_size(self):
return self.__return_first__('data_read_size') or 0
return self.__return_first__('data_read_size')
def data_read_nb_blocks(self):
return self.__return_first__('data_read_nb_blocks') or 0
return self.__return_first__('data_read_nb_blocks')
def data_read_time(self):
return self.__return_first__('data_read_time') or 0.
return self.__return_first__('data_read_time')
def data_written_size(self):
return self.__return_first__('data_written_size') or 0
return self.__return_first__('data_written_size')
def data_written_nb_blocks(self):
return self.__return_first__('data_written_nb_blocks') or 0
return self.__return_first__('data_written_nb_blocks')
def data_written_time(self):
return self.__return_first__('data_written_time') or 0.
return self.__return_first__('data_written_time')
# Accessor for results
results = property(lambda self: self.__return_first__('results'))
......@@ -917,24 +912,6 @@ class CachedFile(models.Model):
return self.hash
def copy_data(self, other):
self.linear_execution_time = other.linear_execution_time
self.speed_up_real = other.speed_up_real
self.speed_up_maximal = other.speed_up_maximal
self.queuing_time = other.queuing_time
self.stdout = other.stdout
self.stderr = other.stderr
self.error_report = other.error_report
self.cpu_time = other.cpu_time
self.max_memory = other.max_memory
self.data_read_size = other.data_read_size
self.data_read_nb_blocks = other.data_read_nb_blocks
self.data_read_time = other.data_read_time
self.data_written_size = other.data_written_size
self.data_written_nb_blocks = other.data_written_nb_blocks
self.data_written_time = other.data_written_time
def path(self):
'''Returns the full path prefix to the cached file on disk'''
......@@ -1014,8 +991,16 @@ class Result(models.Model):
objects = ResultManager()
#_____ Meta parameters __________
class Meta:
unique_together = ('cache', 'name')
def __str__(self):
return '%s - %s' % (self.block, self.name)
return '%s - %s' % (self.cache, self.name)
def natural_key(self):
return (
......@@ -1023,6 +1008,7 @@ class Result(models.Model):
self.cache.hash,
)
def value(self):
if self.data_value in ['+inf', '-inf', 'NaN']:
return self.data_value
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment