From 77caca35d8d22612b64ba80d082154b7e8a2e6ac Mon Sep 17 00:00:00 2001
From: Andre Anjos <andre.dos.anjos@gmail.com>
Date: Tue, 26 Apr 2016 11:13:33 +0200
Subject: [PATCH] [experiments,backend] Correct info cascading

---
 beat/web/backend/models.py     | 50 +++++++++++++---------------------
 beat/web/experiments/models.py | 23 ++++++++++++++--
 2 files changed, 40 insertions(+), 33 deletions(-)

diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py
index fa82ea352..0a48f07f8 100644
--- a/beat/web/backend/models.py
+++ b/beat/web/backend/models.py
@@ -206,6 +206,11 @@ class Worker(models.Model):
 
 
     def load(self):
+        '''Calculates the number of cores in use or to be used in the future'''
+        return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()])
+
+
+    def current_load(self):
         '''Calculates the number of cores being used currently'''
         return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)])
 
@@ -518,32 +523,22 @@ class Job(models.Model):
 
 
 
-    def copy(self, other):
+    @transaction.atomic
+    def _copy(self, other):
         '''Copy state from another block'''
 
+        Job.objects.select_for_update().filter(pk=self.pk)
+
         if self.done(): return
 
         self.start_date = other.start_date
         self.end_date = other.end_date
         self.status = other.status
 
-        if other.result is not None: #copies result
-            r = Result(
-                status=other.result.status,
-                stdout=other.result.stdout,
-                stderr=other.result.stderr,
-                usrerr=other.result.usrerr,
-                syserr=other.result.syserr,
-                _stats=other.result._stats,
-                )
-            r.save()
-            self.result = r
-
-        if self.done():
-            Result.objects.filter(splits__in=self.splits.all()).delete()
-
         # update status of parent jobs
-        self._update_state()
+        self.save()
+        self._cascade_updates()
+        self.block._update_state(None)
 
 
     @transaction.atomic
@@ -638,20 +633,9 @@ class Job(models.Model):
 
 
     def _cascade_updates(self):
-        '''Checks if the execution of this job unblocks other jobs in the same
-           experiment. Update their state or cancel them immediately.
+        '''Cascade updates to children before I'm deleted.
         '''
 
-        from ..experiments.models import Block
-
-        for b in self.block.dependents.all():
-            if any([k.status in (Block.FAILED, Block.CANCELLED) \
-                for k in b.dependencies.all()]):
-                b.job._cancel()
-            if all([k.status in (Block.CACHED, Block.SKIPPED) \
-                for k in b.dependencies.all()]):
-                b.job.make_runnable()
-
         if hasattr(self, 'child'):
             if self.status == Job.CANCELLED:
                 if self.parent: #I have a parent, so must give to child
@@ -659,9 +643,13 @@ class Job(models.Model):
                     self.parent = None
                     self.child.parent = parent
                 else: #child is the new parent
+                    child = self.child
                     self.child.parent = None
+                    # does this unblock the child to run?
+                    if child.block.is_runnable(): child.make_runnable()
 
-            self.child.copy(self)
+            else:
+                self.child._copy(self)
 
         if self.parent and self.status == Job.CANCELLED:
             self.parent = None
@@ -733,8 +721,8 @@ class Job(models.Model):
 
         # updates the dependents and child state
         self.save()
-        self.block._update_state(timings)
         self._cascade_updates()
+        self.block._update_state(timings)
 
 
     def erase_dangling_files(self):
diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py
index dc02acc84..feee5bc02 100644
--- a/beat/web/experiments/models.py
+++ b/beat/web/experiments/models.py
@@ -981,8 +981,7 @@ class Block(models.Model):
 
         # checks if the job is immediately runnable - if so, tries to
         # make it runnable (check caches and other)
-        if (self.job.parent is None) and (not self.dependencies.count()):
-            self.job.make_runnable()
+        if self.is_runnable(): self.job.make_runnable()
 
 
     def done(self):
@@ -1011,6 +1010,25 @@ class Block(models.Model):
             self.experiment.update_state()
 
 
+    def is_runnable(self):
+        '''Checks if a block is runnable presently'''
+
+        return all([k.status in (Block.CACHED, Block.SKIPPED) \
+                for k in self.dependencies.all()]) and \
+                (self.job.parent is None)
+
+
+    def _cascade_updates(self):
+        '''Cascade updates to blocks once I'm done.
+        '''
+
+        for b in self.dependents.all():
+            if any([k.status in (Block.FAILED, Block.CANCELLED) \
+                for k in b.dependencies.all()]):
+                b.job._cancel()
+            if b.is_runnable(): b.job.make_runnable()
+
+
     @transaction.atomic
     def _update_state(self, timings=None):
         '''Updates self state as a result of backend running
@@ -1097,6 +1115,7 @@ class Block(models.Model):
             data_source.close()
 
         self.save()
+        self._cascade_updates()
         self.experiment.update_state()
 
 
-- 
GitLab