diff --git a/beat/web/experiments/migrations/0002_auto_20160330_0953.py b/beat/web/experiments/migrations/0002_auto_20160330_0953.py deleted file mode 100644 index b3675057971af363dad1f30b4bd2545a488c6546..0000000000000000000000000000000000000000 --- a/beat/web/experiments/migrations/0002_auto_20160330_0953.py +++ /dev/null @@ -1,289 +0,0 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - -from django.db import migrations, models, utils -from django.conf import settings - -import simplejson -import beat.core.experiment -from ...common import storage - - -def move_result_to_cache(apps, schema_editor): - '''Moves the result association from the block to the related cache file''' - - Result = apps.get_model("experiments", "Result") - - total = Result.objects.count() - if total: print('') - for i, r in enumerate(Result.objects.all()): - print("Resetting result %d/%d..." % (i+1, total)) - r.cache = r.block.hashes.first() - r.save() - - -def reset_blocks(apps, schema_editor): - '''Resets block dependencies and queue relationship''' - - Experiment = apps.get_model("experiments", "Experiment") - Block = apps.get_model("experiments", "Block") - BlockInput = apps.get_model("experiments", "BlockInput") - CachedFile = apps.get_model("experiments", "CachedFile") - Queue = apps.get_model("backend", "Queue") - Environment = apps.get_model("backend", "Environment") - Algorithm = apps.get_model("algorithms", "Algorithm") - DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput") - - total = Experiment.objects.count() - for i, e in enumerate(Experiment.objects.order_by('id')): - - fullname = '%s/%s/%s/%d/%s' % ( - e.author.username, - e.toolchain.author.username, - e.toolchain.name, - e.toolchain.version, - e.name, - ) - - print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \ - (i+1, total, fullname, e.id)) - - xp_decl = simplejson.loads(storage.get_file_content(e, - 'declaration_file')) - tc_decl = simplejson.loads(storage.get_file_content(e.toolchain, - 'declaration_file')) - - xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl, - tc_decl)) - - if xp.errors: - message = "The experiment `%s' isn't valid (skipping " \ - "block update), due to the following errors:\n * %s" - print message % (fullname, '\n * '.join(xp.errors)) - continue - - # Loads the experiment execution description, creating the Block's, - # BlockInput's and BlockOutput's as required. - for block_name, description in xp.setup().items(): - - # Checks that the Queue/Environment exists - job_description = description['configuration'] - - env = Environment.objects.filter( - name=job_description['environment']['name'], - version=job_description['environment']['version'], - ) - - if not env: - print("Cannot find environment `%s (%s)' - not setting" % \ - (job_description['environment']['name'], - job_description['environment']['version'])) - env = None - else: - env = env[0] - - # Search for queue that contains a specific environment - # notice we don't require environment to exist in relation to - # the queue as it may have been removed already. - queue = Queue.objects.filter(name=job_description['queue']) - if not queue: - print("Cannot find queue `%s'" % job_description['queue']) - queue = None - else: - queue = queue[0] - - parts = job_description['algorithm'].split('/') - algorithm = Algorithm.objects.get( - author__username=parts[0], - name=parts[1], - version=parts[2], - ) - - # Ties the block in - slots = job_description.get('nb_slots') - - try: - b, _ = Block.objects.get_or_create(experiment=e, - name=block_name, algorithm=algorithm) - except utils.IntegrityError as exc: - print("Block `%s' for experiment `%s' already exists - " \ - "modifying entry for migration purposes. This " \ - "issue is due a misconnection on the toolchain level " \ - "(known case: tpereira/full_isv/2)" % \ - (block_name, fullname)) - b = Block.objects.get(experiment=e, name=block_name) - - b.command=simplejson.dumps(job_description, indent=4) - b.status='N' if (e.status == 'P') else b.status - b.environment=env - b.queue=queue - b.algorithm = algorithm - b.analyzer = (algorithm.result_dataformat is not None) - b.required_slots=job_description['nb_slots'] - b.channel=job_description['channel'] - b.save() - - # from this point: requires block to have an assigned id - b.dependencies.add(*[e.blocks.get(name=k) \ - for k in description['dependencies']]) - - # reset inputs and outputs - creates if necessary only - for v in job_description['inputs'].values(): - if 'database' in v: #database input - db = DatabaseSetOutput.objects.get(hash=v['hash']) - BlockInput.objects.get_or_create(block=b, - channel=v['channel'], database=db) - else: - cache = CachedFile.objects.get(hash=v['hash']) - BlockInput.objects.get_or_create(block=b, - channel=v['channel'], cache=cache) - - current = list(b.outputs.all()) - b.outputs.clear() #dissociates all current outputs - outputs = job_description.get('outputs', - {'': job_description.get('result')}) - for v in outputs.values(): - cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) - if cr: - if len(current) == len(outputs): #copy - cache.linear_exedution_time = \ - current[0].linear_execution_time - cache.speed_up_real = current[0].speed_up_real - cache.speed_up_maximal = current[0].speed_up_maximal - cache.queuing_time = current[0].queuing_time - cache.stdout = current[0].stdout - cache.stderr = current[0].stderr - cache.error_report = current[0].error_report - cache.cpu_time = current[0].cpu_time - cache.max_memory = current[0].max_memory - cache.data_read_size = current[0].data_read_size - cache.data_read_nb_blocks = \ - current[0].data_read_nb_blocks - cache.data_read_time = current[0].data_read_time - cache.data_written_size = current[0].data_written_size - cache.data_written_nb_blocks = \ - current[0].data_written_nb_blocks - cache.data_written_time = current[0].data_written_time - print("CachedFile data `%s' COPIED from `%s'" % \ - (cache.hash, current[0].hash)) - else: - print("CachedFile (hash=%s) CREATED for block `%s' " \ - "of experiment `%s' which is in state `%s'" % \ - (cache.hash, block_name, fullname, - b.get_status_display())) - cache.blocks.add(b) - - #asserts all blocks (except analysis blocks have dependents) - for b in e.blocks.all(): - assert (b.analyzer and b.dependents.count() == 0) or \ - b.dependents.count() > 0 - - #asserts all analysis blocks have only one output - for b in e.blocks.filter(analyzer=True): - assert b.outputs.count() == 1 - - -class Migration(migrations.Migration): - - dependencies = [ - ('backend', '0002_auto_20160330_0005'), - ('databases', '0002_auto_20160329_1733'), - ('experiments', '0001_initial'), - ] - - operations = [ - migrations.AddField( - model_name='result', - name='cache', - field=models.ForeignKey(related_name='results', - to='experiments.CachedFile', null=True), - ), - migrations.RunPython(move_result_to_cache), - migrations.RemoveField( - model_name='result', - name='block', - ), - migrations.CreateModel( - name='BlockInput', - fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, - auto_created=True, primary_key=True)), - ('channel', models.CharField(default=b'', - help_text=b'Synchronization channel within the toolchain', - max_length=200, blank=True)), - ('block', models.ForeignKey(related_name='inputs', - to='experiments.Block', null=True)), - ('cache', models.ForeignKey(related_name='inputs', - to='experiments.CachedFile', null=True)), - ('database', models.ForeignKey(related_name='blocks', - to='databases.DatabaseSetOutput', null=True)), - ], - ), - migrations.AddField( - model_name='block', - name='channel', - field=models.CharField(default=b'', - help_text=b'Synchronization channel within the toolchain', - max_length=200, blank=True), - ), - migrations.AddField( - model_name='block', - name='command', - field=models.TextField(null=True, blank=True), - ), - migrations.AddField( - model_name='block', - name='dependencies', - field=models.ManyToManyField(related_name='dependents', - to='experiments.Block', blank=True), - ), - migrations.AlterField( - model_name='block', - name='environment', - field=models.ForeignKey(related_name='blocks', - on_delete=models.deletion.SET_NULL, to='backend.Environment', - null=True), - ), - migrations.AddField( - model_name='block', - name='queue', - field=models.ForeignKey(related_name='blocks', - on_delete=models.deletion.SET_NULL, to='backend.Queue', - null=True), - ), - migrations.AddField( - model_name='block', - name='required_slots', - field=models.PositiveIntegerField(default=1), - ), - migrations.AddField( - model_name='block', - name='runnable_date', - field=models.DateTimeField(null=True, blank=True), - ), - migrations.AlterField( - model_name='block', - name='status', - field=models.CharField(default=b'N', max_length=1, - choices=[ - (b'N', b'Not cached'), - (b'P', b'Processing'), - (b'C', b'Cached'), - (b'F', b'Failed'), - (b'S', b'Skipped'), - (b'L', b'Cancelled'), - ] - ), - ), - migrations.AlterUniqueTogether( - name='block', - unique_together=set([('experiment', 'name')]), - ), - migrations.AlterField( - model_name='cachedfile', - name='blocks', - field=models.ManyToManyField(related_name='outputs', - to='experiments.Block', blank=True), - ), - migrations.RunPython(reset_blocks), - ] diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py index 0ba958389acd5d69d3499f2845700270ae92e090..2a80a2665d33d6ab50a164d7c17390454da6910d 100644 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models.py @@ -568,14 +568,11 @@ class Experiment(Shareable): BlockInput.objects.get_or_create(block=b, channel=v['channel'], cache=cache) - current = list(b.outputs.all()) b.outputs.clear() outputs = job_description.get('outputs', {'': job_description.get('result')}) for v in outputs.values(): cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) - if cr: - if len(current) == len(outputs): cache.copy(current[0]) cache.blocks.add(b) @@ -806,12 +803,10 @@ class Block(models.Model): # Accessors for statistics - def __return_first__(self, field): - if not self.outputs.count(): return '' - return getattr(self.outputs.first(), field) + def __return_first__(self, field, default=None): + return getattr(self.outputs.first(), field, default) def first_cache(self): - if not self.outputs.count(): return None return self.outputs.first() def error_report(self): @@ -824,40 +819,40 @@ class Block(models.Model): return self.__return_first__('stderr') def speed_up_real(self): - return self.__return_first__('speed_up_real') or 0. + return self.__return_first__('speed_up_real') def speed_up_maximal(self): - return self.__return_first__('speed_up_maximal') or 0. + return self.__return_first__('speed_up_maximal') def linear_execution_time(self): - return self.__return_first__('linear_execution_time') or 0. + return self.__return_first__('linear_execution_time') def queuing_time(self): - return self.__return_first__('queuing_time') or 0. + return self.__return_first__('queuing_time') def cpu_time(self): - return self.__return_first__('cpu_time') or 0. + return self.__return_first__('cpu_time') def max_memory(self): - return self.__return_first__('max_memory') or 0 + return self.__return_first__('max_memory') def data_read_size(self): - return self.__return_first__('data_read_size') or 0 + return self.__return_first__('data_read_size') def data_read_nb_blocks(self): - return self.__return_first__('data_read_nb_blocks') or 0 + return self.__return_first__('data_read_nb_blocks') def data_read_time(self): - return self.__return_first__('data_read_time') or 0. + return self.__return_first__('data_read_time') def data_written_size(self): - return self.__return_first__('data_written_size') or 0 + return self.__return_first__('data_written_size') def data_written_nb_blocks(self): - return self.__return_first__('data_written_nb_blocks') or 0 + return self.__return_first__('data_written_nb_blocks') def data_written_time(self): - return self.__return_first__('data_written_time') or 0. + return self.__return_first__('data_written_time') # Accessor for results results = property(lambda self: self.__return_first__('results')) @@ -917,24 +912,6 @@ class CachedFile(models.Model): return self.hash - def copy_data(self, other): - self.linear_execution_time = other.linear_execution_time - self.speed_up_real = other.speed_up_real - self.speed_up_maximal = other.speed_up_maximal - self.queuing_time = other.queuing_time - self.stdout = other.stdout - self.stderr = other.stderr - self.error_report = other.error_report - self.cpu_time = other.cpu_time - self.max_memory = other.max_memory - self.data_read_size = other.data_read_size - self.data_read_nb_blocks = other.data_read_nb_blocks - self.data_read_time = other.data_read_time - self.data_written_size = other.data_written_size - self.data_written_nb_blocks = other.data_written_nb_blocks - self.data_written_time = other.data_written_time - - def path(self): '''Returns the full path prefix to the cached file on disk''' @@ -1014,8 +991,16 @@ class Result(models.Model): objects = ResultManager() + + #_____ Meta parameters __________ + + class Meta: + unique_together = ('cache', 'name') + + def __str__(self): - return '%s - %s' % (self.block, self.name) + return '%s - %s' % (self.cache, self.name) + def natural_key(self): return ( @@ -1023,6 +1008,7 @@ class Result(models.Model): self.cache.hash, ) + def value(self): if self.data_value in ['+inf', '-inf', 'NaN']: return self.data_value