diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 16616bc8e4290fb945f14a43283dd01c9bd2e333..ccd7cdd6b2f60390df623d4336327832e3d568a6 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -206,7 +206,7 @@ class Worker(models.Model): def load(self): '''Calculates the number of cores being used currently''' - return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) + return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=Job.PROCESSING)]) def available_cores(self): @@ -436,7 +436,10 @@ class Result(models.Model): def _get_stats(self): - return beat.core.stats.Statistics(simplejson.loads(self._stats)) + if self._stats is not None: + return beat.core.stats.Statistics(simplejson.loads(self._stats)) + else: + return beat.core.stats.Statistics() def _set_stats(self, v): @@ -481,11 +484,22 @@ class Job(models.Model): end_date = models.DateTimeField(null=True, blank=True) - parent = models.ForeignKey('self', related_name='child', null=True) + parent = models.OneToOneField('self', related_name='child', null=True, + on_delete=models.SET_NULL) split_errors = models.PositiveIntegerField(default=0) + def _get_child(self): + return self.child if hasattr(self, 'child') else None + + def _set_child(self, val): + val.parent = self + val.save() + + child_ = property(_get_child, _set_child) + + def __str__(self): return "Job(%s, %s, splits=%d, status=%s, cores=%d)" % \ @@ -520,6 +534,7 @@ class Job(models.Model): syserr=other.result.syserr, _stats=other.result._stats, ) + r.save() self.result = r if self.done(): @@ -560,7 +575,9 @@ class Job(models.Model): '''Runs the index splitting machinery once for all associated splits''' # no index spliting is required - if self.block.required_slots == 1: return + if self.block.required_slots == 1: + self.save() + return indices = [] @@ -625,15 +642,15 @@ class Job(models.Model): from ..experiments.models import Block - for b in self.block.dependents: - if any([k.state in (Block.FAILED, Block.CANCELLED) \ + for b in self.block.dependents.all(): + if any([k.status in (Block.FAILED, Block.CANCELLED) \ for k in b.dependencies.all()]): for split in b.job.splits: split.end(None, Job.CANCELLED) - if all([k.state in (Block.CACHED, Block.SKIPPED) \ - for k in b.dependencies]): + if all([k.status in (Block.CACHED, Block.SKIPPED) \ + for k in b.dependencies.all()]): b.job.make_runnable() - if self.child: + if hasattr(self, 'child'): if self.status == Job.CANCELLED: if self.parent: #I have a parent, so must give to child parent = self.parent @@ -645,7 +662,7 @@ class Job(models.Model): self.child.copy(job) self.child._cascade_updates() - if self.parent and self.state == Job.CANCELLED: + if self.parent and self.status == Job.CANCELLED: self.parent = None # update state of parent block/experiment @@ -668,21 +685,25 @@ class Job(models.Model): split_statuses = self.splits.values_list('status', flat=True) + if self.start_date is None: + self.start_date = \ + self.splits.order_by('start_date').first().start_date + # Process main status and status from job results - if Job.FAILED in job_statuses: + if Job.FAILED in split_statuses: self.status = Job.FAILED - elif Job.CANCELLED in job_statuses: + elif Job.CANCELLED in split_statuses: self.status = Job.CANCELLED - elif (Job.PROCESSING in job_statuses) or \ - (Job.QUEUED in job_statuses and Job.COMPLETED in job_statuses): + elif (Job.PROCESSING in split_statuses) or \ + (Job.QUEUED in split_statuses and Job.COMPLETED in split_statuses): self.status = Job.PROCESSING - elif all([s == Job.SKIPPED for s in job_statuses]): + elif all([s == Job.SKIPPED for s in split_statuses]): self.status = Job.SKIPPED - elif Job.QUEUED not in job_statuses: + elif Job.QUEUED not in split_statuses: self.status = Job.COMPLETED else: @@ -695,13 +716,13 @@ class Job(models.Model): diff_timings = self._merge_results() self.end_date = self.splits.order_by('-end_date').\ first().end_date - Result.objects.filter(splits__in=self.splits).delete() + Result.objects.filter(splits__in=self.splits.all()).delete() seqtime = sum(diff_timings) timings = dict( - queuing = (self.runnable_date - self.start_date).seconds, + queuing = (self.start_date - self.runnable_date).total_seconds(), linear_execution = seqtime, speed_up_real = float(seqtime) / \ - (self.end_date - self.start_date).seconds, + (self.end_date - self.start_date).total_seconds(), speed_up_maximal = float(seqtime) / max(diff_timings), ) self.runnable_date = None @@ -710,11 +731,9 @@ class Job(models.Model): # compute final timings and update parent block self.save() - self.block._update_state(timings) - # updates the dependents and child state - self._cascade_updates() self.save() + self._cascade_updates() def erase_dangling_files(self): @@ -756,10 +775,9 @@ class Job(models.Model): '''Merge results from jobs, if any exist''' # update results - job_results = [k for k in self.splits.values_list('result', flat=True) \ - if k is not None] + job_results = Result.objects.filter(pk__in=self.splits.filter(result__isnull=False).values_list('result', flat=True)) - diff_timings = [(k[0]-k[1]).seconds for k in \ + diff_timings = [(k[0]-k[1]).total_seconds() for k in \ self.splits.values_list('end_date', 'start_date')] status = sum([k.status for k in job_results]) @@ -772,16 +790,16 @@ class Job(models.Model): if job_results: stats = job_results[0].stats for k in job_results[1:]: stats += k.stats - stats = stats.as_dict() + stats = stats else: - stats = None + stats = beat.core.stats.Statistics() cancelled = any([k.cancelled for k in job_results]) timed_out = any([k.timed_out for k in job_results]) r = Result(status=status, stdout=stdout, stderr=stderr, usrerr=usrerr, - syserr=syserr, _status=stats, timed_out=timed_out, - cancelled=cancelled) + syserr=syserr, timed_out=timed_out, cancelled=cancelled) + r.stats = stats r.save() self.result = r @@ -826,14 +844,14 @@ class JobSplit(models.Model): def __str__(self): return "JobSplit(%s, index=%d, state=%s)%s" % \ - (self.job, self.split_index, self.state, + (self.job, self.split_index, self.status, ('@%s' % self.worker) if self.worker else '') def done(self): '''Says whether the job has finished or not''' - return self.state in ( + return self.status in ( Job.COMPLETED, Job.SKIPPED, Job.FAILED, @@ -883,10 +901,7 @@ class JobSplit(models.Model): self.start_date = datetime.datetime.now() - if self.block.start_date is None: - self.block.start_date = self.start_date - - self.state = Job.PROCESSING + self.status = Job.PROCESSING self.save() self.job._update_state() @@ -900,8 +915,8 @@ class JobSplit(models.Model): JobSplit.objects.select_for_update().filter(pk=self.pk) # If this split is running, then wait - if self.state == Job.PROCESSING and (self.process_id is not None): - self.state = Job.CANCELLED + if self.status == Job.PROCESSING and (self.process_id is not None): + self.status = Job.CANCELLED self.save() logger.info("Job split `%s' is currently processing. Waiting " \ "for worker to cancel split remotely.", self) @@ -944,10 +959,11 @@ class JobSplit(models.Model): self.worker = None #frees worker slot if result: - self.state = Job.COMPLETED if result.status == 0 else Job.FAILED + self.status = Job.COMPLETED if result.status == 0 else Job.FAILED + if result.id is None: result.save() self.result = result else: - self.state = state + self.status = state self.save()