From e0be6886e9927f308af6ad9d74a9008a22bbbae6 Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Mon, 25 Apr 2016 17:19:34 +0200 Subject: [PATCH] [backend,experiments] Test fixes; More tests --- beat/web/backend/models.py | 17 +- beat/web/backend/tests.py | 453 ++++++++++++++++++++++++++++++++- beat/web/experiments/models.py | 7 +- 3 files changed, 467 insertions(+), 10 deletions(-) diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index cdb751afb..89785db43 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -647,7 +647,7 @@ class Job(models.Model): for b in self.block.dependents.all(): if any([k.status in (Block.FAILED, Block.CANCELLED) \ for k in b.dependencies.all()]): - for split in b.job.splits: split.end(None, Job.CANCELLED) + b.job._cancel() if all([k.status in (Block.CACHED, Block.SKIPPED) \ for k in b.dependencies.all()]): b.job.make_runnable() @@ -708,7 +708,7 @@ class Job(models.Model): # if required, erase dangling files, update own results timings = None - if self.done(): + if self.done() and self.status != Job.CANCELLED: # compute final timings and update parent block if self.status != Job.SKIPPED: diff_timings = self._merge_results() @@ -740,7 +740,7 @@ class Job(models.Model): failed = self.status in (Job.FAILED, Job.CANCELLED) if failed: - for o in self.block.outputs: l += o.files() + for o in self.block.outputs.all(): l += o.files() for f in l: logger.info("Erasing output file `%s' because Job `%s' failed", f, @@ -764,8 +764,13 @@ class Job(models.Model): logger.info("Marking job `%s' as 'cancelled'", self) self.runnable_date = None self.start_date = None - for s in self.splits: s._cancel() - self.save() + if self.splits.count(): + for s in self.splits.all(): s._cancel() + else: + self.status = Job.CANCELLED + self.save() + self.block._update_state() + self._cascade_updates() def _merge_results(self): @@ -960,7 +965,7 @@ class JobSplit(models.Model): if result.id is None: result.save() self.result = result else: - self.status = state + self.status = status self.save() diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index e2d712afc..66b8677a0 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -736,6 +736,8 @@ class Scheduling(BaseBackendTestCase): def test_success(self): + # tests a simple successful experiment scheduling and execution + current_stats = HourlyStatistics.objects.count() fullname = 'user/user/single/1/single' @@ -834,10 +836,457 @@ class Scheduling(BaseBackendTestCase): self.check_stats_success(split) - # assert we have no database traces after the block is done + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_worker_activation(self): + + # tests that scheduling depends on worker activation + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # de-activates worker + Worker.objects.update(active=False) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + # re-activate the worker, show it now schedules fine + Worker.objects.update(active=True) + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 1) + + # the rest would continue like with test_success + + + def test_fails_on_first_block(self): + + # tests that, if we fail on the first block, experiment fails, all + # stops as foreseen + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + worker = Worker.objects.get() + + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + # simulate end job signal, faiulre + split.end(Result(status=1)) + self.assertEqual(split.job.status, Job.FAILED) + self.assertEqual(split.job.block.status, Block.FAILED) + self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_fails_on_last_block(self): + + # tests a simple successful experiment scheduling and execution + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + worker = Worker.objects.get() + + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + # simulate end job signal + split.end(Result(status=0)) + self.assertEqual(split.job.status, Job.COMPLETED) + self.assertEqual(split.job.block.status, Block.CACHED) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + self.check_stats_success(split) + + # assert we have no database traces after the last block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0) self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0) - self.assertEqual(Result.objects.filter(split__isnull=True).count(), 0) self.assertEqual(worker.available_cores(), qsetup.CORES) + + # since this job was successful, the next one should be ready to run + + # schedules the last block of the experiment + assert xp.blocks.last().job.runnable_date is not None + schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'analysis') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + # simulate end job signal + split.end(Result(status=1)) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + self.assertEqual(split.job.status, Job.FAILED) + self.assertEqual(split.job.block.status, Block.FAILED) + self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_cancel_before_starting(self): + + # tests experiment cancellation before the experiment is started + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + + xp.cancel() + + self.assertEqual( + [str(k) for k in xp.blocks.values_list('status', flat=True)], + [Block.CANCELLED, Block.CANCELLED] + ) + self.assertEqual(xp.status, Experiment.FAILED) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + worker = Worker.objects.get() + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_cancel_after_success(self): + + # tests experiment cancellation while the experiment is running + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + worker = Worker.objects.get() + + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + # simulate end job signal + split.end(Result(status=0)) + self.assertEqual(split.job.status, Job.COMPLETED) + self.assertEqual(split.job.block.status, Block.CACHED) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + self.check_stats_success(split) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) + self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0) + self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # since this job was successful, the next one should be ready to run + + # schedules the last block of the experiment + assert xp.blocks.last().job.runnable_date is not None + xp.cancel() + + self.assertEqual( + [str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)], + [Block.CACHED, Block.CANCELLED] + ) + self.assertEqual(xp.status, Experiment.FAILED) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + worker = Worker.objects.get() + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_cancel_while_running(self): + + # tests experiment cancellation while a block is running + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + worker = Worker.objects.get() + + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + xp.cancel() + + self.assertEqual( + [str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)], + [Block.CANCELLED, Block.CANCELLED] + ) + self.assertEqual(xp.status, Experiment.FAILED) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + worker = Worker.objects.get() + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_cancel_after_failure(self): + + # tests that, if we fail on the first block, experiment fails and a + # cancellation that comes after that is a NOOP + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + worker = Worker.objects.get() + + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) + + # simulate end job signal, faiulre + split.end(Result(status=1)) + self.assertEqual(split.job.status, Job.FAILED) + self.assertEqual(split.job.block.status, Block.FAILED) + self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + xp.cancel() + self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) + + + def test_blocking_success(self): + + # tests two experiments that are similar can be scheduled at the same + # time and we'll optimise correctly and only run one of them. The other + # is updated as the blocking experiment is executed. + + pass diff --git a/beat/web/experiments/models.py b/beat/web/experiments/models.py index 6d3be8cd0..fe0f82252 100644 --- a/beat/web/experiments/models.py +++ b/beat/web/experiments/models.py @@ -998,7 +998,8 @@ class Block(models.Model): if self.done(): return - if hasattr(self, 'job'): self.job._cancel() + if hasattr(self, 'job'): + self.job._cancel() else: self.status = Block.CANCELLED self.save() @@ -1060,7 +1061,9 @@ class Block(models.Model): if self.job.done(): self.end_date = self.job.end_date - self.job.result.delete() #cascades to job/job split deletion + r = self.job.result + self.job.delete() + if r: r.delete() # Loads Results from cache if self.job.result and self.analyzer and self.status == Block.CACHED: -- GitLab