Skip to content
Snippets Groups Projects
Commit 77caca35 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[experiments,backend] Correct info cascading

parent ddd3450d
No related branches found
No related tags found
1 merge request!194Scheduler
......@@ -206,6 +206,11 @@ class Worker(models.Model):
def load(self):
'''Calculates the number of cores in use or to be used in the future'''
return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()])
def current_load(self):
'''Calculates the number of cores being used currently'''
return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)])
......@@ -518,32 +523,22 @@ class Job(models.Model):
def copy(self, other):
@transaction.atomic
def _copy(self, other):
'''Copy state from another block'''
Job.objects.select_for_update().filter(pk=self.pk)
if self.done(): return
self.start_date = other.start_date
self.end_date = other.end_date
self.status = other.status
if other.result is not None: #copies result
r = Result(
status=other.result.status,
stdout=other.result.stdout,
stderr=other.result.stderr,
usrerr=other.result.usrerr,
syserr=other.result.syserr,
_stats=other.result._stats,
)
r.save()
self.result = r
if self.done():
Result.objects.filter(splits__in=self.splits.all()).delete()
# update status of parent jobs
self._update_state()
self.save()
self._cascade_updates()
self.block._update_state(None)
@transaction.atomic
......@@ -638,20 +633,9 @@ class Job(models.Model):
def _cascade_updates(self):
'''Checks if the execution of this job unblocks other jobs in the same
experiment. Update their state or cancel them immediately.
'''Cascade updates to children before I'm deleted.
'''
from ..experiments.models import Block
for b in self.block.dependents.all():
if any([k.status in (Block.FAILED, Block.CANCELLED) \
for k in b.dependencies.all()]):
b.job._cancel()
if all([k.status in (Block.CACHED, Block.SKIPPED) \
for k in b.dependencies.all()]):
b.job.make_runnable()
if hasattr(self, 'child'):
if self.status == Job.CANCELLED:
if self.parent: #I have a parent, so must give to child
......@@ -659,9 +643,13 @@ class Job(models.Model):
self.parent = None
self.child.parent = parent
else: #child is the new parent
child = self.child
self.child.parent = None
# does this unblock the child to run?
if child.block.is_runnable(): child.make_runnable()
self.child.copy(self)
else:
self.child._copy(self)
if self.parent and self.status == Job.CANCELLED:
self.parent = None
......@@ -733,8 +721,8 @@ class Job(models.Model):
# updates the dependents and child state
self.save()
self.block._update_state(timings)
self._cascade_updates()
self.block._update_state(timings)
def erase_dangling_files(self):
......
......@@ -981,8 +981,7 @@ class Block(models.Model):
# checks if the job is immediately runnable - if so, tries to
# make it runnable (check caches and other)
if (self.job.parent is None) and (not self.dependencies.count()):
self.job.make_runnable()
if self.is_runnable(): self.job.make_runnable()
def done(self):
......@@ -1011,6 +1010,25 @@ class Block(models.Model):
self.experiment.update_state()
def is_runnable(self):
'''Checks if a block is runnable presently'''
return all([k.status in (Block.CACHED, Block.SKIPPED) \
for k in self.dependencies.all()]) and \
(self.job.parent is None)
def _cascade_updates(self):
'''Cascade updates to blocks once I'm done.
'''
for b in self.dependents.all():
if any([k.status in (Block.FAILED, Block.CANCELLED) \
for k in b.dependencies.all()]):
b.job._cancel()
if b.is_runnable(): b.job.make_runnable()
@transaction.atomic
def _update_state(self, timings=None):
'''Updates self state as a result of backend running
......@@ -1097,6 +1115,7 @@ class Block(models.Model):
data_source.close()
self.save()
self._cascade_updates()
self.experiment.update_state()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment