From 4e0b9def577faf918f82580c60804536d63cdf7c Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Thu, 28 Apr 2016 16:01:55 +0200 Subject: [PATCH] [backend] Protect time diff calculations better --- beat/web/backend/models.py | 75 +++++++++++++++++++++++------------- beat/web/backend/schedule.py | 1 - 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 2edfeb82c..386e79e3a 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -32,6 +32,7 @@ import operator import logging logger = logging.getLogger(__name__) +import psutil import simplejson from django.db import models @@ -259,30 +260,29 @@ class Worker(models.Model): # check I have at least all cores and memory I'm supposed to have cores = psutil.cpu_count() ram = psutil.virtual_memory().total/(1024*1024) - worker.info = '' + self.info = '' - if cores < worker.cores: + if cores < self.cores: logger.warn("Worker `%s' only has %d cores which is less then " \ "the value declared on the database - it's not a problem, " \ - "but note this worker may get overloaded", worker, cores) - worker.info += 'only %d cores;' % cores + "but note this self may get overloaded", self, cores) + self.info += 'only %d cores;' % cores - if ram < worker.memory: + if ram < self.memory: logger.warn("Worker `%s' only has %d Mb of RAM which is less " \ "then the value declared on the database - it's not a " \ - "problem, but note this worker may get overloaded", worker, + "problem, but note this self may get overloaded", self, ram) - worker.info += 'only %d Mb of RAM;' % ram + self.info += 'only %d Mb of RAM;' % ram # update process and memory usage - worker.used_cores = int(psutil.cpu_percent()) - worker.used_memory = int(psutil.virtual_memory().percent) + self.used_cores = int(psutil.cpu_percent()) + self.used_memory = int(psutil.virtual_memory().percent) - # save current worker state - worker.info += 'updated: ' + time.asctime() - worker.active = True - worker.update = False - worker.save() + # save current self state + self.active = True + self.update = False + self.save() #---------------------------------------------------------- @@ -641,10 +641,10 @@ class Job(models.Model): # For all synchronized inputs with the current block, append the # list of generated object indices. This is necessary for an # informed decision on where to split the processing - sync = [i for i in conf['inputs'] if i['channel']==conf['channel']] + sync = [conf['inputs'][i] for i in conf['inputs'] if conf['inputs'][i]['channel']==conf['channel']] for i in sync: indices.append(beat.core.data.load_data_index( - settings.CACHE_ROOT, i.path)) + settings.CACHE_ROOT, str(i['path']))) # Determine N splits using the possible indices for split: indices = beat.core.data.foundSplitRanges(indices, @@ -665,16 +665,18 @@ class Job(models.Model): "error: %s" % (self.block.name, self.block.experiment.fullname(), format_exc()) logger.error(message) - split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, - syserr=message)) + for split in self.splits.all(): + split.end(Result(status=1, + usrerr=settings.DEFAULT_USER_ERROR, syserr=message)) if len(indices) == 0: message = "Index splitting for block `%s' of experiment " \ "`%s' could not be completed: not splittable!" % \ (self.block.name, self.block.experiment.fullname()) logger.error(message) - split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, - syserr=message)) + for split in self.splits.all(): + split.end(Result(status=1, + usrerr=settings.DEFAULT_USER_ERROR, syserr=message)) # sets ranges for all splits that are possible, immediately stops # all other splits @@ -730,8 +732,11 @@ class Job(models.Model): split_statuses = self.splits.values_list('status', flat=True) if self.start_date is None: - self.start_date = \ - self.splits.order_by('start_date').first().start_date + qs = self.splits.filter(start_date__isnull=False).order_by('start_date') + if qs: + self.start_date = qs.first().start_date + else: + self.start_date = datetime.datetime.now() # Process main status and status from job results if Job.FAILED in split_statuses: @@ -761,17 +766,30 @@ class Job(models.Model): # compute final timings and update parent block if self.status != Job.SKIPPED: diff_timings = self._merge_results() + # delete all splits w/o results (still queued) + self.splits.filter(result__isnull=True).delete() self.end_date = self.splits.order_by('-end_date').\ first().end_date updateStatistics(self.result.stats, self.end_date) Result.objects.filter(split__in=self.splits.all()).delete() seqtime = sum(diff_timings) + if self.start_date is None: + queuing = 0 + else: + queuing = (self.start_date - \ + self.runnable_date).total_seconds() + if not seqtime: + speed_up_real = 1.0 + speed_up_maximal = 1.0 + else: + speed_up_real = float(seqtime) / \ + (self.end_date - self.start_date).total_seconds() + speed_up_maximal = float(seqtime) / max(diff_timings) timings = dict( - queuing = (self.start_date - self.runnable_date).total_seconds(), + queuing = queuing, linear_execution = seqtime, - speed_up_real = float(seqtime) / \ - (self.end_date - self.start_date).total_seconds(), - speed_up_maximal = float(seqtime) / max(diff_timings), + speed_up_real = speed_up_real, + speed_up_maximal = speed_up_maximal, ) self.runnable_date = None self.erase_dangling_files() @@ -829,7 +847,8 @@ class Job(models.Model): job_results = Result.objects.filter(pk__in=self.splits.filter(result__isnull=False).values_list('result', flat=True)) diff_timings = [(k[0]-k[1]).total_seconds() for k in \ - self.splits.values_list('end_date', 'start_date')] + self.splits.filter(end_date__isnull=False, + start_date__isnull=False).values_list('end_date', 'start_date')] status = sum([k.status for k in job_results]) stdout = _merge_strings([k.stdout for k in job_results]) @@ -1007,6 +1026,8 @@ class JobSplit(models.Model): if status: logger.info("Marking job split `%s' as '%s'", self, status) + if self.start_date is None: + self.start_date = datetime.datetime.now() self.end_date = datetime.datetime.now() self.worker = None #frees worker slot diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py index 71968b48c..10c8fbf9c 100644 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -36,7 +36,6 @@ import subprocess import logging logger = logging.getLogger(__name__) -import psutil import simplejson from django.conf import settings -- GitLab