diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 4face51cc30dc5c26a72f25188a278d43007abb2..9a4d702ecc0472cbac66967ac1aab5773ef8ab2f 100755 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -412,7 +412,7 @@ class Worker(models.Model): - def work(self, environments, cpulimit, process): + def work(self, environments, process): '''Launches user code on isolated processes This function is supposed to be called asynchronously, by a @@ -430,15 +430,10 @@ class Worker(models.Model): environments (dict): A dictionary containing installed environments, their description and execute-file paths. - cpulimit (str): The path to the ``cpulimit`` program to use for - limiting the user code in CPU usage. If set to ``None``, then - don't use it, even if the select user queue has limits. - process (str): The path to the ``process.py`` program to use for running the user code on isolated processes. ''' - from .utils import pick_execute # refresh state from database and update state if required self.refresh_from_db() @@ -465,9 +460,10 @@ class Worker(models.Model): # cmdline base argument cmdline = [process] - if cpulimit is not None: cmdline += ['--cpulimit=%s' % cpulimit] - if settings.DEBUG: cmdline += ['-vv'] - else: cmdline += ['-v'] + if settings.DEBUG: + cmdline += ['-vv'] + else: + cmdline += ['-v'] # start newly assigned job splits with transaction.atomic(): @@ -480,7 +476,7 @@ class Worker(models.Model): kwargs = dict() if settings.WORKER_DETACH_CHILDREN: kwargs['preexec_fn'] = os.setpgrp - subprocess.Popen(cmdline + ['execute', str(split.pk)], **kwargs) + subprocess.Popen(cmdline + [str(split.pk)], **kwargs) split.status = Job.PROCESSING #avoids re-running split.save() @@ -1369,7 +1365,7 @@ class JobSplit(models.Model): self.end(result) - def process(self, execute, cpulimit=None, cache=settings.CACHE_ROOT): + def process(self, cache=settings.CACHE_ROOT): '''Process assigned job splits using beat.core This task executes the user algorithm on a subprocess. It also serves @@ -1379,26 +1375,9 @@ class JobSplit(models.Model): set. Otherwise, it takes care of a subset of the input data that is synchronised with this block, determined by ``split_index``. - Two processes are spawned from the current work process: - - * The process for executing the user code - * A process to limit the CPU usage (with ``cpulimit``), if these - conditions are respected: - - 1. The program ``cpulimit`` is available on the current machine - 2. The configuration requests a CPU usage greater than 0 (``nb_cores - > 0``). (N.B.: a value of zero means not to limit on CPU). - Parameters: - execute (str): The path to the ``execute`` program to use for running - the user code associated with this job split. - - cpulimit (str, Optional): The path to the ``cpulimit`` program to use - for limiting the user code in CPU usage. If not set, then don't use - it, even if the select user queue has limits. - cache (str, Optional): The path leading to the root of the cache to use for this run. If not set, use the global default at ``settings.CACHE_ROOT``. @@ -1408,6 +1387,8 @@ class JobSplit(models.Model): logger.info("Starting to process split `%s' (pid=%d)...", self, os.getpid()) + self.executor = None + config = simplejson.loads(self.job.block.command) # setup range if necessary @@ -1433,14 +1414,14 @@ class JobSplit(models.Model): if JobSplit.host is None: JobSplit.host = Host() - JobSplit.host.setup() + JobSplit.host.setup(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False))) - executor = beat.core.execution.Executor(settings.PREFIX, config, + self.executor = beat.core.execution.Executor(settings.PREFIX, config, cache) - if not executor.valid: + if not self.executor.valid: err = '' - for e in executor.errors: err += ' * %s\n' % e + for e in self.executor.errors: err += ' * %s\n' % e message = "Failed to load execution information for split " \ "%d/%d running at worker `%s', for block `%s' of " \ "experiment `%s': %s" % (self.split_index+1, @@ -1450,23 +1431,17 @@ class JobSplit(models.Model): raise RuntimeError(message) queue = self.job.block.queue - nb_cores = queue.cores_per_slot - if (nb_cores > 0) and (cpulimit is None): - logger.warn("Job requires limiting CPU usage to %g (cores), " \ - "but you have not set the path to the program " \ - "`cpulimit'. Continuing without CPU limiting...", nb_cores) - nb_cores = 0 logger.info("Running `%s' on worker request", - executor.algorithm.name) + self.executor.algorithm.name) # n.b.: with executor may crash on the database view setup - with executor: + with self.executor: self.start() - result = executor.process( + result = self.executor.process( JobSplit.host, virtual_memory_in_megabytes=queue.memory_limit, - max_cpu_percent=int(100*float(nb_cores)), #allows for 150% + max_cpu_percent=int(100*float(queue.cores_per_slot)), #allows for 150% timeout_in_minutes=queue.time_limit, daemon=0, ) @@ -1500,3 +1475,5 @@ class JobSplit(models.Model): logger.error("Split `%s' (pid=%d) ended with an error: %s", self, os.getpid(), traceback.format_exc()) self.try_end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR)) + + self.executor = None diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index c8b2c802eafdfdbe586d52f896b4f84bd6ea36d0..e4cfea7fdb4665084f15531839fe800532202510 100755 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -31,6 +31,7 @@ import time import shutil import tempfile import collections +import time from django.conf import settings from django.core.urlresolvers import reverse @@ -2096,10 +2097,8 @@ class Working(BaseBackendTestCase): def setUp(self): from . import utils - self.cpulimit = utils.resolve_cpulimit_path(None) self.process = utils.resolve_process_path() self.environments = utils.find_environments(None) - self.env1_execute = 'execute' if not os.path.exists(settings.CACHE_ROOT): os.makedirs(settings.CACHE_ROOT) @@ -2150,7 +2149,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # at this point, job should have been successful xp.refresh_from_db() @@ -2188,7 +2187,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -2240,7 +2239,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # at this point, job should have failed xp.refresh_from_db() @@ -2299,7 +2298,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # at this point, job should have been successful xp.refresh_from_db() @@ -2337,7 +2336,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -2398,7 +2397,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # at this point, job should have been successful xp.refresh_from_db() @@ -2447,7 +2446,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -2509,7 +2508,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - split.process(self.env1_execute, self.cpulimit) + split.process() # at this point, job should have been successful xp.refresh_from_db() @@ -2548,7 +2547,6 @@ class WorkingExternally(TransactionTestCase): def setUp(self): from . import utils - self.cpulimit = utils.resolve_cpulimit_path(None) self.process = utils.resolve_process_path() self.environments = utils.find_environments(None) @@ -2562,7 +2560,7 @@ class WorkingExternally(TransactionTestCase): setup_backend(qsetup.DEFAULT_CONFIGURATION) Worker.objects.update(active=True) - env = Environment.objects.first() + env = Environment.objects.get(name='environment') queue = Queue.objects.first() template_data = dict( @@ -2618,7 +2616,7 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (non-blocking) - worker.work(self.environments, self.cpulimit, self.process) + worker.work(self.environments, self.process) def condition(): xp.refresh_from_db() @@ -2662,7 +2660,7 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (non-blocking) - worker.work(self.environments, self.cpulimit, self.process) + worker.work(self.environments, self.process) def condition(): xp.refresh_from_db() @@ -2720,20 +2718,24 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (non-blocking) - worker.work(self.environments, self.cpulimit, self.process) + worker.work(self.environments, self.process) def condition(): xp.refresh_from_db() return xp.status == Experiment.RUNNING + _sleep(20, condition) + # Just to be sure that the docker container really started + time.sleep(3) + # cancels the experiment xp.cancel() split.refresh_from_db() self.assertEqual(split.status, Job.CANCEL) # launch another working cycle to kill the process - worker.work(self.environments, self.cpulimit, self.process) + worker.work(self.environments, self.process) def condition(): xp.refresh_from_db() diff --git a/beat/web/backend/utils.py b/beat/web/backend/utils.py index 8d0c0fa095bf2af58425bb3f3b61c5b6e9b45cfd..ff220f74dd037318d6450edfbc0d5864f220c97c 100755 --- a/beat/web/backend/utils.py +++ b/beat/web/backend/utils.py @@ -39,6 +39,7 @@ logger = logging.getLogger(__name__) import psutil +from django.conf import settings from django.db import transaction from django.contrib.auth.models import Group from guardian.shortcuts import assign_perm @@ -359,7 +360,7 @@ def dump_backend(): def resolve_process_path(): - '''Returns the path to cpulimit''' + '''Returns the path to process.py''' basedir = os.path.dirname(os.path.realpath(sys.argv[0])) r = os.path.join(basedir, 'process') @@ -403,47 +404,5 @@ def find_environments(paths=None): from beat.core.dock import Host host = Host() - host.setup() + host.setup(raise_on_errors=not(getattr(settings, 'TEST_CONFIGURATION', False))) return host.environments - - -def pick_execute(split, environments): - """Resolves the path to the ``execute`` program to use for the split""" - - # Check we have a compatible environment to execute the user algorithm - envinfo = environments.get(split.job.block.environment.fullname()) - return envinfo['execute'] if envinfo else None - - -def resolve_cpulimit_path(exe): - '''Returns the path to cpulimit''' - FIXED_LOCATIONS = [ - '/usr/local/bin/cpulimit', - '/opt/local/bin/cpulimit', - '/usr/bin/cpulimit', - ] - default = os.path.join( - os.path.dirname(os.path.realpath(sys.argv[0])), - 'cpulimit', - ) - retval = exe or default - # See if we find it in parallel, installed with our interpreter - if not os.path.exists(retval): - cand = os.path.join(os.path.dirname(sys.executable), 'cpulimit') - if os.path.exists(cand): retval = cand - # Try to see if the PATH variable is set - if not os.path.exists(retval): - try: - retval = distutils.spawn.find_executable('cpulimit') - except KeyError: #missing PATH variable - retval = None - # Try fixed locations - if not retval: - for k in FIXED_LOCATIONS: - if os.path.exists(k): - retval = k - if not retval: - raise IOError("I cannot the find a `cpulimit' binary on your system or " \ - "the value you provided is not valid (%s) or the symbolic link " \ - "(%s) is not properly set" % (exe, default)) - return retval diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index c86e05641484d9ef175794e53d41461ea57c817a..b3c953c6c3f26dda4274f36f1c80e30f38b2b5d1 100755 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -42,8 +42,6 @@ from django.contrib.auth.decorators import login_required from django.http import HttpResponseForbidden from django.contrib import messages -from .utils import resolve_cpulimit_path - from ..experiments.models import Experiment from .models import Environment, Worker, Queue @@ -65,8 +63,6 @@ class Work: def __setup__(self): - Work.cpulimit = resolve_cpulimit_path(None) - logger.debug("(path) cpulimit: `%s'", Work.cpulimit) Work.process = utils.resolve_process_path() logger.debug("(path) process: `%s'", Work.process) Work.environments = utils.find_environments(None) diff --git a/beat/web/scripts/process.py b/beat/web/scripts/process.py index 3c009f9522716502bdf57e36209816a745547eec..8c44eba51343210169f9120c8d67b49a25f1fdf9 100644 --- a/beat/web/scripts/process.py +++ b/beat/web/scripts/process.py @@ -30,14 +30,13 @@ Processes one split. Usage: - %(prog)s [--settings=<file>] [--cpulimit=<file>] [-v ...] <execute> <split> + %(prog)s [--settings=<file>] [-v ...] <split> %(prog)s (-h | --help) %(prog)s (-V | --version) -Arguments: +Arguments: - <execute> The path to the base execution program for running the user code <split> The primary-key of the split to treat by this subprocess @@ -47,23 +46,18 @@ Options: -v, --verbose Increases the output verbosity level -S <file>, --settings=<file> The module name to the Django settings file [default: beat.web.settings.settings] - -C <file>, --cpulimit=<file> The path to the cpulimit program to use. If - not set, CPU limiting is not enforced. Examples: To start the job split processing do the following: - $ %(prog)s <path-to-execute> <split-id> + $ %(prog)s <split-id> You can optionally pass the ``-v`` flag to start the worker with the logging level set to ``INFO`` or ``-vv`` to set it to ``DEBUG``. By default, the logging level is set to ``WARNING`` if no ``-v`` flag is passed. - You can optionally also set the path to the ``cpulimit`` program to use. If - it is not set, then CPU limiting will not be enforced. - """ @@ -102,12 +96,7 @@ def main(user_input=None): sys.exit(0) def stop(): - import psutil - for child in psutil.Process().children(recursive=True): - if 'cpulimit' in child.name(): continue #only user processes - child.kill() - logger.info("Killing user process %d...", child.pid) - + split.executor.kill() message = "Force-stopped user processes for split `%s' for block " \ "`%s' of experiment `%s'" % \ (split, split.job.block.name, @@ -126,7 +115,4 @@ def main(user_input=None): signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGINT, handler) - split.process( - execute=arguments['<execute>'], - cpulimit=arguments['--cpulimit'], - ) + split.process() diff --git a/beat/web/scripts/worker.py b/beat/web/scripts/worker.py index a78c66c4fdce872a8accf6de2d88003cba19cbfc..8b888b0f1fe9cf50b80d949955073065e20abb03 100755 --- a/beat/web/scripts/worker.py +++ b/beat/web/scripts/worker.py @@ -31,7 +31,7 @@ Starts the worker process. Usage: %(prog)s [-v ... | --verbose ...] [--settings=<file>] [--period=<seconds>] - [--cpulimit=<file>] [--environments=<path>] [--name=<name>] + [--environments=<path>] [--name=<name>] %(prog)s (-h | --help) %(prog)s (-V | --version) @@ -42,10 +42,6 @@ Options: -v, --verbose Increases the output verbosity level -S <file>, --settings=<file> The module name to the Django settings file [default: beat.web.settings.settings] - -c <file>, --cpulimit=<file> The path to the cpulimit program to use. If - not set, try to search in standard - locations. If not found, CPU limiting is - not enforced. -e <path>, --environments=<path> The path to the installation root of available environments. -n <name>, --name=<name> The unique name of this worker on the @@ -126,9 +122,6 @@ def main(user_input=None): arguments['--name']) # figure out paths to programs I need to use - from ..backend.utils import resolve_cpulimit_path - cpulimit = resolve_cpulimit_path(arguments['--cpulimit']) - logger.debug("(path) cpulimit: `%s'", cpulimit) process = utils.resolve_process_path() logger.debug("(path) process: `%s'", process) @@ -162,7 +155,7 @@ def main(user_input=None): start = time.time() logger.debug("Starting work cycle...") - worker.work(environments, cpulimit, process) + worker.work(environments, process) duration = time.time() - start if duration < timing: time.sleep(timing - duration) diff --git a/beat/web/settings/test.py b/beat/web/settings/test.py index 178cbb721703b8a8274d92021e4d60fea0a385ed..4d45a26d9a48141bf8e2b1b6ae0d3f95ce8662a3 100644 --- a/beat/web/settings/test.py +++ b/beat/web/settings/test.py @@ -29,6 +29,8 @@ from .settings import * +TEST_CONFIGURATION = True + DEBUG = False TEMPLATES[0]['OPTIONS']['debug'] = DEBUG