diff --git a/beat/web/backend/migrations/0002_scheduler_addons.py b/beat/web/backend/migrations/0002_scheduler_addons.py new file mode 100644 index 0000000000000000000000000000000000000000..6b8005e4ead446e018d7c8d9b5451c72f3be4927 --- /dev/null +++ b/beat/web/backend/migrations/0002_scheduler_addons.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('backend', '0001_initial'), + ] + + operations = [ + migrations.RenameModel('QueueWorkerSlot', 'Slot'), + migrations.RenameField( + model_name='slot', + old_name='nb_slots', + new_name='quantity', + ), + migrations.AddField( + model_name='slot', + name='priority', + field=models.PositiveIntegerField(default=0, help_text=b'Priority of these slots on the defined queue'), + ), + migrations.AlterUniqueTogether( + name='slot', + unique_together=set([('queue', 'worker')]), + ), + migrations.RenameField( + model_name='queue', + old_name='nb_cores_per_slot', + new_name='cores_per_slot', + ), + migrations.RenameField( + model_name='worker', + old_name='nb_cores', + new_name='cores', + ), + migrations.AddField( + model_name='worker', + name='info', + field=models.TextField(help_text=b'Informative message from the worker in case it is inactive', null=True, blank=True), + ), + migrations.AddField( + model_name='worker', + name='memory', + field=models.PositiveIntegerField(default=0, help_text=b'In megabytes'), + ), + migrations.AlterField( + model_name='queue', + name='name', + field=models.CharField(help_text=b'The name for this object (space-like characters will be automatically replaced by dashes)', unique=True, max_length=100), + ), + migrations.AlterField( + model_name='queue', + name='cores_per_slot', + field=models.PositiveIntegerField(), + ), + migrations.AlterField( + model_name='queue', + name='max_slots_per_user', + field=models.PositiveIntegerField(), + ), + migrations.AlterField( + model_name='queue', + name='memory_limit', + field=models.PositiveIntegerField(help_text=b'In megabytes'), + ), + migrations.AlterField( + model_name='queue', + name='time_limit', + field=models.PositiveIntegerField(help_text=b'In minutes'), + ), + migrations.AlterField( + model_name='slot', + name='quantity', + field=models.PositiveIntegerField(help_text=b'Number of processing slots to dedicate in this worker for a given queue', verbose_name=b'Number of slots'), + ), + migrations.AlterField( + model_name='worker', + name='cores', + field=models.PositiveIntegerField(), + ), + migrations.AlterField( + model_name='environment', + name='previous_version', + field=models.ForeignKey(related_name='next_versions', on_delete=models.deletion.SET_NULL, blank=True, to='backend.Environment', null=True), + ), + ] diff --git a/beat/web/databases/migrations/0002_scheduler_addons.py b/beat/web/databases/migrations/0002_scheduler_addons.py new file mode 100644 index 0000000000000000000000000000000000000000..9c748aa7240cefa5c46f9e5a91d241d898d05db7 --- /dev/null +++ b/beat/web/databases/migrations/0002_scheduler_addons.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from __future__ import unicode_literals + +from django.db import migrations, models + +from ...common.models import get_declaration + +from ..models import validate_database + +import logging +logger = logging.getLogger(__name__) + + +def refresh_databases(apps, schema_editor): + '''Refreshes each database so datasets/outputs are recreated''' + + Database = apps.get_model("databases", "Database") + DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput") + + Database.declaration = property(get_declaration) + Database.fullname = lambda self: '%s/%d' % (self.name, self.version) + + if Database.objects.count(): print('') + + for db in Database.objects.order_by('id'): + print("Refreshing protocols for database `%s'..." % db.fullname()) + core = validate_database(db.declaration) + core.name = db.fullname() + for proto in db.protocols.all(): + for set in proto.sets.all(): + for output in set.template.outputs.all(): + try: + DatabaseSetOutput(template=output, set=set, + hash=core.hash_output(proto.name, set.name, + output.name)).save() + except KeyError: + logger.warn('Database output %s/%d.%s.%s.%s does ' \ + 'not exist' % (db.name, db.version, proto.name, + set.name, output.name)) + continue + + +class Migration(migrations.Migration): + + dependencies = [ + ('dataformats', '0001_initial'), + ('databases', '0001_initial'), + ] + + operations = [ + migrations.RenameModel('DatabaseOutput', 'DatabaseSetTemplateOutput'), + migrations.AlterUniqueTogether( + name='databasesettemplateoutput', + unique_together=set([('template', 'name')]), + ), + migrations.CreateModel( + name='DatabaseSetOutput', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('hash', models.CharField(unique=True, max_length=64)), + ('set', models.ForeignKey(related_name='outputs', to='databases.DatabaseSet')), + ('template', models.ForeignKey(related_name='instances', to='databases.DatabaseSetTemplateOutput')), + ], + ), + migrations.RunPython(refresh_databases), + ] diff --git a/beat/web/experiments/migrations/0002_scheduler_addons.py b/beat/web/experiments/migrations/0002_scheduler_addons.py new file mode 100644 index 0000000000000000000000000000000000000000..077c036b34374c1649cf6abe81707e0ab475652b --- /dev/null +++ b/beat/web/experiments/migrations/0002_scheduler_addons.py @@ -0,0 +1,308 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models, utils +from django.conf import settings + +import simplejson +import beat.core.experiment +from ...common import storage + + +def move_result_to_cache(apps, schema_editor): + '''Moves the result association from the block to the related cache file''' + + Result = apps.get_model("experiments", "Result") + + total = Result.objects.count() + if total: print('') + for i, r in enumerate(Result.objects.order_by('id')): + print("Resetting result (%d) %d/%d..." % (r.id, i+1, total)) + older = Result.objects.filter(name=r.name, cache=r.block.hashes.first()).exclude(id=r.id).first() + if older: + print("Cache %s already contains Result `%s' - keeping " \ + "newest..." % (older.cache.hash, older.name)) + older.delete() + r.cache = r.block.hashes.first() + r.save() + + +def reset_blocks(apps, schema_editor): + '''Resets block dependencies and queue relationship''' + + Experiment = apps.get_model("experiments", "Experiment") + Block = apps.get_model("experiments", "Block") + BlockInput = apps.get_model("experiments", "BlockInput") + CachedFile = apps.get_model("experiments", "CachedFile") + Queue = apps.get_model("backend", "Queue") + Environment = apps.get_model("backend", "Environment") + Algorithm = apps.get_model("algorithms", "Algorithm") + DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput") + Result = apps.get_model("experiments", "Result") + + total = Experiment.objects.count() + for i, e in enumerate(Experiment.objects.order_by('id')): + + fullname = '%s/%s/%s/%d/%s' % ( + e.author.username, + e.toolchain.author.username, + e.toolchain.name, + e.toolchain.version, + e.name, + ) + + print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \ + (i+1, total, fullname, e.id)) + + xp_decl = simplejson.loads(storage.get_file_content(e, + 'declaration_file')) + tc_decl = simplejson.loads(storage.get_file_content(e.toolchain, + 'declaration_file')) + + xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl, + tc_decl)) + + if xp.errors: + message = "The experiment `%s' isn't valid (skipping " \ + "block update), due to the following errors:\n * %s" + print message % (fullname, '\n * '.join(xp.errors)) + continue + + # Loads the experiment execution description, creating the Block's, + # BlockInput's and BlockOutput's as required. + for block_name, description in xp.setup().items(): + + # Checks that the Queue/Environment exists + job_description = description['configuration'] + + env = Environment.objects.filter( + name=job_description['environment']['name'], + version=job_description['environment']['version'], + ) + + if not env: + print("Cannot find environment `%s (%s)' - not setting" % \ + (job_description['environment']['name'], + job_description['environment']['version'])) + env = None + else: + env = env[0] + + # Search for queue that contains a specific environment + # notice we don't require environment to exist in relation to + # the queue as it may have been removed already. + queue = Queue.objects.filter(name=job_description['queue']) + if not queue: + print("Cannot find queue `%s'" % job_description['queue']) + queue = None + else: + queue = queue[0] + + parts = job_description['algorithm'].split('/') + algorithm = Algorithm.objects.get( + author__username=parts[0], + name=parts[1], + version=parts[2], + ) + + # Ties the block in + slots = job_description.get('nb_slots') + + try: + b, _ = Block.objects.get_or_create(experiment=e, + name=block_name, algorithm=algorithm) + except utils.IntegrityError as exc: + print("Block `%s' for experiment `%s' already exists - " \ + "modifying entry for migration purposes. This " \ + "issue is due a misconnection on the toolchain level " \ + "(known case: tpereira/full_isv/2)" % \ + (block_name, fullname)) + b = Block.objects.get(experiment=e, name=block_name) + + b.command=simplejson.dumps(job_description, indent=4) + b.status='N' if (e.status == 'P') else b.status + b.environment=env + b.queue=queue + b.algorithm = algorithm + b.analyzer = (algorithm.result_dataformat is not None) + b.required_slots=job_description['nb_slots'] + b.channel=job_description['channel'] + b.save() + + # from this point: requires block to have an assigned id + b.dependencies.add(*[e.blocks.get(name=k) \ + for k in description['dependencies']]) + + # reset inputs and outputs - creates if necessary only + for v in job_description['inputs'].values(): + if 'database' in v: #database input + db = DatabaseSetOutput.objects.get(hash=v['hash']) + BlockInput.objects.get_or_create(block=b, + channel=v['channel'], database=db) + else: + cache = CachedFile.objects.get(hash=v['hash']) + BlockInput.objects.get_or_create(block=b, + channel=v['channel'], cache=cache) + + current = list(b.outputs.all()) + b.outputs.clear() #dissociates all current outputs + outputs = job_description.get('outputs', + {'': job_description.get('result')}) + for v in outputs.values(): + cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) + if cr: + if len(current) == len(outputs): #copy + cache.linear_exedution_time = \ + current[0].linear_execution_time + cache.speed_up_real = current[0].speed_up_real + cache.speed_up_maximal = current[0].speed_up_maximal + cache.queuing_time = current[0].queuing_time + cache.stdout = current[0].stdout + cache.stderr = current[0].stderr + cache.error_report = current[0].error_report + cache.cpu_time = current[0].cpu_time + cache.max_memory = current[0].max_memory + cache.data_read_size = current[0].data_read_size + cache.data_read_nb_blocks = \ + current[0].data_read_nb_blocks + cache.data_read_time = current[0].data_read_time + cache.data_written_size = current[0].data_written_size + cache.data_written_nb_blocks = \ + current[0].data_written_nb_blocks + cache.data_written_time = current[0].data_written_time + if current[0].results.count(): + for r in current[0].results.all(): + r.cache = cache + r.save() + print("CachedFile data `%s' MOVED from `%s'" % \ + (cache.hash, current[0].hash)) + else: + print("CachedFile (hash=%s) CREATED for block `%s' " \ + "of experiment `%s' which is in state `%s'" % \ + (cache.hash, block_name, fullname, + b.get_status_display())) + cache.blocks.add(b) + + #asserts all blocks (except analysis blocks have dependents) + for b in e.blocks.all(): + assert (b.analyzer and b.dependents.count() == 0) or \ + b.dependents.count() > 0 + + #asserts all analysis blocks have only one output + for b in e.blocks.filter(analyzer=True): + assert b.outputs.count() == 1 + + #removes results without caches + for r in Result.objects.filter(cache=None): + print("Removing result %d (no associated cache)" % r.id) + r.delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ('backend', '0002_scheduler_addons'), + ('databases', '0002_scheduler_addons'), + ('experiments', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='result', + name='cache', + field=models.ForeignKey(related_name='results', + to='experiments.CachedFile', null=True), + ), + migrations.AlterUniqueTogether( + name='result', + unique_together=set([('cache', 'name')]), + ), + migrations.RunPython(move_result_to_cache), + migrations.RemoveField( + model_name='result', + name='block', + ), + migrations.CreateModel( + name='BlockInput', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, + auto_created=True, primary_key=True)), + ('channel', models.CharField(default=b'', + help_text=b'Synchronization channel within the toolchain', + max_length=200, blank=True)), + ('block', models.ForeignKey(related_name='inputs', + to='experiments.Block', null=True)), + ('cache', models.ForeignKey(related_name='inputs', + to='experiments.CachedFile', null=True)), + ('database', models.ForeignKey(related_name='blocks', + to='databases.DatabaseSetOutput', null=True)), + ], + ), + migrations.AddField( + model_name='block', + name='channel', + field=models.CharField(default=b'', + help_text=b'Synchronization channel within the toolchain', + max_length=200, blank=True), + ), + migrations.AddField( + model_name='block', + name='command', + field=models.TextField(null=True, blank=True), + ), + migrations.AddField( + model_name='block', + name='dependencies', + field=models.ManyToManyField(related_name='dependents', + to='experiments.Block', blank=True), + ), + migrations.AlterField( + model_name='block', + name='environment', + field=models.ForeignKey(related_name='blocks', + on_delete=models.deletion.SET_NULL, to='backend.Environment', + null=True), + ), + migrations.AddField( + model_name='block', + name='queue', + field=models.ForeignKey(related_name='blocks', + on_delete=models.deletion.SET_NULL, to='backend.Queue', + null=True), + ), + migrations.AddField( + model_name='block', + name='required_slots', + field=models.PositiveIntegerField(default=1), + ), + migrations.AddField( + model_name='block', + name='runnable_date', + field=models.DateTimeField(null=True, blank=True), + ), + migrations.AlterField( + model_name='block', + name='status', + field=models.CharField(default=b'N', max_length=1, + choices=[ + (b'N', b'Not cached'), + (b'P', b'Processing'), + (b'C', b'Cached'), + (b'F', b'Failed'), + (b'S', b'Skipped'), + (b'L', b'Cancelled'), + ] + ), + ), + migrations.AlterUniqueTogether( + name='block', + unique_together=set([('experiment', 'name')]), + ), + migrations.AlterField( + model_name='cachedfile', + name='blocks', + field=models.ManyToManyField(related_name='outputs', + to='experiments.Block', blank=True), + ), + migrations.RunPython(reset_blocks), + ]