diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index 9bef9cf5006761dca9b8e200d32a6d0358cd23ce..3fcf14e0dd7404f3708b49d3fc1c3d5031770f55 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -26,1073 +26,787 @@ ############################################################################### import os -import simplejson as json +import sys +import time import shutil -from datetime import datetime +import tempfile +import collections from django.conf import settings -from django.contrib.auth.models import User from django.core.urlresolvers import reverse +from django.core import management +from django.contrib.auth.models import User, Group +from django.test import TestCase -from ..experiments.models import Experiment -from ..experiments.models import Block -from ..dataformats.models import DataFormat -from ..algorithms.models import Algorithm -from ..toolchains.models import Toolchain -from ..databases.models import Database +from guardian.shortcuts import get_perms -from ..backend.models import Environment -from ..backend.models import Queue +from ..common.testutils import BaseTestCase as APITestCase +from ..experiments.models import Experiment, Block +from ..algorithms.models import Algorithm -from ..common.testutils import BaseTestCase +from .models import Queue, Worker, Slot, Environment, Job, JobSplit, Result +from .utils import cleanup_cache, dump_backend, setup_backend +from .management.commands import qsetup +from ..utils.management.commands import install +from .schedule import schedule + + +EXPERIMENT_HASHES = { + "user/single/1/single": \ + 'c5ae0db08c0b9f033461c4cf4eb7ca5b4ae4b61e108c1001f8eb0cc703887910', + "user/single/1/single_error": \ + '8b39ee8384f611c3c9ebb12570c201e88b67e7f991669f53172f9e315ba0fcd6', + "user/single/1/single_error2": \ + '8b39ee8384f611c3c9ebb12570c201e88b67e7f991669f53172f9e315ba0fcd6', + "user/single/1/single_add": \ + 'e5c9d741587a849d80ecebb78b2d97932e8f6e64a7d82e57954934a2d79751cb', + "user/single/1/single_add2": \ + 'b0bb7120f771ccaae0b22a0c2d6f11e80ca6067ce5c774294858ad3019ebfe7d', + "user/double/1/double": \ + '3e549ee87c98113207796f289603746ba5c2dd26cf8d5710fde9ed45697716d0', + "user/triangle/1/triangle": \ + '43769717b9a53bf51c4a4d0354ecb6d733b70e00fdb493ab77099c0edfceb1cb', + "user/double_triangle/1/double_triangle": \ + 'fbde7da1f51ce62e4446b9d26748865d7debe3778a66aa36244f717a830a13b1', + } -class BackendAPIBase(BaseTestCase): +# Example configuration with 3 queues with an increasing amount of resources +# running on the same host +QUEUES_WITHOUT_PRIORITY = { + "queues": collections.OrderedDict([ + ("q1", { + "memory-limit": 4*1024, + "time-limit": 180, #3 hours + "cores-per-slot": 1, + "max-slots-per-user": 4, + "environments": ['environment (1)'], + "groups": [ + "Default", + ], + "slots": { + "node1": { + "quantity": 4, + "priority": 0 + } + } + } + ), + ("q2", { + "memory-limit": 8*1024, + "time-limit": 360, #6 hours + "cores-per-slot": 2, + "max-slots-per-user": 2, + "environments": ['environment (1)'], + "groups": [ + "Default", + ], + "slots": { + "node1": { + "quantity": 2, + "priority": 0 + }, + } + } + ), + ("q4", { + "memory-limit": 16*1024, + "time-limit": 720, #12 hours + "cores-per-slot": 4, + "max-slots-per-user": 1, + "environments": ['environment (1)'], + "groups": [ + "Default", + ], + "slots": { + "node1": { + "quantity": 1, + "priority": 0 + }, + } + } + ) + ]), + "workers": { + "node1": { + "cores": 4, + "memory": 16*1024, + } + }, + "environments": { + 'environment (1)': { + "name": 'environment', + "version": '1', + "short_description": "Test", + "description": "Test environment", + }, + }, + } - DECLARATION1 = { - "globals": { - }, - "blocks": { - "addition1": { - "algorithm": "johndoe/sum/1", - "parameters": { - }, - "inputs": { - "a": "a", - "b": "b" - }, - "outputs": { - "sum": "sum" - } +# Example configuration with 3 queues sharing slots on 2 hosts +PRIORITY_QUEUES = { + "queues": collections.OrderedDict([ + ("q1", { + "memory-limit": 4*1024, + "time-limit": 180, #3 hours + "cores-per-slot": 1, + "max-slots-per-user": 2, + "environments": ['environment (1)'], + "groups": [ + "Default", + ], + "slots": { + "node1": { + "quantity": 4, + "priority": 5 }, - "addition2": { - "algorithm": "johndoe/sum/1", - "parameters": { - }, - "inputs": { - "a": "a", - "b": "b" - }, - "outputs": { - "sum": "sum" - } - } - }, - "datasets": { - "dataset1": { - "database": "integers/1", - "protocol": "triple", - "set": "default", - } + "node2": { + "quantity": 4, + "priority": 0 + }, + } }, - "analyzers": { - "analysis": { - "algorithm": "johndoe/analysis/1", - "parameters": { - }, - "inputs": { - "in": "input" - } + ), + ("q2", { + "memory-limit": 8*1024, + "time-limit": 360, #6 hours + "cores-per-slot": 2, + "max-slots-per-user": 1, + "environments": ['environment (1)'], + "groups": [ + "Default", + ], + "slots": { + "node1": { + "quantity": 2, + "priority": 0 + }, + "node2": { + "quantity": 2, + "priority": 10 } + } }, - "globals": { - "environment": { - "name": "env1", - "version": "1.0" + ), + ("q1_special", { + "memory-limit": 4*1024, + "time-limit": 180, #3 hours + "cores-per-slot": 1, + "max-slots-per-user": 8, + "environments": ['environment (1)'], + "groups": [ + "Default", + ], + "slots": { + "node1": { + "quantity": 4, + "priority": 0 }, - "queue": "queue1" - } - } - - DATABASE = { - "root_folder": "/path/to/root_folder", - "protocols": [ - { - "name": "triple", - "template": "test", - "sets": [ - { - "name": "default", - "template": "set", - "view": "dummy", - "outputs": { - "output1": "johndoe/single_integer/1", - "output2": "johndoe/single_integer/1", - "output3": "johndoe/single_integer/1" - } - } - ] - } - ] + "node2": { + "quantity": 4, + "priority": 5 } - - def setUp(self): - for path in [settings.TOOLCHAINS_ROOT, settings.EXPERIMENTS_ROOT, - settings.DATAFORMATS_ROOT, settings.ALGORITHMS_ROOT, - settings.CACHE_ROOT]: - if os.path.exists(path): - shutil.rmtree(path) - - user = User.objects.create_user('johndoe', 'johndoe@test.org', '1234') - - - # Create an environment and queue - environment = Environment(name='env1', version='1.0') - environment.save() - - queue = Queue(name='queue1', memory_limit=1024, time_limit=60, cores_per_slot=1, max_slots_per_user=10) - queue.save() - - queue.environments.add(environment) - - - DataFormat.objects.create_dataformat( - author=user, - name='single_integer', - short_description='description', - declaration={ - "value": "int32" - }, - ) - - db, errors = Database.objects.create_database('integers', declaration=self.DATABASE) - assert not errors, 'Database errors: %s' % errors - db.sharing = Database.PUBLIC - db.save() - - (self.toolchain1, errors) = Toolchain.objects.create_toolchain(user, - 'toolchain1', 'short description 1', - declaration={ - "blocks": [ { - "name": "addition1", - "inputs": [ - "a", - "b" - ], - "outputs": [ - "sum" - ], - "synchronized_channel": "dataset1" - }, - { - "name": "addition2", - "inputs": [ - "a", - "b" - ], - "outputs": [ - "sum" - ], - "synchronized_channel": "dataset1" - } - ], - "datasets": [ { - "name": "dataset1", - "outputs": [ - "output1", - "output2", - "output3" - ] - } - ], - "connections": [ { - "from": "dataset1.output1", - "to": "addition1.a", - "channel": "dataset1" - }, - { - "from": "dataset1.output2", - "to": "addition1.b", - "channel": "dataset1" - }, - { - "from": "addition1.sum", - "to": "addition2.a", - "channel": "dataset1" - }, - { - "from": "dataset1.output3", - "to": "addition2.b", - "channel": "dataset1" - }, - { - "to": "analysis.input", - "from": "addition2.sum", - "channel": "dataset1" - } - ], - "analyzers": [ - { - "inputs": [ - "input" - ], - "synchronized_channel": "dataset1", - "name": "analysis" - } - ], - "representation": { - "connections": {}, - "blocks": {}, - "channel_colors": {}, - }, - }) - assert not errors, 'Toolchain errors: %s' % errors - - (self.algorithm, errors) = Algorithm.objects.create_algorithm( - author=user, - name='sum', - short_description='description', - declaration="""{ - "language": "python", - "splittable": false, - "groups": [ - { - "inputs": { - "a": { "type": "johndoe/single_integer/1" }, - "b": { "type": "johndoe/single_integer/1" } - }, - "outputs": { - "sum": { "type": "johndoe/single_integer/1" } - } - } - ], - "parameters": { - } -}""", - - code="""class Algorithm: - - def process(self, inputs, outputs): - data = outputs['sum'].createData() - data.value = inputs['a'].data.value + inputs['b'].data.value - outputs['sum'].write(data) - return True -""") - assert not errors, 'Algorithm errors: %s' % errors - - system_user = User.objects.create_user(settings.SYSTEM_ACCOUNT, 'system@test.org', '1234') - - (dataformat, errors) = DataFormat.objects.create_dataformat( - author=system_user, - name='float', - short_description='description', - declaration={ - "value": "float64", - }, - ) - assert not errors, 'Data format errors: %s' % errors - - (dataformat, errors) = DataFormat.objects.create_dataformat( - author=system_user, - name='text', - short_description='description', - declaration={ - "text": "string", - }, - ) - assert not errors, 'Data format errors: %s' % errors - - - (self.algorithm, errors) = Algorithm.objects.create_algorithm( - author=user, - name='analysis', - short_description='description', - declaration="""{ - "language": "python", - "groups": [ - { - "inputs": { - "in": { "type": "johndoe/single_integer/1" } - } - } - ], - "results": { - "out_float": { "type": "float32" }, - "out_text": { "type": "string" } - }, - "parameters": { + } + } + ), + ]), + "workers": collections.OrderedDict([ + ("node1", { + "cores": 4, + "memory": 32*1024, + } + ), + ("node2", { + "cores": 4, + "memory": 16*1024, + } + ) + ]), + "environments": { + 'environment (1)': { + "name": 'environment', + "version": '1', + "short_description": "Test", + "description": "Test environment", + }, + }, } -}""", - - code="""class Algorithm: - def process(self, inputs, output): - # We don't really care - return True -""") - assert not errors, 'Algorithm errors: %s' % errors - super_user = User.objects.create_user('superuser', 'superuser@test.org', '1234') - super_user.is_superuser = True - super_user.save() +class CancelAllExperimentsAPI(APITestCase): - - def tearDown(self): - for path in [settings.TOOLCHAINS_ROOT, settings.EXPERIMENTS_ROOT, - settings.DATAFORMATS_ROOT, settings.ALGORITHMS_ROOT, - settings.CACHE_ROOT]: - if os.path.exists(path): - shutil.rmtree(path) - - -#---------------------------------------------------------- - - -class CancelAllExperimentsAPI(BackendAPIBase): def setUp(self): - super(CancelAllExperimentsAPI, self).setUp() self.url = reverse('backend:cancel-experiments') - def test_no_access_for_anonymous_user(self): - response = self.client.get(self.url) - self.checkResponse(response, 302) #redirects to login page - - - def test_no_access_for_non_superuser(self): - self.client.login(username='johndoe', password='1234') - response = self.client.get(self.url) - self.checkResponse(response, 403) - - -#---------------------------------------------------------- - - -class SchedulerConfigurationAPI(BackendAPIBase): - def setUp(self): - super(SchedulerConfigurationAPI, self).setUp() - self.url = reverse('api_backend:backend-api-scheduler-configuration') - - def test_no_access_for_anonymous_user(self): - response = self.client.get(self.url) - self.checkResponse(response, 403) - - - def test_no_access_for_non_superuser(self): - self.client.login(username='johndoe', password='1234') - response = self.client.get(self.url) - self.checkResponse(response, 403) - - -#---------------------------------------------------------- - - -class CacheCleanupAPI(BackendAPIBase): - def setUp(self): - super(CacheCleanupAPI, self).setUp() - self.url = reverse('api_backend:backend-api-cache-cleanup') def test_no_access_for_anonymous_user(self): response = self.client.get(self.url) - self.checkResponse(response, 403) + self.checkResponse(response, 302) #redirects to login page def test_no_access_for_non_superuser(self): + User.objects.create_user('johndoe', 'johndoe@test.org', '1234') self.client.login(username='johndoe', password='1234') response = self.client.get(self.url) self.checkResponse(response, 403) -#---------------------------------------------------------- +class CacheCleanUp(TestCase): -class BlockStartedAPI(BackendAPIBase): - def setUp(self): - super(BlockStartedAPI, self).setUp() - - User.objects.create_user('scheduler', - 'scheduler@test.org', '1234') - - self.client.login(username='johndoe', password='1234') - - url = reverse('api_experiments:list_create', args=['johndoe']) - response = self.client.post(url, - json.dumps({ - 'toolchain': 'johndoe/toolchain1/1', - 'declaration': BackendAPIBase.DECLARATION1, - 'name': 'experiment1', - }), content_type='application/json') - - self.checkResponse(response, 201, content_type='application/json') - - url = reverse('api_experiments:start', args=['johndoe', 'toolchain1', 1, 'experiment1']) - response = self.client.post(url) - self.checkResponse(response, 200, content_type='application/json') - - self.client.logout() - - self.experiment = Experiment.objects.get(author__username='johndoe', - toolchain__name='toolchain1', - name='experiment1', - ) - - self.url = reverse('api_backend:backend-api-block-started') - - - - def test_no_notification_for_anonymous_user(self): - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - }), content_type='application/json') - - self.checkResponse(response, 403) - - - def test_no_notification_for_non_scheduler_user(self): - self.client.login(username='johndoe', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - }), content_type='application/json') - - self.checkResponse(response, 403) - - - def test_bad_notification_request_with_unknown_experiment(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/unknown', - 'block-name': 'addition1', - }), content_type='application/json') - - self.checkResponse(response, 404) - + self.cache = tempfile.mkdtemp(prefix='beat_') - def test_bad_notification_request_with_unknown_block(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'unknown', - }), content_type='application/json') - - self.checkResponse(response, 404) - - - def test_bad_notification_request_without_json_content(self): - self.client.login(username='scheduler', password='1234') - response = self.client.put(self.url) - self.checkResponse(response, 400, content_type='application/json') - - - def test_bad_notification_request_with_invalid_json_content1(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'block-name': 'addition1', - }), content_type='application/json') - - self.checkResponse(response, 400, content_type='application/json') - - - def test_bad_notification_request_with_invalid_json_content2(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - }), content_type='application/json') - - self.checkResponse(response, 400, content_type='application/json') - - - def test_first_block_of_experiment(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - }), content_type='application/json') - - self.checkResponse(response, 204) - - experiment = Experiment.objects.get(id=self.experiment.id) - - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is None) - self.assertEqual(experiment.status, Experiment.RUNNING) - self.assertEqual(experiment.blocks.get(name='addition1').status, Block.PROCESSING) - self.assertEqual(experiment.blocks.get(name='addition2').status, Block.NOT_CACHED) - self.assertEqual(experiment.blocks.get(name='analysis').status, Block.NOT_CACHED) + def tearDown(self): + shutil.rmtree(self.cache) + if os.path.exists(settings.CACHE_ROOT): + shutil.rmtree(settings.CACHE_ROOT) + if os.path.exists(settings.PREFIX): + shutil.rmtree(settings.PREFIX) - def test_other_block_of_running_experiment(self): - self.client.login(username='scheduler', password='1234') - experiment = Experiment.objects.get(id=self.experiment.id) + def touch(self, f, times=None): + """Replicates the `touch' command-line utility""" + with open(f, 'a'): os.utime(f, times) - experiment.start_date = datetime.now() - experiment.status = Experiment.RUNNING - experiment.save() - block = experiment.blocks.get(name='addition1') - block.status = Block.CACHED - block.save() + def J(self, *args): + return os.path.join(*((self.cache,) + args)) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition2', - }), content_type='application/json') - self.checkResponse(response, 204) + def prepare_cleanup_full(self): - experiment.refresh_from_db() + # creates a temporary directory structure + os.makedirs(self.J('a', 'b', 'c')) + os.makedirs(self.J('a', 'c', 'd')) + os.makedirs(self.J('a', 'c', 'e')) + self.touch(self.J('a', 'b', 'c', 'd.json')) + self.touch(self.J('a', 'c', 'd', 'e.json')) - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is None) - self.assertEqual(experiment.status, Experiment.RUNNING) - addition1 = experiment.blocks.get(name='addition1') - self.assertEqual(addition1.status, Block.CACHED) - self.assertTrue(addition1.start_date is None) #no start received - self.assertTrue(addition1.end_date is not None) + def check_cleanup_full(self): - addition2 = experiment.blocks.get(name='addition2') - self.assertEqual(addition2.status, Block.PROCESSING) - self.assertTrue(addition2.start_date is not None) - self.assertTrue(addition2.end_date is None) + assert not os.listdir(self.cache) - analysis = experiment.blocks.get(name='analysis') - self.assertEqual(analysis.status, Block.NOT_CACHED) - self.assertTrue(analysis.start_date is None) - self.assertTrue(analysis.end_date is None) + def test_cache_cleanup_full(self): -#---------------------------------------------------------- + self.prepare_cleanup_full() + cleanup_cache(self.cache, delete=True) + self.check_cleanup_full() -class BlockFinishedAPI(BackendAPIBase): + def test_cmd_cleanup_full(self): - def setUp(self): - super(BlockFinishedAPI, self).setUp() + self.prepare_cleanup_full() + management.call_command('cleanup_cache', path=self.cache, + verbosity=0, delete=True) + self.check_cleanup_full() - User.objects.create_user('scheduler', 'scheduler@test.org', '1234') - self.client.login(username='johndoe', password='1234') + def prepare_cleanup_aged(self): - url = reverse('api_experiments:list_create', args=['johndoe']) - response = self.client.post(url, - json.dumps({ - 'toolchain': 'johndoe/toolchain1/1', - 'declaration': BackendAPIBase.DECLARATION1, - 'name': 'experiment1', - }), content_type='application/json') + two_min_ago = time.time() - 60*2 - self.checkResponse(response, 201, content_type='application/json') + # creates a temporary directory structure + os.makedirs(self.J('a', 'b', 'c')) + os.makedirs(self.J('a', 'c', 'd')) + os.makedirs(self.J('a', 'c', 'e')) + self.touch(self.J('a', 'b', 'c', 'd.json'), (two_min_ago, two_min_ago)) + self.touch(self.J('a', 'c', 'd', 'e.json')) - url = reverse('api_experiments:start', args=['johndoe', 'toolchain1', 1, 'experiment1']) - response = self.client.post(url) - self.checkResponse(response, 200, content_type='application/json') - self.client.logout() + def check_cleanup_aged(self): - self.experiment = Experiment.objects.get(author__username='johndoe', - toolchain__name='toolchain1', - name='experiment1', - ) + assert os.path.exists(self.J('a', 'c', 'd', 'e.json')) + assert not os.path.exists(self.J('a', 'b', 'c')) + assert not os.path.exists(self.J('a', 'b', 'c', 'd.json')) + assert not os.path.exists(self.J('a', 'b', 'e')) - self.url = reverse('api_backend:backend-api-block-finished') + def test_cache_cleanup_aged(self): - def start_block(self, block_name): - response = self.client.put(reverse('api_backend:backend-api-block-started'), - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': block_name, - }), content_type='application/json') + self.prepare_cleanup_aged() + cleanup_cache(self.cache, age_in_minutes=2, delete=True) + self.check_cleanup_aged() - self.checkResponse(response, 204) + def test_cmd_cleanup_aged(self): - def test_no_notification_for_anonymous_user(self): - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'statistics': None, - }), content_type='application/json') + self.prepare_cleanup_aged() + management.call_command('cleanup_cache', path=self.cache, + verbosity=0, olderthan=2, delete=True) + self.check_cleanup_aged() - self.checkResponse(response, 403) + def prepare_cleanup_lock(self): - def test_no_notification_for_non_scheduler_user(self): - self.client.login(username='johndoe', password='1234') + two_min_ago = time.time() - 60*2 + ten_min_ago = time.time() - 60*10 - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'statistics': None, - }), content_type='application/json') + # creates a temporary directory structure + os.makedirs(self.J('a', 'b', 'c')) + os.makedirs(self.J('a', 'c', 'd')) + os.makedirs(self.J('a', 'c', 'e')) + self.touch(self.J('a', 'b', 'c', 'd.json'), (two_min_ago, two_min_ago)) + self.touch(self.J('a', 'c', 'd', 'e.json'), (ten_min_ago, ten_min_ago)) - self.checkResponse(response, 403) + self.touch(self.J('a', 'c', 'd', 'e.lock')) #create a lock - def test_bad_notification_request_with_unknown_experiment(self): - self.client.login(username='scheduler', password='1234') + def check_cleanup_lock(self): - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') + assert os.path.exists(self.J('a', 'c', 'd', 'e.json')) + assert not os.path.exists(self.J('a', 'b', 'c')) + assert not os.path.exists(self.J('a', 'b', 'c', 'd.json')) + assert not os.path.exists(self.J('a', 'b', 'e')) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/unknown', - 'block-name': 'addition1', - 'state': 'processed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') - self.checkResponse(response, 404) + def test_cache_cleanup_lock(self): + self.prepare_cleanup_lock() + cleanup_cache(self.cache, delete=True) + self.check_cleanup_lock() - def test_bad_notification_request_with_unknown_block(self): - self.client.login(username='scheduler', password='1234') - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'unknown', - 'state': 'processed', - 'outputs': ['deadbeef'], - 'statistics': None, - }), content_type='application/json') + def test_cmd_cleanup_lock(self): - self.checkResponse(response, 404) + self.prepare_cleanup_lock() + management.call_command('cleanup_cache', path=self.cache, + verbosity=0, delete=True) + self.check_cleanup_lock() - def test_bad_notification_request_with_invalid_state(self): - self.client.login(username='scheduler', password='1234') +class BaseBackendTestCase(TestCase): - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'unknown', - 'statistics': None, - }), content_type='application/json') - self.checkResponse(response, 400, content_type='application/json') + @classmethod + def setUpTestData(cls): + install.create_sites() + system_user, plot_user, user = install.create_users('user', 'user') + install.add_group('Default') + setup_backend(qsetup.DEFAULT_CONFIGURATION) - def test_bad_notification_request_without_json_content(self): - self.client.login(username='scheduler', password='1234') - response = self.client.put(self.url) - self.checkResponse(response, 400, content_type='application/json') + Worker.objects.update(active=True) + env = Environment.objects.first() + queue = Queue.objects.first() + template_data = dict( + system_user = system_user, + plot_user = plot_user, + user = user, + private = False, + queue = queue.name, + environment = dict(name=env.name, version=env.version), + ) + prefix = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(sys.argv[0]))), + 'src', + 'beat.examples', + ) + install.install_contributions(prefix, 'system', template_data) + install.install_contributions(prefix, 'test', template_data) - def test_bad_notification_request_with_invalid_json_content1(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'block-name': 'addition1', - 'state': 'processed', - 'statistics': None, - }), content_type='application/json') - - self.checkResponse(response, 400, content_type='application/json') - - - def test_bad_notification_request_with_invalid_json_content2(self): - self.client.login(username='scheduler', password='1234') - - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'state': 'processed', - 'statistics': None, - }), content_type='application/json') - self.checkResponse(response, 400, content_type='application/json') + def tearDown(self): + if os.path.exists(settings.CACHE_ROOT): + shutil.rmtree(settings.CACHE_ROOT) + if os.path.exists(settings.PREFIX): + shutil.rmtree(settings.PREFIX) - def test_bad_notification_request_with_invalid_json_content3(self): - self.client.login(username='scheduler', password='1234') + def check_single(self, xp): + '''Checks user/user/single/1/single''' - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'statistics': None, - }), content_type='application/json') + self.assertEqual(xp.blocks.count(), 2) - self.checkResponse(response, 400, content_type='application/json') + b0 = xp.blocks.all()[0] + self.assertEqual(b0.name, 'echo') + self.assertEqual(b0.status, Block.NOT_CACHED) + self.assertEqual(b0.algorithm, + Algorithm.objects.get(name='integers_echo')) + self.assertEqual(b0.dependencies.count(), 0) + self.assertEqual(b0.dependents.count(), 1) + self.assertEqual(b0.job.status, Job.QUEUED) + self.assertEqual(b0.job.parent, None) + self.assertEqual(b0.job.child_, None) + self.assertEqual(b0.queue.name, 'queue') + self.assertEqual(b0.environment.name, 'environment') + self.assertEqual(b0.required_slots, 1) + self.assertEqual(b0.inputs.count(), 1) + self.assertEqual(b0.outputs.count(), 1) + self.assertEqual(b0.job.splits.count(), 1) + self.assertEqual(b0.job.splits.get().status, Job.QUEUED) + assert b0.job.splits.get().worker is None + assert not b0.done() - def test_bad_notification_request_with_invalid_json_content4(self): - self.client.login(username='scheduler', password='1234') + b1 = xp.blocks.all()[1] - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'statistics': None, - }), content_type='application/json') + self.assertEqual(b1.name, 'analysis') + self.assertEqual(b1.status, Block.NOT_CACHED) + self.assertEqual(b1.algorithm, + Algorithm.objects.get(name='integers_echo_analyzer')) + self.assertEqual(b1.dependencies.count(), 1) + self.assertEqual(b1.dependents.count(), 0) + self.assertEqual(b1.job.status, Job.QUEUED) + self.assertEqual(b1.job.parent, None) + self.assertEqual(b1.job.child_, None) + self.assertEqual(b1.queue.name, 'queue') + self.assertEqual(b1.environment.name, 'environment') + self.assertEqual(b1.required_slots, 1) + self.assertEqual(b1.inputs.count(), 1) + self.assertEqual(b1.outputs.count(), 1) + self.assertEqual(b1.job.splits.count(), 0) #not scheduled yet - self.checkResponse(response, 400, content_type='application/json') + assert not b1.done() - def test__not_cached_block__processed(self): - self.client.login(username='scheduler', password='1234') - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') +class BackendSetup(BaseBackendTestCase): - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') - self.checkResponse(response, 204) + def check_default_config(self): - experiment.refresh_from_db() + # checks all is there + self.assertEqual(dump_backend(), qsetup.DEFAULT_CONFIGURATION) - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is None) - self.assertEqual(experiment.status, Experiment.RUNNING) + worker = Worker.objects.get() + queue = Queue.objects.get() + Worker.objects.update(active=True) - addition1 = experiment.blocks.get(name='addition1') - self.assertEqual(addition1.status, Block.CACHED) - self.assertTrue(addition1.start_date is None) #no start received! - self.assertTrue(addition1.end_date is not None) + self.assertEqual(worker.available_cores(), qsetup.CORES) + self.assertEqual(list(worker.slots.values_list('id', flat=True)), + list(queue.slots.values_list('id', flat=True))) - addition2 = experiment.blocks.get(name='addition2') - self.assertEqual(addition2.status, Block.NOT_CACHED) - self.assertTrue(addition2.start_date is None) - self.assertTrue(addition2.end_date is None) + # worker has no job splits assigned to it + self.assertEqual(worker.splits.count(), 0) - analysis = experiment.blocks.get(name='analysis') - self.assertEqual(analysis.status, Block.NOT_CACHED) - self.assertTrue(analysis.start_date is None) - self.assertTrue(analysis.end_date is None) + self.assertEqual(queue.availability(), qsetup.CORES) + self.assertEqual(queue.number_of_slots(), qsetup.CORES) + self.assertEqual(queue.worker_availability(), [worker]) - def test__not_cached_block__failed(self): - self.client.login(username='scheduler', password='1234') + # checks the single slot and priority + slot = queue.slots.get() + self.assertEqual(slot.quantity, qsetup.CORES) + self.assertEqual(slot.priority, 0) + self.assertEqual(slot.worker, worker) - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') + # checks no orphan slots exist + self.assertEqual(Slot.objects.filter(queue=None).count(), 0) + self.assertEqual(Slot.objects.filter(worker=None).count(), 0) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'failed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') - self.checkResponse(response, 204) + def test_setup(self): + self.check_default_config() - def test__not_cached_block__cancelled(self): - self.client.login(username='scheduler', password='1234') - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') + def test_cmd_reset(self): - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'cancelled', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + # installs the default configuration command + management.call_command('qsetup', verbosity=0, reset=True) + self.check_default_config() - self.checkResponse(response, 204) + def check_noprior_config(self): - def test__cached_block__processed(self): - self.client.login(username='scheduler', password='1234') + qs = Queue.objects.all() - experiment = Experiment.objects.get(id=self.experiment.id) + self.assertEqual(qs.count(), 3) - block = experiment.blocks.get(name='addition1') - block.status = Block.CACHED - block.save() + q1, q2, q3 = qs.order_by('name') - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + self.assertEqual(q1.name, 'q1') + self.assertEqual(q2.name, 'q2') + self.assertEqual(q3.name, 'q4') + self.assertEqual(q1.splits().count(), 0) + self.assertEqual(q2.splits().count(), 0) + self.assertEqual(q3.splits().count(), 0) - self.checkResponse(response, 204) + self.assertEqual(q1.number_of_slots(), 4) + self.assertEqual(q2.number_of_slots(), 2) + self.assertEqual(q3.number_of_slots(), 1) + self.assertEqual(q1.availability(), 4) + self.assertEqual(q2.availability(), 2) + self.assertEqual(q3.availability(), 1) + self.assertEqual(q1.environments.count(), 1) + self.assertEqual(q2.environments.count(), 1) + self.assertEqual(q3.environments.count(), 1) + self.assertEqual(q1.environments.first(), q2.environments.first()) + self.assertEqual(q2.environments.first(), q3.environments.first()) - def test__cached_block__failed(self): - self.client.login(username='scheduler', password='1234') + env = q1.environments.first() - experiment = Experiment.objects.get(id=self.experiment.id) + self.assertEqual(env.name, 'environment') + self.assertEqual(env.version, '1') - block = experiment.blocks.get(name='addition1') - block.status = Block.CACHED - block.save() + self.assertEqual(q1.slots.count(), 1) + self.assertEqual(q2.slots.count(), 1) + self.assertEqual(q3.slots.count(), 1) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'failed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + slot1 = q1.slots.first() + slot2 = q2.slots.first() + slot3 = q3.slots.first() - self.checkResponse(response, 204) + self.assertEqual(slot1.quantity, 4) + self.assertEqual(slot1.priority, 0) + self.assertEqual(slot1.queue, q1) + self.assertEqual(slot2.quantity, 2) + self.assertEqual(slot2.priority, 0) + self.assertEqual(slot2.queue, q2) + self.assertEqual(slot3.quantity, 1) + self.assertEqual(slot3.priority, 0) + self.assertEqual(slot3.queue, q3) + worker1 = slot1.worker + worker2 = slot2.worker + worker3 = slot3.worker - def test__cached_block__cancelled(self): - self.client.login(username='scheduler', password='1234') + self.assertEqual(worker1, worker2) + self.assertEqual(worker2, worker3) - experiment = Experiment.objects.get(id=self.experiment.id) + self.assertEqual(worker1.name, 'node1') + self.assertEqual(list(worker1.splits.all()), []) + self.assertEqual(worker1.memory, 16*1024) + self.assertEqual(worker1.cores, 4) + self.assertEqual(worker1.available_cores(), 4) - block = experiment.blocks.get(name='addition1') - block.status = Block.CACHED - block.save() + self.assertEqual(worker1.slots.count(), 3) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'cancelled', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + self.assertEqual(set(worker1.slots.all()), + set(list(q1.slots.all()) + list(q2.slots.all()) + \ + list(q3.slots.all()))) - self.checkResponse(response, 204) + avail1 = q1.worker_availability() + self.assertEqual(avail1, [worker1]) + avail2 = q2.worker_availability() + self.assertEqual(avail2, [worker1]) - def test__failed_block__processed(self): - self.client.login(username='scheduler', password='1234') + avail3 = q3.worker_availability() + self.assertEqual(avail2, [worker1]) - experiment = Experiment.objects.get(id=self.experiment.id) + # checks no orphan slots exist + self.assertEqual(Slot.objects.filter(queue=None).count(), 0) + self.assertEqual(Slot.objects.filter(worker=None).count(), 0) - block = experiment.blocks.get(name='addition1') - block.status = Block.FAILED - block.save() - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + def test_reconfigure_noprior(self): - self.checkResponse(response, 400, content_type='application/json') + setup_backend(QUEUES_WITHOUT_PRIORITY) + Worker.objects.update(active=True) + self.check_noprior_config() - def test__failed_block__failed(self): - self.client.login(username='scheduler', password='1234') + def test_reconfigure_fail_qenv_used(self): - experiment = Experiment.objects.get(id=self.experiment.id) + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) - block = experiment.blocks.get(name='addition1') - block.status = Block.FAILED - block.save() + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'failed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + try: + setup_backend(QUEUES_WITHOUT_PRIORITY) + except RuntimeError as e: + assert str(e).find('on the following queue/environment combinations') != -1 + else: + assert False, 'Queue re-configuration worked with q/env in use' + + + def check_prior_config(self): + + qs = Queue.objects.all() - self.checkResponse(response, 400, content_type='application/json') + self.assertEqual(qs.count(), 3) + q1, q1_special, q2 = qs.order_by('name') - def test__failed_block__cancelled(self): - self.client.login(username='scheduler', password='1234') + self.assertEqual(q1.name, 'q1') + self.assertEqual(q2.name, 'q2') + self.assertEqual(q1_special.name, 'q1_special') + self.assertEqual(q1.splits().count(), 0) + self.assertEqual(q2.splits().count(), 0) + self.assertEqual(q1_special.splits().count(), 0) - experiment = Experiment.objects.get(id=self.experiment.id) + self.assertEqual(q1.number_of_slots(), 8) + self.assertEqual(q2.number_of_slots(), 4) + self.assertEqual(q1_special.number_of_slots(), 8) + self.assertEqual(q1.availability(), 8) + self.assertEqual(q2.availability(), 4) + self.assertEqual(q1_special.availability(), 8) + self.assertEqual(q1.environments.count(), 1) + self.assertEqual(q2.environments.count(), 1) + self.assertEqual(q1_special.environments.count(), 1) - block = experiment.blocks.get(name='addition1') - block.status = Block.FAILED - block.save() + self.assertEqual(q1.environments.first(), q2.environments.first()) + self.assertEqual(q2.environments.first(), + q1_special.environments.first()) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'cancelled', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + env = q1.environments.first() - self.checkResponse(response, 400, content_type='application/json') + self.assertEqual(env.name, 'environment') + self.assertEqual(env.version, '1') + self.assertEqual(q1.slots.count(), 2) + self.assertEqual(q1_special.slots.count(), 2) + self.assertEqual(q2.slots.count(), 2) - def test__processing_block__processed(self): - self.client.login(username='scheduler', password='1234') + slot11, slot12 = q1.slots.all() + slot1_special1, slot1_special2 = q1_special.slots.all() + slot21, slot22 = q2.slots.all() - self.start_block('addition1') + self.assertEqual(slot11.quantity, 4) + self.assertEqual(slot11.priority, 5) + self.assertEqual(slot12.quantity, 4) + self.assertEqual(slot12.priority, 0) + self.assertEqual(slot11.queue, q1) + self.assertEqual(slot12.queue, q1) - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') + self.assertEqual(slot21.quantity, 2) + self.assertEqual(slot21.priority, 0) + self.assertEqual(slot22.quantity, 2) + self.assertEqual(slot22.priority, 10) + self.assertEqual(slot21.queue, q2) + self.assertEqual(slot22.queue, q2) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'processed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + self.assertEqual(slot1_special1.quantity, 4) + self.assertEqual(slot1_special1.priority, 0) + self.assertEqual(slot1_special2.quantity, 4) + self.assertEqual(slot1_special2.priority, 5) + self.assertEqual(slot1_special1.queue, q1_special) + self.assertEqual(slot1_special2.queue, q1_special) - self.checkResponse(response, 204) + worker1 = slot11.worker + worker2 = slot12.worker + worker21 = slot21.worker + worker22 = slot22.worker + worker1_special1 = slot1_special1.worker + worker1_special2 = slot1_special2.worker - experiment.refresh_from_db() + self.assertEqual(worker1, worker21) + self.assertEqual(worker1, worker1_special1) + self.assertEqual(worker2, worker22) + self.assertEqual(worker2, worker1_special2) - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is None) - self.assertEqual(experiment.status, Experiment.RUNNING) + self.assertEqual(worker1.name, 'node1') + self.assertEqual(worker1.splits.count(), 0) + self.assertEqual(worker1.memory, 32*1024) + self.assertEqual(worker1.cores, 4) + self.assertEqual(worker1.available_cores(), 4) - addition1 = experiment.blocks.get(name='addition1') - self.assertEqual(addition1.status, Block.CACHED) - self.assertTrue(addition1.start_date is not None) - self.assertTrue(addition1.end_date is not None) + self.assertEqual(worker2.name, 'node2') + self.assertEqual(worker2.splits.count(), 0) + self.assertEqual(worker2.memory, 16*1024) + self.assertEqual(worker2.cores, 4) + self.assertEqual(worker2.available_cores(), 4) - addition2 = experiment.blocks.get(name='addition2') - self.assertEqual(addition2.status, Block.NOT_CACHED) - self.assertTrue(addition2.start_date is None) - self.assertTrue(addition2.end_date is None) + self.assertEqual(worker1.slots.count(), 3) + self.assertEqual(worker2.slots.count(), 3) - analysis = experiment.blocks.get(name='analysis') - self.assertEqual(analysis.status, Block.NOT_CACHED) - self.assertTrue(analysis.start_date is None) - self.assertTrue(analysis.end_date is None) + avail1 = q1.worker_availability() + self.assertEqual(avail1, [worker1, worker2]) + avail2 = q2.worker_availability() + self.assertEqual(avail2, [worker2, worker1]) - def test__processing_block__failed(self): - self.client.login(username='scheduler', password='1234') + avail1_special = q1_special.worker_availability() + self.assertEqual(avail1_special, [worker2, worker1]) - self.start_block('addition1') + # checks no orphan slots exist + self.assertEqual(Slot.objects.filter(queue=None).count(), 0) + self.assertEqual(Slot.objects.filter(worker=None).count(), 0) - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'failed', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + def test_reconfigure_priors(self): - self.checkResponse(response, 204) + setup_backend(PRIORITY_QUEUES) + Worker.objects.update(active=True) + self.check_prior_config() - experiment.refresh_from_db() - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is not None) - self.assertEqual(experiment.status, Experiment.FAILED) +class Scheduling(BaseBackendTestCase): - addition1 = experiment.blocks.get(name='addition1') - self.assertEqual(addition1.status, Block.FAILED) - self.assertTrue(addition1.start_date is not None) - self.assertTrue(addition1.end_date is not None) - addition2 = experiment.blocks.get(name='addition2') - self.assertEqual(addition2.status, Block.NOT_CACHED) - self.assertTrue(addition2.start_date is None) - self.assertTrue(addition2.end_date is None) + def test_success(self): - analysis = experiment.blocks.get(name='analysis') - self.assertEqual(analysis.status, Block.NOT_CACHED) - self.assertTrue(analysis.start_date is None) - self.assertTrue(analysis.end_date is None) + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) - def test__unique_processing_block__cancelled(self): - self.client.login(username='scheduler', password='1234') + # schedules the experiment and check it + xp.schedule() + self.check_single(xp) - self.start_block('addition1') + # schedules the first runnable block + schedule() - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') + assigned_splits = JobSplit.objects.filter(worker__isnull=False) + worker = Worker.objects.get() - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'cancelled', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + self.assertEqual(assigned_splits.count(), 1) + split = assigned_splits.first() + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES) - self.checkResponse(response, 204) + # simulate job start on worker + split.start() + self.assertEqual(split.job.status, Job.PROCESSING) + self.assertEqual(split.job.block.status, Block.PROCESSING) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) - experiment.refresh_from_db() + self.assertEqual(worker.available_cores(), qsetup.CORES-1) - self.assertTrue(experiment.creation_date is not None) - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is not None) - self.assertEqual(experiment.status, Experiment.FAILED) + # no job can be run right now + schedule() + assigned_splits = JobSplit.objects.filter(worker__isnull=False, + status=Job.QUEUED) + self.assertEqual(assigned_splits.count(), 0) - self.assertEqual(experiment.blocks.count(), 3) + # simulate end job signal + split.end(Result(status=0)) + self.assertEqual(split.job.status, Job.COMPLETED) + self.assertEqual(split.job.block.status, Block.CACHED) + self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + ''' + nose.tools.eq_(block.result.stats.as_dict(), result.stats) - def test__processing_block__cancelled(self): - self.client.login(username='scheduler', password='1234') + self.assertEqual(worker.available_cores(), CORES) - self.start_block('addition1') - self.start_block('addition2') + # since this job was successful, the next one should be ready to run - experiment = Experiment.objects.get(id=self.experiment.id) - block = experiment.blocks.get(name='addition1') + # schedules the last block of the experiment + jobs = s.jobs_to_run() + self.assertEqual(len(jobs), 1) + job, worker = jobs.popitem(last=False) - response = self.client.put(self.url, - json.dumps({ - 'experiment-name': 'johndoe/toolchain1/1/experiment1', - 'block-name': 'addition1', - 'state': 'cancelled', - 'outputs': [o.encode() for o in block.outputs.values_list('hash', flat=True)], - 'statistics': None, - }), content_type='application/json') + check_block_assignment(job, worker, xp, 'analysis', HOSTNAME, CORES) - self.checkResponse(response, 204) + check_start_job(s, job, worker, xp) + self.assertEqual(worker.available_cores(), CORES-1) - experiment.refresh_from_db() + # no job can be run right now + jobs = s.jobs_to_run() + self.assertEqual(len(jobs), 0) - self.assertTrue(experiment.start_date is not None) - self.assertTrue(experiment.end_date is not None) - self.assertEqual(experiment.status, Experiment.CANCELING) + # simulate end job signal + check_end_job_successfuly(s, job) - addition1 = experiment.blocks.get(name='addition1') - self.assertEqual(addition1.status, Block.NOT_CACHED) - self.assertTrue(addition1.start_date is not None) - self.assertTrue(addition1.end_date is None) + # no more jobs scheduled + jobs = s.jobs_to_run() + self.assertEqual(len(jobs), 0) - addition2 = experiment.blocks.get(name='addition2') - self.assertEqual(addition2.status, Block.PROCESSING) - self.assertTrue(addition2.start_date is not None) - self.assertTrue(addition2.end_date is None) + # experiment is now 'completed' + self.assertEqual(xp.state, 'completed') + self.assertEqual(xp.dispatch, 'pending') - analysis = experiment.blocks.get(name='analysis') - self.assertEqual(analysis.status, Block.NOT_CACHED) - self.assertTrue(analysis.start_date is None) - self.assertTrue(analysis.end_date is None) + self.assertEqual(worker.available_cores(), CORES) + '''