Skip to content
Snippets Groups Projects
Commit c6efc35e authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[experiments] Break k migrations even further to avoid Postgres transaction errors

parent 0e73a899
No related branches found
No related tags found
1 merge request!194Scheduler
Pipeline #
...@@ -28,184 +28,7 @@ ...@@ -28,184 +28,7 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from django.db import migrations, models, utils from django.db import migrations, models
from django.conf import settings
import simplejson
import beat.core.experiment
from ...common import storage
def reset_blocks(apps, schema_editor):
'''Resets block dependencies and queue relationship'''
Experiment = apps.get_model("experiments", "Experiment")
Block = apps.get_model("experiments", "Block")
BlockInput = apps.get_model("experiments", "BlockInput")
CachedFile = apps.get_model("experiments", "CachedFile")
Queue = apps.get_model("backend", "Queue")
Environment = apps.get_model("backend", "Environment")
Algorithm = apps.get_model("algorithms", "Algorithm")
DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput")
Result = apps.get_model("experiments", "Result")
total = Experiment.objects.count()
for i, e in enumerate(Experiment.objects.order_by('id')):
fullname = '%s/%s/%s/%d/%s' % (
e.author.username,
e.toolchain.author.username,
e.toolchain.name,
e.toolchain.version,
e.name,
)
print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \
(i+1, total, fullname, e.id))
xp_decl = simplejson.loads(storage.get_file_content(e,
'declaration_file'))
tc_decl = simplejson.loads(storage.get_file_content(e.toolchain,
'declaration_file'))
xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl,
tc_decl))
if xp.errors:
message = "The experiment `%s' isn't valid (skipping " \
"block update), due to the following errors:\n * %s"
print message % (fullname, '\n * '.join(xp.errors))
continue
# Loads the experiment execution description, creating the Block's,
# BlockInput's and BlockOutput's as required.
for block_name, description in xp.setup().items():
# Checks that the Queue/Environment exists
job_description = description['configuration']
env = Environment.objects.filter(
name=job_description['environment']['name'],
version=job_description['environment']['version'],
)
if not env:
print("Cannot find environment `%s (%s)' - not setting" % \
(job_description['environment']['name'],
job_description['environment']['version']))
env = None
else:
env = env[0]
# Search for queue that contains a specific environment
# notice we don't require environment to exist in relation to
# the queue as it may have been removed already.
queue = Queue.objects.filter(name=job_description['queue'])
if not queue:
print("Cannot find queue `%s'" % job_description['queue'])
queue = None
else:
queue = queue[0]
parts = job_description['algorithm'].split('/')
algorithm = Algorithm.objects.get(
author__username=parts[0],
name=parts[1],
version=parts[2],
)
# Ties the block in
slots = job_description.get('nb_slots')
try:
b, _ = Block.objects.get_or_create(experiment=e,
name=block_name, algorithm=algorithm)
except utils.IntegrityError as exc:
print("Block `%s' for experiment `%s' already exists - " \
"modifying entry for migration purposes. This " \
"issue is due a misconnection on the toolchain level " \
"(known case: tpereira/full_isv/2)" % \
(block_name, fullname))
b = Block.objects.get(experiment=e, name=block_name)
b.command=simplejson.dumps(job_description, indent=4)
b.status='N' if (e.status == 'P') else b.status
b.environment=env
b.queue=queue
b.algorithm = algorithm
b.analyzer = (algorithm.result_dataformat is not None)
b.required_slots=job_description['nb_slots']
b.channel=job_description['channel']
b.save()
# from this point: requires block to have an assigned id
b.dependencies.add(*[e.blocks.get(name=k) \
for k in description['dependencies']])
# reset inputs and outputs - creates if necessary only
for v in job_description['inputs'].values():
if 'database' in v: #database input
db = DatabaseSetOutput.objects.get(hash=v['hash'])
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], database=db)
else:
cache = CachedFile.objects.get(hash=v['hash'])
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], cache=cache)
current = list(b.outputs.all())
b.outputs.clear() #dissociates all current outputs
outputs = job_description.get('outputs',
{'': job_description.get('result')})
for v in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=v['hash'])
if cr:
if len(current) == len(outputs): #copy
cache.linear_exedution_time = \
current[0].linear_execution_time
cache.speed_up_real = current[0].speed_up_real
cache.speed_up_maximal = current[0].speed_up_maximal
cache.queuing_time = current[0].queuing_time
cache.stdout = current[0].stdout
cache.stderr = current[0].stderr
cache.error_report = current[0].error_report
cache.cpu_time = current[0].cpu_time
cache.max_memory = current[0].max_memory
cache.data_read_size = current[0].data_read_size
cache.data_read_nb_blocks = \
current[0].data_read_nb_blocks
cache.data_read_time = current[0].data_read_time
cache.data_written_size = current[0].data_written_size
cache.data_written_nb_blocks = \
current[0].data_written_nb_blocks
cache.data_written_time = current[0].data_written_time
if current[0].results.count():
for r in current[0].results.all():
r.cache = cache
r.save()
print("CachedFile data `%s' MOVED from `%s'" % \
(cache.hash, current[0].hash))
else:
print("CachedFile (hash=%s) CREATED for block `%s' " \
"of experiment `%s' which is in state `%s'" % \
(cache.hash, block_name, fullname,
b.get_status_display()))
cache.blocks.add(b)
#asserts all blocks (except analysis blocks have dependents)
for b in e.blocks.all():
assert (b.analyzer and b.dependents.count() == 0) or \
b.dependents.count() > 0
#asserts all analysis blocks have only one output
for b in e.blocks.filter(analyzer=True):
assert b.outputs.count() == 1
#removes results without caches
for r in Result.objects.filter(cache=None):
print("Removing result %d (no associated cache)" % r.id)
r.delete()
class Migration(migrations.Migration): class Migration(migrations.Migration):
...@@ -301,5 +124,4 @@ class Migration(migrations.Migration): ...@@ -301,5 +124,4 @@ class Migration(migrations.Migration):
field=models.ManyToManyField(related_name='outputs', field=models.ManyToManyField(related_name='outputs',
to='experiments.Block', blank=True), to='experiments.Block', blank=True),
), ),
migrations.RunPython(reset_blocks),
] ]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from __future__ import unicode_literals
from django.db import migrations, utils
from django.conf import settings
import simplejson
import beat.core.experiment
from ...common import storage
def reset_blocks(apps, schema_editor):
'''Resets block dependencies and queue relationship'''
Experiment = apps.get_model("experiments", "Experiment")
Block = apps.get_model("experiments", "Block")
BlockInput = apps.get_model("experiments", "BlockInput")
CachedFile = apps.get_model("experiments", "CachedFile")
Queue = apps.get_model("backend", "Queue")
Environment = apps.get_model("backend", "Environment")
Algorithm = apps.get_model("algorithms", "Algorithm")
DatabaseSetOutput = apps.get_model("databases", "DatabaseSetOutput")
Result = apps.get_model("experiments", "Result")
total = Experiment.objects.count()
for i, e in enumerate(Experiment.objects.order_by('id')):
fullname = '%s/%s/%s/%d/%s' % (
e.author.username,
e.toolchain.author.username,
e.toolchain.name,
e.toolchain.version,
e.name,
)
print("Updating blocks for experiment %d/%d (%s, id=%d)..." % \
(i+1, total, fullname, e.id))
xp_decl = simplejson.loads(storage.get_file_content(e,
'declaration_file'))
tc_decl = simplejson.loads(storage.get_file_content(e.toolchain,
'declaration_file'))
xp = beat.core.experiment.Experiment(settings.PREFIX, (xp_decl,
tc_decl))
if xp.errors:
message = "The experiment `%s' isn't valid (skipping " \
"block update), due to the following errors:\n * %s"
print message % (fullname, '\n * '.join(xp.errors))
continue
# Loads the experiment execution description, creating the Block's,
# BlockInput's and BlockOutput's as required.
for block_name, description in xp.setup().items():
# Checks that the Queue/Environment exists
job_description = description['configuration']
env = Environment.objects.filter(
name=job_description['environment']['name'],
version=job_description['environment']['version'],
)
if not env:
print("Cannot find environment `%s (%s)' - not setting" % \
(job_description['environment']['name'],
job_description['environment']['version']))
env = None
else:
env = env[0]
# Search for queue that contains a specific environment
# notice we don't require environment to exist in relation to
# the queue as it may have been removed already.
queue = Queue.objects.filter(name=job_description['queue'])
if not queue:
print("Cannot find queue `%s'" % job_description['queue'])
queue = None
else:
queue = queue[0]
parts = job_description['algorithm'].split('/')
algorithm = Algorithm.objects.get(
author__username=parts[0],
name=parts[1],
version=parts[2],
)
# Ties the block in
slots = job_description.get('nb_slots')
try:
b, _ = Block.objects.get_or_create(experiment=e,
name=block_name, algorithm=algorithm)
except utils.IntegrityError as exc:
print("Block `%s' for experiment `%s' already exists - " \
"modifying entry for migration purposes. This " \
"issue is due a misconnection on the toolchain level " \
"(known case: tpereira/full_isv/2)" % \
(block_name, fullname))
b = Block.objects.get(experiment=e, name=block_name)
b.command=simplejson.dumps(job_description, indent=4)
b.status='N' if (e.status == 'P') else b.status
b.environment=env
b.queue=queue
b.algorithm = algorithm
b.analyzer = (algorithm.result_dataformat is not None)
b.required_slots=job_description['nb_slots']
b.channel=job_description['channel']
b.save()
# from this point: requires block to have an assigned id
b.dependencies.add(*[e.blocks.get(name=k) \
for k in description['dependencies']])
# reset inputs and outputs - creates if necessary only
for v in job_description['inputs'].values():
if 'database' in v: #database input
db = DatabaseSetOutput.objects.get(hash=v['hash'])
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], database=db)
else:
cache = CachedFile.objects.get(hash=v['hash'])
BlockInput.objects.get_or_create(block=b,
channel=v['channel'], cache=cache)
current = list(b.outputs.all())
b.outputs.clear() #dissociates all current outputs
outputs = job_description.get('outputs',
{'': job_description.get('result')})
for v in outputs.values():
cache, cr = CachedFile.objects.get_or_create(hash=v['hash'])
if cr:
if len(current) == len(outputs): #copy
cache.linear_exedution_time = \
current[0].linear_execution_time
cache.speed_up_real = current[0].speed_up_real
cache.speed_up_maximal = current[0].speed_up_maximal
cache.queuing_time = current[0].queuing_time
cache.stdout = current[0].stdout
cache.stderr = current[0].stderr
cache.error_report = current[0].error_report
cache.cpu_time = current[0].cpu_time
cache.max_memory = current[0].max_memory
cache.data_read_size = current[0].data_read_size
cache.data_read_nb_blocks = \
current[0].data_read_nb_blocks
cache.data_read_time = current[0].data_read_time
cache.data_written_size = current[0].data_written_size
cache.data_written_nb_blocks = \
current[0].data_written_nb_blocks
cache.data_written_time = current[0].data_written_time
if current[0].results.count():
for r in current[0].results.all():
r.cache = cache
r.save()
print("CachedFile data `%s' MOVED from `%s'" % \
(cache.hash, current[0].hash))
else:
print("CachedFile (hash=%s) CREATED for block `%s' " \
"of experiment `%s' which is in state `%s'" % \
(cache.hash, block_name, fullname,
b.get_status_display()))
cache.blocks.add(b)
#asserts all blocks (except analysis blocks have dependents)
for b in e.blocks.all():
assert (b.analyzer and b.dependents.count() == 0) or \
b.dependents.count() > 0
#asserts all analysis blocks have only one output
for b in e.blocks.filter(analyzer=True):
assert b.outputs.count() == 1
#removes results without caches
for r in Result.objects.filter(cache=None):
print("Removing result %d (no associated cache)" % r.id)
r.delete()
class Migration(migrations.Migration):
dependencies = [
('experiments', '0004_scheduler_addons_3'),
]
operations = [
migrations.RunPython(reset_blocks),
]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment