From b57f37d63d2f4d965f191c2157cabc1f9a3ef400 Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Tue, 26 Apr 2016 23:29:57 +0200 Subject: [PATCH] [backend] Finish porting all scheduler tests --- beat/web/backend/tests.py | 187 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 187 insertions(+) diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index e0658a905..9e3beccd3 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -720,6 +720,8 @@ class BackendSetup(BaseBackendTestCase): self.check_prior_config() + + class Scheduling(BaseBackendTestCase): @@ -1765,6 +1767,191 @@ class Scheduling(BaseBackendTestCase): # the rest would continue normally + def test_schedule_without_queue(self): + + # tests that an experiment with a queue that disappeared is correctly + # aborted + setup_backend(QUEUES_WITHOUT_PRIORITY) + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + self.assertRaises(RuntimeError, xp.schedule) + + + +class SchedulingPriority(BaseBackendTestCase): + + + def set_globals(self, xp, queue, environment): + '''Sets the global queue of the experiment''' + + decl = xp.declaration + decl['globals']['queue'] = queue.name + decl['globals']['environment']['name'] = environment.name + decl['globals']['environment']['version'] = environment.version + xp.declaration = decl + xp.save() #reloads all blocks + + + def reset_slots(self, xp): + '''Only use one slot in all blocks''' + + decl = xp.declaration + for b in decl['blocks']: + if 'nb_slots' in decl['blocks'][b]: + del decl['blocks'][b]['nb_slots'] + xp.declaration = decl + xp.save() #reloads all blocks + + + def test_priority_multicore(self): + + # tests that in an heterogeneous backend setup, priority is given to + # jobs that require more cores correctly. + + setup_backend(QUEUES_WITHOUT_PRIORITY) + Worker.objects.update(active=True) + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + fullname = 'user/user/single/1/single_add' + xp_add = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + fullname = 'user/user/single/1/single_add2' + xp_add2 = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + fullname = 'user/user/single/1/single_large' + xp_large = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + q1 = Queue.objects.get(name='q1') + q2 = Queue.objects.get(name='q2') + env = Environment.objects.get() + + # reset queue and environment to new backend configuration + self.set_globals(xp, q1, env) + self.set_globals(xp_add, q1, env) + self.set_globals(xp_add2, q1, env) + self.set_globals(xp_large, q2, env) #notice different queue + self.reset_slots(xp_large) #one slot per block only + + xp.schedule() + xp_add.schedule() + xp_add2.schedule() + xp_large.schedule() + + assigned_splits = schedule() + + self.assertEqual(len(assigned_splits), 3) + + self.assertEqual(assigned_splits[0].job.block.experiment, xp_large) + # then, the scheduling order is respected + self.assertEqual(assigned_splits[1].job.block.experiment, xp) + self.assertEqual(assigned_splits[2].job.block.experiment, xp_add) + # notice that the last experiment is not assigned + + + def test_priority_multicore_delayed(self): + + # tests that in an heterogeneous backend setup, priority is given to + # jobs that require more cores correctly. In this test, specifically, + # we verify that, if the farm is taken, new jobs that require more + # resources will block other possible jobs to run if they cannot run, + # even if free cores are available. + + setup_backend(QUEUES_WITHOUT_PRIORITY) + Worker.objects.update(active=True) + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + fullname = 'user/user/single/1/single_add' + xp_add = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + fullname = 'user/user/single/1/single_add2' + xp_add2 = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + fullname = 'user/user/single/1/single_large' + xp_large = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + q1 = Queue.objects.get(name='q1') + q4 = Queue.objects.get(name='q4') + env = Environment.objects.get() + + # reset queue and environment to new backend configuration + self.set_globals(xp, q1, env) + self.set_globals(xp_add, q1, env) + self.set_globals(xp_large, q4, env) #notice different queue + self.reset_slots(xp_large) #one slot per block only + + xp.schedule() + assigned_splits = schedule() + + self.assertEqual(len(assigned_splits), 1) + self.assertEqual(assigned_splits[0].job.block.experiment, xp) + split = assigned_splits[0] + + xp_large.schedule() #will now block anything else from running + xp_add.schedule() + + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 0) + + # start/end the xp block and schedule again + split.start() + split.end(Result(0)) + + # now, the job with more cores should be scheduled first + assigned_splits = schedule() + self.assertEqual(len(assigned_splits), 1) + self.assertEqual(assigned_splits[0].job.block.experiment, xp_large) + + + def test_priorities(self): + + # tests that in an heterogeneous backend setup, priority is given to + # different computers based on their priority settings + + setup_backend(PRIORITY_QUEUES) + Worker.objects.update(active=True) + + q1 = Queue.objects.get(name='q1') + env = Environment.objects.get() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + self.set_globals(xp, q1, env) + + q1_special = Queue.objects.get(name='q1_special') + fullname = 'user/user/single/1/single_add' + xp_add = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + self.set_globals(xp_add, q1_special, env) + + q2 = Queue.objects.get(name='q2') + fullname = 'user/user/single/1/single_large' + xp_large = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + self.set_globals(xp_large, q2, env) #notice different queue + self.reset_slots(xp_large) #one slot per block only + + node1 = Worker.objects.get(name='node1') + node2 = Worker.objects.get(name='node2') + + # verify that xp_large has priority, other jobs corresponding to + # q1/_special + xp.schedule() + xp_add.schedule() + xp_large.schedule() + + assigned_splits = schedule() + self.assertTrue(len(assigned_splits), 3) + + self.assertEqual(assigned_splits[0].job.block.experiment, xp_large) + self.assertEqual(assigned_splits[0].job.block.name, 'echo') + self.assertEqual(assigned_splits[0].job.splits.first().worker, node2) + + self.assertEqual(assigned_splits[1].job.block.experiment, xp) + self.assertEqual(assigned_splits[1].job.block.name, 'echo') + self.assertEqual(assigned_splits[1].job.splits.first().worker, node1) + + self.assertEqual(assigned_splits[2].job.block.experiment, xp_add) + self.assertEqual(assigned_splits[2].job.block.name, 'echo') + self.assertEqual(assigned_splits[2].job.splits.first().worker, node2) + class Working(BaseBackendTestCase): -- GitLab