Skip to content
Snippets Groups Projects
Commit 4e0b9def authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Protect time diff calculations better

parent b62a7d21
No related branches found
No related tags found
1 merge request!194Scheduler
...@@ -32,6 +32,7 @@ import operator ...@@ -32,6 +32,7 @@ import operator
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import psutil
import simplejson import simplejson
from django.db import models from django.db import models
...@@ -259,30 +260,29 @@ class Worker(models.Model): ...@@ -259,30 +260,29 @@ class Worker(models.Model):
# check I have at least all cores and memory I'm supposed to have # check I have at least all cores and memory I'm supposed to have
cores = psutil.cpu_count() cores = psutil.cpu_count()
ram = psutil.virtual_memory().total/(1024*1024) ram = psutil.virtual_memory().total/(1024*1024)
worker.info = '' self.info = ''
if cores < worker.cores: if cores < self.cores:
logger.warn("Worker `%s' only has %d cores which is less then " \ logger.warn("Worker `%s' only has %d cores which is less then " \
"the value declared on the database - it's not a problem, " \ "the value declared on the database - it's not a problem, " \
"but note this worker may get overloaded", worker, cores) "but note this self may get overloaded", self, cores)
worker.info += 'only %d cores;' % cores self.info += 'only %d cores;' % cores
if ram < worker.memory: if ram < self.memory:
logger.warn("Worker `%s' only has %d Mb of RAM which is less " \ logger.warn("Worker `%s' only has %d Mb of RAM which is less " \
"then the value declared on the database - it's not a " \ "then the value declared on the database - it's not a " \
"problem, but note this worker may get overloaded", worker, "problem, but note this self may get overloaded", self,
ram) ram)
worker.info += 'only %d Mb of RAM;' % ram self.info += 'only %d Mb of RAM;' % ram
# update process and memory usage # update process and memory usage
worker.used_cores = int(psutil.cpu_percent()) self.used_cores = int(psutil.cpu_percent())
worker.used_memory = int(psutil.virtual_memory().percent) self.used_memory = int(psutil.virtual_memory().percent)
# save current worker state # save current self state
worker.info += 'updated: ' + time.asctime() self.active = True
worker.active = True self.update = False
worker.update = False self.save()
worker.save()
#---------------------------------------------------------- #----------------------------------------------------------
...@@ -641,10 +641,10 @@ class Job(models.Model): ...@@ -641,10 +641,10 @@ class Job(models.Model):
# For all synchronized inputs with the current block, append the # For all synchronized inputs with the current block, append the
# list of generated object indices. This is necessary for an # list of generated object indices. This is necessary for an
# informed decision on where to split the processing # informed decision on where to split the processing
sync = [i for i in conf['inputs'] if i['channel']==conf['channel']] sync = [conf['inputs'][i] for i in conf['inputs'] if conf['inputs'][i]['channel']==conf['channel']]
for i in sync: for i in sync:
indices.append(beat.core.data.load_data_index( indices.append(beat.core.data.load_data_index(
settings.CACHE_ROOT, i.path)) settings.CACHE_ROOT, str(i['path'])))
# Determine N splits using the possible indices for split: # Determine N splits using the possible indices for split:
indices = beat.core.data.foundSplitRanges(indices, indices = beat.core.data.foundSplitRanges(indices,
...@@ -665,16 +665,18 @@ class Job(models.Model): ...@@ -665,16 +665,18 @@ class Job(models.Model):
"error: %s" % (self.block.name, "error: %s" % (self.block.name,
self.block.experiment.fullname(), format_exc()) self.block.experiment.fullname(), format_exc())
logger.error(message) logger.error(message)
split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, for split in self.splits.all():
syserr=message)) split.end(Result(status=1,
usrerr=settings.DEFAULT_USER_ERROR, syserr=message))
if len(indices) == 0: if len(indices) == 0:
message = "Index splitting for block `%s' of experiment " \ message = "Index splitting for block `%s' of experiment " \
"`%s' could not be completed: not splittable!" % \ "`%s' could not be completed: not splittable!" % \
(self.block.name, self.block.experiment.fullname()) (self.block.name, self.block.experiment.fullname())
logger.error(message) logger.error(message)
split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, for split in self.splits.all():
syserr=message)) split.end(Result(status=1,
usrerr=settings.DEFAULT_USER_ERROR, syserr=message))
# sets ranges for all splits that are possible, immediately stops # sets ranges for all splits that are possible, immediately stops
# all other splits # all other splits
...@@ -730,8 +732,11 @@ class Job(models.Model): ...@@ -730,8 +732,11 @@ class Job(models.Model):
split_statuses = self.splits.values_list('status', flat=True) split_statuses = self.splits.values_list('status', flat=True)
if self.start_date is None: if self.start_date is None:
self.start_date = \ qs = self.splits.filter(start_date__isnull=False).order_by('start_date')
self.splits.order_by('start_date').first().start_date if qs:
self.start_date = qs.first().start_date
else:
self.start_date = datetime.datetime.now()
# Process main status and status from job results # Process main status and status from job results
if Job.FAILED in split_statuses: if Job.FAILED in split_statuses:
...@@ -761,17 +766,30 @@ class Job(models.Model): ...@@ -761,17 +766,30 @@ class Job(models.Model):
# compute final timings and update parent block # compute final timings and update parent block
if self.status != Job.SKIPPED: if self.status != Job.SKIPPED:
diff_timings = self._merge_results() diff_timings = self._merge_results()
# delete all splits w/o results (still queued)
self.splits.filter(result__isnull=True).delete()
self.end_date = self.splits.order_by('-end_date').\ self.end_date = self.splits.order_by('-end_date').\
first().end_date first().end_date
updateStatistics(self.result.stats, self.end_date) updateStatistics(self.result.stats, self.end_date)
Result.objects.filter(split__in=self.splits.all()).delete() Result.objects.filter(split__in=self.splits.all()).delete()
seqtime = sum(diff_timings) seqtime = sum(diff_timings)
if self.start_date is None:
queuing = 0
else:
queuing = (self.start_date - \
self.runnable_date).total_seconds()
if not seqtime:
speed_up_real = 1.0
speed_up_maximal = 1.0
else:
speed_up_real = float(seqtime) / \
(self.end_date - self.start_date).total_seconds()
speed_up_maximal = float(seqtime) / max(diff_timings)
timings = dict( timings = dict(
queuing = (self.start_date - self.runnable_date).total_seconds(), queuing = queuing,
linear_execution = seqtime, linear_execution = seqtime,
speed_up_real = float(seqtime) / \ speed_up_real = speed_up_real,
(self.end_date - self.start_date).total_seconds(), speed_up_maximal = speed_up_maximal,
speed_up_maximal = float(seqtime) / max(diff_timings),
) )
self.runnable_date = None self.runnable_date = None
self.erase_dangling_files() self.erase_dangling_files()
...@@ -829,7 +847,8 @@ class Job(models.Model): ...@@ -829,7 +847,8 @@ class Job(models.Model):
job_results = Result.objects.filter(pk__in=self.splits.filter(result__isnull=False).values_list('result', flat=True)) job_results = Result.objects.filter(pk__in=self.splits.filter(result__isnull=False).values_list('result', flat=True))
diff_timings = [(k[0]-k[1]).total_seconds() for k in \ diff_timings = [(k[0]-k[1]).total_seconds() for k in \
self.splits.values_list('end_date', 'start_date')] self.splits.filter(end_date__isnull=False,
start_date__isnull=False).values_list('end_date', 'start_date')]
status = sum([k.status for k in job_results]) status = sum([k.status for k in job_results])
stdout = _merge_strings([k.stdout for k in job_results]) stdout = _merge_strings([k.stdout for k in job_results])
...@@ -1007,6 +1026,8 @@ class JobSplit(models.Model): ...@@ -1007,6 +1026,8 @@ class JobSplit(models.Model):
if status: if status:
logger.info("Marking job split `%s' as '%s'", self, status) logger.info("Marking job split `%s' as '%s'", self, status)
if self.start_date is None:
self.start_date = datetime.datetime.now()
self.end_date = datetime.datetime.now() self.end_date = datetime.datetime.now()
self.worker = None #frees worker slot self.worker = None #frees worker slot
......
...@@ -36,7 +36,6 @@ import subprocess ...@@ -36,7 +36,6 @@ import subprocess
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
import psutil
import simplejson import simplejson
from django.conf import settings from django.conf import settings
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment