Skip to content
Snippets Groups Projects
Commit 277755de authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Improve models and Job.parent

parent 11763f3a
No related branches found
No related tags found
1 merge request!194Scheduler
...@@ -206,7 +206,7 @@ class Worker(models.Model): ...@@ -206,7 +206,7 @@ class Worker(models.Model):
def load(self): def load(self):
'''Calculates the number of cores being used currently''' '''Calculates the number of cores being used currently'''
return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)])
def available_cores(self): def available_cores(self):
...@@ -436,7 +436,10 @@ class Result(models.Model): ...@@ -436,7 +436,10 @@ class Result(models.Model):
def _get_stats(self): def _get_stats(self):
return beat.core.stats.Statistics(simplejson.loads(self._stats)) if self._stats is not None:
return beat.core.stats.Statistics(simplejson.loads(self._stats))
else:
return beat.core.stats.Statistics()
def _set_stats(self, v): def _set_stats(self, v):
...@@ -481,11 +484,22 @@ class Job(models.Model): ...@@ -481,11 +484,22 @@ class Job(models.Model):
end_date = models.DateTimeField(null=True, blank=True) end_date = models.DateTimeField(null=True, blank=True)
parent = models.ForeignKey('self', related_name='child', null=True) parent = models.OneToOneField('self', related_name='child', null=True,
on_delete=models.SET_NULL)
split_errors = models.PositiveIntegerField(default=0) split_errors = models.PositiveIntegerField(default=0)
def _get_child(self):
return self.child if hasattr(self, 'child') else None
def _set_child(self, val):
val.parent = self
val.save()
child_ = property(_get_child, _set_child)
def __str__(self): def __str__(self):
return "Job(%s, %s, splits=%d, status=%s, cores=%d)" % \ return "Job(%s, %s, splits=%d, status=%s, cores=%d)" % \
...@@ -520,6 +534,7 @@ class Job(models.Model): ...@@ -520,6 +534,7 @@ class Job(models.Model):
syserr=other.result.syserr, syserr=other.result.syserr,
_stats=other.result._stats, _stats=other.result._stats,
) )
r.save()
self.result = r self.result = r
if self.done(): if self.done():
...@@ -560,7 +575,9 @@ class Job(models.Model): ...@@ -560,7 +575,9 @@ class Job(models.Model):
'''Runs the index splitting machinery once for all associated splits''' '''Runs the index splitting machinery once for all associated splits'''
# no index spliting is required # no index spliting is required
if self.block.required_slots == 1: return if self.block.required_slots == 1:
self.save()
return
indices = [] indices = []
...@@ -625,15 +642,15 @@ class Job(models.Model): ...@@ -625,15 +642,15 @@ class Job(models.Model):
from ..experiments.models import Block from ..experiments.models import Block
for b in self.block.dependents: for b in self.block.dependents.all():
if any([k.state in (Block.FAILED, Block.CANCELLED) \ if any([k.status in (Block.FAILED, Block.CANCELLED) \
for k in b.dependencies.all()]): for k in b.dependencies.all()]):
for split in b.job.splits: split.end(None, Job.CANCELLED) for split in b.job.splits: split.end(None, Job.CANCELLED)
if all([k.state in (Block.CACHED, Block.SKIPPED) \ if all([k.status in (Block.CACHED, Block.SKIPPED) \
for k in b.dependencies]): for k in b.dependencies.all()]):
b.job.make_runnable() b.job.make_runnable()
if self.child: if hasattr(self, 'child'):
if self.status == Job.CANCELLED: if self.status == Job.CANCELLED:
if self.parent: #I have a parent, so must give to child if self.parent: #I have a parent, so must give to child
parent = self.parent parent = self.parent
...@@ -645,7 +662,7 @@ class Job(models.Model): ...@@ -645,7 +662,7 @@ class Job(models.Model):
self.child.copy(job) self.child.copy(job)
self.child._cascade_updates() self.child._cascade_updates()
if self.parent and self.state == Job.CANCELLED: if self.parent and self.status == Job.CANCELLED:
self.parent = None self.parent = None
# update state of parent block/experiment # update state of parent block/experiment
...@@ -668,21 +685,25 @@ class Job(models.Model): ...@@ -668,21 +685,25 @@ class Job(models.Model):
split_statuses = self.splits.values_list('status', flat=True) split_statuses = self.splits.values_list('status', flat=True)
if self.start_date is None:
self.start_date = \
self.splits.order_by('start_date').first().start_date
# Process main status and status from job results # Process main status and status from job results
if Job.FAILED in job_statuses: if Job.FAILED in split_statuses:
self.status = Job.FAILED self.status = Job.FAILED
elif Job.CANCELLED in job_statuses: elif Job.CANCELLED in split_statuses:
self.status = Job.CANCELLED self.status = Job.CANCELLED
elif (Job.PROCESSING in job_statuses) or \ elif (Job.PROCESSING in split_statuses) or \
(Job.QUEUED in job_statuses and Job.COMPLETED in job_statuses): (Job.QUEUED in split_statuses and Job.COMPLETED in split_statuses):
self.status = Job.PROCESSING self.status = Job.PROCESSING
elif all([s == Job.SKIPPED for s in job_statuses]): elif all([s == Job.SKIPPED for s in split_statuses]):
self.status = Job.SKIPPED self.status = Job.SKIPPED
elif Job.QUEUED not in job_statuses: elif Job.QUEUED not in split_statuses:
self.status = Job.COMPLETED self.status = Job.COMPLETED
else: else:
...@@ -695,13 +716,13 @@ class Job(models.Model): ...@@ -695,13 +716,13 @@ class Job(models.Model):
diff_timings = self._merge_results() diff_timings = self._merge_results()
self.end_date = self.splits.order_by('-end_date').\ self.end_date = self.splits.order_by('-end_date').\
first().end_date first().end_date
Result.objects.filter(splits__in=self.splits).delete() Result.objects.filter(splits__in=self.splits.all()).delete()
seqtime = sum(diff_timings) seqtime = sum(diff_timings)
timings = dict( timings = dict(
queuing = (self.runnable_date - self.start_date).seconds, queuing = (self.start_date - self.runnable_date).total_seconds(),
linear_execution = seqtime, linear_execution = seqtime,
speed_up_real = float(seqtime) / \ speed_up_real = float(seqtime) / \
(self.end_date - self.start_date).seconds, (self.end_date - self.start_date).total_seconds(),
speed_up_maximal = float(seqtime) / max(diff_timings), speed_up_maximal = float(seqtime) / max(diff_timings),
) )
self.runnable_date = None self.runnable_date = None
...@@ -710,11 +731,9 @@ class Job(models.Model): ...@@ -710,11 +731,9 @@ class Job(models.Model):
# compute final timings and update parent block # compute final timings and update parent block
self.save() self.save()
self.block._update_state(timings)
# updates the dependents and child state # updates the dependents and child state
self._cascade_updates()
self.save() self.save()
self._cascade_updates()
def erase_dangling_files(self): def erase_dangling_files(self):
...@@ -756,10 +775,9 @@ class Job(models.Model): ...@@ -756,10 +775,9 @@ class Job(models.Model):
'''Merge results from jobs, if any exist''' '''Merge results from jobs, if any exist'''
# update results # update results
job_results = [k for k in self.splits.values_list('result', flat=True) \ job_results = Result.objects.filter(pk__in=self.splits.filter(result__isnull=False).values_list('result', flat=True))
if k is not None]
diff_timings = [(k[0]-k[1]).seconds for k in \ diff_timings = [(k[0]-k[1]).total_seconds() for k in \
self.splits.values_list('end_date', 'start_date')] self.splits.values_list('end_date', 'start_date')]
status = sum([k.status for k in job_results]) status = sum([k.status for k in job_results])
...@@ -772,16 +790,16 @@ class Job(models.Model): ...@@ -772,16 +790,16 @@ class Job(models.Model):
if job_results: if job_results:
stats = job_results[0].stats stats = job_results[0].stats
for k in job_results[1:]: stats += k.stats for k in job_results[1:]: stats += k.stats
stats = stats.as_dict() stats = stats
else: else:
stats = None stats = beat.core.stats.Statistics()
cancelled = any([k.cancelled for k in job_results]) cancelled = any([k.cancelled for k in job_results])
timed_out = any([k.timed_out for k in job_results]) timed_out = any([k.timed_out for k in job_results])
r = Result(status=status, stdout=stdout, stderr=stderr, usrerr=usrerr, r = Result(status=status, stdout=stdout, stderr=stderr, usrerr=usrerr,
syserr=syserr, _status=stats, timed_out=timed_out, syserr=syserr, timed_out=timed_out, cancelled=cancelled)
cancelled=cancelled) r.stats = stats
r.save() r.save()
self.result = r self.result = r
...@@ -826,14 +844,14 @@ class JobSplit(models.Model): ...@@ -826,14 +844,14 @@ class JobSplit(models.Model):
def __str__(self): def __str__(self):
return "JobSplit(%s, index=%d, state=%s)%s" % \ return "JobSplit(%s, index=%d, state=%s)%s" % \
(self.job, self.split_index, self.state, (self.job, self.split_index, self.status,
('@%s' % self.worker) if self.worker else '') ('@%s' % self.worker) if self.worker else '')
def done(self): def done(self):
'''Says whether the job has finished or not''' '''Says whether the job has finished or not'''
return self.state in ( return self.status in (
Job.COMPLETED, Job.COMPLETED,
Job.SKIPPED, Job.SKIPPED,
Job.FAILED, Job.FAILED,
...@@ -883,10 +901,7 @@ class JobSplit(models.Model): ...@@ -883,10 +901,7 @@ class JobSplit(models.Model):
self.start_date = datetime.datetime.now() self.start_date = datetime.datetime.now()
if self.block.start_date is None: self.status = Job.PROCESSING
self.block.start_date = self.start_date
self.state = Job.PROCESSING
self.save() self.save()
self.job._update_state() self.job._update_state()
...@@ -900,8 +915,8 @@ class JobSplit(models.Model): ...@@ -900,8 +915,8 @@ class JobSplit(models.Model):
JobSplit.objects.select_for_update().filter(pk=self.pk) JobSplit.objects.select_for_update().filter(pk=self.pk)
# If this split is running, then wait # If this split is running, then wait
if self.state == Job.PROCESSING and (self.process_id is not None): if self.status == Job.PROCESSING and (self.process_id is not None):
self.state = Job.CANCELLED self.status = Job.CANCELLED
self.save() self.save()
logger.info("Job split `%s' is currently processing. Waiting " \ logger.info("Job split `%s' is currently processing. Waiting " \
"for worker to cancel split remotely.", self) "for worker to cancel split remotely.", self)
...@@ -944,10 +959,11 @@ class JobSplit(models.Model): ...@@ -944,10 +959,11 @@ class JobSplit(models.Model):
self.worker = None #frees worker slot self.worker = None #frees worker slot
if result: if result:
self.state = Job.COMPLETED if result.status == 0 else Job.FAILED self.status = Job.COMPLETED if result.status == 0 else Job.FAILED
if result.id is None: result.save()
self.result = result self.result = result
else: else:
self.state = state self.status = state
self.save() self.save()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment