From 369facb0676b56fdcb6e5cdbd25d2dd4476e6153 Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Tue, 26 Apr 2016 11:14:12 +0200 Subject: [PATCH] [backend] More tests --- beat/web/backend/tests.py | 465 +++++++++++++++++++++++++++++--------- 1 file changed, 359 insertions(+), 106 deletions(-) diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index 5e1eb8d55..d54007e91 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -746,22 +746,21 @@ class Scheduling(BaseBackendTestCase): # schedules the experiment and check it xp.schedule() self.check_single(xp) + assigned_splits = schedule() # schedules the first runnable block assert xp.blocks.first().job.runnable_date is not None assert xp.blocks.last().job.runnable_date is None - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -772,10 +771,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=0)) @@ -799,16 +796,15 @@ class Scheduling(BaseBackendTestCase): # schedules the last block of the experiment assert xp.blocks.last().job.runnable_date is not None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'analysis') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -819,10 +815,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=0)) @@ -859,17 +853,13 @@ class Scheduling(BaseBackendTestCase): self.check_single(xp) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # re-activate the worker, show it now schedules fine Worker.objects.update(active=True) - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 1) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 1) # the rest would continue like with test_success @@ -891,18 +881,17 @@ class Scheduling(BaseBackendTestCase): # schedules the first runnable block assert xp.blocks.first().job.runnable_date is not None assert xp.blocks.last().job.runnable_date is None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -913,10 +902,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal, faiulre split.end(Result(status=1)) @@ -951,18 +938,17 @@ class Scheduling(BaseBackendTestCase): # schedules the first runnable block assert xp.blocks.first().job.runnable_date is not None assert xp.blocks.last().job.runnable_date is None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -973,10 +959,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=0)) @@ -1000,16 +984,15 @@ class Scheduling(BaseBackendTestCase): # schedules the last block of the experiment assert xp.blocks.last().job.runnable_date is not None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'analysis') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -1020,10 +1003,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=1)) @@ -1093,18 +1074,17 @@ class Scheduling(BaseBackendTestCase): # schedules the first runnable block assert xp.blocks.first().job.runnable_date is not None assert xp.blocks.last().job.runnable_date is None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -1115,10 +1095,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=0)) @@ -1175,18 +1153,17 @@ class Scheduling(BaseBackendTestCase): # schedules the first runnable block assert xp.blocks.first().job.runnable_date is not None assert xp.blocks.last().job.runnable_date is None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -1197,10 +1174,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) xp.cancel() @@ -1236,18 +1211,17 @@ class Scheduling(BaseBackendTestCase): # schedules the first runnable block assert xp.blocks.first().job.runnable_date is not None assert xp.blocks.last().job.runnable_date is None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() @@ -1258,10 +1232,8 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal, faiulre split.end(Result(status=1)) @@ -1306,18 +1278,17 @@ class Scheduling(BaseBackendTestCase): assert xpc.blocks.first().job.runnable_date is None assert xpc.blocks.last().job.runnable_date is None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) worker = Worker.objects.get() - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'echo') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # checks the jobs are connected one to the other across experiments self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc) @@ -1328,26 +1299,25 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(split.job.status, Job.PROCESSING) self.assertEqual(split.job.block.status, Block.PROCESSING) self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.PROCESSING) + self.assertEqual(split.job.child.block.status, Block.PROCESSING) + self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING) self.assertEqual(worker.available_cores(), qsetup.CORES-1) - # check child - self.assertEqual(xpc.blocks.first().job.status, Job.PROCESSING) - self.assertEqual(xpc.blocks.first().status, Block.PROCESSING) - ''' - self.assertEqual(xpc.blocks.first().experiment.status, Experiment.RUNNING) - # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=0)) self.assertEqual(split.job.status, Job.COMPLETED) self.assertEqual(split.job.block.status, Block.CACHED) self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.COMPLETED) + self.assertEqual(split.job.child.block.status, Block.CACHED) + self.assertEqual(split.job.child.block.experiment.status, + Experiment.RUNNING) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -1356,7 +1326,10 @@ class Scheduling(BaseBackendTestCase): # assert we have no database traces after the block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) - self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0) + self.assertEqual( + Job.objects.filter(block=split.job.child.block).count(), 0) + self.assertEqual(JobSplit.objects.filter(job=split.job.child).count(), + 0) self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0) self.assertEqual(worker.available_cores(), qsetup.CORES) @@ -1365,30 +1338,31 @@ class Scheduling(BaseBackendTestCase): # schedules the last block of the experiment assert xp.blocks.last().job.runnable_date is not None - schedule() + assigned_splits = schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False) - self.assertEqual(assigned_splits.count(), 1) - split = assigned_splits.first() + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] self.assertEqual(split.job.block.experiment, xp) self.assertEqual(split.job.block.name, 'analysis') self.assertEqual(split.worker, worker) self.assertEqual(worker.name, qsetup.HOSTNAME) - self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) # simulate job start on worker split.start() self.assertEqual(split.job.status, Job.PROCESSING) self.assertEqual(split.job.block.status, Block.PROCESSING) self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.PROCESSING) + self.assertEqual(split.job.child.block.status, Block.PROCESSING) + self.assertEqual(split.job.child.block.experiment.status, + Experiment.RUNNING) self.assertEqual(worker.available_cores(), qsetup.CORES-1) # no job can be run right now - schedule() - assigned_splits = JobSplit.objects.filter(worker__isnull=False, - status=Job.QUEUED) - self.assertEqual(assigned_splits.count(), 0) + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) # simulate end job signal split.end(Result(status=0)) @@ -1399,6 +1373,10 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(split.job.status, Job.COMPLETED) self.assertEqual(split.job.block.status, Block.CACHED) self.assertEqual(split.job.block.experiment.status, Experiment.DONE) + self.assertEqual(split.job.child.status, Job.COMPLETED) + self.assertEqual(split.job.child.block.status, Block.CACHED) + self.assertEqual(split.job.child.block.experiment.status, + Experiment.DONE) self.check_stats_success(split) @@ -1408,4 +1386,279 @@ class Scheduling(BaseBackendTestCase): self.assertEqual(Result.objects.count(), 0) self.assertEqual(worker.available_cores(), qsetup.CORES) - ''' + + + def test_blocking_failure(self): + + # tests two experiments that are similar can be scheduled at the same + # time and we'll optimise correctly and only run one of them. If the + # blocking experiment fails, so does the blocked one too. + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + xpc = xp.fork(name='single_copy') + + # schedules the experiment and check it + xp.schedule() + xpc.schedule() + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + + assigned_splits = schedule() + + worker = Worker.objects.get() + + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # checks the jobs are connected one to the other across experiments + self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc) + self.assertEqual(xp.blocks.last().job.child.block.experiment, xpc) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.PROCESSING) + self.assertEqual(split.job.child.block.status, Block.PROCESSING) + self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) + + # simulate end job signal + split.end(Result(status=1)) + self.assertEqual(split.job.status, Job.FAILED) + self.assertEqual(split.job.block.status, Block.FAILED) + self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) + self.assertEqual(split.job.child.status, Job.FAILED) + self.assertEqual(split.job.child.block.status, Block.FAILED) + self.assertEqual(split.job.child.block.experiment.status, + Experiment.FAILED) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + + def test_blocking_cancel_after_success(self): + + # tests two experiments that are similar can be scheduled at the same + # time and we'll optimise correctly and only run one of them. If the + # first experiment is cancelled, then the second one proceeds normally. + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + xpc = xp.fork(name='single_copy') + + # schedules the experiment and check it + xp.schedule() + xpc.schedule() + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + + assigned_splits = schedule() + + worker = Worker.objects.get() + + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # checks the jobs are connected one to the other across experiments + self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc) + self.assertEqual(xp.blocks.last().job.child.block.experiment, xpc) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.PROCESSING) + self.assertEqual(split.job.child.block.status, Block.PROCESSING) + self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) + + # simulate end job signal + split.end(Result(status=0)) + self.assertEqual(split.job.status, Job.COMPLETED) + self.assertEqual(split.job.block.status, Block.CACHED) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.COMPLETED) + self.assertEqual(split.job.child.block.status, Block.CACHED) + self.assertEqual(split.job.child.block.experiment.status, + Experiment.RUNNING) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + self.check_stats_success(split) + + # assert we have no database traces after the block is done + self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) + self.assertEqual( + Job.objects.filter(block=split.job.child.block).count(), 0) + self.assertEqual(JobSplit.objects.filter(job=split.job.child).count(), + 0) + self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # cancels the blocking experiment - the blocked one must continue + xp.cancel() + self.assertEqual( + [str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)], + [Block.CACHED, Block.CANCELLED] + ) + self.assertEqual(xp.status, Experiment.FAILED) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.filter(block__in=xp.blocks.all()).count(), 0) + self.assertEqual(JobSplit.objects.filter(job__block__in=xp.blocks.all()).count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # since the first job was successful, the second block of the + # previously blocked experiment must be ready to run + + # schedules the last block of the experiment + assert xpc.blocks.last().job.runnable_date is not None + assigned_splits = schedule() + + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] + self.assertEqual(split.job.block.experiment, xpc) + self.assertEqual(split.job.block.name, 'analysis') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # the rest would continue normally + + + def test_blocking_cancel_while_running(self): + + # tests two experiments that are similar can be scheduled at the same + # time and we'll optimise correctly and only run one of them. If the + # first experiment is cancelled while one of the blocks is running, + # then the second one proceeds normally. + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + xpc = xp.fork(name='single_copy') + + # schedules the experiment and check it + xp.schedule() + xpc.schedule() + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + assert xpc.blocks.first().job.runnable_date is None + assert xpc.blocks.last().job.runnable_date is None + + assigned_splits = schedule() + + worker = Worker.objects.get() + + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # checks the jobs are connected one to the other across experiments + self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc) + self.assertEqual(xp.blocks.last().job.child.block.experiment, xpc) + + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + self.assertEqual(split.job.child.status, Job.PROCESSING) + self.assertEqual(split.job.child.block.status, Block.PROCESSING) + self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING) + + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # no job can be run right now + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) + + # cancels the blocking experiment - the blocked one must continue + xp.cancel() + self.assertEqual( + [str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)], + [Block.CANCELLED, Block.CANCELLED] + ) + self.assertEqual(xp.status, Experiment.FAILED) + + # assert we have no database traces after the last block is cancel + self.assertEqual(Job.objects.filter(block__in=xp.blocks.all()).count(), 0) + self.assertEqual(JobSplit.objects.filter(job__block__in=xp.blocks.all()).count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # since the first job was successful, the second block of the + # previously blocked experiment must be ready to run + + # schedules the last block of the experiment + assert xpc.blocks.first().job.runnable_date is not None + assigned_splits = schedule() + + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xpc) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # the rest would continue normally -- GitLab