diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 386e79e3a93243c0d7668ac3bfcc1fcc66e0603a..dbf6d185a18798a6f7c779cf5cea61fb605267e3 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -573,9 +573,9 @@ class Job(models.Model): def _copy(self, other): '''Copy state from another block''' - Job.objects.select_for_update().filter(pk=self.pk) + self_ = Job.objects.select_for_update().get(pk=self.pk) - if self.done(): return + if self_.done(): return self.start_date = other.start_date self.end_date = other.end_date @@ -592,7 +592,7 @@ class Job(models.Model): '''Tries to make this job runnable - if it is cached, we skip it''' # lock self - avoids concurrent update from scheduler/worker subsystem - Job.objects.select_for_update().filter(pk=self.pk) + self_ = Job.objects.select_for_update().get(pk=self.pk) # checks for the presence of output caches - if they exist and # checksum, skip and update related blocks @@ -615,11 +615,6 @@ class Job(models.Model): # else, flag it as runnable self.runnable_date = datetime.datetime.now() - # creates the N job splits to that will be executed on the backend farm - for i in range(self.block.required_slots): - s = JobSplit(job=self, split_index=i) - s.save() - # runs index splitting once, for all created splits self._split_indices() @@ -630,6 +625,8 @@ class Job(models.Model): # no index spliting is required if self.block.required_slots == 1: self.save() + s = JobSplit(job=self, split_index=0) + s.save() return indices = [] @@ -653,6 +650,25 @@ class Job(models.Model): self.split_errors = 0 self.save() + if len(indices) == 0: + message = "Index splitting for block `%s' of experiment " \ + "`%s' could not be completed: not splittable!" % \ + (self.block.name, self.block.experiment.fullname()) + logger.warn(message) + self._cancel(usrerr=settings.DEFAULT_USER_ERROR, syserr=message) + + # if you get to this point, the splitting has succeeded, + # create the necessary splits and assign the ranges + for i, s in enumerate(indices): + split_indices = indices[i] + s = JobSplit( + job=self, + split_index=i, + start_index=split_indices[0], + end_index=split_indices[0], + ) + s.save() + except Exception as e: self.split_errors += 1 @@ -664,30 +680,8 @@ class Job(models.Model): "`%s' could not be completed due to an index split " \ "error: %s" % (self.block.name, self.block.experiment.fullname(), format_exc()) - logger.error(message) - for split in self.splits.all(): - split.end(Result(status=1, - usrerr=settings.DEFAULT_USER_ERROR, syserr=message)) - - if len(indices) == 0: - message = "Index splitting for block `%s' of experiment " \ - "`%s' could not be completed: not splittable!" % \ - (self.block.name, self.block.experiment.fullname()) - logger.error(message) - for split in self.splits.all(): - split.end(Result(status=1, - usrerr=settings.DEFAULT_USER_ERROR, syserr=message)) - - # sets ranges for all splits that are possible, immediately stops - # all other splits - for i, s in enumerate(self.splits.select_for_update().all()): - if i < len(indices): - split_indices = indices[i] - s.start_index = split_indices[0] - s.end_index = split_indices[0] - s.save() - else: - s.end(None, Job.SKIPPED) #this split is not necessary + logger.warn(message) + self._cancel(usrerr=settings.DEFAULT_USER_ERROR, syserr=message) def _cascade_updates(self): @@ -713,7 +707,6 @@ class Job(models.Model): self.parent = None - @transaction.atomic def _update_state(self): '''Update self state based on associated job states @@ -721,81 +714,86 @@ class Job(models.Model): called by children splits or itself. ''' - # lock - Job.objects.select_for_update().filter(pk=self.pk) + with transaction.atomic(): - if self.done(): return + # lock + self_ = Job.objects.select_for_update().get(pk=self.pk) - # If this process has a parent, then don't try to get split statuses - if not self.parent: + if self_.done(): return - split_statuses = self.splits.values_list('status', flat=True) + # If this process has a parent, then don't try to get split + # statuses + if not self.parent: - if self.start_date is None: - qs = self.splits.filter(start_date__isnull=False).order_by('start_date') - if qs: - self.start_date = qs.first().start_date - else: - self.start_date = datetime.datetime.now() + split_statuses = self.splits.values_list('status', flat=True) - # Process main status and status from job results - if Job.FAILED in split_statuses: - self.status = Job.FAILED + if self.start_date is None: + qs = self.splits.filter(start_date__isnull=False).\ + order_by('start_date') + if qs: + self.start_date = qs.first().start_date + else: + self.start_date = datetime.datetime.now() - elif Job.CANCELLED in split_statuses: - self.status = Job.CANCELLED + # Process main status and status from job results + if Job.FAILED in split_statuses: + self.status = Job.FAILED - elif (Job.PROCESSING in split_statuses) or \ - (Job.QUEUED in split_statuses and \ - Job.COMPLETED in split_statuses) or \ - (Job.KILL in split_statuses): - self.status = Job.PROCESSING + elif Job.CANCELLED in split_statuses: + self.status = Job.CANCELLED - elif all([s == Job.SKIPPED for s in split_statuses]): - self.status = Job.SKIPPED + elif (Job.PROCESSING in split_statuses) or \ + (Job.QUEUED in split_statuses and \ + Job.COMPLETED in split_statuses) or \ + (Job.KILL in split_statuses): + self.status = Job.PROCESSING - elif Job.QUEUED not in split_statuses: - self.status = Job.COMPLETED + elif all([s == Job.SKIPPED for s in split_statuses]): + self.status = Job.SKIPPED + + elif Job.QUEUED not in split_statuses: + self.status = Job.COMPLETED - else: - self.status = Job.QUEUED - - # if required, erase dangling files, update own results - timings = None - if self.done() and self.status != Job.CANCELLED: - # compute final timings and update parent block - if self.status != Job.SKIPPED: - diff_timings = self._merge_results() - # delete all splits w/o results (still queued) - self.splits.filter(result__isnull=True).delete() - self.end_date = self.splits.order_by('-end_date').\ - first().end_date - updateStatistics(self.result.stats, self.end_date) - Result.objects.filter(split__in=self.splits.all()).delete() - seqtime = sum(diff_timings) - if self.start_date is None: - queuing = 0 - else: - queuing = (self.start_date - \ - self.runnable_date).total_seconds() - if not seqtime: - speed_up_real = 1.0 - speed_up_maximal = 1.0 else: - speed_up_real = float(seqtime) / \ - (self.end_date - self.start_date).total_seconds() - speed_up_maximal = float(seqtime) / max(diff_timings) - timings = dict( - queuing = queuing, - linear_execution = seqtime, - speed_up_real = speed_up_real, - speed_up_maximal = speed_up_maximal, - ) - self.runnable_date = None - self.erase_dangling_files() + self.status = Job.QUEUED + + # if required, erase dangling files, update own results + timings = None + if self.done() and self.status != Job.CANCELLED: + # compute final timings and update parent block + if self.status != Job.SKIPPED: + diff_timings = self._merge_results() + # delete all splits w/o results (still queued) + self.splits.filter(result__isnull=True).delete() + self.end_date = self.splits.order_by('-end_date').\ + first().end_date + updateStatistics(self.result.stats, self.end_date) + Result.objects.filter(split__in=self.splits.all()).delete() + seqtime = sum(diff_timings) + if self.start_date is None: + queuing = 0 + else: + queuing = (self.start_date - \ + self.runnable_date).total_seconds() + if not seqtime: + speed_up_real = 1.0 + speed_up_maximal = 1.0 + else: + speed_up_real = float(seqtime) / \ + (self.end_date - self.start_date).total_seconds() + speed_up_maximal = float(seqtime) / max(diff_timings) + timings = dict( + queuing = queuing, + linear_execution = seqtime, + speed_up_real = speed_up_real, + speed_up_maximal = speed_up_maximal, + ) + self.runnable_date = None + self.erase_dangling_files() + + # updates the dependents and child state + self.save() - # updates the dependents and child state - self.save() self._cascade_updates() self.block._update_state(timings) @@ -816,7 +814,7 @@ class Job(models.Model): @transaction.atomic - def _cancel(self): + def _cancel(self, usrerr=None, syserr=None): '''Cancel the execution of this job As a consequence: delete all associated jobs, mark end_date and set @@ -824,9 +822,9 @@ class Job(models.Model): ''' # lock - Job.objects.select_for_update().filter(pk=self.pk) + self_ = Job.objects.select_for_update().get(pk=self.pk) - if self.done(): return + if self_.done(): return logger.info("Marking job `%s' as 'cancelled'", self) self.runnable_date = None @@ -835,6 +833,10 @@ class Job(models.Model): for s in self.splits.all(): s._cancel() else: self.status = Job.CANCELLED + if usrerr or syserr: + r = Result(status=1, usrerr=usrerr, syserr=syserr) + r.save() + self.result = r self.save() self.block._update_state() self._cascade_updates() @@ -929,7 +931,6 @@ class JobSplit(models.Model): ) - @transaction.atomic def schedule(self, worker): '''Schedules this split to be executed on a given worker @@ -940,17 +941,18 @@ class JobSplit(models.Model): ''' - # lock self - avoids concurrent update from scheduler/worker subsystem - JobSplit.objects.select_for_update().filter(pk=self.pk) + with transaction.atomic(): + # lock self - avoids concurrent update from scheduler/worker + # subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) - self.worker = worker - self.save() + self.worker = worker + self.save() logger.info("Job split %s scheduled at `%s' was assigned to `%s'", self, self.job.block.queue, self.worker) - @transaction.atomic def start(self): '''Marks the job as started, acknowledging scheduler assignment @@ -964,31 +966,34 @@ class JobSplit(models.Model): ''' - # lock self - avoids concurrent update from scheduler/worker subsystem - JobSplit.objects.select_for_update().filter(pk=self.pk) - if self.start_date is not None: return - self.start_date = datetime.datetime.now() - self.process_id = os.getpid() + with transaction.atomic(): + # lock self - avoids concurrent update from scheduler/worker + # subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) - self.status = Job.PROCESSING - self.save() + self.start_date = datetime.datetime.now() + self.process_id = os.getpid() + + self.status = Job.PROCESSING + self.save() self.job._update_state() - @transaction.atomic def _cancel(self): '''Marks this job as cancelled.''' - # lock self - avoids concurrent update from scheduler/worker subsystem - JobSplit.objects.select_for_update().filter(pk=self.pk) - # If this split is running, then wait if self.status == Job.PROCESSING: - self.status = Job.KILL - self.save() + with transaction.atomic(): + # lock self - avoids concurrent update from scheduler/worker + # subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) + self.status = Job.KILL + self.save() + logger.info("Job split `%s' is currently processing. Waiting " \ "for worker to cancel split remotely.", self) @@ -996,7 +1001,6 @@ class JobSplit(models.Model): self.end(None, Job.CANCELLED) - @transaction.atomic def end(self, result, status=None): '''Marks the job as finished on the state database @@ -1018,27 +1022,30 @@ class JobSplit(models.Model): ''' - # lock self - avoids concurrent update from scheduler/worker subsystem - JobSplit.objects.select_for_update().filter(pk=self.pk) + with transaction.atomic(): + + # lock self - avoids concurrent update from scheduler/worker + # subsystem + self_ = JobSplit.objects.select_for_update().get(pk=self.pk) - if self.done(): return + if self_.done(): return - if status: - logger.info("Marking job split `%s' as '%s'", self, status) + if status: + logger.info("Marking job split `%s' as '%s'", self, status) - if self.start_date is None: - self.start_date = datetime.datetime.now() - self.end_date = datetime.datetime.now() - self.worker = None #frees worker slot + if self.start_date is None: + self.start_date = datetime.datetime.now() + self.end_date = datetime.datetime.now() + self.worker = None #frees worker slot - if result: - self.status = Job.COMPLETED if result.status == 0 else Job.FAILED - if result.id is None: result.save() - self.result = result - else: - self.status = status + if result: + self.status = Job.COMPLETED if result.status == 0 else Job.FAILED + if result.id is None: result.save() + self.result = result + else: + self.status = status - self.save() + self.save() logger.info("Job split `%s' finished executing", self) diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index 9379638ef022b0dbf9f987f982df3fb355c1a83c..9d2ac5450ef2e36096b4385c8eff51d82583ac6d 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -930,6 +930,7 @@ class Scheduling(BaseBackendTestCase): split.end(Result(status=1)) self.assertEqual(split.job.status, Job.FAILED) self.assertEqual(split.job.block.status, Block.FAILED) + split.job.block.experiment.refresh_from_db() self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) # checks the number of statistics objects has increased by 1 @@ -1068,6 +1069,7 @@ class Scheduling(BaseBackendTestCase): [str(k) for k in xp.blocks.values_list('status', flat=True)], [Block.CANCELLED, Block.CANCELLED] ) + xp.refresh_from_db() self.assertEqual(xp.status, Experiment.FAILED) # assert we have no database traces after the last block is done @@ -1263,6 +1265,7 @@ class Scheduling(BaseBackendTestCase): split.end(Result(status=1)) self.assertEqual(split.job.status, Job.FAILED) self.assertEqual(split.job.block.status, Block.FAILED) + split.job.block.experiment.refresh_from_db() self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) # checks the number of statistics objects has increased by 1 @@ -1470,9 +1473,11 @@ class Scheduling(BaseBackendTestCase): split.end(Result(status=1)) self.assertEqual(split.job.status, Job.FAILED) self.assertEqual(split.job.block.status, Block.FAILED) + split.job.block.experiment.refresh_from_db() self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) self.assertEqual(split.job.child.status, Job.FAILED) self.assertEqual(split.job.child.block.status, Block.FAILED) + split.job.child.block.experiment.refresh_from_db() self.assertEqual(split.job.child.block.experiment.status, Experiment.FAILED) @@ -1809,6 +1814,36 @@ class Scheduling(BaseBackendTestCase): self.assertRaises(RuntimeError, xp.schedule) + def test_split_no_index(self): + + # tests a simple experiment with splitting and show it can fail + # gracefully + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single_large' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + worker = Worker.objects.get() + + # schedules the experiment and check it + xp.schedule() + xp.refresh_from_db() + self.assertEqual(xp.status, Experiment.FAILED) + + self.assertEqual(xp.blocks.first().status, Block.CANCELLED) + assert xp.blocks.first().error_report().find(settings.DEFAULT_USER_ERROR) == 0 + self.assertEqual(xp.blocks.last().status, Block.CANCELLED) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.filter(block__in=xp.blocks.all()).count(), 0) + self.assertEqual(JobSplit.objects.filter(job__block__in=xp.blocks.all()).count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + class SchedulingPriority(BaseBackendTestCase): diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py index 6ba42de388faaec3fdc6695dd420f23312814551..b0313276e6def9f01c6943f2dea86d525297d5f2 100644 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models.py @@ -44,6 +44,7 @@ from beat.core.utils import NumpyJSONEncoder from ..algorithms.models import Algorithm from ..toolchains.models import Toolchain +from ..utils.api import send_email_to_administrators from ..common.models import Shareable from ..common.models import ContributionManager @@ -742,17 +743,25 @@ class Experiment(Shareable): def update_state(self): '''Update self state based on associated block states''' - Experiment.objects.select_for_update().filter(pk=self.pk) + self_ = Experiment.objects.select_for_update().get(pk=self.pk) - if self.is_done(): return + if self_.is_done(): return if self.start_date is None: - d = self.blocks.order_by('start_date').first().start_date - if d is not None: self.start_date = d + d = self.blocks.filter(start_date__isnull=False).\ + order_by('start_date') + if d: + self.start_date = d.first().start_date + else: + self.start_date = datetime.now() if self.end_date is None: - d = self.blocks.order_by('-end_date').first().end_date - if d is not None: self.end_date = d + d = self.blocks.filter(end_date__isnull=False).\ + order_by('-end_date') + if d: + self.end_date = d.first().end_date + else: + self.end_date = datetime.now() block_statuses = self.blocks.values_list('status', flat=True) @@ -775,7 +784,6 @@ class Experiment(Shareable): self.save() - @transaction.atomic def schedule(self): '''Schedules this experiment for execution at the backend @@ -790,21 +798,21 @@ class Experiment(Shareable): creating one :py:class:`..backend.models.JobSplit` per split. ''' - # lock self - avoids concurrent update from scheduler/worker subsystem - Experiment.objects.select_for_update().filter(pk=self.pk) + self_ = Experiment.objects.get(pk=self.pk) - if self.status != Experiment.PENDING: return + if self_.status != Experiment.PENDING: return for b in self.blocks.all(): b._schedule() # notice that the previous call may decide all is done already # so, we must respect that before setting the SCHEDULED status - if not self.is_done(): - self.status = Experiment.SCHEDULED - self.save() + with transaction.atomic(): + self_ = Experiment.objects.select_for_update().get(pk=self.pk) + if not self_.is_done(): + self.status = Experiment.SCHEDULED + self.save() - @transaction.atomic def cancel(self): '''Cancels the execution of this experiment on the backend. @@ -817,10 +825,10 @@ class Experiment(Shareable): ''' - # lock self - avoids concurrent update from scheduler/worker subsystem - Experiment.objects.select_for_update().filter(pk=self.pk) + self_ = Experiment.objects.get(pk=self.pk) - if self.status not in (Experiment.SCHEDULED, Experiment.RUNNING): return + if self_.status not in (Experiment.SCHEDULED, Experiment.RUNNING): + return for b in self.blocks.all(): b._cancel() @@ -987,9 +995,10 @@ class Block(models.Model): ''' # lock self - avoids concurrent update from scheduler/worker subsystem - Block.objects.select_for_update().filter(pk=self.pk) + self_ = Block.objects.select_for_update().get(pk=self.pk) - if self.status != Block.NOT_CACHED: return + # checks we have not, meanwhile, been cancelled + if self_.done(): return # checks queue and environment if self.queue is None: @@ -1033,9 +1042,9 @@ class Block(models.Model): ''' # lock self - avoids concurrent update from scheduler/worker subsystem - Block.objects.select_for_update().filter(pk=self.pk) + self_ = Block.objects.select_for_update().get(pk=self.pk) - if self.done(): return + if self_.done(): return if hasattr(self, 'job'): self.job._cancel() @@ -1058,10 +1067,9 @@ class Block(models.Model): ''' for b in self.dependents.all(): - if not hasattr(b, 'job'): continue if any([k.status in (Block.FAILED, Block.CANCELLED) \ for k in b.dependencies.all()]): - b.job._cancel() + b._cancel() if b.is_runnable(): b.job.make_runnable() @@ -1087,12 +1095,14 @@ class Block(models.Model): ''' # lock self - avoids concurrent update from scheduler/worker subsystem - Block.objects.select_for_update().filter(pk=self.pk) + self_ = Block.objects.select_for_update().get(pk=self.pk) + + if self_.done(): return if self.start_date is None: self.start_date = self.job.start_date - if self.job.result and timings: + if self.job.result: statistics = self.job.result.stats @@ -1108,14 +1118,22 @@ class Block(models.Model): stdout = self.job.result.stdout, stderr = self.job.result.stderr, error_report = self.job.result.usrerr, - queuing_time = timings['queuing'], - linear_execution_time = timings['linear_execution'], - speed_up_real = timings['speed_up_real'], - speed_up_maximal = timings['speed_up_maximal'], ) + if timings: + info.update(dict( + queuing_time = timings['queuing'], + linear_execution_time = timings['linear_execution'], + speed_up_real = timings['speed_up_real'], + speed_up_maximal = timings['speed_up_maximal'], + )) + self.outputs.update(**info) + if self.job.result.syserr: #mail admins + send_email_to_administrators('System error captured', + self.job.result.syserr) + if self.job.status == Block.SKIPPED: self.status = Block.CACHED else: