From 77794200e868eda11f5ebb29774a577f826c892b Mon Sep 17 00:00:00 2001
From: Andre Anjos <andre.anjos@idiap.ch>
Date: Tue, 26 Apr 2016 17:20:07 +0200
Subject: [PATCH] [experiments] Fix state update in case of cached blocks

---
 beat/web/experiments/models.py | 70 +++++++++++++++++++++++++++-------
 1 file changed, 56 insertions(+), 14 deletions(-)

diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py
index feee5bc02..63cf7294c 100644
--- a/beat/web/experiments/models.py
+++ b/beat/web/experiments/models.py
@@ -732,11 +732,22 @@ class Experiment(Shareable):
         return storage.get_file_content(self, 'declaration_file')
 
 
+    @transaction.atomic
     def update_state(self):
         '''Update self state based on associated block states'''
 
+        Experiment.objects.select_for_update().filter(pk=self.pk)
+
         if self.is_done(): return
 
+        if self.start_date is None:
+            d = self.blocks.order_by('start_date').first().start_date
+            if d is not None: self.start_date = d
+
+        if self.end_date is None:
+            d = self.blocks.order_by('-end_date').first().end_date
+            if d is not None: self.end_date = d
+
         block_statuses = self.blocks.values_list('status', flat=True)
 
         # Process main state and state from job results
@@ -780,8 +791,11 @@ class Experiment(Shareable):
 
         for b in self.blocks.all(): b._schedule()
 
-        self.status = Experiment.SCHEDULED
-        self.save()
+        # notice that the previous call may decide all is done already
+        # so, we must respect that before setting the SCHEDULED status
+        if not self.is_done():
+            self.status = Experiment.SCHEDULED
+            self.save()
 
 
     @transaction.atomic
@@ -1015,7 +1029,7 @@ class Block(models.Model):
 
         return all([k.status in (Block.CACHED, Block.SKIPPED) \
                 for k in self.dependencies.all()]) and \
-                (self.job.parent is None)
+                (hasattr(self, 'job') and self.job.parent is None)
 
 
     def _cascade_updates(self):
@@ -1080,7 +1094,10 @@ class Block(models.Model):
 
             self.outputs.update(**info)
 
-        self.status = self.job.status
+        if self.job.status == Block.SKIPPED:
+            self.status = Block.CACHED
+        else:
+            self.status = self.job.status
 
         if self.job.done():
             self.end_date = self.job.end_date
@@ -1100,14 +1117,14 @@ class Block(models.Model):
                         self.algorithm.fullname())
                 for field, value in output_data.as_dict().items():
                     res, _ = Result.objects.get_or_create(name=field,
-                        cache=cached)
+                        cache=cache)
                     res.primary = algorithm.results[field]['display']
                     res.type = algorithm.results[field]["type"]
 
                     if res.type in ['int32', 'float32', 'bool', 'string']:
                         res.data_value = str(value)
                     else:
-                        res.data_value = json.dumps(value, indent=4,
+                        res.data_value = simplejson.dumps(value, indent=4,
                             cls=NumpyJSONEncoder)
 
                     res.save()
@@ -1179,22 +1196,47 @@ class CachedFile(models.Model):
         return beat.core.hash.toPath(self.hash, suffix='')
 
 
-    def absolute_path(self):
+    def absolute_path(self, cache=settings.CACHE_ROOT):
         '''Returns the full path prefix to the cached file on disk'''
 
-        return os.path.join(settings.CACHE_ROOT, self.path())
+        return os.path.join(cache, self.path())
+
+
+    def files(self, cache=settings.CACHE_ROOT):
+        '''Checks if any file belonging to this cache exist on disk'''
+
+        return glob.glob(self.absolute_path(cache) + '*')
 
 
-    def files(self):
-        '''Returns a list of files matching this cache prefix path'''
+    def exists(self, cache=settings.CACHE_ROOT):
+        '''Checks if any file belonging to this cache exist on disk'''
 
-        return glob.glob(self.absolute_path() + '*')
+        return bool(self.files(cache))
 
 
-    def exists(self):
-        '''Check if this cached file really exists on the cache directory'''
+    def index_checksums(self, cache=settings.CACHE_ROOT):
+        '''Checks if this cached file indexes checksum properly'''
+
+        abs_path = self.absolute_path(cache)
+        index = sorted(glob.glob(abs_path + '*.index'))
+        chksum = sorted(glob.glob(abs_path + '*.index.checksum'))
+
+        if len(index) != len(chksum):
+            logger.warn("Number of index files (%d) is different from " \
+                "checksums (%d) for cache `%s'", len(index), len(chksum),
+                abs_path)
+            return False
+
+        for i, c in zip(index, chksum):
+            with open(c, 'rt') as f: recorded = f.read().strip()
+            actual = beat.core.hash.hashFileContents(i)
+            if actual != recorded:
+                logger.warn("Checksum for index of cache `%s' does not " \
+                    "match for file `%s' (%s != %s)", abs_path, i,
+                    actual, recorded)
+                return False
 
-        return bool(self.files())
+        return True
 
 
 #----------------------------------------------------------
-- 
GitLab