diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py index cea079a9750396051b20955226758fbaab20c8db..e727ff325b0d2011aa2705f290555a39a705a955 100644 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -99,6 +99,17 @@ def _select_splits_for_queue(queue): return splits_to_consider[:queue.availability()] +@transaction.atomic +def send_experiment_emails(): + '''Sends experiment e-mails for experiments that just finished''' + + from ..experiments.models import Experiment + + qs = Experiment.objects.select_for_update().filter(email_dispatch=True) + for k in qs: k.email() + qs.update(email_dispatch=False) + + @transaction.atomic def schedule(): '''Schedules job splits that can run now, respecting user/queue usage @@ -286,7 +297,16 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): ''' # lock split - split = JobSplit.objects.select_for_update().get(pk=split_pk) + try: + split = JobSplit.objects.select_for_update().get(pk=split_pk) + except JobSplit.DoesNotExist: + from traceback import format_exc + logger.critical("Job split(pk=%d) does not exist. %s", + split_pk, format_exc()) + return + + # mark split start + split.start() config = simplejson.loads(split.job.block.command) @@ -300,7 +320,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): (split.split_index+1, split.job.block.required_slots, split.process_id, split.worker, split.job.block.name, split.job.block.experiment.fullname()) - logger.error(message) + logger.warn(message) split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, syserr=message)) return @@ -363,7 +383,7 @@ def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): except: from traceback import format_exc - logger.error(format_exc()) + logger.warn(format_exc()) logger.warn("Process `%s' for split `%s' ended with an error", split.process_id, split) split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, @@ -405,7 +425,7 @@ def worker_update(): @transaction.atomic -def work(environments, cpulimit, process, settings): +def work(environments, cpulimit, process, django_settings): '''Launches user code on isolated processes This function is supposed to be called asynchronously, by a scheduling @@ -429,8 +449,8 @@ def work(environments, cpulimit, process, settings): process (str): The path to the ``process.py`` program to use for running the user code on isolated processes. - settings (str): The name of the module containing the Django settings for - use with the process program + django_settings (str): The name of the module containing the Django + settings for use with the process program ''' @@ -444,7 +464,7 @@ def work(environments, cpulimit, process, settings): j.end(None, Job.CANCELLED) # cmdline base argument - cmdline = [process, '--settings=%s' % settings] + cmdline = [process, '--settings=%s' % django_settings] if cpulimit is not None: cmdline += ['--cpulimit=%s' % cpulimit] if settings.DEBUG: cmdline += ['-vv'] diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index 9e3beccd3a9a4bc6a1cff25fb4b209f910b7c18e..014cbff0e260ce73f26114270f2fafb3b770229e 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -36,7 +36,7 @@ from django.conf import settings from django.core.urlresolvers import reverse from django.core import management from django.contrib.auth.models import User, Group -from django.test import TestCase +from django.test import TestCase, TransactionTestCase from guardian.shortcuts import get_perms @@ -49,7 +49,7 @@ from ..statistics.models import HourlyStatistics from .models import Queue, Worker, Slot, Environment, Job, JobSplit, Result from .utils import cleanup_cache, dump_backend, setup_backend from .management.commands import qsetup -from .schedule import schedule, process, refresh_environments +from .schedule import schedule, process, work EXPERIMENT_HASHES = { @@ -1958,30 +1958,34 @@ class Working(BaseBackendTestCase): def setUp(self): - # load default environments - self.environments = refresh_environments() + from beat.core.async import resolve_cpulimit_path + self.cpulimit = resolve_cpulimit_path(None) + + from .schedule import find_environments, resolve_process_path + self.process = resolve_process_path() + self.environments = find_environments(None) + self.env1_execute = self.environments['environment (1)']['execute'] + + self.settings = 'beat.web.settings.test' + if not os.path.exists(settings.CACHE_ROOT): os.makedirs(settings.CACHE_ROOT) - self.cpulimit = None - candidate = os.path.join(os.path.dirname(sys.argv[0]), 'cpulimit') - if os.path.exists(candidate): self.cpulimit = candidate - def tearDown(self): if os.path.exists(settings.CACHE_ROOT): shutil.rmtree(settings.CACHE_ROOT) - def check_stats_success(self, split): + def check_stats_success(self, block): - assert abs(split.job.block.speed_up_real() - 1.0) < 0.1 - assert abs(split.job.block.speed_up_maximal() - 1.0) < 0.1 - assert split.job.block.linear_execution_time() > 0.0 - assert split.job.block.queuing_time() > 0.0 - assert split.job.block.stdout() == '' - assert split.job.block.stderr() == '' - assert split.job.block.error_report() == '' + assert abs(block.speed_up_real() - 1.0) < 0.1 + assert abs(block.speed_up_maximal() - 1.0) < 0.1 + assert block.linear_execution_time() > 0.0 + assert block.queuing_time() > 0.0 + assert block.stdout() == '' + assert block.stderr() == '' + assert block.error_report() == '' def test_success(self): @@ -2013,20 +2017,21 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # at this point, job should have been successful - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + xp.refresh_from_db() + block = xp.blocks.first() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(xp.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) @@ -2050,19 +2055,20 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.DONE) + xp.refresh_from_db() + block = xp.blocks.last() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(xp.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the last block is done self.assertEqual(Job.objects.count(), 0) @@ -2101,26 +2107,27 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # at this point, job should have failed - self.assertEqual(split.job.status, Job.FAILED) - self.assertEqual(split.job.block.status, Block.FAILED) - self.assertEqual(split.job.block.experiment.status, Experiment.FAILED) + xp.refresh_from_db() + block = xp.blocks.first() + self.assertEqual(block.status, Block.FAILED) + self.assertEqual(block.experiment.status, Experiment.FAILED) # all caches have not been generated - assert all([not k.exists() for k in split.job.block.outputs.all()]) + assert all([not k.exists() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - assert abs(split.job.block.speed_up_real() - 1.0) < 0.1 - assert abs(split.job.block.speed_up_maximal() - 1.0) < 0.1 - assert split.job.block.linear_execution_time() > 0.0 - assert split.job.block.queuing_time() > 0.0 - assert split.job.block.stdout() == '' - assert split.job.block.stderr() == '' - assert split.job.block.error_report().find('Error') != -1 + assert abs(block.speed_up_real() - 1.0) < 0.1 + assert abs(block.speed_up_maximal() - 1.0) < 0.1 + assert block.linear_execution_time() > 0.0 + assert block.queuing_time() > 0.0 + assert block.stdout() == '' + assert block.stderr() == '' + assert block.error_report().find('Error') != -1 # assert we have no database traces after the block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) @@ -2160,20 +2167,21 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # at this point, job should have been successful - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + xp.refresh_from_db() + block = xp.blocks.first() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(block.experiment.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) @@ -2197,19 +2205,20 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.DONE) + xp.refresh_from_db() + block = xp.blocks.last() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(block.experiment.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the last block is done self.assertEqual(Job.objects.count(), 0) @@ -2257,20 +2266,21 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # at this point, job should have been successful - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + xp.refresh_from_db() + block = xp.blocks.first() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(block.experiment.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) @@ -2305,19 +2315,20 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.DONE) + xpc.refresh_from_db() + block = xpc.blocks.last() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(block.experiment.status, Experiment.DONE) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the last block is done self.assertEqual(Job.objects.count(), 0) @@ -2366,20 +2377,21 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split, self.environments, self.cpulimit) + process(split.pk, self.env1_execute, self.cpulimit) # at this point, job should have been successful - self.assertEqual(split.job.status, Job.COMPLETED) - self.assertEqual(split.job.block.status, Block.CACHED) - self.assertEqual(split.job.block.experiment.status, Experiment.RUNNING) + xp.refresh_from_db() + block = xp.blocks.first() + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(block.experiment.status, Experiment.RUNNING) # all caches must be have been generated - assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + assert all([k.index_checksums() for k in block.outputs.all()]) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) - self.check_stats_success(split) + self.check_stats_success(block) # assert we have no database traces after the block is done self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) @@ -2396,3 +2408,148 @@ class Working(BaseBackendTestCase): self.assertEqual([k.status for k in xpc.blocks.all()], [Block.CACHED, Block.NOT_CACHED]) assert xpc.blocks.last().job.parent == xp.blocks.last().job + + +class WorkingExternally(TransactionTestCase): + + + def setUp(self): + + from beat.core.async import resolve_cpulimit_path + self.cpulimit = resolve_cpulimit_path(None) + + from .schedule import find_environments, resolve_process_path + self.process = resolve_process_path() + self.environments = find_environments(None) + + self.settings = 'beat.web.settings.test' + + if not os.path.exists(settings.CACHE_ROOT): + os.makedirs(settings.CACHE_ROOT) + + install.create_sites() + system_user, plot_user, user = install.create_users('user', 'user') + install.add_group('Default') + + setup_backend(qsetup.DEFAULT_CONFIGURATION) + + Worker.objects.update(active=True) + env = Environment.objects.first() + queue = Queue.objects.first() + + template_data = dict( + system_user = system_user, + plot_user = plot_user, + user = user, + private = False, + queue = queue.name, + environment = dict(name=env.name, version=env.version), + ) + prefix = os.path.join( + os.path.dirname(os.path.dirname(os.path.realpath(sys.argv[0]))), + 'src', + 'beat.examples', + ) + install.install_contributions(prefix, 'system', template_data) + install.install_contributions(prefix, 'test', template_data) + + + def tearDown(self): + if os.path.exists(settings.CACHE_ROOT): + shutil.rmtree(settings.CACHE_ROOT) + if os.path.exists(settings.PREFIX): + shutil.rmtree(settings.PREFIX) + + + def test_success_subprocess(self): + + # tests an experiment can actually be run + + current_stats = HourlyStatistics.objects.count() + + fullname = 'user/user/single/1/single' + xp = Experiment.objects.get(name=fullname.split(os.sep)[-1]) + + # schedules the experiment and check it + xp.schedule() + + # schedules the first runnable block + assert xp.blocks.first().job.runnable_date is not None + assert xp.blocks.last().job.runnable_date is None + + assigned_splits = schedule() + + worker = Worker.objects.get() + + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'echo') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # actually runs the job (blocking) + work(self.environments, self.cpulimit, self.process, self.settings) + time.sleep(5) #wait job completion + + # at this point, split should have been successful which shall + # trigger job deletion and block update + xp.refresh_from_db() + block = xp.blocks.first() + + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(xp.status, Experiment.RUNNING) + + # all caches must be have been generated + assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + # assert we have no database traces after the block is done + self.assertEqual(Job.objects.filter(block=split.job.block).count(), 0) + self.assertEqual(JobSplit.objects.filter(job=split.job).count(), 0) + self.assertEqual(Result.objects.filter(job__isnull=True).count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES) + + # since this job was successful, the next one should be ready to run + + # schedules the last block of the experiment + assert xp.blocks.last().job.runnable_date is not None + assigned_splits = schedule() + + self.assertEqual(len(assigned_splits), 1) + split = assigned_splits[0] + self.assertEqual(split.job.block.experiment, xp) + self.assertEqual(split.job.block.name, 'analysis') + self.assertEqual(split.worker, worker) + self.assertEqual(worker.name, qsetup.HOSTNAME) + self.assertEqual(worker.available_cores(), qsetup.CORES-1) + + # actually runs the job (blocking) + work(self.environments, self.cpulimit, self.process, self.settings) + time.sleep(5) #wait job completion + + # checks the number of statistics objects has increased by 1 + self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) + + # at this point, split should have been successful which shall + # trigger job deletion and block update + xp.refresh_from_db() + block = xp.blocks.first() + + self.assertEqual(block.status, Block.CACHED) + self.assertEqual(xp.status, Experiment.DONE) + assert xp.email_dispatch + + # all caches must be have been generated + assert all([k.index_checksums() for k in split.job.block.outputs.all()]) + + # assert we have no database traces after the last block is done + self.assertEqual(Job.objects.count(), 0) + self.assertEqual(JobSplit.objects.count(), 0) + self.assertEqual(Result.objects.count(), 0) + + self.assertEqual(worker.available_cores(), qsetup.CORES)