Skip to content
Snippets Groups Projects
Commit e0be6886 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend,experiments] Test fixes; More tests

parent 190ea5e3
No related branches found
No related tags found
1 merge request!194Scheduler
Pipeline #
......@@ -647,7 +647,7 @@ class Job(models.Model):
for b in self.block.dependents.all():
if any([k.status in (Block.FAILED, Block.CANCELLED) \
for k in b.dependencies.all()]):
for split in b.job.splits: split.end(None, Job.CANCELLED)
b.job._cancel()
if all([k.status in (Block.CACHED, Block.SKIPPED) \
for k in b.dependencies.all()]):
b.job.make_runnable()
......@@ -708,7 +708,7 @@ class Job(models.Model):
# if required, erase dangling files, update own results
timings = None
if self.done():
if self.done() and self.status != Job.CANCELLED:
# compute final timings and update parent block
if self.status != Job.SKIPPED:
diff_timings = self._merge_results()
......@@ -740,7 +740,7 @@ class Job(models.Model):
failed = self.status in (Job.FAILED, Job.CANCELLED)
if failed:
for o in self.block.outputs: l += o.files()
for o in self.block.outputs.all(): l += o.files()
for f in l:
logger.info("Erasing output file `%s' because Job `%s' failed", f,
......@@ -764,8 +764,13 @@ class Job(models.Model):
logger.info("Marking job `%s' as 'cancelled'", self)
self.runnable_date = None
self.start_date = None
for s in self.splits: s._cancel()
self.save()
if self.splits.count():
for s in self.splits.all(): s._cancel()
else:
self.status = Job.CANCELLED
self.save()
self.block._update_state()
self._cascade_updates()
def _merge_results(self):
......@@ -960,7 +965,7 @@ class JobSplit(models.Model):
if result.id is None: result.save()
self.result = result
else:
self.status = state
self.status = status
self.save()
......
......@@ -736,6 +736,8 @@ class Scheduling(BaseBackendTestCase):
def test_success(self):
# tests a simple successful experiment scheduling and execution
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
......@@ -834,10 +836,457 @@ class Scheduling(BaseBackendTestCase):
self.check_stats_success(split)
# assert we have no database traces after the block is done
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_worker_activation(self):
# tests that scheduling depends on worker activation
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# de-activates worker
Worker.objects.update(active=False)
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
# re-activate the worker, show it now schedules fine
Worker.objects.update(active=True)
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 1)
# the rest would continue like with test_success
def test_fails_on_first_block(self):
# tests that, if we fail on the first block, experiment fails, all
# stops as foreseen
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
# simulate end job signal, faiulre
split.end(Result(status=1))
self.assertEqual(split.job.status, Job.FAILED)
self.assertEqual(split.job.block.status, Block.FAILED)
self.assertEqual(split.job.block.experiment.status, Experiment.FAILED)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_fails_on_last_block(self):
# tests a simple successful experiment scheduling and execution
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
# simulate end job signal
split.end(Result(status=0))
self.assertEqual(split.job.status, Job.COMPLETED)
self.assertEqual(split.job.block.status, Block.CACHED)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
self.check_stats_success(split)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0)
self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0)
self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0)
self.assertEqual(Result.objects.filter(split__isnull=True).count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# since this job was successful, the next one should be ready to run
# schedules the last block of the experiment
assert xp.blocks.last().job.runnable_date is not None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'analysis')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
# simulate end job signal
split.end(Result(status=1))
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
self.assertEqual(split.job.status, Job.FAILED)
self.assertEqual(split.job.block.status, Block.FAILED)
self.assertEqual(split.job.block.experiment.status, Experiment.FAILED)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_cancel_before_starting(self):
# tests experiment cancellation before the experiment is started
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
xp.cancel()
self.assertEqual(
[str(k) for k in xp.blocks.values_list('status', flat=True)],
[Block.CANCELLED, Block.CANCELLED]
)
self.assertEqual(xp.status, Experiment.FAILED)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
worker = Worker.objects.get()
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_cancel_after_success(self):
# tests experiment cancellation while the experiment is running
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
# simulate end job signal
split.end(Result(status=0))
self.assertEqual(split.job.status, Job.COMPLETED)
self.assertEqual(split.job.block.status, Block.CACHED)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
self.check_stats_success(split)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0)
self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0)
self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# since this job was successful, the next one should be ready to run
# schedules the last block of the experiment
assert xp.blocks.last().job.runnable_date is not None
xp.cancel()
self.assertEqual(
[str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)],
[Block.CACHED, Block.CANCELLED]
)
self.assertEqual(xp.status, Experiment.FAILED)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
worker = Worker.objects.get()
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_cancel_while_running(self):
# tests experiment cancellation while a block is running
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
xp.cancel()
self.assertEqual(
[str(k) for k in xp.blocks.order_by('id').values_list('status', flat=True)],
[Block.CANCELLED, Block.CANCELLED]
)
self.assertEqual(xp.status, Experiment.FAILED)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
worker = Worker.objects.get()
self.assertEqual(worker.available_cores(), qsetup.CORES)
def test_cancel_after_failure(self):
# tests that, if we fail on the first block, experiment fails and a
# cancellation that comes after that is a NOOP
current_stats = HourlyStatistics.objects.count()
fullname = 'user/user/single/1/single'
xp = Experiment.objects.get(name=fullname.split(os.sep)[-1])
# schedules the experiment and check it
xp.schedule()
self.check_single(xp)
# schedules the first runnable block
assert xp.blocks.first().job.runnable_date is not None
assert xp.blocks.last().job.runnable_date is None
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False)
worker = Worker.objects.get()
self.assertEqual(assigned_splits.count(), 1)
split = assigned_splits.first()
self.assertEqual(split.job.block.experiment, xp)
self.assertEqual(split.job.block.name, 'echo')
self.assertEqual(split.worker, worker)
self.assertEqual(worker.name, qsetup.HOSTNAME)
self.assertEqual(worker.available_cores(), qsetup.CORES)
# simulate job start on worker
split.start()
self.assertEqual(split.job.status, Job.PROCESSING)
self.assertEqual(split.job.block.status, Block.PROCESSING)
self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING)
self.assertEqual(worker.available_cores(), qsetup.CORES-1)
# no job can be run right now
schedule()
assigned_splits = JobSplit.objects.filter(worker__isnull=False,
status=Job.QUEUED)
self.assertEqual(assigned_splits.count(), 0)
# simulate end job signal, faiulre
split.end(Result(status=1))
self.assertEqual(split.job.status, Job.FAILED)
self.assertEqual(split.job.block.status, Block.FAILED)
self.assertEqual(split.job.block.experiment.status, Experiment.FAILED)
# checks the number of statistics objects has increased by 1
self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1)
# assert we have no database traces after the last block is done
self.assertEqual(Job.objects.count(), 0)
self.assertEqual(JobSplit.objects.count(), 0)
self.assertEqual(Result.objects.count(), 0)
self.assertEqual(worker.available_cores(), qsetup.CORES)
xp.cancel()
self.assertEqual(split.job.block.experiment.status, Experiment.FAILED)
def test_blocking_success(self):
# tests two experiments that are similar can be scheduled at the same
# time and we'll optimise correctly and only run one of them. The other
# is updated as the blocking experiment is executed.
pass
......@@ -998,7 +998,8 @@ class Block(models.Model):
if self.done(): return
if hasattr(self, 'job'): self.job._cancel()
if hasattr(self, 'job'):
self.job._cancel()
else:
self.status = Block.CANCELLED
self.save()
......@@ -1060,7 +1061,9 @@ class Block(models.Model):
if self.job.done():
self.end_date = self.job.end_date
self.job.result.delete() #cascades to job/job split deletion
r = self.job.result
self.job.delete()
if r: r.delete()
# Loads Results from cache
if self.job.result and self.analyzer and self.status == Block.CACHED:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment