From 7adf01b4cd53ca369c8f0b2a8d18924e9ee3119c Mon Sep 17 00:00:00 2001 From: Philip ABBET <philip.abbet@idiap.ch> Date: Wed, 27 Sep 2017 16:12:43 +0200 Subject: [PATCH] [experiments] Refactoring: Add a status field to the 'CachedFile' model --- beat/web/backend/models.py | 2 +- beat/web/backend/tests.py | 18 +-- beat/web/experiments/admin.py | 106 ++++++++++-------- .../migrations/0006_block_order.py | 0 .../migrations/0007_cachedfile_status.py | 92 +++++++++++++++ beat/web/experiments/models/block.py | 4 + beat/web/experiments/models/cached_file.py | 66 ++++++++--- beat/web/experiments/tests.py | 9 ++ 8 files changed, 224 insertions(+), 73 deletions(-) mode change 100644 => 100755 beat/web/experiments/admin.py mode change 100644 => 100755 beat/web/experiments/migrations/0006_block_order.py create mode 100755 beat/web/experiments/migrations/0007_cachedfile_status.py diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 72db89456..9ea3d5a4d 100755 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -808,7 +808,7 @@ class Job(models.Model): # checks for the presence of output caches - if they exist and # checksum, skip and update related blocks if all([k.exists() for k in self.block.outputs.all()]): - if all([k.index_checksums() for k in self.block.outputs.all()]): + if all([k.check_checksums() for k in self.block.outputs.all()]): self.status = Job.SKIPPED self.split_errors = 0 self.end_date = datetime.datetime.now() diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index aa4e82f74..2087c3a65 100755 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -2158,7 +2158,7 @@ class Working(BaseBackendTestCase): self.assertEqual(xp.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertTrue(HourlyStatistics.objects.count() > current_stats) @@ -2198,7 +2198,7 @@ class Working(BaseBackendTestCase): self.assertEqual(xp.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) self.check_stats_success(block) @@ -2307,7 +2307,7 @@ class Working(BaseBackendTestCase): self.assertEqual(block.experiment.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertTrue(HourlyStatistics.objects.count() > current_stats) @@ -2347,7 +2347,7 @@ class Working(BaseBackendTestCase): self.assertEqual(block.experiment.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) self.check_stats_success(block) @@ -2406,7 +2406,7 @@ class Working(BaseBackendTestCase): self.assertEqual(block.experiment.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertTrue(HourlyStatistics.objects.count() > current_stats) @@ -2457,7 +2457,7 @@ class Working(BaseBackendTestCase): self.assertEqual(block.experiment.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) self.check_stats_success(block) @@ -2517,7 +2517,7 @@ class Working(BaseBackendTestCase): self.assertEqual(block.experiment.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in block.outputs.all()]) + assert all([k.check_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertTrue(HourlyStatistics.objects.count() > current_stats) @@ -2634,7 +2634,7 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(xp.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.check_checksums() for k in split.job.block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertTrue(HourlyStatistics.objects.count() > current_stats) @@ -2681,7 +2681,7 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(xp.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.check_checksums() for k in split.job.block.outputs.all()]) # assert we have no database traces after the last block is done self.assertEqual(Job.objects.count(), 0) diff --git a/beat/web/experiments/admin.py b/beat/web/experiments/admin.py old mode 100644 new mode 100755 index bcd5f9ef5..2e0748df5 --- a/beat/web/experiments/admin.py +++ b/beat/web/experiments/admin.py @@ -379,17 +379,34 @@ class BlockModelForm(forms.ModelForm): class Block(admin.ModelAdmin): - list_display = ('id', 'author', 'toolchain', 'xp', 'execution_order', 'name', 'algorithm', 'analyzer', 'status', 'ins', 'outs', 'environment', 'q') - search_fields = ['name', - 'experiment__author__username', - 'experiment__toolchain__author__username', - 'experiment__toolchain__name', - 'experiment__name', - 'algorithm__author__username', - 'algorithm__name', - 'environment__name', - 'environment__version', - ] + list_display = ( + 'id', + 'author', + 'toolchain', + 'xp', + 'execution_order', + 'name', + 'algorithm', + 'analyzer', + 'status', + 'ins', + 'outs', + 'environment', + 'q' + ) + + search_fields = [ + 'name', + 'experiment__author__username', + 'experiment__toolchain__author__username', + 'experiment__toolchain__name', + 'experiment__name', + 'algorithm__author__username', + 'algorithm__name', + 'environment__name', + 'environment__version', + ] + list_display_links = ('id', 'name') inlines = [ @@ -478,12 +495,12 @@ admin.site.register(BlockModel, Block) class Result(admin.ModelAdmin): - list_display = ('id', 'cache', 'name', 'type', 'primary', 'data_value') + list_display = ('id', 'cache', 'name', 'type', 'primary', 'data_value') - search_fields = [ - 'name', - 'cache__hash', - ] + search_fields = [ + 'name', + 'cache__hash', + ] list_display_links = ('id', 'name') @@ -505,22 +522,24 @@ admin.site.register(ResultModel, Result) class CachedFile(admin.ModelAdmin): - search_fields = [ - 'hash', - 'blocks__name', - 'blocks__experiment__name', - ] + search_fields = [ + 'hash', + 'blocks__name', + 'blocks__experiment__name', + ] - list_display = ( + list_display = ( 'id', 'hash', + 'status', 'date', 'blocks_url', - 'blocks_status', - 'experiments_url', - ) + ) + list_display_links = ('id', 'hash') + list_filter = ('status', ) + # to avoid very slow loading of cached files raw_id_fields = ('blocks',) @@ -528,40 +547,31 @@ class CachedFile(admin.ModelAdmin): qs = super(CachedFile, self).get_queryset(request) return qs.annotate(date=Max('blocks__start_date')) - def date(self, obj): return obj.date - date.admin_order_field = '-date' + def date(self, obj): + return obj.date - def blocks_status(self, obj): - retval = [k.get_status_display() for k in obj.blocks.all()] - return ', '.join(retval) - blocks_status.short_description = "Block Status" + date.admin_order_field = '-date' def blocks_url(self, obj): - retval = [] + retval = '<ul>' for block in obj.blocks.all(): - retval.append(format_html("<a href='{url}'>{name}</a>", - url=reverse('admin:experiments_block_change', args=(block.id,)), - name=block.name, - )) - return ', '.join(retval) + retval += format_html("<li><a href='{block_url}'>{block_name}</a> @ <a href='{experiment_url}'>{experiment_name}</a> ({block_status})</li>", + experiment_url=reverse('admin:experiments_experiment_change', args=(block.experiment.id,)), + experiment_name=block.experiment.fullname(), + block_url=reverse('admin:experiments_block_change', args=(block.id,)), + block_name=block.name, + block_status=block.get_status_display(), + ) + return retval + '</ul>' + blocks_url.short_description = "Blocks" blocks_url.allow_tags = True - def experiments_url(self, obj): - retval = [] - for block in obj.blocks.all(): - retval.append(format_html("<a href='{url}'>{name}</a>", - url=reverse('admin:experiments_experiment_change', args=(block.experiment.id,)), - name=block.experiment.fullname(), - )) - return ', '.join(retval) - experiments_url.short_description = "Experiments" - experiments_url.allow_tags = True fieldsets = ( (None, dict( - fields=('hash', 'blocks',) + fields=('hash', 'status', 'blocks',) ), ), ('Logging', diff --git a/beat/web/experiments/migrations/0006_block_order.py b/beat/web/experiments/migrations/0006_block_order.py old mode 100644 new mode 100755 diff --git a/beat/web/experiments/migrations/0007_cachedfile_status.py b/beat/web/experiments/migrations/0007_cachedfile_status.py new file mode 100755 index 000000000..00522a805 --- /dev/null +++ b/beat/web/experiments/migrations/0007_cachedfile_status.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +from __future__ import unicode_literals + +from django.db import migrations, models +from django.conf import settings + +import os +import glob + +import beat.core.hash + + +def set_status(apps, schema_editor): + CachedFile = apps.get_model("experiments", "CachedFile") + + total = CachedFile.objects.count() + for i, c in enumerate(CachedFile.objects.order_by('id')): + + print("Updating cached file %d/%d (%s, id=%d)..." % \ + (i+1, total, c.hash, c.id)) + + abs_path = os.path.join(settings.CACHE_ROOT, beat.core.hash.toPath(c.hash, suffix='')) + + data_files = sorted(glob.glob(abs_path + '*.index')) + \ + sorted(glob.glob(abs_path + '*.data')) + + checksum_files = sorted(glob.glob(abs_path + '*.index.checksum')) + \ + sorted(glob.glob(abs_path + '*.data.checksum')) + + if len(data_files) == 0: + continue + + if len(data_files) != len(checksum_files): + continue + + cached = True + for data_file, checksum_file in zip(data_files, checksum_files): + with open(checksum_file, 'rt') as f: + recorded = f.read().strip() + + actual = beat.core.hash.hashFileContents(data_file) + + if actual != recorded: + cached = False + break + + if cached: + c.status = CachedFile.CACHED + c.save() + + + +class Migration(migrations.Migration): + + dependencies = [ + ('experiments', '0006_block_order'), + ] + + operations = [ + migrations.AddField( + model_name='cachedfile', + name='status', + field=models.CharField(choices=[(b'N', b'Not cached'), (b'P', b'Processing'), (b'C', b'Cached')], default=b'N', max_length=1), + ), + migrations.RunPython(set_status), + ] diff --git a/beat/web/experiments/models/block.py b/beat/web/experiments/models/block.py index aedc1026e..c70c983c3 100755 --- a/beat/web/experiments/models/block.py +++ b/beat/web/experiments/models/block.py @@ -385,6 +385,10 @@ class Block(models.Model): self.job.delete() if r: r.delete() + # Update the associated cached files + for cached_file in self.outputs.all(): + cached_file.update(self.status) + # Loads Results from cache if self.job.result and self.analyzer and self.status == Block.CACHED: cache = self.first_cache() diff --git a/beat/web/experiments/models/cached_file.py b/beat/web/experiments/models/cached_file.py index 7ea30546c..bb79e07c5 100755 --- a/beat/web/experiments/models/cached_file.py +++ b/beat/web/experiments/models/cached_file.py @@ -51,8 +51,20 @@ class CachedFileManager(models.Manager): class CachedFile(models.Model): + NOT_CACHED = 'N' + PROCESSING = 'P' + CACHED = 'C' + + STATUS = ( + (NOT_CACHED, 'Not cached'), + (PROCESSING, 'Processing'), + (CACHED, 'Cached'), + ) + + blocks = models.ManyToManyField('Block', related_name='outputs', blank=True) hash = models.CharField(max_length=64, unique=True) + status = models.CharField(max_length=1, choices=STATUS, default=NOT_CACHED) # the total amount of time this block took to run considering the # wall-clock time. @@ -87,7 +99,9 @@ class CachedFile(models.Model): def __str__(self): - return 'CachedFile(%s, %d blocks)' % (self.hash, self.blocks.count()) + return 'CachedFile(%s, %s, %d blocks)' % ( + self.hash, self.get_status_display(), self.blocks.count() + ) def natural_key(self): @@ -112,32 +126,54 @@ class CachedFile(models.Model): return glob.glob(self.absolute_path(cache) + '*') + def update(self, block_status): + from . import Block + + if (block_status == Block.CACHED) and (self.status != CachedFile.CACHED): + self.status = CachedFile.CACHED + self.save() + + elif (block_status == Block.PROCESSING) and (self.status != CachedFile.PROCESSING): + self.status = CachedFile.PROCESSING + self.save() + + elif (block_status != Block.CACHED) and (self.status == CachedFile.PROCESSING): + self.status = CachedFile.NOT_CACHED + self.save() + + def exists(self, cache=settings.CACHE_ROOT): '''Checks if any file belonging to this cache exist on disk''' return bool(self.files(cache)) - def index_checksums(self, cache=settings.CACHE_ROOT): - '''Checks if this cached file indexes checksum properly''' + def check_checksums(self, cache=settings.CACHE_ROOT): + '''Checks if the cached files checksum properly''' abs_path = self.absolute_path(cache) - index = sorted(glob.glob(abs_path + '*.index')) - chksum = sorted(glob.glob(abs_path + '*.index.checksum')) - if len(index) != len(chksum): - logger.warn("Number of index files (%d) is different from " \ - "checksums (%d) for cache `%s'", len(index), len(chksum), - abs_path) + data_files = sorted(glob.glob(abs_path + '*.index')) + \ + sorted(glob.glob(abs_path + '*.data')) + + checksum_files = sorted(glob.glob(abs_path + '*.index.checksum')) + \ + sorted(glob.glob(abs_path + '*.data.checksum')) + + if len(data_files) != len(checksum_files): + logger.warn("Number of files (%d) is different from checksums (%d) " \ + "for cache `%s'", len(data_files), len(checksum_files), + abs_path) return False - for i, c in zip(index, chksum): - with open(c, 'rt') as f: recorded = f.read().strip() - actual = beat.core.hash.hashFileContents(i) + for data_file, checksum_file in zip(data_files, checksum_files): + with open(checksum_file, 'rt') as f: + recorded = f.read().strip() + + actual = beat.core.hash.hashFileContents(data_file) + if actual != recorded: - logger.warn("Checksum for index of cache `%s' does not " \ - "match for file `%s' (%s != %s)", abs_path, i, - actual, recorded) + logger.warn("Checksum for file `%s' does not match (%s != %s)", + data_file, actual, recorded) return False return True diff --git a/beat/web/experiments/tests.py b/beat/web/experiments/tests.py index 10408a6ab..f82abacef 100755 --- a/beat/web/experiments/tests.py +++ b/beat/web/experiments/tests.py @@ -977,6 +977,9 @@ class ExperimentStartingAPI(ExperimentTestBase): hashes = block.outputs.all() self.assertEqual(1, hashes.count()) + cached_file = hashes[0] + self.assertEqual(CachedFile.NOT_CACHED, cached_file.status) + block = experiment.blocks.get(name='addition2') self.assertEqual(Block.NOT_CACHED, block.status) self.assertFalse(block.analyzer) @@ -985,6 +988,9 @@ class ExperimentStartingAPI(ExperimentTestBase): hashes = block.outputs.all() self.assertEqual(1, hashes.count()) + cached_file = hashes[0] + self.assertEqual(CachedFile.NOT_CACHED, cached_file.status) + block = experiment.blocks.get(name='analysis') self.assertEqual(Block.NOT_CACHED, block.status) self.assertTrue(block.analyzer) @@ -993,6 +999,9 @@ class ExperimentStartingAPI(ExperimentTestBase): hashes = block.outputs.all() self.assertEqual(1, hashes.count()) + cached_file = hashes[0] + self.assertEqual(CachedFile.NOT_CACHED, cached_file.status) + def test_start_team_shared_experiment(self): johndoe = User.objects.get(username='johndoe') -- GitLab