Skip to content
Snippets Groups Projects
Commit e7812630 authored by Philip ABBET's avatar Philip ABBET
Browse files

[backend] More refactoring

parent 823da9ab
No related branches found
No related tags found
No related merge requests found
...@@ -99,13 +99,21 @@ def start_local_scheduler(request): ...@@ -99,13 +99,21 @@ def start_local_scheduler(request):
# Clean start-up # Clean start-up
LocalSchedulerProcesses.objects.all().delete() LocalSchedulerProcesses.objects.all().delete()
address = getattr(settings, 'LOCAL_SCHEDULER_ADDRESS', '127.0.0.1')
port = getattr(settings, 'LOCAL_SCHEDULER_PORT', 50000)
use_docker = getattr(settings, 'LOCAL_SCHEDULER_USE_DOCKER', False)
full_address = 'tcp://%s:%d' % (address, port)
for worker in Worker.objects.all(): for worker in Worker.objects.all():
(pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX, (pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX,
settings.CACHE_ROOT, settings.CACHE_ROOT,
'tcp://127.0.0.1:50000') full_address,
use_docker=use_docker)
LocalSchedulerProcesses(name=worker.name, pid=pid).save() LocalSchedulerProcesses(name=worker.name, pid=pid).save()
(pid, _) = local_scheduler.start_scheduler(address='127.0.0.1', port=50000) (pid, _) = local_scheduler.start_scheduler(address=address, port=port)
LocalSchedulerProcesses(name='Scheduler', pid=pid).save() LocalSchedulerProcesses(name='Scheduler', pid=pid).save()
return Response(status=status.HTTP_204_NO_CONTENT) return Response(status=status.HTTP_204_NO_CONTENT)
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
############################################################################### ###############################################################################
from django.conf import settings
import multiprocessing import multiprocessing
import psutil import psutil
import signal import signal
...@@ -80,16 +82,18 @@ class WorkerProcess(multiprocessing.Process): ...@@ -80,16 +82,18 @@ class WorkerProcess(multiprocessing.Process):
#---------------------------------------------------------- #----------------------------------------------------------
def start_scheduler(settings='beat.web.settings.settings', interval=5, def start_scheduler(settings_module='beat.web.settings.settings', interval=5,
address='127.0.0.1', port=50000): address='127.0.0.1', port=50000):
args = [ args = [
'--settings=%s' % str(settings), '--settings=%s' % str(settings_module),
'--interval=%d' % int(interval), '--interval=%d' % int(interval),
'--address=%s' % str(address), '--address=%s' % str(address),
'--port=%d' % int(port), '--port=%d' % int(port),
'--verbose',
] ]
if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None:
args.append(settings.LOCAL_SCHEDULER_VERBOSITY)
process = SchedulerProcess(args) process = SchedulerProcess(args)
process.start() process.start()
process.queue.get() process.queue.get()
...@@ -100,15 +104,20 @@ def start_scheduler(settings='beat.web.settings.settings', interval=5, ...@@ -100,15 +104,20 @@ def start_scheduler(settings='beat.web.settings.settings', interval=5,
#---------------------------------------------------------- #----------------------------------------------------------
def start_worker(name, prefix, cache, scheduler_address): def start_worker(name, prefix, cache, scheduler_address, use_docker=False):
args = [ args = [
'--prefix=%s' % str(prefix), '--prefix=%s' % str(prefix),
'--cache=%s' % str(cache), '--cache=%s' % str(cache),
'--name=%s' % str(name), '--name=%s' % str(name),
'--verbose',
str(scheduler_address) str(scheduler_address)
] ]
if use_docker:
args.insert(3, '--docker')
if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None:
args.insert(3, settings.LOCAL_SCHEDULER_VERBOSITY)
process = WorkerProcess(args) process = WorkerProcess(args)
process.start() process.start()
process.queue.get() process.queue.get()
......
...@@ -133,244 +133,9 @@ class Worker(models.Model): ...@@ -133,244 +133,9 @@ class Worker(models.Model):
def available_cores(self): def available_cores(self):
'''Calculates the number of available cores considering current load''' '''Calculates the number of available cores considering current load'''
return max(self.cores - self.load(), 0) return max(self.cores - self.load(), 0)
# def deactivate(self, reason):
# '''Deactivates the current worker for a reason, that is registered'''
#
# self.info = reason
# self.active = False
#
#
# def activate(self, reason=None):
# '''Reactivates the worker, deletes any associated information'''
#
# self.info = reason
# self.active = True
def as_dict(self): def as_dict(self):
'''Returns a dictionary-like representation''' '''Returns a dictionary-like representation'''
return dict(cores=self.cores, memory=self.memory) return dict(cores=self.cores, memory=self.memory)
# def check_environments(self, environments):
# '''Checks that this worker has access to all environments it needs
#
# This method will check if the found set of environments (in the
# dictionary ``environments``) contains, at least, one environment for
# each environment object this worker is supposed to be able to execute
# user algorithms for.
#
#
# Parameters:
#
# environments (dict): A dictionary of environments found by using
# :py:func:`utils.find_environments` in which, keys represent the
# natural keys of Django database environments.
#
#
# Returns:
#
# list: A list of missing environments this worker can be assigned to
# work with, but where not found
#
# list: A list of unused environments this worker cannot be assigned to
# work with, but where nevertheless found
#
# '''
#
# slots = Slot.objects.filter(worker=self)
# queues = Queue.objects.filter(slots__in=slots)
# wishlist = Environment.objects.filter(queues__in=queues, active=True)
# wishlist = wishlist.order_by('id').distinct()
#
# required = [k.fullname() for k in wishlist]
# missing = [k for k in required if k not in environments]
# unused = [k for k in environments if k not in required]
#
# return missing, unused
#
#
# def update_state(self):
# '''Updates state on the database based on current machine readings'''
#
# # check I have at least all cores and memory I'm supposed to have
# cores = psutil.cpu_count()
# ram = psutil.virtual_memory().total / (1024 * 1024)
# self.info = ''
#
# if cores < self.cores:
# logger.warn("Worker `%s' only has %d cores which is less then " \
# "the value declared on the database - it's not a problem, " \
# "but note this self may get overloaded", self, cores)
# self.info += 'only %d cores;' % cores
#
# if ram < self.memory:
# logger.warn("Worker `%s' only has %d Mb of RAM which is less " \
# "then the value declared on the database - it's not a " \
# "problem, but note this self may get overloaded", self,
# ram)
# self.info += 'only %d Mb of RAM;' % ram
#
# with transaction.atomic():
# self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock
#
# # update process and memory usage
# self.used_cores = int(psutil.cpu_percent())
# self.used_memory = int(psutil.virtual_memory().percent)
#
# # save current self state
# self.active = True
# self.update = False
# self.save()
#
#
# def terminate(self):
# '''Cleanly terminates a particular worker at the database
#
# .. note::
#
# This method does not destroy running or assigned processes that may
# be running or assigned to this worker. This is implemented in this
# way to allow for a clean replacement of the worker program w/o an
# interruption of the backend service.
#
# '''
#
# from ..models import JobSplit
# from ..models import Job
#
# # disables worker, so no more splits can be assigned to it
# with transaction.atomic():
# self_ = Worker.objects.select_for_update().get(pk=self.pk)
# self_.active = False
# self_.used_cores = 0
# self_.used_memory = 0
# self_.info = 'Worker deactivated by system administrator'
# self_.save()
#
# # cancel job splits which should be cancelled anyways
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING,
# end_date__isnull=True, process_id__isnull=False):
# if psutil.pid_exists(j.process_id):
# os.kill(j.process_id, signal.SIGTERM)
#
# # cleans-up zombie processes that may linger
# _cleanup_zombies()
#
#
# def shutdown(self):
# '''Removes all running/assigned jobs from the queue, shuts down
#
# This method should be used with care as it may potentially cancel all
# assigned splits for the current worker.
#
# '''
#
# from ..models import JobSplit
# from ..models import Job
#
# self.terminate()
#
# message = 'Cancelled on forced worker shutdown (maintenance)' \
# ' - you may retry submitting your experiment shortly'
#
# # cancel job splits which were not yet started
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.QUEUED,
# start_date__isnull=True, process_id__isnull=True):
# j.end(Result(status=1, usrerr=message))
#
# # cancel job splits which are running
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.PROCESSING,
# end_date__isnull=True, process_id__isnull=False):
# j._cancel()
#
#
#
# def work(self, environments, process):
# '''Launches user code on isolated processes
#
# This function is supposed to be called asynchronously, by a
# scheduled agent, every few seconds. It examines job splits assigned
# to the current host and launches an individual process to handle
# these splits. The process is started locally and the process ID
# stored with the split.
#
# Job split cancelling is executed by setting the split state as
# ``CANCEL`` and waiting for this function to handle it.
#
#
# Parameters:
#
# environments (dict): A dictionary containing installed
# environments, their description and execute-file paths.
#
# process (str): The path to the ``process.py`` program to use for
# running the user code on isolated processes.
#
# '''
#
# from ..models import JobSplit
# from ..models import Job
#
# # refresh state from database and update state if required
# self.refresh_from_db()
# if self.update: self.update_state()
#
# # cancel job splits by killing associated processes
# for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING,
# end_date__isnull=True):
# if j.process_id is not None and psutil.pid_exists(j.process_id):
# os.kill(j.process_id, signal.SIGTERM)
# else: # process went away without any apparent reason
# with transaction.atomic():
# message = "Split %d/%d running at worker `%s' for " \
# "block `%s' of experiment `%s' finished before " \
# "even starting. Force-cancelling job split at " \
# "database..." % (j.split_index+1,
# j.job.block.required_slots,
# self,
# j.job.block.name,
# j.job.block.experiment.fullname(),
# )
# logger.error(message)
# j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR))
#
# # cmdline base argument
# cmdline = [process]
# if settings.DEBUG:
# cmdline += ['-vv']
# else:
# cmdline += ['-v']
#
# # start newly assigned job splits
# with transaction.atomic():
# splits = JobSplit.objects.select_for_update().filter(worker=self,
# status=JobSplit.QUEUED, start_date__isnull=True,
# process_id__isnull=True)
# for split in splits:
# # if we get to this point, then we launch the user process
# # -> see settings.WORKER_DETACH_CHILDREN for more info
# kwargs = dict()
# if settings.WORKER_DETACH_CHILDREN:
# kwargs['preexec_fn'] = os.setpgrp
# subprocess.Popen(cmdline + [str(split.pk)], **kwargs)
# split.status = JobSplit.PROCESSING #avoids re-running
# split.save()
#
# # cleans-up zombie processes that may linger
# _cleanup_zombies()
#
#
# def __enter__(self):
# self.update_state()
# return self
#
#
# def __exit__(self, *exc):
# self.terminate()
# return False #propagate exceptions
...@@ -41,6 +41,8 @@ from ..models import Worker ...@@ -41,6 +41,8 @@ from ..models import Worker
from ..utils import setup_backend from ..utils import setup_backend
from ..helpers import schedule_experiment from ..helpers import schedule_experiment
from ..helpers import cancel_experiment from ..helpers import cancel_experiment
from ..local_scheduler import start_scheduler
from ..local_scheduler import start_worker
from ...scripts import scheduler from ...scripts import scheduler
from ...experiments.models import Experiment from ...experiments.models import Experiment
...@@ -52,40 +54,6 @@ from beat.core.scripts import worker ...@@ -52,40 +54,6 @@ from beat.core.scripts import worker
#---------------------------------------------------------- #----------------------------------------------------------
class SchedulerThread(multiprocessing.Process):
def __init__(self, queue, arguments):
super(SchedulerThread, self).__init__()
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put('STARTED')
scheduler.main(self.arguments)
#----------------------------------------------------------
class WorkerThread(multiprocessing.Process):
def __init__(self, queue, arguments):
super(WorkerThread, self).__init__()
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put('STARTED')
worker.main(self.arguments)
#----------------------------------------------------------
class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin): class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def __init__(self, methodName='runTest'): def __init__(self, methodName='runTest'):
...@@ -112,16 +80,9 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin): ...@@ -112,16 +80,9 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def start_scheduler(self): def start_scheduler(self):
args = [ (pid, self.scheduler_thread) = start_scheduler(settings_module='beat.web.settings.test',
'--settings=beat.web.settings.test', interval=1, address='127.0.0.1',
'--interval=1', port=52000)
'--address=127.0.0.1',
'--port=50000',
]
self.scheduler_thread = SchedulerThread(multiprocessing.Queue(), args)
self.scheduler_thread.start()
self.scheduler_thread.queue.get()
def stop_scheduler(self): def stop_scheduler(self):
...@@ -132,16 +93,8 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin): ...@@ -132,16 +93,8 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def start_worker(self, name): def start_worker(self, name):
args = [ (pid, worker_thread) = start_worker(name, settings.PREFIX, settings.CACHE_ROOT,
'--prefix=%s' % settings.PREFIX, 'tcp://127.0.0.1:52000')
'--cache=%s' % settings.CACHE_ROOT,
'--name=%s' % name,
'tcp://127.0.0.1:50000',
]
worker_thread = WorkerThread(multiprocessing.Queue(), args)
worker_thread.start()
worker_thread.queue.get()
self.worker_threads[name] = worker_thread self.worker_threads[name] = worker_thread
......
...@@ -213,10 +213,8 @@ CACHE_ROOT = os.path.join(PREFIX, 'cache') ...@@ -213,10 +213,8 @@ CACHE_ROOT = os.path.join(PREFIX, 'cache')
# #
############################################################################## ##############################################################################
DATASETS_UID = 1000 DATASETS_UID = None
# DATASETS_UID = None DATASETS_ROOT_PATH = None
DATASETS_ROOT_PATH = '/Users/pabbet/Projects/databases/'
# DATASETS_ROOT_PATH = None
############################################################################## ##############################################################################
...@@ -267,58 +265,22 @@ EMAIL_USE_TLS = False ...@@ -267,58 +265,22 @@ EMAIL_USE_TLS = False
############################################################################## ##############################################################################
# The scheduling interval controls the number of seconds between # The scheduling interval controls the number of seconds between
# scheduling attempts (calls to :py:func:`beat.web.backend.schedule.schedule`) # scheduling attempts
SCHEDULING_INTERVAL = 5 #seconds SCHEDULING_INTERVAL = 5 #seconds
# The worker interval controls the number of seconds between checks
# a particular worker will run to verify jobs are not scheduled to itself
WORKER_INTERVAL = 5 #seconds
# If set, a testing panel that can accomplish scheduling activities will appear # If set, a testing panel that can accomplish scheduling activities will appear
# at the scheduler page allowing administrators to launch scheduling activities # at the scheduler page allowing administrators to launch scheduling activities
# manually. # manually.
SCHEDULING_PANEL = True SCHEDULING_PANEL = True
# The maximum index split errors control the maximum number of times we can # The verbosity level used by the local scheduler when starting the 'scheduler.py'
# incur in an index split error condition without cancelling the block # and 'worker.py' scripts
# execution altogether. This number, multiplied by the scheduling interval, LOCAL_SCHEDULER_VERBOSITY = '--verbose'
# must be larger than 60 seconds, as this is the default NFS caching interval.
# If you run on a reliable networked filesystem (i.e., not NFS) or on the local LOCAL_SCHEDULER_ADDRESS = '127.0.0.1'
# node, you may set this value to 0, which will cause the scheduling activity LOCAL_SCHEDULER_PORT = 50000
# to consider even a single splitting error as enough reason to cancel the
# block execution (and, by consequence), the experiment. LOCAL_SCHEDULER_USE_DOCKER = True
MAXIMUM_SPLIT_ERRORS = 0 #attempts to split without errors
# The maximum number of IOErrors (due to cache loading) which are acceptable
# for a particular split. If the number of cache errors is bigger than the
# value set, then the split is considered as failed. This variable serves the
# same purpose as ``MAXIMUM_SPLIT_ERRORS`` above and fills-in where NFS caching
# does not. If you're running on a reliable filesystem, you can't leave it to
# zero.
MAXIMUM_IO_ERRORS = 0 #attempts to load cache files without errors
# The maximum number of retries for saving (mostly at start() and end()), job
# splits from remote processes. If you're using a SQLite database backend, this
# number should be higher than 1 (recommended value is 3 to 5). In case you're
# using another database, then this can value can be ignored. If set to a value
# of one or more and there are "database is locked" errors, then job split
# saving at ``start()`` or ``end()`` will be retried with a 1-second interval.
MAXIMUM_SPLIT_SAVE_RETRIES = 5
# The default user error is a message string that is set upon the failure of
# execution of a particular block if the root cause of the problem is NOT a
# user error, but the systems'. In this case, the system administrators receive
# an e-mail indicating the exception caught and the user block for a given
# experiment is marked with this informative message.
DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \
"algorithm.\nThe administrators have been informed.\nYou may try to run " \
"the experiment again at another moment."
# If set, then detach children processes (I/O daemons) from the worker process
# group. In this case, signals sent to the worker process will not be forwarded
# to the processing children. This is desirable in a production environment, to
# avoid user processes to be terminated in case the worker is updated.
WORKER_DETACH_CHILDREN = False
############################################################################## ##############################################################################
......
...@@ -62,6 +62,9 @@ TOOLCHAINS_ROOT = os.path.join(PREFIX, 'toolchains') ...@@ -62,6 +62,9 @@ TOOLCHAINS_ROOT = os.path.join(PREFIX, 'toolchains')
EXPERIMENTS_ROOT = os.path.join(PREFIX, 'experiments') EXPERIMENTS_ROOT = os.path.join(PREFIX, 'experiments')
CACHE_ROOT = os.path.join(PREFIX, 'cache') CACHE_ROOT = os.path.join(PREFIX, 'cache')
LOCAL_SCHEDULER_VERBOSITY = None
LOCAL_SCHEDULER_USE_DOCKER = False
# To speed-up tests, don't put this in production # To speed-up tests, don't put this in production
PASSWORD_HASHERS = [ PASSWORD_HASHERS = [
'django.contrib.auth.hashers.MD5PasswordHasher', 'django.contrib.auth.hashers.MD5PasswordHasher',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment