diff --git a/beat/web/backend/api.py b/beat/web/backend/api.py index fb14a01138fe8ebfc2f4aeeaa1603456aa0891c7..54803dda8d85295843d8ed7d376d9d391d25245c 100755 --- a/beat/web/backend/api.py +++ b/beat/web/backend/api.py @@ -99,13 +99,21 @@ def start_local_scheduler(request): # Clean start-up LocalSchedulerProcesses.objects.all().delete() + address = getattr(settings, 'LOCAL_SCHEDULER_ADDRESS', '127.0.0.1') + port = getattr(settings, 'LOCAL_SCHEDULER_PORT', 50000) + use_docker = getattr(settings, 'LOCAL_SCHEDULER_USE_DOCKER', False) + + full_address = 'tcp://%s:%d' % (address, port) + for worker in Worker.objects.all(): (pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX, settings.CACHE_ROOT, - 'tcp://127.0.0.1:50000') + full_address, + use_docker=use_docker) + LocalSchedulerProcesses(name=worker.name, pid=pid).save() - (pid, _) = local_scheduler.start_scheduler(address='127.0.0.1', port=50000) + (pid, _) = local_scheduler.start_scheduler(address=address, port=port) LocalSchedulerProcesses(name='Scheduler', pid=pid).save() return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/beat/web/backend/local_scheduler.py b/beat/web/backend/local_scheduler.py index ba688dba4422f94564f023c3a5d6948f2c5604e2..bc270cb0bea67dcd9d331aad71acd35df10636e9 100755 --- a/beat/web/backend/local_scheduler.py +++ b/beat/web/backend/local_scheduler.py @@ -26,6 +26,8 @@ ############################################################################### +from django.conf import settings + import multiprocessing import psutil import signal @@ -80,16 +82,18 @@ class WorkerProcess(multiprocessing.Process): #---------------------------------------------------------- -def start_scheduler(settings='beat.web.settings.settings', interval=5, +def start_scheduler(settings_module='beat.web.settings.settings', interval=5, address='127.0.0.1', port=50000): args = [ - '--settings=%s' % str(settings), + '--settings=%s' % str(settings_module), '--interval=%d' % int(interval), '--address=%s' % str(address), '--port=%d' % int(port), - '--verbose', ] - + + if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None: + args.append(settings.LOCAL_SCHEDULER_VERBOSITY) + process = SchedulerProcess(args) process.start() process.queue.get() @@ -100,15 +104,20 @@ def start_scheduler(settings='beat.web.settings.settings', interval=5, #---------------------------------------------------------- -def start_worker(name, prefix, cache, scheduler_address): +def start_worker(name, prefix, cache, scheduler_address, use_docker=False): args = [ '--prefix=%s' % str(prefix), '--cache=%s' % str(cache), '--name=%s' % str(name), - '--verbose', str(scheduler_address) ] + if use_docker: + args.insert(3, '--docker') + + if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None: + args.insert(3, settings.LOCAL_SCHEDULER_VERBOSITY) + process = WorkerProcess(args) process.start() process.queue.get() diff --git a/beat/web/backend/models/worker.py b/beat/web/backend/models/worker.py index 405b40b23c5721bfdb25849f9646a3553528af5c..5d9a915bd01d9159a0a1d7390d59e363a0a6ac59 100755 --- a/beat/web/backend/models/worker.py +++ b/beat/web/backend/models/worker.py @@ -133,244 +133,9 @@ class Worker(models.Model): def available_cores(self): '''Calculates the number of available cores considering current load''' - return max(self.cores - self.load(), 0) - # def deactivate(self, reason): - # '''Deactivates the current worker for a reason, that is registered''' - # - # self.info = reason - # self.active = False - # - # - # def activate(self, reason=None): - # '''Reactivates the worker, deletes any associated information''' - # - # self.info = reason - # self.active = True - - def as_dict(self): '''Returns a dictionary-like representation''' - return dict(cores=self.cores, memory=self.memory) - - - # def check_environments(self, environments): - # '''Checks that this worker has access to all environments it needs - # - # This method will check if the found set of environments (in the - # dictionary ``environments``) contains, at least, one environment for - # each environment object this worker is supposed to be able to execute - # user algorithms for. - # - # - # Parameters: - # - # environments (dict): A dictionary of environments found by using - # :py:func:`utils.find_environments` in which, keys represent the - # natural keys of Django database environments. - # - # - # Returns: - # - # list: A list of missing environments this worker can be assigned to - # work with, but where not found - # - # list: A list of unused environments this worker cannot be assigned to - # work with, but where nevertheless found - # - # ''' - # - # slots = Slot.objects.filter(worker=self) - # queues = Queue.objects.filter(slots__in=slots) - # wishlist = Environment.objects.filter(queues__in=queues, active=True) - # wishlist = wishlist.order_by('id').distinct() - # - # required = [k.fullname() for k in wishlist] - # missing = [k for k in required if k not in environments] - # unused = [k for k in environments if k not in required] - # - # return missing, unused - # - # - # def update_state(self): - # '''Updates state on the database based on current machine readings''' - # - # # check I have at least all cores and memory I'm supposed to have - # cores = psutil.cpu_count() - # ram = psutil.virtual_memory().total / (1024 * 1024) - # self.info = '' - # - # if cores < self.cores: - # logger.warn("Worker `%s' only has %d cores which is less then " \ - # "the value declared on the database - it's not a problem, " \ - # "but note this self may get overloaded", self, cores) - # self.info += 'only %d cores;' % cores - # - # if ram < self.memory: - # logger.warn("Worker `%s' only has %d Mb of RAM which is less " \ - # "then the value declared on the database - it's not a " \ - # "problem, but note this self may get overloaded", self, - # ram) - # self.info += 'only %d Mb of RAM;' % ram - # - # with transaction.atomic(): - # self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock - # - # # update process and memory usage - # self.used_cores = int(psutil.cpu_percent()) - # self.used_memory = int(psutil.virtual_memory().percent) - # - # # save current self state - # self.active = True - # self.update = False - # self.save() - # - # - # def terminate(self): - # '''Cleanly terminates a particular worker at the database - # - # .. note:: - # - # This method does not destroy running or assigned processes that may - # be running or assigned to this worker. This is implemented in this - # way to allow for a clean replacement of the worker program w/o an - # interruption of the backend service. - # - # ''' - # - # from ..models import JobSplit - # from ..models import Job - # - # # disables worker, so no more splits can be assigned to it - # with transaction.atomic(): - # self_ = Worker.objects.select_for_update().get(pk=self.pk) - # self_.active = False - # self_.used_cores = 0 - # self_.used_memory = 0 - # self_.info = 'Worker deactivated by system administrator' - # self_.save() - # - # # cancel job splits which should be cancelled anyways - # for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING, - # end_date__isnull=True, process_id__isnull=False): - # if psutil.pid_exists(j.process_id): - # os.kill(j.process_id, signal.SIGTERM) - # - # # cleans-up zombie processes that may linger - # _cleanup_zombies() - # - # - # def shutdown(self): - # '''Removes all running/assigned jobs from the queue, shuts down - # - # This method should be used with care as it may potentially cancel all - # assigned splits for the current worker. - # - # ''' - # - # from ..models import JobSplit - # from ..models import Job - # - # self.terminate() - # - # message = 'Cancelled on forced worker shutdown (maintenance)' \ - # ' - you may retry submitting your experiment shortly' - # - # # cancel job splits which were not yet started - # for j in JobSplit.objects.filter(worker=self, status=JobSplit.QUEUED, - # start_date__isnull=True, process_id__isnull=True): - # j.end(Result(status=1, usrerr=message)) - # - # # cancel job splits which are running - # for j in JobSplit.objects.filter(worker=self, status=JobSplit.PROCESSING, - # end_date__isnull=True, process_id__isnull=False): - # j._cancel() - # - # - # - # def work(self, environments, process): - # '''Launches user code on isolated processes - # - # This function is supposed to be called asynchronously, by a - # scheduled agent, every few seconds. It examines job splits assigned - # to the current host and launches an individual process to handle - # these splits. The process is started locally and the process ID - # stored with the split. - # - # Job split cancelling is executed by setting the split state as - # ``CANCEL`` and waiting for this function to handle it. - # - # - # Parameters: - # - # environments (dict): A dictionary containing installed - # environments, their description and execute-file paths. - # - # process (str): The path to the ``process.py`` program to use for - # running the user code on isolated processes. - # - # ''' - # - # from ..models import JobSplit - # from ..models import Job - # - # # refresh state from database and update state if required - # self.refresh_from_db() - # if self.update: self.update_state() - # - # # cancel job splits by killing associated processes - # for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING, - # end_date__isnull=True): - # if j.process_id is not None and psutil.pid_exists(j.process_id): - # os.kill(j.process_id, signal.SIGTERM) - # else: # process went away without any apparent reason - # with transaction.atomic(): - # message = "Split %d/%d running at worker `%s' for " \ - # "block `%s' of experiment `%s' finished before " \ - # "even starting. Force-cancelling job split at " \ - # "database..." % (j.split_index+1, - # j.job.block.required_slots, - # self, - # j.job.block.name, - # j.job.block.experiment.fullname(), - # ) - # logger.error(message) - # j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR)) - # - # # cmdline base argument - # cmdline = [process] - # if settings.DEBUG: - # cmdline += ['-vv'] - # else: - # cmdline += ['-v'] - # - # # start newly assigned job splits - # with transaction.atomic(): - # splits = JobSplit.objects.select_for_update().filter(worker=self, - # status=JobSplit.QUEUED, start_date__isnull=True, - # process_id__isnull=True) - # for split in splits: - # # if we get to this point, then we launch the user process - # # -> see settings.WORKER_DETACH_CHILDREN for more info - # kwargs = dict() - # if settings.WORKER_DETACH_CHILDREN: - # kwargs['preexec_fn'] = os.setpgrp - # subprocess.Popen(cmdline + [str(split.pk)], **kwargs) - # split.status = JobSplit.PROCESSING #avoids re-running - # split.save() - # - # # cleans-up zombie processes that may linger - # _cleanup_zombies() - # - # - # def __enter__(self): - # self.update_state() - # return self - # - # - # def __exit__(self, *exc): - # self.terminate() - # return False #propagate exceptions diff --git a/beat/web/backend/tests/test_scheduler.py b/beat/web/backend/tests/test_scheduler.py index 0280d5ca30107dd69ba3594301508f755741896b..59448d83a2847cd46de7bcc2a0f9c562bc3c23a7 100755 --- a/beat/web/backend/tests/test_scheduler.py +++ b/beat/web/backend/tests/test_scheduler.py @@ -41,6 +41,8 @@ from ..models import Worker from ..utils import setup_backend from ..helpers import schedule_experiment from ..helpers import cancel_experiment +from ..local_scheduler import start_scheduler +from ..local_scheduler import start_worker from ...scripts import scheduler from ...experiments.models import Experiment @@ -52,40 +54,6 @@ from beat.core.scripts import worker #---------------------------------------------------------- -class SchedulerThread(multiprocessing.Process): - - def __init__(self, queue, arguments): - super(SchedulerThread, self).__init__() - - self.queue = queue - self.arguments = arguments - - - def run(self): - self.queue.put('STARTED') - scheduler.main(self.arguments) - - -#---------------------------------------------------------- - - -class WorkerThread(multiprocessing.Process): - - def __init__(self, queue, arguments): - super(WorkerThread, self).__init__() - - self.queue = queue - self.arguments = arguments - - - def run(self): - self.queue.put('STARTED') - worker.main(self.arguments) - - -#---------------------------------------------------------- - - class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin): def __init__(self, methodName='runTest'): @@ -112,16 +80,9 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin): def start_scheduler(self): - args = [ - '--settings=beat.web.settings.test', - '--interval=1', - '--address=127.0.0.1', - '--port=50000', - ] - - self.scheduler_thread = SchedulerThread(multiprocessing.Queue(), args) - self.scheduler_thread.start() - self.scheduler_thread.queue.get() + (pid, self.scheduler_thread) = start_scheduler(settings_module='beat.web.settings.test', + interval=1, address='127.0.0.1', + port=52000) def stop_scheduler(self): @@ -132,16 +93,8 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin): def start_worker(self, name): - args = [ - '--prefix=%s' % settings.PREFIX, - '--cache=%s' % settings.CACHE_ROOT, - '--name=%s' % name, - 'tcp://127.0.0.1:50000', - ] - - worker_thread = WorkerThread(multiprocessing.Queue(), args) - worker_thread.start() - worker_thread.queue.get() + (pid, worker_thread) = start_worker(name, settings.PREFIX, settings.CACHE_ROOT, + 'tcp://127.0.0.1:52000') self.worker_threads[name] = worker_thread diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py index 81c3f9a6b76671bfd1af12b1694b4170e0a4994c..23311a217d89299a8f7d20dae46b84e5f195aacc 100755 --- a/beat/web/settings/settings.py +++ b/beat/web/settings/settings.py @@ -213,10 +213,8 @@ CACHE_ROOT = os.path.join(PREFIX, 'cache') # ############################################################################## -DATASETS_UID = 1000 -# DATASETS_UID = None -DATASETS_ROOT_PATH = '/Users/pabbet/Projects/databases/' -# DATASETS_ROOT_PATH = None +DATASETS_UID = None +DATASETS_ROOT_PATH = None ############################################################################## @@ -267,58 +265,22 @@ EMAIL_USE_TLS = False ############################################################################## # The scheduling interval controls the number of seconds between -# scheduling attempts (calls to :py:func:`beat.web.backend.schedule.schedule`) +# scheduling attempts SCHEDULING_INTERVAL = 5 #seconds -# The worker interval controls the number of seconds between checks -# a particular worker will run to verify jobs are not scheduled to itself -WORKER_INTERVAL = 5 #seconds - # If set, a testing panel that can accomplish scheduling activities will appear # at the scheduler page allowing administrators to launch scheduling activities # manually. SCHEDULING_PANEL = True -# The maximum index split errors control the maximum number of times we can -# incur in an index split error condition without cancelling the block -# execution altogether. This number, multiplied by the scheduling interval, -# must be larger than 60 seconds, as this is the default NFS caching interval. -# If you run on a reliable networked filesystem (i.e., not NFS) or on the local -# node, you may set this value to 0, which will cause the scheduling activity -# to consider even a single splitting error as enough reason to cancel the -# block execution (and, by consequence), the experiment. -MAXIMUM_SPLIT_ERRORS = 0 #attempts to split without errors - -# The maximum number of IOErrors (due to cache loading) which are acceptable -# for a particular split. If the number of cache errors is bigger than the -# value set, then the split is considered as failed. This variable serves the -# same purpose as ``MAXIMUM_SPLIT_ERRORS`` above and fills-in where NFS caching -# does not. If you're running on a reliable filesystem, you can't leave it to -# zero. -MAXIMUM_IO_ERRORS = 0 #attempts to load cache files without errors - -# The maximum number of retries for saving (mostly at start() and end()), job -# splits from remote processes. If you're using a SQLite database backend, this -# number should be higher than 1 (recommended value is 3 to 5). In case you're -# using another database, then this can value can be ignored. If set to a value -# of one or more and there are "database is locked" errors, then job split -# saving at ``start()`` or ``end()`` will be retried with a 1-second interval. -MAXIMUM_SPLIT_SAVE_RETRIES = 5 - -# The default user error is a message string that is set upon the failure of -# execution of a particular block if the root cause of the problem is NOT a -# user error, but the systems'. In this case, the system administrators receive -# an e-mail indicating the exception caught and the user block for a given -# experiment is marked with this informative message. -DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \ - "algorithm.\nThe administrators have been informed.\nYou may try to run " \ - "the experiment again at another moment." - -# If set, then detach children processes (I/O daemons) from the worker process -# group. In this case, signals sent to the worker process will not be forwarded -# to the processing children. This is desirable in a production environment, to -# avoid user processes to be terminated in case the worker is updated. -WORKER_DETACH_CHILDREN = False +# The verbosity level used by the local scheduler when starting the 'scheduler.py' +# and 'worker.py' scripts +LOCAL_SCHEDULER_VERBOSITY = '--verbose' + +LOCAL_SCHEDULER_ADDRESS = '127.0.0.1' +LOCAL_SCHEDULER_PORT = 50000 + +LOCAL_SCHEDULER_USE_DOCKER = True ############################################################################## diff --git a/beat/web/settings/test.py b/beat/web/settings/test.py index 4b72e0697ebe775b16dd0efde780953aa9da6a08..8d3219cb8fbdd534e83be8d1d6b3e366e6a8e105 100755 --- a/beat/web/settings/test.py +++ b/beat/web/settings/test.py @@ -62,6 +62,9 @@ TOOLCHAINS_ROOT = os.path.join(PREFIX, 'toolchains') EXPERIMENTS_ROOT = os.path.join(PREFIX, 'experiments') CACHE_ROOT = os.path.join(PREFIX, 'cache') +LOCAL_SCHEDULER_VERBOSITY = None +LOCAL_SCHEDULER_USE_DOCKER = False + # To speed-up tests, don't put this in production PASSWORD_HASHERS = [ 'django.contrib.auth.hashers.MD5PasswordHasher',