diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index fa82ea352b830399d5c7722d9abe6fe095a74c2d..0a48f07f8d14a7bfed4feef1544706449bab2d23 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -206,6 +206,11 @@ class Worker(models.Model): def load(self): + '''Calculates the number of cores in use or to be used in the future''' + return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) + + + def current_load(self): '''Calculates the number of cores being used currently''' return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)]) @@ -518,32 +523,22 @@ class Job(models.Model): - def copy(self, other): + @transaction.atomic + def _copy(self, other): '''Copy state from another block''' + Job.objects.select_for_update().filter(pk=self.pk) + if self.done(): return self.start_date = other.start_date self.end_date = other.end_date self.status = other.status - if other.result is not None: #copies result - r = Result( - status=other.result.status, - stdout=other.result.stdout, - stderr=other.result.stderr, - usrerr=other.result.usrerr, - syserr=other.result.syserr, - _stats=other.result._stats, - ) - r.save() - self.result = r - - if self.done(): - Result.objects.filter(splits__in=self.splits.all()).delete() - # update status of parent jobs - self._update_state() + self.save() + self._cascade_updates() + self.block._update_state(None) @transaction.atomic @@ -638,20 +633,9 @@ class Job(models.Model): def _cascade_updates(self): - '''Checks if the execution of this job unblocks other jobs in the same - experiment. Update their state or cancel them immediately. + '''Cascade updates to children before I'm deleted. ''' - from ..experiments.models import Block - - for b in self.block.dependents.all(): - if any([k.status in (Block.FAILED, Block.CANCELLED) \ - for k in b.dependencies.all()]): - b.job._cancel() - if all([k.status in (Block.CACHED, Block.SKIPPED) \ - for k in b.dependencies.all()]): - b.job.make_runnable() - if hasattr(self, 'child'): if self.status == Job.CANCELLED: if self.parent: #I have a parent, so must give to child @@ -659,9 +643,13 @@ class Job(models.Model): self.parent = None self.child.parent = parent else: #child is the new parent + child = self.child self.child.parent = None + # does this unblock the child to run? + if child.block.is_runnable(): child.make_runnable() - self.child.copy(self) + else: + self.child._copy(self) if self.parent and self.status == Job.CANCELLED: self.parent = None @@ -733,8 +721,8 @@ class Job(models.Model): # updates the dependents and child state self.save() - self.block._update_state(timings) self._cascade_updates() + self.block._update_state(timings) def erase_dangling_files(self): diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py index dc02acc841dbef7cf10d22d179f80432ebff7e84..feee5bc02ad9b4e89aa3ce270cd4b1f490d14a9a 100644 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models.py @@ -981,8 +981,7 @@ class Block(models.Model): # checks if the job is immediately runnable - if so, tries to # make it runnable (check caches and other) - if (self.job.parent is None) and (not self.dependencies.count()): - self.job.make_runnable() + if self.is_runnable(): self.job.make_runnable() def done(self): @@ -1011,6 +1010,25 @@ class Block(models.Model): self.experiment.update_state() + def is_runnable(self): + '''Checks if a block is runnable presently''' + + return all([k.status in (Block.CACHED, Block.SKIPPED) \ + for k in self.dependencies.all()]) and \ + (self.job.parent is None) + + + def _cascade_updates(self): + '''Cascade updates to blocks once I'm done. + ''' + + for b in self.dependents.all(): + if any([k.status in (Block.FAILED, Block.CANCELLED) \ + for k in b.dependencies.all()]): + b.job._cancel() + if b.is_runnable(): b.job.make_runnable() + + @transaction.atomic def _update_state(self, timings=None): '''Updates self state as a result of backend running @@ -1097,6 +1115,7 @@ class Block(models.Model): data_source.close() self.save() + self._cascade_updates() self.experiment.update_state()