diff --git a/beat/web/experiments/migrations/0002_auto_20160330_0953.py b/beat/web/experiments/migrations/0002_auto_20160330_0953.py index bdc49d4636e9e5cacfd01a21534a2a78c0678f1d..b3675057971af363dad1f30b4bd2545a488c6546 100644 --- a/beat/web/experiments/migrations/0002_auto_20160330_0953.py +++ b/beat/web/experiments/migrations/0002_auto_20160330_0953.py @@ -45,7 +45,8 @@ def reset_blocks(apps, schema_editor): e.name, ) - print("Updating blocks for experiment %d/%d (%s, id=%d)..." % (i+1, total, fullname, e.id)) + print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \ + (i+1, total, fullname, e.id)) xp_decl = simplejson.loads(storage.get_file_content(e, 'declaration_file')) @@ -137,20 +138,49 @@ def reset_blocks(apps, schema_editor): BlockInput.objects.get_or_create(block=b, channel=v['channel'], cache=cache) + current = list(b.outputs.all()) + b.outputs.clear() #dissociates all current outputs outputs = job_description.get('outputs', {'': job_description.get('result')}) for v in outputs.values(): cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) if cr: - print("CachedFile (hash=%s) created for block `%s' of " \ - "experiment `%s' which is in state `%s'" % \ - (cache.hash, block_name, fullname, - b.get_status_display())) + if len(current) == len(outputs): #copy + cache.linear_exedution_time = \ + current[0].linear_execution_time + cache.speed_up_real = current[0].speed_up_real + cache.speed_up_maximal = current[0].speed_up_maximal + cache.queuing_time = current[0].queuing_time + cache.stdout = current[0].stdout + cache.stderr = current[0].stderr + cache.error_report = current[0].error_report + cache.cpu_time = current[0].cpu_time + cache.max_memory = current[0].max_memory + cache.data_read_size = current[0].data_read_size + cache.data_read_nb_blocks = \ + current[0].data_read_nb_blocks + cache.data_read_time = current[0].data_read_time + cache.data_written_size = current[0].data_written_size + cache.data_written_nb_blocks = \ + current[0].data_written_nb_blocks + cache.data_written_time = current[0].data_written_time + print("CachedFile data `%s' COPIED from `%s'" % \ + (cache.hash, current[0].hash)) + else: + print("CachedFile (hash=%s) CREATED for block `%s' " \ + "of experiment `%s' which is in state `%s'" % \ + (cache.hash, block_name, fullname, + b.get_status_display())) cache.blocks.add(b) #asserts all blocks (except analysis blocks have dependents) for b in e.blocks.all(): - assert (b.analyzer and b.dependents.count() == 0) or b.dependents.count() > 0 + assert (b.analyzer and b.dependents.count() == 0) or \ + b.dependents.count() > 0 + + #asserts all analysis blocks have only one output + for b in e.blocks.filter(analyzer=True): + assert b.outputs.count() == 1 class Migration(migrations.Migration): @@ -165,7 +195,8 @@ class Migration(migrations.Migration): migrations.AddField( model_name='result', name='cache', - field=models.ForeignKey(related_name='results', to='experiments.CachedFile', null=True), + field=models.ForeignKey(related_name='results', + to='experiments.CachedFile', null=True), ), migrations.RunPython(move_result_to_cache), migrations.RemoveField( @@ -175,17 +206,25 @@ class Migration(migrations.Migration): migrations.CreateModel( name='BlockInput', fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('channel', models.CharField(default=b'', help_text=b'Synchronization channel within the toolchain', max_length=200, blank=True)), - ('block', models.ForeignKey(related_name='inputs', to='experiments.Block', null=True)), - ('cache', models.ForeignKey(related_name='inputs', to='experiments.CachedFile', null=True)), - ('database', models.ForeignKey(related_name='blocks', to='databases.DatabaseSetOutput', null=True)), + ('id', models.AutoField(verbose_name='ID', serialize=False, + auto_created=True, primary_key=True)), + ('channel', models.CharField(default=b'', + help_text=b'Synchronization channel within the toolchain', + max_length=200, blank=True)), + ('block', models.ForeignKey(related_name='inputs', + to='experiments.Block', null=True)), + ('cache', models.ForeignKey(related_name='inputs', + to='experiments.CachedFile', null=True)), + ('database', models.ForeignKey(related_name='blocks', + to='databases.DatabaseSetOutput', null=True)), ], ), migrations.AddField( model_name='block', name='channel', - field=models.CharField(default=b'', help_text=b'Synchronization channel within the toolchain', max_length=200, blank=True), + field=models.CharField(default=b'', + help_text=b'Synchronization channel within the toolchain', + max_length=200, blank=True), ), migrations.AddField( model_name='block', @@ -195,17 +234,22 @@ class Migration(migrations.Migration): migrations.AddField( model_name='block', name='dependencies', - field=models.ManyToManyField(related_name='dependents', to='experiments.Block', blank=True), + field=models.ManyToManyField(related_name='dependents', + to='experiments.Block', blank=True), ), migrations.AlterField( model_name='block', name='environment', - field=models.ForeignKey(related_name='blocks', on_delete=models.deletion.SET_NULL, to='backend.Environment', null=True), + field=models.ForeignKey(related_name='blocks', + on_delete=models.deletion.SET_NULL, to='backend.Environment', + null=True), ), migrations.AddField( model_name='block', name='queue', - field=models.ForeignKey(related_name='blocks', on_delete=models.deletion.SET_NULL, to='backend.Queue', null=True), + field=models.ForeignKey(related_name='blocks', + on_delete=models.deletion.SET_NULL, to='backend.Queue', + null=True), ), migrations.AddField( model_name='block', @@ -220,7 +264,16 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='block', name='status', - field=models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Not cached'), (b'P', b'Processing'), (b'C', b'Cached'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled')]), + field=models.CharField(default=b'N', max_length=1, + choices=[ + (b'N', b'Not cached'), + (b'P', b'Processing'), + (b'C', b'Cached'), + (b'F', b'Failed'), + (b'S', b'Skipped'), + (b'L', b'Cancelled'), + ] + ), ), migrations.AlterUniqueTogether( name='block', @@ -229,7 +282,8 @@ class Migration(migrations.Migration): migrations.AlterField( model_name='cachedfile', name='blocks', - field=models.ManyToManyField(related_name='outputs', to='experiments.Block', blank=True), + field=models.ManyToManyField(related_name='outputs', + to='experiments.Block', blank=True), ), migrations.RunPython(reset_blocks), ] diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py index 53e06c7ce1ae568645fa829f79601ae851fe7cb6..0ba958389acd5d69d3499f2845700270ae92e090 100644 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models.py @@ -61,6 +61,7 @@ from ..import __version__ from datetime import datetime import os +import glob import simplejson import logging @@ -551,10 +552,12 @@ class Experiment(Shareable): b.save() # from this point: requires block to have an assigned id + b.dependencies.clear() b.dependencies.add(*[self.blocks.get(name=k) \ for k in description['dependencies']]) # reset inputs and outputs - creates if necessary only + b.inputs.clear() for v in job_description['inputs'].values(): if 'database' in v: #database input db = DatabaseSetOutput.objects.get(hash=v['hash']) @@ -565,10 +568,14 @@ class Experiment(Shareable): BlockInput.objects.get_or_create(block=b, channel=v['channel'], cache=cache) + current = list(b.outputs.all()) + b.outputs.clear() outputs = job_description.get('outputs', {'': job_description.get('result')}) for v in outputs.values(): - cache, _ = CachedFile.objects.get_or_create(hash=v['hash']) + cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) + if cr: + if len(current) == len(outputs): cache.copy(current[0]) cache.blocks.add(b) @@ -872,43 +879,81 @@ class CachedFile(models.Model): # the total amount of time this block took to run considering the # wall-clock time. - linear_execution_time = models.FloatField(default=0.) + linear_execution_time = models.FloatField(default=0.) # the real speed-up obtained by running this block using X slots - speed_up_real = models.FloatField(default=0.) + speed_up_real = models.FloatField(default=0.) # the maximum obtainable speed-up that could be achieved if all slots # were running in parallel. Essentially linear_execution_time / # maximum_slot_time - speed_up_maximal = models.FloatField(default=0.) + speed_up_maximal = models.FloatField(default=0.) # the time this block waited to be executed - queuing_time = models.FloatField(default=0.) + queuing_time = models.FloatField(default=0.) - stdout = models.TextField(null=True, blank=True) - stderr = models.TextField(null=True, blank=True) - error_report = models.TextField(null=True, blank=True) + stdout = models.TextField(null=True, blank=True) + stderr = models.TextField(null=True, blank=True) + error_report = models.TextField(null=True, blank=True) # other statistics of interest - cpu_time = models.FloatField(default=0.) - max_memory = models.BigIntegerField(default=0) - data_read_size = models.BigIntegerField(default=0) - data_read_nb_blocks = models.IntegerField(default=0) - data_read_time = models.FloatField(default=0.) - data_written_size = models.BigIntegerField(default=0) - data_written_nb_blocks = models.IntegerField(default=0) - data_written_time = models.FloatField(default=0.) + cpu_time = models.FloatField(default=0.) + max_memory = models.BigIntegerField(default=0) + data_read_size = models.BigIntegerField(default=0) + data_read_nb_blocks = models.IntegerField(default=0) + data_read_time = models.FloatField(default=0.) + data_written_size = models.BigIntegerField(default=0) + data_written_nb_blocks = models.IntegerField(default=0) + data_written_time = models.FloatField(default=0.) objects = CachedFileManager() def __str__(self): - return self.hash + return 'CachedFile(%s, %d blocks)' % (self.hash, self.blocks.count()) + def natural_key(self): return self.hash + def copy_data(self, other): + self.linear_execution_time = other.linear_execution_time + self.speed_up_real = other.speed_up_real + self.speed_up_maximal = other.speed_up_maximal + self.queuing_time = other.queuing_time + self.stdout = other.stdout + self.stderr = other.stderr + self.error_report = other.error_report + self.cpu_time = other.cpu_time + self.max_memory = other.max_memory + self.data_read_size = other.data_read_size + self.data_read_nb_blocks = other.data_read_nb_blocks + self.data_read_time = other.data_read_time + self.data_written_size = other.data_written_size + self.data_written_nb_blocks = other.data_written_nb_blocks + self.data_written_time = other.data_written_time + + + def path(self): + '''Returns the full path prefix to the cached file on disk''' + + return os.path.join(settings.CACHE_ROOT, + beat.core.hash.toPath(self.hash, suffix='')) + + + def files(self): + '''Returns a list of files matching this cache prefix path''' + + return glob.glob(self.path() + '*') + + + def exists(self): + '''Check if this cached file really exists on the cache directory''' + + return bool(self.files()) + + #----------------------------------------------------------