From cf5e21ba626cc96cae3a3485f8570c20f22ce4d9 Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Fri, 6 May 2016 19:41:56 +0200 Subject: [PATCH] [backend] Code re-organization into the object model This commit brings the following changes: * Rename Job.KILL by Job.CANCEL as we normally send SIGTERM instead of SIGKILL * Move most of code related to JobSplit/Worker back into the object model, from the schedule module * Implement signal handling on scripts that will run under the supervisord * Overall clean-up of backend standalone scripts, moved code back into the object model * Rename standalone scripts (remove '.py' extension) * Reflections of these changes to other code --- .../migrations/0002_scheduler_addons.py | 4 +- beat/web/backend/models.py | 280 ++++++++++++++- beat/web/backend/schedule.py | 340 +----------------- beat/web/backend/tests.py | 68 ++-- beat/web/backend/utils.py | 78 +++- beat/web/backend/views.py | 35 +- beat/web/scripts/process.py | 60 +++- beat/web/scripts/scheduler.py | 118 ++++++ beat/web/scripts/worker.py | 95 ++--- setup.py | 6 +- 10 files changed, 601 insertions(+), 483 deletions(-) create mode 100644 beat/web/scripts/scheduler.py diff --git a/beat/web/backend/migrations/0002_scheduler_addons.py b/beat/web/backend/migrations/0002_scheduler_addons.py index 8bda50ee8..76176108f 100644 --- a/beat/web/backend/migrations/0002_scheduler_addons.py +++ b/beat/web/backend/migrations/0002_scheduler_addons.py @@ -156,7 +156,7 @@ class Migration(migrations.Migration): name='Job', fields=[ ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Kill')])), + ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Cancel')])), ('runnable_date', models.DateTimeField(null=True, blank=True)), ('start_date', models.DateTimeField(null=True, blank=True)), ('end_date', models.DateTimeField(null=True, blank=True)), @@ -174,7 +174,7 @@ class Migration(migrations.Migration): ('start_index', models.PositiveIntegerField(null=True)), ('end_index', models.PositiveIntegerField(null=True)), ('cache_errors', models.PositiveIntegerField(default=0)), - ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Kill')])), + ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Cancel')])), ('start_date', models.DateTimeField(null=True)), ('end_date', models.DateTimeField(null=True)), ('process_id', models.PositiveIntegerField(null=True)), diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 5c828de61..64a175bb7 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -27,8 +27,11 @@ import os import time +import signal import datetime import operator +import traceback +import subprocess import logging logger = logging.getLogger(__name__) @@ -48,6 +51,7 @@ from guardian.shortcuts import get_perms import beat.core.stats import beat.core.data +import beat.core.execution from ..common.models import Shareable, ShareableManager from ..common.texts import Messages @@ -277,14 +281,128 @@ class Worker(models.Model): ram) self.info += 'only %d Mb of RAM;' % ram - # update process and memory usage - self.used_cores = int(psutil.cpu_percent()) - self.used_memory = int(psutil.virtual_memory().percent) + with transaction.atomic(): + self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock + + # update process and memory usage + self.used_cores = int(psutil.cpu_percent()) + self.used_memory = int(psutil.virtual_memory().percent) + + # save current self state + self.active = True + self.update = False + self.save() + + + def shutdown(self): + '''Cleanly shuts down a particular worker at the database + + Stop all running/assigned splits and then mark the worker as + inactive. + + ''' + + message = 'Cancelled on forced worker shutdown (maintenance)' \ + ' - you may retry submitting your experiment shortly' + + self.refresh_from_db() + + # cancel job splits which are running + for j in JobSplit.objects.filter(worker=self, + status__in=(Job.CANCEL, Job.PROCESSING), end_date__isnull=True, + process_id__isnull=False): + if psutil.pid_exists(j.process_id): + os.kill(j.process_id, signal.SIGTERM) + + # cancel job splits which were not yet started + for j in JobSplit.objects.filter(worker=self, status=Job.QUEUED, + start_date__isnull=True, process_id__isnull=True): + j.end(Result(status=1, usrerr=message)) + + with transaction.atomic(): + self_ = Worker.objects.select_for_update().get(pk=self.pk) + self.active = False + self.used_cores = 0 + self.used_memory = 0 + self.info = 'Worker shutdown by system administrator' + self.save() - # save current self state - self.active = True - self.update = False - self.save() + + def work(self, environments, cpulimit, process): + '''Launches user code on isolated processes + + This function is supposed to be called asynchronously, by a + scheduled agent, every few seconds. It examines job splits assigned + to the current host and launches an individual process to handle + these splits. The process is started locally and the process ID + stored with the split. + + Job split cancelling is executed by setting the split state as + ``CANCEL`` and waiting for this function to handle it. + + + Parameters: + + environments (dict): A dictionary containing installed + environments, their description and execute-file paths. + + cpulimit (str): The path to the ``cpulimit`` program to use for + limiting the user code in CPU usage. If set to ``None``, then + don't use it, even if the select user queue has limits. + + process (str): The path to the ``process.py`` program to use for + running the user code on isolated processes. + + ''' + from .utils import pick_execute + + # refresh state from database and update state if required + self.refresh_from_db() + if self.update: self.update_state() + + # cancel job splits by killing associated processes + for j in JobSplit.objects.filter(worker=self, status=Job.CANCEL, + end_date__isnull=True, process_id__isnull=False): + if psutil.pid_exists(j.process_id): + os.kill(j.process_id, signal.SIGTERM) + + # cmdline base argument + cmdline = [process] + if cpulimit is not None: cmdline += ['--cpulimit=%s' % cpulimit] + if settings.DEBUG: cmdline += ['-vv'] + + # start newly assigned job splits + for j in JobSplit.objects.filter(worker=self, status=Job.QUEUED, + start_date__isnull=True, process_id__isnull=True): + execute = pick_execute(j, environments) + if execute is None: + message = "Environment `%s' is not available for split " \ + "%d/%d running at worker `%s', for block `%s' of " \ + "experiment `%s': %s" % \ + (self.job.block.environment, + self.split_index+1, + self.job.block.required_slots, + self.worker, + self.job.block.name, + self.job.block.experiment.fullname(), + "Available environments are `%s'" % \ + '|'.join(environments.keys()), + ) + j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, + syserr=message)) + + # if we get to this point, then we launch the user process + subprocess.Popen(cmdline + [execute, str(j.pk)]) + + + def __enter__(self): + self.update_state() + return self + + + def __exit__(self, *exc): + self.shutdown() + return False #propagate exceptions #---------------------------------------------------------- @@ -513,7 +631,7 @@ class Job(models.Model): FAILED = 'F' #Block.FAILED SKIPPED = 'S' #Block.SKIPPED CANCELLED = 'L' #Block.CANCELLED - KILL = 'K' #Job was asked to be killed + CANCEL = 'K' #Job was asked to be killed STATUS = ( (QUEUED, 'Queued'), @@ -522,7 +640,7 @@ class Job(models.Model): (FAILED, 'Failed'), (SKIPPED, 'Skipped'), (CANCELLED, 'Cancelled'), - (KILL, 'Kill'), + (CANCEL, 'Cancel'), ) block = models.OneToOneField('experiments.Block', null=True, @@ -675,11 +793,11 @@ class Job(models.Model): self.save() if self.split_errors > settings.MAXIMUM_SPLIT_ERRORS: #stop - from traceback import format_exc message = "Index splitting for block `%s' of experiment " \ "`%s' could not be completed due to an index split " \ "error: %s" % (self.block.name, - self.block.experiment.fullname(), format_exc()) + self.block.experiment.fullname(), + traceback.format_exc()) logger.warn(message) self._cancel(usrerr=settings.DEFAULT_USER_ERROR, syserr=message) @@ -743,7 +861,7 @@ class Job(models.Model): elif (Job.PROCESSING in split_statuses) or \ (Job.QUEUED in split_statuses and \ Job.COMPLETED in split_statuses) or \ - (Job.KILL in split_statuses): + (Job.CANCEL in split_statuses): self.status = Job.PROCESSING elif all([s == Job.SKIPPED for s in split_statuses]): @@ -1042,7 +1160,7 @@ class JobSplit(models.Model): # lock self - avoids concurrent update from scheduler/worker # subsystem self_ = JobSplit.objects.select_for_update().get(pk=self.pk) - self.status = Job.KILL + self.status = Job.CANCEL self.save() logger.info("Job split `%s' is currently processing. Waiting " \ @@ -1123,3 +1241,139 @@ class JobSplit(models.Model): settings.MAXIMUM_SPLIT_SAVE_RETRIES) # wait a second and retry time.sleep(1) + + + def try_end(self, result): + '''Tries to end the split - ignores if the split was deleted''' + + try: + self.refresh_from_db() + except JobSplit.DoesNotExist: + logger.warn("Job split(pk=%d) does not exist. Likely cancelled, " \ + "so ignoring result `%s'", self.pk, result) + self.end(result) + + + def process(self, execute, cpulimit=None, cache=settings.CACHE_ROOT): + '''Process assigned job splits using beat.core + + This task executes the user algorithm on a subprocess. It also serves + the data to the user process so it works like an I/O daemon. + + If ``required_slots == 1``, then this job takes care of the whole data + set. Otherwise, it takes care of a subset of the input data that is + synchronised with this block, determined by ``split_index``. + + Two processes are spawned from the current work process: + + * The process for executing the user code + * A process to limit the CPU usage (with ``cpulimit``), if these + conditions are respected: + + 1. The program ``cpulimit`` is available on the current machine + 2. The configuration requests a CPU usage greater than 0 (``nb_cores + > 0``). (N.B.: a value of zero means not to limit on CPU). + + + Parameters: + + execute (str): The path to the ``execute`` program to use for running + the user code associated with this job split. + + cpulimit (str, Optional): The path to the ``cpulimit`` program to use + for limiting the user code in CPU usage. If not set, then don't use + it, even if the select user queue has limits. + + cache (str, Optional): The path leading to the root of the cache to + use for this run. If not set, use the global default at + ``settings.CACHE_ROOT``. + + ''' + + config = simplejson.loads(self.job.block.command) + + # setup range if necessary + if self.job.block.required_slots > 1: + + if (self.start_index) is None or (self.end_index is None): + message = "The split %d/%d (pid=%d) running on worker `%s' " \ + "for block `%s' of experiment `%s' could not " \ + "be completed: indexes are missing!" % \ + (self.split_index+1, self.job.block.required_slots, + os.getpid(), self.worker, self.job.block.name, + self.job.block.experiment.fullname()) + logger.warn(message) + self.try_end(Result(status=1, syserr=message, + usrerr=settings.DEFAULT_USER_ERROR)) + + config['range'] = [self.start_index, self.end_index] + + # For reference, this bit of code should match (or be very similar) to + # the one at beat.cmdline.experiments:run_experiment() + + try: + + executor = beat.core.execution.Executor(settings.PREFIX, config, + cache) + + if not executor.valid: + err = '' + for e in executor.errors: err += ' * %s\n' % e + message = "Failed to load execution information for split " \ + "%d/%d running at worker `%s', for block `%s' of " \ + "experiment `%s': %s" % (self.split_index+1, + self.job.block.required_slots, + self.worker, self.job.block.name, + self.job.block.experiment.fullname(), err) + raise RuntimeError(message) + + queue = self.job.block.queue + nb_cores = queue.cores_per_slot + if (nb_cores > 0) and (cpulimit is None): + logger.warn("Job requires limiting CPU usage to %g (cores), " \ + "but you have not set the path to the program " \ + "`cpulimit'. Continuing without CPU limiting...", nb_cores) + nb_cores = 0 + + logger.info("Running `%s' on worker request", + executor.algorithm.name) + + # n.b.: with executor may crash on the database view setup + with executor: + self.start() + result = executor.process( + execute_path=execute, + virtual_memory_in_megabytes=queue.memory_limit, + max_cpu_percent=int(100*float(nb_cores)), #allows for 150% + cpulimit_path=cpulimit, + timeout_in_minutes=queue.time_limit, + daemon=0, + ) + + self.try_end(Result( + status=result['status'], + stdout=result['stdout'], + stderr=result['stderr'], + usrerr=result['user_error'], + syserr=result['system_error'], + _stats=simplejson.dumps(result['statistics'], indent=2), + )) + logger.info("Split `%s' (pid=%d) ended gracefully", self, + os.getpid()) + + except IOError: + logger.warn("Split `%s' (pid=%d) execution raised an IOError: %s", + self, os.getpid(), traceback.format_exc()) + self.signal_io_error() + if self.cache_errors > settings.MAXIMUM_IO_ERRORS: + self.try_end(Result(status=1, + usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) + else: + logger.info("Split `%s' will be retried (%d/%d)", + self, self.cache_errors, settings.MAXIMUM_IO_ERRORS) + + except Exception: + logger.warn("Split `%s' (pid=%d) ended with an error: %s", + self, os.getpid(), traceback.format_exc()) + self.try_end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, + syserr=traceback.format_exc(),)) diff --git a/beat/web/backend/schedule.py b/beat/web/backend/schedule.py index aa18347bb..8b5c306b0 100644 --- a/beat/web/backend/schedule.py +++ b/beat/web/backend/schedule.py @@ -27,22 +27,12 @@ '''Scheduling functions and utilities''' -import os -import sys -import time -import socket -import signal -import subprocess import logging logger = logging.getLogger(__name__) -import psutil -import simplejson - -from django.conf import settings from django.db import transaction -from .models import Job, JobSplit, Queue, Worker, Result +from .models import Job, JobSplit, Queue, Worker def _select_splits_for_queue(queue): @@ -255,331 +245,3 @@ def schedule(): del whitelist[c0] return assigned_splits - - -def _split_end(split, result): - '''Tries to end the split - ignores if the split was deleted''' - - try: - split.refresh_from_db() - split.end(result) - except JobSplit.DoesNotExist: - logger.warn("Job split(pk=%d) does not exist. Likely cancelled, " \ - "so ignoring result `%s'", split.pk, result) - - -def process(split_pk, execute, cpulimit=None, cache=settings.CACHE_ROOT): - '''Process assigned job splits using beat.core - - This task executes the user algorithm on a subprocess. It also serves the - data to the user process so it works like an I/O daemon. - - If ``required_slots == 1``, then this job takes care of the whole data set. - Otherwise, it takes care of a subset of the input data that is synchronised - with this block, determined by ``split_index``. - - Two processes are spawned from the current work process: - - * The process for executing the user code - * A process to limit the CPU usage (with ``cpulimit``), if these conditions - are respected: - - 1. The program ``cpulimit`` is available on the current machine - 2. The configuration requests a CPU usage greater than 0 (``nb_cores > - 0``). (N.B.: a value of zero means not to limit on CPU). - - - Parameters: - - split_pk (int): The primary-key of the JobSplit to process - - execute (str): The path to the ``execute`` program to use for running the - user code associated with this job split. - - cpulimit (str, Optional): The path to the ``cpulimit`` program to use for - limiting the user code in CPU usage. If not set, then don't use it, - even if the select user queue has limits. - - cache (str, Optional): The path leading to the root of the cache to use - for this run. If not set, use the global default at - ``settings.CACHE_ROOT``. - - ''' - - try: - split = JobSplit.objects.get(pk=split_pk) - except JobSplit.DoesNotExist: - logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \ - "so, ignoring.", split_pk) - return - - config = simplejson.loads(split.job.block.command) - - # setup range if necessary - if split.job.block.required_slots > 1: - - if (split.start_index) is None or (split.end_index is None): - message = "The split %d/%d (pid=%d) running on worker `%s' for " \ - "block `%s' of experiment `%s' could not " \ - "be completed: indexes are missing!" % \ - (split.split_index+1, split.job.block.required_slots, - os.getpid(), split.worker, split.job.block.name, - split.job.block.experiment.fullname()) - logger.warn(message) - _split_end(split, Result(status=1, syserr=message, - usrerr=settings.DEFAULT_USER_ERROR)) - - config['range'] = [split.start_index, split.end_index] - - # For reference, this bit of code should match (or be very similar) to the - # one at beat.cmdline.experiments:run_experiment() - from beat.core.execution import Executor - - try: - - executor = Executor(settings.PREFIX, config, cache) - - if not executor.valid: - err = '' - for e in executor.errors: err += ' * %s\n' % e - message = "Failed to load execution information for split %d/%d " \ - "running at worker `%s', for block `%s' of experiment " \ - "`%s': %s" % (split.split_index+1, - split.job.block.required_slots, - split.worker, - split.job.block.name, - split.job.block.experiment.fullname(), - err, - ) - raise RuntimeError(message) - - queue = split.job.block.queue - nb_cores = queue.cores_per_slot - if (nb_cores > 0) and (cpulimit is None): - logger.warn("Job requires limiting CPU usage to %g (cores), but " \ - "you have not set the path to the program `cpulimit'. " \ - "Continuing without CPU limiting...", nb_cores) - nb_cores = 0 - - logger.info("Running `%s' on worker request", executor.algorithm.name) - - # n.b.: with executor may crash on the database view setup - with executor: - split.start() - result = executor.process( - execute_path=execute, - virtual_memory_in_megabytes=queue.memory_limit, - max_cpu_percent=int(100*float(nb_cores)), #allows for 150% - cpulimit_path=cpulimit, - timeout_in_minutes=queue.time_limit, - daemon=0, - ) - - _split_end(split, Result( - status=result['status'], - stdout=result['stdout'], - stderr=result['stderr'], - usrerr=result['user_error'], - syserr=result['system_error'], - _stats=simplejson.dumps(result['statistics'], indent=2), - )) - logger.info("Split `%s' (pid=%d) ended gracefully", split, os.getpid()) - - except IOError: - from traceback import format_exc - logger.warn("Split `%s' (pid=%d) execution raised an IOError: %s", - split, os.getpid(), format_exc()) - split.signal_io_error() - if split.cache_errors > settings.MAXIMUM_IO_ERRORS: - _split_end(split, Result(status=1, - usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) - else: - logger.info("Split `%s' will be retried (%d/%d)", - split, split.cache_errors, settings.MAXIMUM_IO_ERRORS) - - except Exception: - from traceback import format_exc - logger.warn("Split `%s' (pid=%d) ended with an error: %s", - split, os.getpid(), format_exc()) - _split_end(split, Result(status=1, - usrerr=settings.DEFAULT_USER_ERROR, syserr=format_exc(),)) - - -@transaction.atomic -def worker_update(): - '''Updates the worker state in the database from local readings''' - - # myself, raises if I cannot find me - worker = Worker.objects.select_for_update().get(name=socket.gethostname()) - - worker.update_state() - - -def work(environments, cpulimit, process, django_settings): - '''Launches user code on isolated processes - - This function is supposed to be called asynchronously, by a scheduling - agent, every few seconds. It examines job splits assigned to the current - host and launches an individual process to handle these splits. The process - is started locally and the process ID stored with the split. - - Job split cancelling is executed by setting the split state as - ``KILL`` and waiting for this function to handle it. - - - Parameters: - - environments (dict): A dictionary containing installed environments, - their description and execute-file paths. - - cpulimit (str): The path to the ``cpulimit`` program to use for - limiting the user code in CPU usage. If set to ``None``, then don't use - it, even if the select user queue has limits. - - process (str): The path to the ``process.py`` program to use for running - the user code on isolated processes. - - django_settings (str): The name of the module containing the Django - settings for use with the process program - - ''' - - # myself, raises if I cannot find me - worker = Worker.objects.get(name=socket.gethostname()) - - # cancel job splits by killing associated processes - for j in JobSplit.objects.filter(worker=worker, status=Job.KILL, - end_date__isnull=True, process_id__isnull=False): - os.kill(j.process_id, signal.SIGKILL) - j.end(None, Job.CANCELLED) - - # cmdline base argument - cmdline = [process, '--settings=%s' % django_settings] - if cpulimit is not None: cmdline += ['--cpulimit=%s' % cpulimit] - if settings.DEBUG: cmdline += ['-vv'] - - # start newly assigned job splits - for j in JobSplit.objects.filter(worker=worker, status=Job.QUEUED, - start_date__isnull=True, process_id__isnull=True): - execute = pick_execute(j, environments) - if execute is None: - message = "Environment `%s' is not available for split %d/%d " \ - "running at worker `%s', for block `%s' of experiment " \ - "`%s': %s" % (split.job.block.environment, - split.split_index+1, - split.job.block.required_slots, - split.worker, - split.job.block.name, - split.job.block.experiment.fullname(), - "Available environments are `%s'" % \ - '|'.join(environments.keys()), - ) - j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR, - syserr=message)) - - # if we get to this point, then we launch the user process - subprocess.Popen(cmdline + [execute, str(j.pk)]) - - -def cleanup_zombies(): - '''Cleans-up eventual zombie subprocesses launched by the call above''' - - for child in psutil.Process().children(recursive=True): - if child.status == psutil.STATUS_ZOMBIE: - child.wait() - - -def resolve_process_path(): - '''Returns the path to cpulimit''' - - basedir = os.path.dirname(os.path.realpath(sys.argv[0])) - r = os.path.join(basedir, 'process.py') - - if not os.path.exists(r): - raise RuntimeError("Cannot find `process.py' at `%s' - please check " \ - "your installation" % basedir) - - return r - - -def find_environments(paths=None): - '''Finds list of known environments - - Parameters: - - paths (list, Optional): A list of paths where to search for environments. - If not set, then load default environments if possible. - - - Returns: - - dict: A dictionary containing each environment available using as key the - natural key for environments (i.e., ``name (version)``) and as values - another dictionary with these keys: - - * name: The environment name (str) - * version: The environment version (str) - * os: The output of ``uname -a`` (list): - 1. Operating system (str) - 2. Hostname (str) - 3. Kernel version (str) - 4. Kernel compilation details (str) - 5. Platform (``x86_64`` for 64-bits or ``i386`` for 32-bits) (str) - * execute: The path to the ``execute`` script to be used for running - user jobs (str) - * directory: The path leading to the root of this environment (str) - - ''' - - from beat.core.execution import discover_environments - - if paths is not None: - logger.debug("Search for environments at `%s'", os.pathsep.join(paths)) - return discover_environments(paths) - - else: - import pkg_resources - path = pkg_resources.resource_filename(__name__, 'environments') - logger.debug("Search for environments at `%s'", path) - return discover_environments([path]) - - -def worker_shutdown(): - """Standard worker shutdown procedure - - Stop all running/assigned splits and then mark the worker as inactive. - """ - - # myself, raises if I cannot find me - worker = Worker.objects.get(name=socket.gethostname()) - - message = 'Cancelled on forced worker shutdown (maintenance)' \ - ' - you may retry submitting your experiment shortly' - - # cancel job splits which are running - for j in JobSplit.objects.filter(worker=worker, - status=(Job.KILL, Job.PROCESSING), end_date__isnull=True, - process_id__isnull=False): - signal.signal(signal.SIGKILL, j.process_id) - j.end(Result(status=1, usrerr=message)) - - # cancel job splits which were not yet started - for j in JobSplit.objects.filter(worker=worker, - status=Job.QUEUED, start_date__isnull=True, process_id__isnull=True): - j.end(Result(status=1, usrerr=message)) - - with transaction.atomic(): - worker = Worker.objects.select_for_update().get(pk=worker.pk) - worker.active = False - worker.used_cores = 0 - worker.used_memory = 0 - worker.info = 'Worker shutdown by system administrator' - worker.save() - - -def pick_execute(split, environments): - """Resolves the path to the ``execute`` program to use for the split""" - - # Check we have a compatible environment to execute the user algorithm - envinfo = environments.get(split.job.block.environment.natural_key()) - return envinfo['execute'] if envinfo else None diff --git a/beat/web/backend/tests.py b/beat/web/backend/tests.py index 74372dca6..be2cc170e 100644 --- a/beat/web/backend/tests.py +++ b/beat/web/backend/tests.py @@ -49,27 +49,7 @@ from ..statistics.models import HourlyStatistics from .models import Queue, Worker, Slot, Environment, Job, JobSplit, Result from .utils import cleanup_cache, dump_backend, setup_backend from .management.commands import qsetup -from .schedule import schedule, process, work - - -EXPERIMENT_HASHES = { - "user/single/1/single": \ - 'c5ae0db08c0b9f033461c4cf4eb7ca5b4ae4b61e108c1001f8eb0cc703887910', - "user/single/1/single_error": \ - '8b39ee8384f611c3c9ebb12570c201e88b67e7f991669f53172f9e315ba0fcd6', - "user/single/1/single_error2": \ - '8b39ee8384f611c3c9ebb12570c201e88b67e7f991669f53172f9e315ba0fcd6', - "user/single/1/single_add": \ - 'e5c9d741587a849d80ecebb78b2d97932e8f6e64a7d82e57954934a2d79751cb', - "user/single/1/single_add2": \ - 'b0bb7120f771ccaae0b22a0c2d6f11e80ca6067ce5c774294858ad3019ebfe7d', - "user/double/1/double": \ - '3e549ee87c98113207796f289603746ba5c2dd26cf8d5710fde9ed45697716d0', - "user/triangle/1/triangle": \ - '43769717b9a53bf51c4a4d0354ecb6d733b70e00fdb493ab77099c0edfceb1cb', - "user/double_triangle/1/double_triangle": \ - 'fbde7da1f51ce62e4446b9d26748865d7debe3778a66aa36244f717a830a13b1', - } +from .schedule import schedule # Example configuration with 3 queues with an increasing amount of resources @@ -1202,7 +1182,7 @@ class Scheduling(BaseBackendTestCase): # simulate worker cancelling split.refresh_from_db() - self.assertEqual(split.status, Job.KILL) + self.assertEqual(split.status, Job.CANCEL) split.end(None, Job.CANCELLED) self.assertEqual( @@ -1662,7 +1642,7 @@ class Scheduling(BaseBackendTestCase): # simulate worker cancelling split.refresh_from_db() - self.assertEqual(split.status, Job.KILL) + self.assertEqual(split.status, Job.CANCEL) split.end(None, Job.CANCELLED) self.assertEqual( @@ -2050,13 +2030,11 @@ class Working(BaseBackendTestCase): from beat.core.async import resolve_cpulimit_path self.cpulimit = resolve_cpulimit_path(None) - from .schedule import find_environments, resolve_process_path - self.process = resolve_process_path() - self.environments = find_environments(None) + from . import utils + self.process = utils.resolve_process_path() + self.environments = utils.find_environments(None) self.env1_execute = self.environments['environment (1)']['execute'] - self.settings = 'beat.web.settings.test' - if not os.path.exists(settings.CACHE_ROOT): os.makedirs(settings.CACHE_ROOT) @@ -2106,7 +2084,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # at this point, job should have been successful xp.refresh_from_db() @@ -2144,7 +2122,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -2196,7 +2174,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # at this point, job should have failed xp.refresh_from_db() @@ -2256,7 +2234,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # at this point, job should have been successful xp.refresh_from_db() @@ -2294,7 +2272,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -2355,7 +2333,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # at this point, job should have been successful xp.refresh_from_db() @@ -2404,7 +2382,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # checks the number of statistics objects has increased by 1 self.assertEqual(HourlyStatistics.objects.count(), current_stats + 1) @@ -2466,7 +2444,7 @@ class Working(BaseBackendTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (blocking) - process(split.pk, self.env1_execute, self.cpulimit) + split.process(self.env1_execute, self.cpulimit) # at this point, job should have been successful xp.refresh_from_db() @@ -2507,11 +2485,9 @@ class WorkingExternally(TransactionTestCase): from beat.core.async import resolve_cpulimit_path self.cpulimit = resolve_cpulimit_path(None) - from .schedule import find_environments, resolve_process_path - self.process = resolve_process_path() - self.environments = find_environments(None) - - self.settings = 'beat.web.settings.test' + from . import utils + self.process = utils.resolve_process_path() + self.environments = utils.find_environments(None) if not os.path.exists(settings.CACHE_ROOT): os.makedirs(settings.CACHE_ROOT) @@ -2579,7 +2555,7 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (non-blocking) - work(self.environments, self.cpulimit, self.process, self.settings) + worker.work(self.environments, self.cpulimit, self.process) time.sleep(5) #wait job completion # at this point, split should have been successful which shall @@ -2618,7 +2594,7 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (non-blocking) - work(self.environments, self.cpulimit, self.process, self.settings) + worker.work(self.environments, self.cpulimit, self.process) time.sleep(5) #wait job completion # checks the number of statistics objects has increased by 1 @@ -2673,16 +2649,16 @@ class WorkingExternally(TransactionTestCase): self.assertEqual(worker.available_cores(), qsetup.CORES-1) # actually runs the job (non-blocking) - work(self.environments, self.cpulimit, self.process, self.settings) + worker.work(self.environments, self.cpulimit, self.process) time.sleep(5) #wait till done (sqlite doesn't impl. select_for_update()) # cancels the experiment xp.cancel() split.refresh_from_db() - self.assertEqual(split.status, Job.KILL) + self.assertEqual(split.status, Job.CANCEL) # launch another working cycle to kill the process - work(self.environments, self.cpulimit, self.process, self.settings) + worker.work(self.environments, self.cpulimit, self.process) time.sleep(5) #wait till done (sqlite doesn't impl. select_for_update()) xp.refresh_from_db() diff --git a/beat/web/backend/utils.py b/beat/web/backend/utils.py index 02848fc57..1ec4deb74 100644 --- a/beat/web/backend/utils.py +++ b/beat/web/backend/utils.py @@ -28,6 +28,7 @@ '''Utilities for backend management''' import os +import sys import fnmatch import glob import time @@ -35,13 +36,15 @@ import time import logging logger = logging.getLogger(__name__) +import psutil + from django.db import transaction from django.contrib.auth.models import Group from guardian.shortcuts import assign_perm -from ..experiments.models import CachedFile, Block, Experiment from ..common.models import Shareable -from .models import Queue, Worker, Slot, Job, Environment +from ..experiments.models import CachedFile, Block, Experiment +from .models import Queue, Worker, Job, Environment, Slot def cleanup_cache(path, age_in_minutes=0, delete=False): @@ -339,3 +342,74 @@ def dump_backend(): environments=dict([(str(k), k.as_dict()) for k in Environment.objects.all()]), workers=dict([(k.name, k.as_dict()) for k in Worker.objects.all()]), ) + + +def cleanup_zombies(): + '''Cleans-up eventual zombie subprocesses launched by the call above''' + + for child in psutil.Process().children(recursive=True): + if child.status == psutil.STATUS_ZOMBIE: + child.wait() + + +def resolve_process_path(): + '''Returns the path to cpulimit''' + + basedir = os.path.dirname(os.path.realpath(sys.argv[0])) + r = os.path.join(basedir, 'process') + + if not os.path.exists(r): + raise RuntimeError("Cannot find `process.py' at `%s' - please check " \ + "your installation" % basedir) + + return r + + +def find_environments(paths=None): + '''Finds list of known environments + + Parameters: + + paths (list, Optional): A list of paths where to search for environments. + If not set, then load default environments if possible. + + + Returns: + + dict: A dictionary containing each environment available using as key the + natural key for environments (i.e., ``name (version)``) and as values + another dictionary with these keys: + + * name: The environment name (str) + * version: The environment version (str) + * os: The output of ``uname -a`` (list): + 1. Operating system (str) + 2. Hostname (str) + 3. Kernel version (str) + 4. Kernel compilation details (str) + 5. Platform (``x86_64`` for 64-bits or ``i386`` for 32-bits) (str) + * execute: The path to the ``execute`` script to be used for running + user jobs (str) + * directory: The path leading to the root of this environment (str) + + ''' + + from beat.core.execution import discover_environments + + if paths is not None: + logger.debug("Search for environments at `%s'", os.pathsep.join(paths)) + return discover_environments(paths) + + else: + import pkg_resources + path = pkg_resources.resource_filename(__name__, 'environments') + logger.debug("Search for environments at `%s'", path) + return discover_environments([path]) + + +def pick_execute(split, environments): + """Resolves the path to the ``execute`` program to use for the split""" + + # Check we have a compatible environment to execute the user algorithm + envinfo = environments.get(split.job.block.environment.natural_key()) + return envinfo['execute'] if envinfo else None diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index 491921621..00f958987 100644 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -26,6 +26,7 @@ ############################################################################### import os +import socket import logging logger = logging.getLogger(__name__) @@ -43,11 +44,12 @@ from django.contrib import messages from beat.core.async import resolve_cpulimit_path -from .models import Environment, Worker, Queue from ..experiments.models import Experiment + +from .models import Environment, Worker, Queue from . import state -from .schedule import schedule, work, worker_update, cleanup_zombies -from .schedule import find_environments, resolve_process_path +from . import utils +from . import schedule #------------------------------------------------ @@ -57,21 +59,19 @@ class Work: '''Helper to do the required worker job for local scheduling''' cpulimit = resolve_cpulimit_path(None) - process = resolve_process_path() - environments = find_environments(None) - django_settings = os.environ.get('DJANGO_SETTINGS_MODULE') - + process = utils.resolve_process_path() + environments = utils.find_environments(None) + worker = None def __call__(self): - # update workers that require updates - worker_update() - cleanup_zombies() - work( - Work.environments, - Work.cpulimit, - Work.process, - Work.django_settings, - ) + + if Work.worker is None: # set it once + # if only one worker declared, use that one + Work.worker = Worker.objects.get(name=socket.gethostname()) \ + if Worker.objects.count() != 1 else Worker.objects.get() + + utils.cleanup_zombies() + Work.worker.work(Work.environments, Work.cpulimit, Work.process) #------------------------------------------------ @@ -106,7 +106,8 @@ def scheduler(request): activity = request.GET['activity'] if activity in ('both', 'schedule'): - splits = schedule() + schedule.send_experiment_emails() + splits = schedule.schedule() if splits: logger.info("Scheduler assigned %d splits", len(splits)) diff --git a/beat/web/scripts/process.py b/beat/web/scripts/process.py index 06b72bcf3..1a13a9480 100644 --- a/beat/web/scripts/process.py +++ b/beat/web/scripts/process.py @@ -67,13 +67,11 @@ Examples: """ -import logging -logger = logging.getLogger('beat') - - import os import sys +import signal import docopt +import logging def main(user_input=None): @@ -84,21 +82,53 @@ def main(user_input=None): ), ) - - # Sets-up logging - if arguments['--verbose'] == 1: logger.setLevel(logging.INFO) - elif arguments['--verbose'] >= 2: logger.setLevel(logging.DEBUG) - # Initializes the Django framework os.environ.setdefault('DJANGO_SETTINGS_MODULE', arguments['--settings']) from django.conf import settings from django import setup setup() - from beat.web.backend.schedule import process + logger = logging.getLogger('beat.web') + if arguments['--verbose'] == 1: logger.setLevel(logging.INFO) + elif arguments['--verbose'] >= 2: logger.setLevel(logging.DEBUG) - process( - split_pk=int(arguments['<split>']), - execute=arguments['<execute>'], - cpulimit=arguments['--cpulimit'], - ) + from beat.web.backend.models import JobSplit, Result + + try: + split = JobSplit.objects.get(pk=int(arguments['<split>'])) + except JobSplit.DoesNotExist: + logger.info("Job split(pk=%d) does not exist. Likely cancelled, " \ + "so, ignoring.", split_pk) + sys.exit(0) + + def stop(): + import psutil + for child in psutil.Process().children(recursive=True): + child.kill() + logger.info("Killing user process %d...", child.pid) + + message = "Force-stopped user processes for split `%s' for block " \ + "`%s' of experiment `%s'" % \ + (split.process_id, split, split.job.block.name, + split.job.block.experiment.fullname()) + logger.info(message) + split.end(Result(status=1, usrerr=usrerr, syserr=message)) + + # installs SIGTERM handler + def handler(signum, frame): + logger.info("Signal %d caught, terminating...", signum) + stop() + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + + try: + + split.process( + execute=arguments['<execute>'], + cpulimit=arguments['--cpulimit'], + ) + + except KeyboardInterrupt: + + logger.info("CTRL-c caught, terminating...") + stop() diff --git a/beat/web/scripts/scheduler.py b/beat/web/scripts/scheduler.py new file mode 100644 index 000000000..3db13dd2a --- /dev/null +++ b/beat/web/scripts/scheduler.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +"""\ +Starts the scheduling process. + +Usage: + %(prog)s [-v ... | --verbose ...] [--settings=<file>] [--period=<seconds>] + %(prog)s (-h | --help) + %(prog)s (-V | --version) + + +Options: + -h, --help Show this help message + -V, --version Show program's version number + -v, --verbose Increases the output verbosity level + -S <file>, --settings=<file> The module name to the Django settings + file [default: beat.web.settings.settings] + -p <seconds, --period=<seconds> The time, in seconds, in which this + scheduler will try to allocate job splits + to existing workers. If not set, use the + value available on the Django settings + file, at the variable + `SCHEDULING_INTERVAL`. + + +Examples: + + To start the scheduling process do the following: + + $ %(prog)s + + You can pass the ``-v`` flag to start the scheduler with the logging level + set to ``INFO`` or ``-vv`` to set it to ``DEBUG``. By default, the logging + level is set to ``WARNING`` if no ``-v`` flag is passed. + +""" + +import os +import sys +import time +import signal +import docopt +import logging + +stop = False + +def main(user_input=None): + + arguments = docopt.docopt( + __doc__ % dict( + prog=os.path.basename(sys.argv[0]), + ), + ) + + os.environ.setdefault('DJANGO_SETTINGS_MODULE', arguments['--settings']) + from django.conf import settings + from django import setup + setup() + + logger = logging.getLogger('beat.web') + if arguments['--verbose'] == 1: logger.setLevel(logging.INFO) + elif arguments['--verbose'] >= 2: logger.setLevel(logging.DEBUG) + + # installs SIGTERM handler + def handler(signum, frame): + logger.info("Signal %d caught, terminating...", signum) + global stop + stop = True + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) + + from beat.web.backend import schedule + + timing = int(arguments['--period']) \ + if arguments['--period'] else settings.SCHEDULING_INTERVAL + logger.info("Scheduling every %d seconds", timing) + + global stop + while not stop: + + try: + start = time.time() + logger.debug("Starting scheduling cycle...") + schedule.send_experiment_emails() + schedule.schedule() + duration = time.time() - start + if duration < timing: + time.sleep(timing - duration) + + except KeyboardInterrupt: + logger.info("CTRL-c caught, terminating...") + stop = True diff --git a/beat/web/scripts/worker.py b/beat/web/scripts/worker.py index d23d98b06..39c2c209f 100644 --- a/beat/web/scripts/worker.py +++ b/beat/web/scripts/worker.py @@ -30,8 +30,8 @@ Starts the worker process. Usage: - %(prog)s [-v ... | --verbose ...] [--settings=<file>] - %(prog)s [--cpulimit=<file>] [--environments=<path>] + %(prog)s [-v ... | --verbose ...] [--settings=<file>] [--period=<seconds>] + %(prog)s [--cpulimit=<file>] [--environments=<path>] [--name=<name>] %(prog)s (-h | --help) %(prog)s (-V | --version) @@ -48,6 +48,15 @@ Options: not enforced. -e <path>, --environments=<path> The path to the installation root of available environments. + -n <name>, --name=<name> The unique name of this worker on the + database. This is typically the assigned + hostname of the node, but not necessarily + [default: %(hostname)s] + -p <seconds, --period=<seconds> The time, in seconds, in which this worker + will probe the database for jobs to run or + cancel. If not set, use the value available + on the Django settings file, at the + variable `WORKER_INTERVAL`. Examples: @@ -62,75 +71,69 @@ Examples: """ -import logging -logger = logging.getLogger('beat') - - import os import sys import time +import socket import signal import docopt +import logging +stop = False def main(user_input=None): - stop = False - - # installs SIGTERM handler - def handler(signum, frame): - stop = True - signal.signal(signal.SIGTERM, handler) - arguments = docopt.docopt( __doc__ % dict( prog=os.path.basename(sys.argv[0]), + hostname=socket.gethostname(), ), ) + os.environ.setdefault('DJANGO_SETTINGS_MODULE', arguments['--settings']) + from django.conf import settings + from django import setup + setup() - # Sets-up logging + logger = logging.getLogger('beat.web') if arguments['--verbose'] == 1: logger.setLevel(logging.INFO) elif arguments['--verbose'] >= 2: logger.setLevel(logging.DEBUG) + # installs SIGTERM handler + def handler(signum, frame): + logger.info("Signal %d caught, terminating...", signum) + global stop + stop = True + signal.signal(signal.SIGTERM, handler) + signal.signal(signal.SIGINT, handler) - try: - - from beat.core.async import resolve_cpulimit_path - cpulimit = resolve_cpulimit_path(arguments['--cpulimit']) - - from beat.web.backend.schedule import find_environments, work - from beat.web.backend.schedule import resolve_process_path - from beat.web.backend.schedule import worker_shutdown, worker_update - - process = resolve_process_path() - environments = find_environments(arguments['--environments'] or []) - - os.environ.setdefault('DJANGO_SETTINGS_MODULE', arguments['--settings']) - from django.conf import settings + from ..backend import utils + from ..backend.models import Worker - from django import setup - setup() + from beat.core.async import resolve_cpulimit_path + cpulimit = resolve_cpulimit_path(arguments['--cpulimit']) + process = utils.resolve_process_path() + environments = utils.find_environments(arguments['--environments'] or []) - worker_update() + timing = int(arguments['--period']) \ + if arguments['--period'] else settings.WORKER_INTERVAL + logger.info("Working every %d seconds", timing) - timing = settings.WORKER_INTERVAL + global stop + with Worker.objects.get(name=arguments['--name']) as worker: while not stop: - start = time.time() - work(environments, cpulimit, process, arguments['--settings']) - currtime = time.time() - duration = currtime - start - - if duration < timing: - time.sleep(timing - duration) - - except Exception: - from traceback import format_exc - logger.error(format_exc()) - + try: - finally: + start = time.time() + logger.debug("Starting work cycle...") + utils.cleanup_zombies() + worker.work(environments, cpulimit, process) + duration = time.time() - start + if duration < timing: + time.sleep(timing - duration) - worker_shutdown() + except KeyboardInterrupt: + logger.info("CTRL-c caught, terminating...") + stop = True diff --git a/setup.py b/setup.py index b4a6f66cd..344e7921a 100644 --- a/setup.py +++ b/setup.py @@ -86,9 +86,9 @@ setup( entry_points={ 'console_scripts': [ - 'localhost.py = beat.web.scripts.localhost:main', - 'process.py = beat.web.scripts.process:main', - 'worker.py = beat.web.scripts.worker:main', + 'process = beat.web.scripts.process:main', + 'worker = beat.web.scripts.worker:main', + 'scheduler = beat.web.scripts.scheduler:main', ], }, -- GitLab