Commit 7adf01b4 authored by Philip ABBET's avatar Philip ABBET
Browse files

[experiments] Refactoring: Add a status field to the 'CachedFile' model

parent 99dd97f6
......@@ -808,7 +808,7 @@ class Job(models.Model):
# checks for the presence of output caches - if they exist and
# checksum, skip and update related blocks
if all([k.exists() for k in self.block.outputs.all()]):
if all([k.index_checksums() for k in self.block.outputs.all()]):
if all([k.check_checksums() for k in self.block.outputs.all()]):
self.status = Job.SKIPPED
self.split_errors = 0
self.end_date = datetime.datetime.now()
......
......@@ -2158,7 +2158,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(xp.status, Experiment.RUNNING)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
# checks the number of statistics objects has increased by 1
self.assertTrue(HourlyStatistics.objects.count() > current_stats)
......@@ -2198,7 +2198,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(xp.status, Experiment.DONE)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
self.check_stats_success(block)
......@@ -2307,7 +2307,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(block.experiment.status, Experiment.RUNNING)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
# checks the number of statistics objects has increased by 1
self.assertTrue(HourlyStatistics.objects.count() > current_stats)
......@@ -2347,7 +2347,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(block.experiment.status, Experiment.DONE)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
self.check_stats_success(block)
......@@ -2406,7 +2406,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(block.experiment.status, Experiment.RUNNING)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
# checks the number of statistics objects has increased by 1
self.assertTrue(HourlyStatistics.objects.count() > current_stats)
......@@ -2457,7 +2457,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(block.experiment.status, Experiment.DONE)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
self.check_stats_success(block)
......@@ -2517,7 +2517,7 @@ class Working(BaseBackendTestCase):
self.assertEqual(block.experiment.status, Experiment.RUNNING)
# all caches must be have been generated
assert all([k.index_checksums() for k in block.outputs.all()])
assert all([k.check_checksums() for k in block.outputs.all()])
# checks the number of statistics objects has increased by 1
self.assertTrue(HourlyStatistics.objects.count() > current_stats)
......@@ -2634,7 +2634,7 @@ class WorkingExternally(TransactionTestCase):
self.assertEqual(xp.status, Experiment.RUNNING)
# all caches must be have been generated
assert all([k.index_checksums() for k in split.job.block.outputs.all()])
assert all([k.check_checksums() for k in split.job.block.outputs.all()])
# checks the number of statistics objects has increased by 1
self.assertTrue(HourlyStatistics.objects.count() > current_stats)
......@@ -2681,7 +2681,7 @@ class WorkingExternally(TransactionTestCase):
self.assertEqual(xp.status, Experiment.DONE)
# all caches must be have been generated
assert all([k.index_checksums() for k in split.job.block.outputs.all()])
assert all([k.check_checksums() for k in split.job.block.outputs.all()])
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
......
......@@ -379,17 +379,34 @@ class BlockModelForm(forms.ModelForm):
class Block(admin.ModelAdmin):
list_display = ('id', 'author', 'toolchain', 'xp', 'execution_order', 'name', 'algorithm', 'analyzer', 'status', 'ins', 'outs', 'environment', 'q')
search_fields = ['name',
'experiment__author__username',
'experiment__toolchain__author__username',
'experiment__toolchain__name',
'experiment__name',
'algorithm__author__username',
'algorithm__name',
'environment__name',
'environment__version',
]
list_display = (
'id',
'author',
'toolchain',
'xp',
'execution_order',
'name',
'algorithm',
'analyzer',
'status',
'ins',
'outs',
'environment',
'q'
)
search_fields = [
'name',
'experiment__author__username',
'experiment__toolchain__author__username',
'experiment__toolchain__name',
'experiment__name',
'algorithm__author__username',
'algorithm__name',
'environment__name',
'environment__version',
]
list_display_links = ('id', 'name')
inlines = [
......@@ -478,12 +495,12 @@ admin.site.register(BlockModel, Block)
class Result(admin.ModelAdmin):
list_display = ('id', 'cache', 'name', 'type', 'primary', 'data_value')
list_display = ('id', 'cache', 'name', 'type', 'primary', 'data_value')
search_fields = [
'name',
'cache__hash',
]
search_fields = [
'name',
'cache__hash',
]
list_display_links = ('id', 'name')
......@@ -505,22 +522,24 @@ admin.site.register(ResultModel, Result)
class CachedFile(admin.ModelAdmin):
search_fields = [
'hash',
'blocks__name',
'blocks__experiment__name',
]
search_fields = [
'hash',
'blocks__name',
'blocks__experiment__name',
]
list_display = (
list_display = (
'id',
'hash',
'status',
'date',
'blocks_url',
'blocks_status',
'experiments_url',
)
)
list_display_links = ('id', 'hash')
list_filter = ('status', )
# to avoid very slow loading of cached files
raw_id_fields = ('blocks',)
......@@ -528,40 +547,31 @@ class CachedFile(admin.ModelAdmin):
qs = super(CachedFile, self).get_queryset(request)
return qs.annotate(date=Max('blocks__start_date'))
def date(self, obj): return obj.date
date.admin_order_field = '-date'
def date(self, obj):
return obj.date
def blocks_status(self, obj):
retval = [k.get_status_display() for k in obj.blocks.all()]
return ', '.join(retval)
blocks_status.short_description = "Block Status"
date.admin_order_field = '-date'
def blocks_url(self, obj):
retval = []
retval = '<ul>'
for block in obj.blocks.all():
retval.append(format_html("<a href='{url}'>{name}</a>",
url=reverse('admin:experiments_block_change', args=(block.id,)),
name=block.name,
))
return ', '.join(retval)
retval += format_html("<li><a href='{block_url}'>{block_name}</a> @ <a href='{experiment_url}'>{experiment_name}</a> ({block_status})</li>",
experiment_url=reverse('admin:experiments_experiment_change', args=(block.experiment.id,)),
experiment_name=block.experiment.fullname(),
block_url=reverse('admin:experiments_block_change', args=(block.id,)),
block_name=block.name,
block_status=block.get_status_display(),
)
return retval + '</ul>'
blocks_url.short_description = "Blocks"
blocks_url.allow_tags = True
def experiments_url(self, obj):
retval = []
for block in obj.blocks.all():
retval.append(format_html("<a href='{url}'>{name}</a>",
url=reverse('admin:experiments_experiment_change', args=(block.experiment.id,)),
name=block.experiment.fullname(),
))
return ', '.join(retval)
experiments_url.short_description = "Experiments"
experiments_url.allow_tags = True
fieldsets = (
(None,
dict(
fields=('hash', 'blocks',)
fields=('hash', 'status', 'blocks',)
),
),
('Logging',
......
File mode changed from 100644 to 100755
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from __future__ import unicode_literals
from django.db import migrations, models
from django.conf import settings
import os
import glob
import beat.core.hash
def set_status(apps, schema_editor):
CachedFile = apps.get_model("experiments", "CachedFile")
total = CachedFile.objects.count()
for i, c in enumerate(CachedFile.objects.order_by('id')):
print("Updating cached file %d/%d (%s, id=%d)..." % \
(i+1, total, c.hash, c.id))
abs_path = os.path.join(settings.CACHE_ROOT, beat.core.hash.toPath(c.hash, suffix=''))
data_files = sorted(glob.glob(abs_path + '*.index')) + \
sorted(glob.glob(abs_path + '*.data'))
checksum_files = sorted(glob.glob(abs_path + '*.index.checksum')) + \
sorted(glob.glob(abs_path + '*.data.checksum'))
if len(data_files) == 0:
continue
if len(data_files) != len(checksum_files):
continue
cached = True
for data_file, checksum_file in zip(data_files, checksum_files):
with open(checksum_file, 'rt') as f:
recorded = f.read().strip()
actual = beat.core.hash.hashFileContents(data_file)
if actual != recorded:
cached = False
break
if cached:
c.status = CachedFile.CACHED
c.save()
class Migration(migrations.Migration):
dependencies = [
('experiments', '0006_block_order'),
]
operations = [
migrations.AddField(
model_name='cachedfile',
name='status',
field=models.CharField(choices=[(b'N', b'Not cached'), (b'P', b'Processing'), (b'C', b'Cached')], default=b'N', max_length=1),
),
migrations.RunPython(set_status),
]
......@@ -385,6 +385,10 @@ class Block(models.Model):
self.job.delete()
if r: r.delete()
# Update the associated cached files
for cached_file in self.outputs.all():
cached_file.update(self.status)
# Loads Results from cache
if self.job.result and self.analyzer and self.status == Block.CACHED:
cache = self.first_cache()
......
......@@ -51,8 +51,20 @@ class CachedFileManager(models.Manager):
class CachedFile(models.Model):
NOT_CACHED = 'N'
PROCESSING = 'P'
CACHED = 'C'
STATUS = (
(NOT_CACHED, 'Not cached'),
(PROCESSING, 'Processing'),
(CACHED, 'Cached'),
)
blocks = models.ManyToManyField('Block', related_name='outputs', blank=True)
hash = models.CharField(max_length=64, unique=True)
status = models.CharField(max_length=1, choices=STATUS, default=NOT_CACHED)
# the total amount of time this block took to run considering the
# wall-clock time.
......@@ -87,7 +99,9 @@ class CachedFile(models.Model):
def __str__(self):
return 'CachedFile(%s, %d blocks)' % (self.hash, self.blocks.count())
return 'CachedFile(%s, %s, %d blocks)' % (
self.hash, self.get_status_display(), self.blocks.count()
)
def natural_key(self):
......@@ -112,32 +126,54 @@ class CachedFile(models.Model):
return glob.glob(self.absolute_path(cache) + '*')
def update(self, block_status):
from . import Block
if (block_status == Block.CACHED) and (self.status != CachedFile.CACHED):
self.status = CachedFile.CACHED
self.save()
elif (block_status == Block.PROCESSING) and (self.status != CachedFile.PROCESSING):
self.status = CachedFile.PROCESSING
self.save()
elif (block_status != Block.CACHED) and (self.status == CachedFile.PROCESSING):
self.status = CachedFile.NOT_CACHED
self.save()
def exists(self, cache=settings.CACHE_ROOT):
'''Checks if any file belonging to this cache exist on disk'''
return bool(self.files(cache))
def index_checksums(self, cache=settings.CACHE_ROOT):
'''Checks if this cached file indexes checksum properly'''
def check_checksums(self, cache=settings.CACHE_ROOT):
'''Checks if the cached files checksum properly'''
abs_path = self.absolute_path(cache)
index = sorted(glob.glob(abs_path + '*.index'))
chksum = sorted(glob.glob(abs_path + '*.index.checksum'))
if len(index) != len(chksum):
logger.warn("Number of index files (%d) is different from " \
"checksums (%d) for cache `%s'", len(index), len(chksum),
abs_path)
data_files = sorted(glob.glob(abs_path + '*.index')) + \
sorted(glob.glob(abs_path + '*.data'))
checksum_files = sorted(glob.glob(abs_path + '*.index.checksum')) + \
sorted(glob.glob(abs_path + '*.data.checksum'))
if len(data_files) != len(checksum_files):
logger.warn("Number of files (%d) is different from checksums (%d) " \
"for cache `%s'", len(data_files), len(checksum_files),
abs_path)
return False
for i, c in zip(index, chksum):
with open(c, 'rt') as f: recorded = f.read().strip()
actual = beat.core.hash.hashFileContents(i)
for data_file, checksum_file in zip(data_files, checksum_files):
with open(checksum_file, 'rt') as f:
recorded = f.read().strip()
actual = beat.core.hash.hashFileContents(data_file)
if actual != recorded:
logger.warn("Checksum for index of cache `%s' does not " \
"match for file `%s' (%s != %s)", abs_path, i,
actual, recorded)
logger.warn("Checksum for file `%s' does not match (%s != %s)",
data_file, actual, recorded)
return False
return True
......@@ -977,6 +977,9 @@ class ExperimentStartingAPI(ExperimentTestBase):
hashes = block.outputs.all()
self.assertEqual(1, hashes.count())
cached_file = hashes[0]
self.assertEqual(CachedFile.NOT_CACHED, cached_file.status)
block = experiment.blocks.get(name='addition2')
self.assertEqual(Block.NOT_CACHED, block.status)
self.assertFalse(block.analyzer)
......@@ -985,6 +988,9 @@ class ExperimentStartingAPI(ExperimentTestBase):
hashes = block.outputs.all()
self.assertEqual(1, hashes.count())
cached_file = hashes[0]
self.assertEqual(CachedFile.NOT_CACHED, cached_file.status)
block = experiment.blocks.get(name='analysis')
self.assertEqual(Block.NOT_CACHED, block.status)
self.assertTrue(block.analyzer)
......@@ -993,6 +999,9 @@ class ExperimentStartingAPI(ExperimentTestBase):
hashes = block.outputs.all()
self.assertEqual(1, hashes.count())
cached_file = hashes[0]
self.assertEqual(CachedFile.NOT_CACHED, cached_file.status)
def test_start_team_shared_experiment(self):
johndoe = User.objects.get(username='johndoe')
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment