diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py index ca533315e2396783f6b1effd5ec30adf97c93cdf..8234642d9cf93776e45e6563c014f6b52dc05540 100644 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -27,12 +27,16 @@ '''Scheduling functions and utilities''' +import os +import sys +import time import socket +import signal +import subprocess import logging logger = logging.getLogger(__name__) -import multiprocessing - +import psutil import simplejson from django.conf import settings @@ -242,7 +246,8 @@ def schedule(): return assigned_splits -def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): +@transaction.atomic +def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): '''Process assigned job splits using beat.core This task executes the user algorithm on a subprocess. It also serves the @@ -265,10 +270,10 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): Parameters: - split (:py:class:JobSplit): The JobSplit to process + split_pk (int): The primary-key of the JobSplit to process - environments (dict): A dictionary mapping environment objects from the - Django database to their actual location on the file system + execute (str): The path to the ``execute`` program to use for running the + user code associated with this job split. cpulimit (str, Optional): The path to the ``cpulimit`` program to use for limiting the user code in CPU usage. If not set, then don't use it, @@ -280,7 +285,8 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ''' - split.start() #officially starts the split + # lock split + split = JobSplit.objects.select_for_update().get(pk=split_pk) config = simplejson.loads(split.job.block.command) @@ -288,11 +294,11 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): if split.job.block.required_slots > 1: if (split.start_index) is None or (split.end_index is None): - message = "The split %d/%d running on worker `%s' for " \ + message = "The split %d/%d (pid=%d) running on worker `%s' for " \ "block `%s' of experiment `%s' could not " \ "be completed: indexes are missing!" % \ (split.split_index+1, split.job.block.required_slots, - split.worker, split.job.block.name, + split.process_id, split.worker, split.job.block.name, split.job.block.experiment.fullname()) logger.error(message) split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, @@ -323,25 +329,6 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): ) raise RuntimeError(message) - # Check we have a compatible environment to execute the user algorithm - envkey = config['environment'] - envinfo = environments.get('%(name)s (%(version)s)' % envkey) - execute_path = envinfo['execute'] if envinfo else None - - if execute_path is None: - message = "Environment `%s' is not available for split %d/%d " \ - "running at worker `%s', for block `%s' of experiment " \ - "`%s': %s" % (split.job.block.environment, - split.split_index+1, - split.job.block.required_slots, - split.worker, - split.job.block.name, - split.job.block.experiment.fullname(), - "Available environments are `%s'" % \ - '|'.join(environments.keys()), - ) - raise RuntimeError(message) - queue = split.job.block.queue nb_cores = queue.cores_per_slot if (nb_cores > 0) and (cpulimit is None): @@ -355,7 +342,7 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): # n.b.: with executor may crash on the database view setup with executor: result = executor.process( - execute_path=execute_path, + execute_path=execute, virtual_memory_in_megabytes=queue.memory_limit, max_cpu_percent=int(100*float(nb_cores)), #allows for 150% cpulimit_path=cpulimit, @@ -371,35 +358,54 @@ def process(split, environments, cpulimit=None, cache=settings.CACHE_ROOT): syserr=result['system_error'], _stats=simplejson.dumps(result['statistics'], indent=2), )) + logger.info("Process `%s' for split `%s' ended gracefully", + split.process_id, split) except: from traceback import format_exc - from beat.core.stats import Statistics - logger.error(format_exc()) + logger.warn("Process `%s' for split `%s' ended with an error", + split.process_id, split) split.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) -def multiprocess(*args, **kwargs): - '''Runs :py:func:`process` through a forked subprocess +@transaction.atomic +def worker_update(): + '''Updates the worker state in the database from local readings''' - Input arguments are the same as for :py:func:`process`. + # myself, raises if I cannot find me + worker = Worker.objects.select_for_update().get(name=socket.gethostname()) - Returns: + # check I have at least all cores and memory I'm supposed to have + cores = psutil.cpu_count() + ram = psutil.virtual_memory().total/(1024*1024) + worker.info = '' - multiprocessing.Process: an instance of a Process you can call - ``join()`` at. + if cores < worker.cores: + logger.warn("Worker `%s' only has %d cores which is less then the " \ + "value declared on the database - it's not a problem, but note " \ + "this worker may get overloaded", worker, cores) + worker.info += 'only %d cores;' % cores - ''' + if ram < worker.memory: + logger.warn("Worker `%s' only has %d Mb of RAM which is less then " \ + "the value declared on the database - it's not a problem, but " \ + "note this worker may get overloaded", worker, ram) + worker.info += 'only %d Mb of RAM;' % ram - retval = multiprocessing.Process(target=process, args=args, kwargs=kwargs) - retval.start() - return retval + # update process and memory usage + worker.used_cores = int(psutil.cpu_percent()) + worker.used_memory = int(psutil.virtual_memory().percent) + + # save current worker state + worker.info += 'updated: ' + time.asctime() + worker.active = True + worker.save() @transaction.atomic -def work(environments, cpulimit): +def work(environments, cpulimit, process): '''Launches user code on isolated processes This function is supposed to be called asynchronously, by a scheduling @@ -409,26 +415,74 @@ def work(environments, cpulimit): Job split cancelling is executed by setting the split state as ``CANCELLED`` and waiting for this function to handle it. + + + Parameters: + + environments (dict): A dictionary containing installed environments, + their description and execute-file paths. + + cpulimit (str): The path to the ``cpulimit`` program to use for + limiting the user code in CPU usage. If set to ``None``, then don't use + it, even if the select user queue has limits. + + process (str): The path to the ``process.py`` program to use for running + the user code on isolated processes. + ''' # myself, raises if I cannot find me - worker = Worker.objects.select_for_update().get(name=socket.gethostname()) + worker = Worker.objects.get(name=socket.gethostname()) # cancel job splits for j in JobSplit.objects.select_for_update().filter(worker=worker, status=Job.CANCELLED, end_date__isnull=True, process_id__isnull=False): - import signal signal.signal(signal.SIGKILL, j.process_id) j.end(None, Job.CANCELLED) + # cmdline base argument + cmdline = [process] + if cpulimit is not None: cmdline += ['--cpulimit=%s' % cpulimit] + if settings.DEBUG: cmdline += ['-vv'] + # start newly assigned job splits for j in JobSplit.objects.select_for_update().filter(worker=worker, - status=Job.SCHEDULED, start_date__isnull=True, process_id__isnull=True): - multiprocess(j, environments, cpulimit) + status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True): + execute = pick_execute(j, environments) + if execute is None: + message = "Environment `%s' is not available for split %d/%d " \ + "running at worker `%s', for block `%s' of experiment " \ + "`%s': %s" % (split.job.block.environment, + split.split_index+1, + split.job.block.required_slots, + split.worker, + split.job.block.name, + split.job.block.experiment.fullname(), + "Available environments are `%s'" % \ + '|'.join(environments.keys()), + ) + j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, + syserr=message)) + # if we get to this point, then we launch the user process + subprocess.Popen(cmdline + [execute, str(j.pk)]) -def refresh_environments(paths=None): - '''Refresh current list of known environments + +def resolve_process_path(): + '''Returns the path to cpulimit''' + + basedir = os.path.dirname(os.path.realpath(sys.argv[0])) + r = os.path.join(basedir, 'process.py') + + if not os.path.exists(r): + raise RuntimeError("Cannot find `process.py' at `%s' - please check " \ + "your installation" % basedir) + + return r + + +def find_environments(paths=None): + '''Finds list of known environments Parameters: @@ -467,3 +521,45 @@ def refresh_environments(paths=None): path = pkg_resources.resource_filename(__name__, 'environments') logger.debug("Search for environments at `%s'", path) return discover_environments([path]) + + +@transaction.atomic +def worker_shutdown(): + """Standard worker shutdown procedure + + Stop all running/assigned splits and then mark the worker as inactive. + """ + + # myself, raises if I cannot find me + worker = Worker.objects.select_for_update().get(name=socket.gethostname()) + + message = 'Cancelled on forced worker shutdown (maintenance)' \ + ' - you may retry submitting your experiment shortly' + + # cancel job splits which are running + for j in JobSplit.objects.select_for_update().filter(worker=worker, + status=(Job.CANCELLED, Job.PROCESSING), end_date__isnull=True, + process_id__isnull=False): + signal.signal(signal.SIGKILL, j.process_id) + j.end(Result(status=1, usrerr=message)) + + # cancel job splits which were not yet started + for j in JobSplit.objects.select_for_update().filter(worker=worker, + status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True): + j.end(Result(status=1, usrerr=message)) + + # lock worker and modify it + worker.active = False + worker.used_cores = 0 + worker.used_memory = 0 + worker.info = 'Worker shutdown by system administrator' + worker.save() + + + +def pick_execute(split, environments): + """Resolves the path to the ``execute`` program to use for the split""" + + # Check we have a compatible environment to execute the user algorithm + envinfo = environments.get(split.job.block.environment.natural_key()) + return envinfo['execute'] if envinfo else None