From 36a80345c3857d3f6b8ee28244924af3da72268f Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.anjos@idiap.ch> Date: Wed, 18 May 2016 10:12:40 +0200 Subject: [PATCH] [experiments] Split migrations to avoid Postgres transaction errors --- .../migrations/0002_scheduler_addons.py | 281 +--------------- .../migrations/0003_scheduler_addons_2.py | 60 ++++ .../migrations/0004_scheduler_addons_3.py | 301 ++++++++++++++++++ 3 files changed, 362 insertions(+), 280 deletions(-) create mode 100644 beat/web/experiments/migrations/0003_scheduler_addons_2.py create mode 100644 beat/web/experiments/migrations/0004_scheduler_addons_3.py diff --git a/beat/web/experiments/migrations/0002_scheduler_addons.py b/beat/web/experiments/migrations/0002_scheduler_addons.py index 5e3cd961a..9c7507834 100644 --- a/beat/web/experiments/migrations/0002_scheduler_addons.py +++ b/beat/web/experiments/migrations/0002_scheduler_addons.py @@ -28,208 +28,12 @@ from __future__ import unicode_literals -from django.db import migrations, models, utils -from django.conf import settings - -import simplejson -import beat.core.experiment -from ...common import storage - - -def move_result_to_cache(apps, schema_editor): - '''Moves the result association from the block to the related cache file''' - - Result = apps.get_model("experiments", "Result") - - total = Result.objects.count() - if total: print('') - for i, r in enumerate(Result.objects.order_by('id')): - print("Resetting result (%d) %d/%d..." % (r.id, i+1, total)) - older = Result.objects.filter(name=r.name, cache=r.block.hashes.first()).exclude(id=r.id).first() - if older: - print("Cache %s already contains Result `%s' - keeping " \ - "newest..." % (older.cache.hash, older.name)) - older.delete() - r.cache = r.block.hashes.first() - r.save() - - -def reset_blocks(apps, schema_editor): - '''Resets block dependencies and queue relationship''' - - Experiment = apps.get_model("experiments", "Experiment") - Block = apps.get_model("experiments", "Block") - BlockInput = apps.get_model("experiments", "BlockInput") - CachedFile = apps.get_model("experiments", "CachedFile") - Queue = apps.get_model("backend", "Queue") - Environment = apps.get_model("backend", "Environment") - Algorithm = apps.get_model("algorithms", "Algorithm") - DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput") - Result = apps.get_model("experiments", "Result") - - total = Experiment.objects.count() - for i, e in enumerate(Experiment.objects.order_by('id')): - - fullname = '%s/%s/%s/%d/%s' % ( - e.author.username, - e.toolchain.author.username, - e.toolchain.name, - e.toolchain.version, - e.name, - ) - - print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \ - (i+1, total, fullname, e.id)) - - xp_decl = simplejson.loads(storage.get_file_content(e, - 'declaration_file')) - tc_decl = simplejson.loads(storage.get_file_content(e.toolchain, - 'declaration_file')) - - xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl, - tc_decl)) - - if xp.errors: - message = "The experiment `%s' isn't valid (skipping " \ - "block update), due to the following errors:\n * %s" - print message % (fullname, '\n * '.join(xp.errors)) - continue - - # Loads the experiment execution description, creating the Block's, - # BlockInput's and BlockOutput's as required. - for block_name, description in xp.setup().items(): - - # Checks that the Queue/Environment exists - job_description = description['configuration'] - - env = Environment.objects.filter( - name=job_description['environment']['name'], - version=job_description['environment']['version'], - ) - - if not env: - print("Cannot find environment `%s (%s)' - not setting" % \ - (job_description['environment']['name'], - job_description['environment']['version'])) - env = None - else: - env = env[0] - - # Search for queue that contains a specific environment - # notice we don't require environment to exist in relation to - # the queue as it may have been removed already. - queue = Queue.objects.filter(name=job_description['queue']) - if not queue: - print("Cannot find queue `%s'" % job_description['queue']) - queue = None - else: - queue = queue[0] - - parts = job_description['algorithm'].split('/') - algorithm = Algorithm.objects.get( - author__username=parts[0], - name=parts[1], - version=parts[2], - ) - - # Ties the block in - slots = job_description.get('nb_slots') - - try: - b, _ = Block.objects.get_or_create(experiment=e, - name=block_name, algorithm=algorithm) - except utils.IntegrityError as exc: - print("Block `%s' for experiment `%s' already exists - " \ - "modifying entry for migration purposes. This " \ - "issue is due a misconnection on the toolchain level " \ - "(known case: tpereira/full_isv/2)" % \ - (block_name, fullname)) - b = Block.objects.get(experiment=e, name=block_name) - - b.command=simplejson.dumps(job_description, indent=4) - b.status='N' if (e.status == 'P') else b.status - b.environment=env - b.queue=queue - b.algorithm = algorithm - b.analyzer = (algorithm.result_dataformat is not None) - b.required_slots=job_description['nb_slots'] - b.channel=job_description['channel'] - b.save() - - # from this point: requires block to have an assigned id - b.dependencies.add(*[e.blocks.get(name=k) \ - for k in description['dependencies']]) - - # reset inputs and outputs - creates if necessary only - for v in job_description['inputs'].values(): - if 'database' in v: #database input - db = DatabaseSetOutput.objects.get(hash=v['hash']) - BlockInput.objects.get_or_create(block=b, - channel=v['channel'], database=db) - else: - cache = CachedFile.objects.get(hash=v['hash']) - BlockInput.objects.get_or_create(block=b, - channel=v['channel'], cache=cache) - - current = list(b.outputs.all()) - b.outputs.clear() #dissociates all current outputs - outputs = job_description.get('outputs', - {'': job_description.get('result')}) - for v in outputs.values(): - cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) - if cr: - if len(current) == len(outputs): #copy - cache.linear_exedution_time = \ - current[0].linear_execution_time - cache.speed_up_real = current[0].speed_up_real - cache.speed_up_maximal = current[0].speed_up_maximal - cache.queuing_time = current[0].queuing_time - cache.stdout = current[0].stdout - cache.stderr = current[0].stderr - cache.error_report = current[0].error_report - cache.cpu_time = current[0].cpu_time - cache.max_memory = current[0].max_memory - cache.data_read_size = current[0].data_read_size - cache.data_read_nb_blocks = \ - current[0].data_read_nb_blocks - cache.data_read_time = current[0].data_read_time - cache.data_written_size = current[0].data_written_size - cache.data_written_nb_blocks = \ - current[0].data_written_nb_blocks - cache.data_written_time = current[0].data_written_time - if current[0].results.count(): - for r in current[0].results.all(): - r.cache = cache - r.save() - print("CachedFile data `%s' MOVED from `%s'" % \ - (cache.hash, current[0].hash)) - else: - print("CachedFile (hash=%s) CREATED for block `%s' " \ - "of experiment `%s' which is in state `%s'" % \ - (cache.hash, block_name, fullname, - b.get_status_display())) - cache.blocks.add(b) - - #asserts all blocks (except analysis blocks have dependents) - for b in e.blocks.all(): - assert (b.analyzer and b.dependents.count() == 0) or \ - b.dependents.count() > 0 - - #asserts all analysis blocks have only one output - for b in e.blocks.filter(analyzer=True): - assert b.outputs.count() == 1 - - #removes results without caches - for r in Result.objects.filter(cache=None): - print("Removing result %d (no associated cache)" % r.id) - r.delete() +from django.db import migrations, models class Migration(migrations.Migration): dependencies = [ - ('backend', '0002_scheduler_addons'), - ('databases', '0002_scheduler_addons'), ('experiments', '0001_initial'), ] @@ -244,87 +48,4 @@ class Migration(migrations.Migration): name='result', unique_together=set([('cache', 'name')]), ), - migrations.RunPython(move_result_to_cache), - migrations.RemoveField( - model_name='result', - name='block', - ), - migrations.CreateModel( - name='BlockInput', - fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, - auto_created=True, primary_key=True)), - ('channel', models.CharField(default=b'', - help_text=b'Synchronization channel within the toolchain', - max_length=200, blank=True)), - ('block', models.ForeignKey(related_name='inputs', - to='experiments.Block', null=True)), - ('cache', models.ForeignKey(related_name='inputs', - to='experiments.CachedFile', null=True)), - ('database', models.ForeignKey(related_name='blocks', - to='databases.DatabaseSetOutput', null=True)), - ], - ), - migrations.AddField( - model_name='block', - name='channel', - field=models.CharField(default=b'', - help_text=b'Synchronization channel within the toolchain', - max_length=200, blank=True), - ), - migrations.AddField( - model_name='block', - name='command', - field=models.TextField(null=True, blank=True), - ), - migrations.AddField( - model_name='block', - name='dependencies', - field=models.ManyToManyField(related_name='dependents', - to='experiments.Block', blank=True), - ), - migrations.AlterField( - model_name='block', - name='environment', - field=models.ForeignKey(related_name='blocks', - on_delete=models.deletion.SET_NULL, to='backend.Environment', - null=True), - ), - migrations.AddField( - model_name='block', - name='queue', - field=models.ForeignKey(related_name='blocks', - on_delete=models.deletion.SET_NULL, to='backend.Queue', - null=True), - ), - migrations.AddField( - model_name='block', - name='required_slots', - field=models.PositiveIntegerField(default=1), - ), - migrations.AlterField( - model_name='block', - name='status', - field=models.CharField(default=b'N', max_length=1, - choices=[ - (b'N', b'Not cached'), - (b'P', b'Processing'), - (b'C', b'Cached'), - (b'F', b'Failed'), - (b'S', b'Skipped'), - (b'L', b'Cancelled'), - ] - ), - ), - migrations.AlterUniqueTogether( - name='block', - unique_together=set([('experiment', 'name')]), - ), - migrations.AlterField( - model_name='cachedfile', - name='blocks', - field=models.ManyToManyField(related_name='outputs', - to='experiments.Block', blank=True), - ), - migrations.RunPython(reset_blocks), ] diff --git a/beat/web/experiments/migrations/0003_scheduler_addons_2.py b/beat/web/experiments/migrations/0003_scheduler_addons_2.py new file mode 100644 index 000000000..65e9b37bd --- /dev/null +++ b/beat/web/experiments/migrations/0003_scheduler_addons_2.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from __future__ import unicode_literals + +from django.db import migrations + + +def move_result_to_cache(apps, schema_editor): + '''Moves the result association from the block to the related cache file''' + + Result = apps.get_model("experiments", "Result") + + total = Result.objects.count() + if total: print('') + for i, r in enumerate(Result.objects.order_by('id')): + print("Resetting result (%d) %d/%d..." % (r.id, i+1, total)) + older = Result.objects.filter(name=r.name, cache=r.block.hashes.first()).exclude(id=r.id).first() + if older: + print("Cache %s already contains Result `%s' - keeping " \ + "newest..." % (older.cache.hash, older.name)) + older.delete() + r.cache = r.block.hashes.first() + r.save() + + +class Migration(migrations.Migration): + + dependencies = [ + ('experiments', '0002_scheduler_addons'), + ] + + operations = [ + migrations.RunPython(move_result_to_cache), + ] diff --git a/beat/web/experiments/migrations/0004_scheduler_addons_3.py b/beat/web/experiments/migrations/0004_scheduler_addons_3.py new file mode 100644 index 000000000..fcd721050 --- /dev/null +++ b/beat/web/experiments/migrations/0004_scheduler_addons_3.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from __future__ import unicode_literals + +from django.db import migrations, models, utils +from django.conf import settings + +import simplejson +import beat.core.experiment +from ...common import storage + + +def reset_blocks(apps, schema_editor): + '''Resets block dependencies and queue relationship''' + + Experiment = apps.get_model("experiments", "Experiment") + Block = apps.get_model("experiments", "Block") + BlockInput = apps.get_model("experiments", "BlockInput") + CachedFile = apps.get_model("experiments", "CachedFile") + Queue = apps.get_model("backend", "Queue") + Environment = apps.get_model("backend", "Environment") + Algorithm = apps.get_model("algorithms", "Algorithm") + DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput") + Result = apps.get_model("experiments", "Result") + + total = Experiment.objects.count() + for i, e in enumerate(Experiment.objects.order_by('id')): + + fullname = '%s/%s/%s/%d/%s' % ( + e.author.username, + e.toolchain.author.username, + e.toolchain.name, + e.toolchain.version, + e.name, + ) + + print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \ + (i+1, total, fullname, e.id)) + + xp_decl = simplejson.loads(storage.get_file_content(e, + 'declaration_file')) + tc_decl = simplejson.loads(storage.get_file_content(e.toolchain, + 'declaration_file')) + + xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl, + tc_decl)) + + if xp.errors: + message = "The experiment `%s' isn't valid (skipping " \ + "block update), due to the following errors:\n * %s" + print message % (fullname, '\n * '.join(xp.errors)) + continue + + # Loads the experiment execution description, creating the Block's, + # BlockInput's and BlockOutput's as required. + for block_name, description in xp.setup().items(): + + # Checks that the Queue/Environment exists + job_description = description['configuration'] + + env = Environment.objects.filter( + name=job_description['environment']['name'], + version=job_description['environment']['version'], + ) + + if not env: + print("Cannot find environment `%s (%s)' - not setting" % \ + (job_description['environment']['name'], + job_description['environment']['version'])) + env = None + else: + env = env[0] + + # Search for queue that contains a specific environment + # notice we don't require environment to exist in relation to + # the queue as it may have been removed already. + queue = Queue.objects.filter(name=job_description['queue']) + if not queue: + print("Cannot find queue `%s'" % job_description['queue']) + queue = None + else: + queue = queue[0] + + parts = job_description['algorithm'].split('/') + algorithm = Algorithm.objects.get( + author__username=parts[0], + name=parts[1], + version=parts[2], + ) + + # Ties the block in + slots = job_description.get('nb_slots') + + try: + b, _ = Block.objects.get_or_create(experiment=e, + name=block_name, algorithm=algorithm) + except utils.IntegrityError as exc: + print("Block `%s' for experiment `%s' already exists - " \ + "modifying entry for migration purposes. This " \ + "issue is due a misconnection on the toolchain level " \ + "(known case: tpereira/full_isv/2)" % \ + (block_name, fullname)) + b = Block.objects.get(experiment=e, name=block_name) + + b.command=simplejson.dumps(job_description, indent=4) + b.status='N' if (e.status == 'P') else b.status + b.environment=env + b.queue=queue + b.algorithm = algorithm + b.analyzer = (algorithm.result_dataformat is not None) + b.required_slots=job_description['nb_slots'] + b.channel=job_description['channel'] + b.save() + + # from this point: requires block to have an assigned id + b.dependencies.add(*[e.blocks.get(name=k) \ + for k in description['dependencies']]) + + # reset inputs and outputs - creates if necessary only + for v in job_description['inputs'].values(): + if 'database' in v: #database input + db = DatabaseSetOutput.objects.get(hash=v['hash']) + BlockInput.objects.get_or_create(block=b, + channel=v['channel'], database=db) + else: + cache = CachedFile.objects.get(hash=v['hash']) + BlockInput.objects.get_or_create(block=b, + channel=v['channel'], cache=cache) + + current = list(b.outputs.all()) + b.outputs.clear() #dissociates all current outputs + outputs = job_description.get('outputs', + {'': job_description.get('result')}) + for v in outputs.values(): + cache, cr = CachedFile.objects.get_or_create(hash=v['hash']) + if cr: + if len(current) == len(outputs): #copy + cache.linear_exedution_time = \ + current[0].linear_execution_time + cache.speed_up_real = current[0].speed_up_real + cache.speed_up_maximal = current[0].speed_up_maximal + cache.queuing_time = current[0].queuing_time + cache.stdout = current[0].stdout + cache.stderr = current[0].stderr + cache.error_report = current[0].error_report + cache.cpu_time = current[0].cpu_time + cache.max_memory = current[0].max_memory + cache.data_read_size = current[0].data_read_size + cache.data_read_nb_blocks = \ + current[0].data_read_nb_blocks + cache.data_read_time = current[0].data_read_time + cache.data_written_size = current[0].data_written_size + cache.data_written_nb_blocks = \ + current[0].data_written_nb_blocks + cache.data_written_time = current[0].data_written_time + if current[0].results.count(): + for r in current[0].results.all(): + r.cache = cache + r.save() + print("CachedFile data `%s' MOVED from `%s'" % \ + (cache.hash, current[0].hash)) + else: + print("CachedFile (hash=%s) CREATED for block `%s' " \ + "of experiment `%s' which is in state `%s'" % \ + (cache.hash, block_name, fullname, + b.get_status_display())) + cache.blocks.add(b) + + #asserts all blocks (except analysis blocks have dependents) + for b in e.blocks.all(): + assert (b.analyzer and b.dependents.count() == 0) or \ + b.dependents.count() > 0 + + #asserts all analysis blocks have only one output + for b in e.blocks.filter(analyzer=True): + assert b.outputs.count() == 1 + + #removes results without caches + for r in Result.objects.filter(cache=None): + print("Removing result %d (no associated cache)" % r.id) + r.delete() + + +class Migration(migrations.Migration): + + dependencies = [ + ('backend', '0002_scheduler_addons'), + ('databases', '0002_scheduler_addons'), + ('experiments', '0003_scheduler_addons_2'), + ] + + operations = [ + migrations.RemoveField( + model_name='result', + name='block', + ), + migrations.CreateModel( + name='BlockInput', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, + auto_created=True, primary_key=True)), + ('channel', models.CharField(default=b'', + help_text=b'Synchronization channel within the toolchain', + max_length=200, blank=True)), + ('block', models.ForeignKey(related_name='inputs', + to='experiments.Block', null=True)), + ('cache', models.ForeignKey(related_name='inputs', + to='experiments.CachedFile', null=True)), + ('database', models.ForeignKey(related_name='blocks', + to='databases.DatabaseSetOutput', null=True)), + ], + ), + migrations.AddField( + model_name='block', + name='channel', + field=models.CharField(default=b'', + help_text=b'Synchronization channel within the toolchain', + max_length=200, blank=True), + ), + migrations.AddField( + model_name='block', + name='command', + field=models.TextField(null=True, blank=True), + ), + migrations.AddField( + model_name='block', + name='dependencies', + field=models.ManyToManyField(related_name='dependents', + to='experiments.Block', blank=True), + ), + migrations.AlterField( + model_name='block', + name='environment', + field=models.ForeignKey(related_name='blocks', + on_delete=models.deletion.SET_NULL, to='backend.Environment', + null=True), + ), + migrations.AddField( + model_name='block', + name='queue', + field=models.ForeignKey(related_name='blocks', + on_delete=models.deletion.SET_NULL, to='backend.Queue', + null=True), + ), + migrations.AddField( + model_name='block', + name='required_slots', + field=models.PositiveIntegerField(default=1), + ), + migrations.AlterField( + model_name='block', + name='status', + field=models.CharField(default=b'N', max_length=1, + choices=[ + (b'N', b'Not cached'), + (b'P', b'Processing'), + (b'C', b'Cached'), + (b'F', b'Failed'), + (b'S', b'Skipped'), + (b'L', b'Cancelled'), + ] + ), + ), + migrations.AlterUniqueTogether( + name='block', + unique_together=set([('experiment', 'name')]), + ), + migrations.AlterField( + model_name='cachedfile', + name='blocks', + field=models.ManyToManyField(related_name='outputs', + to='experiments.Block', blank=True), + ), + migrations.RunPython(reset_blocks), + ] -- GitLab