Skip to content
Snippets Groups Projects
Commit 369facb0 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] More tests

parent d5d550d2
No related branches found
No related tags found
1 merge request!194Scheduler
Pipeline #
......@@ -746,22 +746,21 @@ class Scheduling(BaseBackendTestCase):
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
assigned_splits = schedule()
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -772,10 +771,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
......@@ -799,16 +796,15 @@ class Scheduling(BaseBackendTestCase):
# schedules the last block of the experiment
assert xp.blocks.last().job.runnable_date is not None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'analysis')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -819,10 +815,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
......@@ -859,17 +853,13 @@ class Scheduling(BaseBackendTestCase):
self.check_single(xp)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# re-activate the worker, show it now schedules fine
Worker.objects.update(active=True)
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 1)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 1)
# the rest would continue like with test_success
......@@ -891,18 +881,17 @@ class Scheduling(BaseBackendTestCase):
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -913,10 +902,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal, faiulre
split.end(Result(status=1))
......@@ -951,18 +938,17 @@ class Scheduling(BaseBackendTestCase):
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -973,10 +959,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
......@@ -1000,16 +984,15 @@ class Scheduling(BaseBackendTestCase):
# schedules the last block of the experiment
assert xp.blocks.last().job.runnable_date is not None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'analysis')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -1020,10 +1003,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=1))
......@@ -1093,18 +1074,17 @@ class Scheduling(BaseBackendTestCase):
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -1115,10 +1095,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
......@@ -1175,18 +1153,17 @@ class Scheduling(BaseBackendTestCase):
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -1197,10 +1174,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
xp.cancel()
......@@ -1236,18 +1211,17 @@ class Scheduling(BaseBackendTestCase):
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
......@@ -1258,10 +1232,8 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal, faiulre
split.end(Result(status=1))
......@@ -1306,18 +1278,17 @@ class Scheduling(BaseBackendTestCase):
assert xpc.blocks.first().job.runnable_date is None
assert xpc.blocks.last().job.runnable_date is None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# checks the jobs are connected one to the other across experiments
self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc)
......@@ -1328,26 +1299,25 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.PROCESSING)
self.assertEqual(split.job.child.block.status, Block.PROCESSING)
self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# check child
self.assertEqual(xpc.blocks.first().job.status, Job.PROCESSING)
self.assertEqual(xpc.blocks.first().status, Block.PROCESSING)
'''
self.assertEqual(xpc.blocks.first().experiment.status, Experiment.RUNNING)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
self.assertEqual(split.job.status, Job.COMPLETED)
self.assertEqual(split.job.block.status, Block.CACHED)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.COMPLETED)
self.assertEqual(split.job.child.block.status, Block.CACHED)
self.assertEqual(split.job.child.block.experiment.status,
Experiment.RUNNING)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
......@@ -1356,7 +1326,10 @@ class Scheduling(BaseBackendTestCase):
# assert we have no database traces after the block is done
self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0)
self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0)
self.assertEqual(
Job.objects.filter(block=split.job.child.block).count(), 0)
self.assertEqual(JobSplit.objects.filter(job=split.job.child).count(),
0)
self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
......@@ -1365,30 +1338,31 @@ class Scheduling(BaseBackendTestCase):
# schedules the last block of the experiment
assert xp.blocks.last().job.runnable_date is not None
schedule()
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'analysis')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.PROCESSING)
self.assertEqual(split.job.child.block.status, Block.PROCESSING)
self.assertEqual(split.job.child.block.experiment.status,
Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
......@@ -1399,6 +1373,10 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(split.job.status, Job.COMPLETED)
self.assertEqual(split.job.block.status, Block.CACHED)
self.assertEqual(split.job.block.experiment.status, Experiment.DONE)
self.assertEqual(split.job.child.status, Job.COMPLETED)
self.assertEqual(split.job.child.block.status, Block.CACHED)
self.assertEqual(split.job.child.block.experiment.status,
Experiment.DONE)
self.check_stats_success(split)
......@@ -1408,4 +1386,279 @@ class Scheduling(BaseBackendTestCase):
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
'''
def test_blocking_failure(self):
# tests two experiments that are similar can be scheduled at the same
# time and we'll optimise correctly and only run one of them. If the
# blocking experiment fails, so does the blocked one too.
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
xpc = xp.fork(name='single_copy')
# schedules the experiment and check it
xp.schedule()
xpc.schedule()
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
assert xpc.blocks.first().job.runnable_date is None
assert xpc.blocks.last().job.runnable_date is None
assigned_splits = schedule()
worker = Worker.objects.get()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# checks the jobs are connected one to the other across experiments
self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc)
self.assertEqual(xp.blocks.last().job.child.block.experiment, xpc)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.PROCESSING)
self.assertEqual(split.job.child.block.status, Block.PROCESSING)
self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=1))
self.assertEqual(split.job.status, Job.FAILED)
self.assertEqual(split.job.block.status, Block.FAILED)
self.assertEqual(split.job.block.experiment.status, Experiment.FAILED)
self.assertEqual(split.job.child.status, Job.FAILED)
self.assertEqual(split.job.child.block.status, Block.FAILED)
self.assertEqual(split.job.child.block.experiment.status,
Experiment.FAILED)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_blocking_cancel_after_success(self):
# tests two experiments that are similar can be scheduled at the same
# time and we'll optimise correctly and only run one of them. If the
# first experiment is cancelled, then the second one proceeds normally.
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
xpc = xp.fork(name='single_copy')
# schedules the experiment and check it
xp.schedule()
xpc.schedule()
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
assert xpc.blocks.first().job.runnable_date is None
assert xpc.blocks.last().job.runnable_date is None
assigned_splits = schedule()
worker = Worker.objects.get()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# checks the jobs are connected one to the other across experiments
self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc)
self.assertEqual(xp.blocks.last().job.child.block.experiment, xpc)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.PROCESSING)
self.assertEqual(split.job.child.block.status, Block.PROCESSING)
self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# simulate end job signal
split.end(Result(status=0))
self.assertEqual(split.job.status, Job.COMPLETED)
self.assertEqual(split.job.block.status, Block.CACHED)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.COMPLETED)
self.assertEqual(split.job.child.block.status, Block.CACHED)
self.assertEqual(split.job.child.block.experiment.status,
Experiment.RUNNING)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
self.check_stats_success(split)
# assert we have no database traces after the block is done
self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0)
self.assertEqual(
Job.objects.filter(block=split.job.child.block).count(), 0)
self.assertEqual(JobSplit.objects.filter(job=split.job.child).count(),
0)
self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# cancels the blocking experiment - the blocked one must continue
xp.cancel()
self.assertEqual(
[str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)],
[Block.CACHED, Block.CANCELLED]
)
self.assertEqual(xp.status, Experiment.FAILED)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.filter(block__in=xp.blocks.all()).count(), 0)
self.assertEqual(JobSplit.objects.filter(job__block__in=xp.blocks.all()).count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# since the first job was successful, the second block of the
# previously blocked experiment must be ready to run
# schedules the last block of the experiment
assert xpc.blocks.last().job.runnable_date is not None
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xpc)
self.assertEqual(split.job.block.name, 'analysis')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# the rest would continue normally
def test_blocking_cancel_while_running(self):
# tests two experiments that are similar can be scheduled at the same
# time and we'll optimise correctly and only run one of them. If the
# first experiment is cancelled while one of the blocks is running,
# then the second one proceeds normally.
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
xpc = xp.fork(name='single_copy')
# schedules the experiment and check it
xp.schedule()
xpc.schedule()
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
assert xpc.blocks.first().job.runnable_date is None
assert xpc.blocks.last().job.runnable_date is None
assigned_splits = schedule()
worker = Worker.objects.get()
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits[0]
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# checks the jobs are connected one to the other across experiments
self.assertEqual(xp.blocks.first().job.child.block.experiment, xpc)
self.assertEqual(xp.blocks.last().job.child.block.experiment, xpc)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(split.job.child.status, Job.PROCESSING)
self.assertEqual(split.job.child.block.status, Block.PROCESSING)
self.assertEqual(split.job.child.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
assigned_splits = schedule()
self.assertEqual(len(assigned_splits), 0)
# cancels the blocking experiment - the blocked one must continue
xp.cancel()
self.assertEqual(
[str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)],
[Block.CANCELLED, Block.CANCELLED]
)
self.assertEqual(xp.status, Experiment.FAILED)
# assert we have no database traces after the last block is cancel
self.assertEqual(Job.objects.filter(block__in=xp.blocks.all()).count(), 0)
self.assertEqual(JobSplit.objects.filter(job__block__in=xp.blocks.all()).count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# since the first job was successful, the second block of the
# previously blocked experiment must be ready to run
# schedules the last block of the experiment
assert xpc.blocks.first().job.runnable_date is not None
assigned_splits = schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
self.assertEqual(len(assigned_splits), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xpc)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# the rest would continue normally
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment