From e78126308540bc4e43e8668294ffb9d0a8869c85 Mon Sep 17 00:00:00 2001
From: Philip ABBET <philip.abbet@idiap.ch>
Date: Fri, 13 Oct 2017 12:29:24 +0200
Subject: [PATCH] [backend] More refactoring

---
 beat/web/backend/api.py                  |  12 +-
 beat/web/backend/local_scheduler.py      |  21 +-
 beat/web/backend/models/worker.py        | 235 -----------------------
 beat/web/backend/tests/test_scheduler.py |  61 +-----
 beat/web/settings/settings.py            |  60 ++----
 beat/web/settings/test.py                |   3 +
 6 files changed, 46 insertions(+), 346 deletions(-)

diff --git a/beat/web/backend/api.py b/beat/web/backend/api.py
index fb14a0113..54803dda8 100755
--- a/beat/web/backend/api.py
+++ b/beat/web/backend/api.py
@@ -99,13 +99,21 @@ def start_local_scheduler(request):
     # Clean start-up
     LocalSchedulerProcesses.objects.all().delete()
 
+    address = getattr(settings, 'LOCAL_SCHEDULER_ADDRESS', '127.0.0.1')
+    port = getattr(settings, 'LOCAL_SCHEDULER_PORT', 50000)
+    use_docker = getattr(settings, 'LOCAL_SCHEDULER_USE_DOCKER', False)
+
+    full_address = 'tcp://%s:%d' % (address, port)
+
     for worker in Worker.objects.all():
         (pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX,
                                                     settings.CACHE_ROOT,
-                                                    'tcp://127.0.0.1:50000')
+                                                    full_address,
+                                                    use_docker=use_docker)
+
         LocalSchedulerProcesses(name=worker.name, pid=pid).save()
 
-    (pid, _) = local_scheduler.start_scheduler(address='127.0.0.1', port=50000)
+    (pid, _) = local_scheduler.start_scheduler(address=address, port=port)
     LocalSchedulerProcesses(name='Scheduler', pid=pid).save()
 
     return Response(status=status.HTTP_204_NO_CONTENT)
diff --git a/beat/web/backend/local_scheduler.py b/beat/web/backend/local_scheduler.py
index ba688dba4..bc270cb0b 100755
--- a/beat/web/backend/local_scheduler.py
+++ b/beat/web/backend/local_scheduler.py
@@ -26,6 +26,8 @@
 ###############################################################################
 
 
+from django.conf import settings
+
 import multiprocessing
 import psutil
 import signal
@@ -80,16 +82,18 @@ class WorkerProcess(multiprocessing.Process):
 #----------------------------------------------------------
 
 
-def start_scheduler(settings='beat.web.settings.settings', interval=5,
+def start_scheduler(settings_module='beat.web.settings.settings', interval=5,
                     address='127.0.0.1', port=50000):
     args = [
-        '--settings=%s' % str(settings),
+        '--settings=%s' % str(settings_module),
         '--interval=%d' % int(interval),
         '--address=%s' % str(address),
         '--port=%d' % int(port),
-        '--verbose',
     ]
-    
+
+    if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None:
+        args.append(settings.LOCAL_SCHEDULER_VERBOSITY)
+
     process = SchedulerProcess(args)
     process.start()
     process.queue.get()
@@ -100,15 +104,20 @@ def start_scheduler(settings='beat.web.settings.settings', interval=5,
 #----------------------------------------------------------
 
 
-def start_worker(name, prefix, cache, scheduler_address):
+def start_worker(name, prefix, cache, scheduler_address, use_docker=False):
     args = [
         '--prefix=%s' % str(prefix),
         '--cache=%s' % str(cache),
         '--name=%s' % str(name),
-        '--verbose',
         str(scheduler_address)
     ]
 
+    if use_docker:
+        args.insert(3, '--docker')
+
+    if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None:
+        args.insert(3, settings.LOCAL_SCHEDULER_VERBOSITY)
+
     process = WorkerProcess(args)
     process.start()
     process.queue.get()
diff --git a/beat/web/backend/models/worker.py b/beat/web/backend/models/worker.py
index 405b40b23..5d9a915bd 100755
--- a/beat/web/backend/models/worker.py
+++ b/beat/web/backend/models/worker.py
@@ -133,244 +133,9 @@ class Worker(models.Model):
 
     def available_cores(self):
         '''Calculates the number of available cores considering current load'''
-
         return max(self.cores - self.load(), 0)
 
 
-    # def deactivate(self, reason):
-    #     '''Deactivates the current worker for a reason, that is registered'''
-    #
-    #     self.info = reason
-    #     self.active = False
-    #
-    #
-    # def activate(self, reason=None):
-    #     '''Reactivates the worker, deletes any associated information'''
-    #
-    #     self.info = reason
-    #     self.active = True
-
-
     def as_dict(self):
         '''Returns a dictionary-like representation'''
-
         return dict(cores=self.cores, memory=self.memory)
-
-
-    # def check_environments(self, environments):
-    #     '''Checks that this worker has access to all environments it needs
-    #
-    #     This method will check if the found set of environments (in the
-    #     dictionary ``environments``) contains, at least, one environment for
-    #     each environment object this worker is supposed to be able to execute
-    #     user algorithms for.
-    #
-    #
-    #     Parameters:
-    #
-    #       environments (dict): A dictionary of environments found by using
-    #       :py:func:`utils.find_environments` in which, keys represent the
-    #       natural keys of Django database environments.
-    #
-    #
-    #     Returns:
-    #
-    #       list: A list of missing environments this worker can be assigned to
-    #           work with, but where not found
-    #
-    #       list: A list of unused environments this worker cannot be assigned to
-    #           work with, but where nevertheless found
-    #
-    #     '''
-    #
-    #     slots = Slot.objects.filter(worker=self)
-    #     queues = Queue.objects.filter(slots__in=slots)
-    #     wishlist = Environment.objects.filter(queues__in=queues, active=True)
-    #     wishlist = wishlist.order_by('id').distinct()
-    #
-    #     required = [k.fullname() for k in wishlist]
-    #     missing = [k for k in required if k not in environments]
-    #     unused = [k for k in environments if k not in required]
-    #
-    #     return missing, unused
-    #
-    #
-    # def update_state(self):
-    #     '''Updates state on the database based on current machine readings'''
-    #
-    #     # check I have at least all cores and memory I'm supposed to have
-    #     cores = psutil.cpu_count()
-    #     ram = psutil.virtual_memory().total / (1024 * 1024)
-    #     self.info = ''
-    #
-    #     if cores < self.cores:
-    #         logger.warn("Worker `%s' only has %d cores which is less then " \
-    #             "the value declared on the database - it's not a problem, " \
-    #             "but note this self may get overloaded", self, cores)
-    #         self.info += 'only %d cores;' % cores
-    #
-    #     if ram < self.memory:
-    #         logger.warn("Worker `%s' only has %d Mb of RAM which is less " \
-    #             "then the value declared on the database - it's not a " \
-    #             "problem, but note this self may get overloaded", self,
-    #             ram)
-    #         self.info += 'only %d Mb of RAM;' % ram
-    #
-    #     with transaction.atomic():
-    #         self_ = Worker.objects.select_for_update().get(pk=self.pk) #lock
-    #
-    #         # update process and memory usage
-    #         self.used_cores = int(psutil.cpu_percent())
-    #         self.used_memory = int(psutil.virtual_memory().percent)
-    #
-    #         # save current self state
-    #         self.active = True
-    #         self.update = False
-    #         self.save()
-    #
-    #
-    # def terminate(self):
-    #     '''Cleanly terminates a particular worker at the database
-    #
-    #     .. note::
-    #
-    #        This method does not destroy running or assigned processes that may
-    #        be running or assigned to this worker. This is implemented in this
-    #        way to allow for a clean replacement of the worker program w/o an
-    #        interruption of the backend service.
-    #
-    #     '''
-    #
-    #     from ..models import JobSplit
-    #     from ..models import Job
-    #
-    #     # disables worker, so no more splits can be assigned to it
-    #     with transaction.atomic():
-    #         self_ = Worker.objects.select_for_update().get(pk=self.pk)
-    #         self_.active = False
-    #         self_.used_cores = 0
-    #         self_.used_memory = 0
-    #         self_.info = 'Worker deactivated by system administrator'
-    #         self_.save()
-    #
-    #     # cancel job splits which should be cancelled anyways
-    #     for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING,
-    #         end_date__isnull=True, process_id__isnull=False):
-    #         if psutil.pid_exists(j.process_id):
-    #             os.kill(j.process_id, signal.SIGTERM)
-    #
-    #     # cleans-up zombie processes that may linger
-    #     _cleanup_zombies()
-    #
-    #
-    # def shutdown(self):
-    #     '''Removes all running/assigned jobs from the queue, shuts down
-    #
-    #     This method should be used with care as it may potentially cancel all
-    #     assigned splits for the current worker.
-    #
-    #     '''
-    #
-    #     from ..models import JobSplit
-    #     from ..models import Job
-    #
-    #     self.terminate()
-    #
-    #     message = 'Cancelled on forced worker shutdown (maintenance)' \
-    #         ' - you may retry submitting your experiment shortly'
-    #
-    #     # cancel job splits which were not yet started
-    #     for j in JobSplit.objects.filter(worker=self, status=JobSplit.QUEUED,
-    #         start_date__isnull=True, process_id__isnull=True):
-    #         j.end(Result(status=1, usrerr=message))
-    #
-    #     # cancel job splits which are running
-    #     for j in JobSplit.objects.filter(worker=self, status=JobSplit.PROCESSING,
-    #         end_date__isnull=True, process_id__isnull=False):
-    #         j._cancel()
-    #
-    #
-    #
-    # def work(self, environments, process):
-    #     '''Launches user code on isolated processes
-    #
-    #     This function is supposed to be called asynchronously, by a
-    #     scheduled agent, every few seconds. It examines job splits assigned
-    #     to the current host and launches an individual process to handle
-    #     these splits. The process is started locally and the process ID
-    #     stored with the split.
-    #
-    #     Job split cancelling is executed by setting the split state as
-    #     ``CANCEL`` and waiting for this function to handle it.
-    #
-    #
-    #     Parameters:
-    #
-    #       environments (dict): A dictionary containing installed
-    #         environments, their description and execute-file paths.
-    #
-    #       process (str): The path to the ``process.py`` program to use for
-    #         running the user code on isolated processes.
-    #
-    #     '''
-    #
-    #     from ..models import JobSplit
-    #     from ..models import Job
-    #
-    #     # refresh state from database and update state if required
-    #     self.refresh_from_db()
-    #     if self.update: self.update_state()
-    #
-    #     # cancel job splits by killing associated processes
-    #     for j in JobSplit.objects.filter(worker=self, status=JobSplit.CANCELLING,
-    #         end_date__isnull=True):
-    #         if j.process_id is not None and psutil.pid_exists(j.process_id):
-    #             os.kill(j.process_id, signal.SIGTERM)
-    #         else: # process went away without any apparent reason
-    #             with transaction.atomic():
-    #                 message = "Split %d/%d running at worker `%s' for " \
-    #                     "block `%s' of experiment `%s' finished before " \
-    #                     "even starting. Force-cancelling job split at " \
-    #                     "database..." % (j.split_index+1,
-    #                         j.job.block.required_slots,
-    #                         self,
-    #                         j.job.block.name,
-    #                         j.job.block.experiment.fullname(),
-    #                         )
-    #                 logger.error(message)
-    #                 j.end(Result(status=1, usrerr=settings.DEFAULT_USER_ERROR))
-    #
-    #     # cmdline base argument
-    #     cmdline = [process]
-    #     if settings.DEBUG:
-    #         cmdline += ['-vv']
-    #     else:
-    #         cmdline += ['-v']
-    #
-    #     # start newly assigned job splits
-    #     with transaction.atomic():
-    #         splits = JobSplit.objects.select_for_update().filter(worker=self,
-    #             status=JobSplit.QUEUED, start_date__isnull=True,
-    #             process_id__isnull=True)
-    #         for split in splits:
-    #             # if we get to this point, then we launch the user process
-    #             # -> see settings.WORKER_DETACH_CHILDREN for more info
-    #             kwargs = dict()
-    #             if settings.WORKER_DETACH_CHILDREN:
-    #                 kwargs['preexec_fn'] = os.setpgrp
-    #             subprocess.Popen(cmdline + [str(split.pk)], **kwargs)
-    #             split.status = JobSplit.PROCESSING #avoids re-running
-    #             split.save()
-    #
-    #     # cleans-up zombie processes that may linger
-    #     _cleanup_zombies()
-    #
-    #
-    # def __enter__(self):
-    #     self.update_state()
-    #     return self
-    #
-    #
-    # def __exit__(self, *exc):
-    #     self.terminate()
-    #     return False #propagate exceptions
diff --git a/beat/web/backend/tests/test_scheduler.py b/beat/web/backend/tests/test_scheduler.py
index 0280d5ca3..59448d83a 100755
--- a/beat/web/backend/tests/test_scheduler.py
+++ b/beat/web/backend/tests/test_scheduler.py
@@ -41,6 +41,8 @@ from ..models import Worker
 from ..utils import setup_backend
 from ..helpers import schedule_experiment
 from ..helpers import cancel_experiment
+from ..local_scheduler import start_scheduler
+from ..local_scheduler import start_worker
 
 from ...scripts import scheduler
 from ...experiments.models import Experiment
@@ -52,40 +54,6 @@ from beat.core.scripts import worker
 #----------------------------------------------------------
 
 
-class SchedulerThread(multiprocessing.Process):
-
-  def __init__(self, queue, arguments):
-    super(SchedulerThread, self).__init__()
-
-    self.queue = queue
-    self.arguments = arguments
-
-
-  def run(self):
-    self.queue.put('STARTED')
-    scheduler.main(self.arguments)
-
-
-#----------------------------------------------------------
-
-
-class WorkerThread(multiprocessing.Process):
-
-  def __init__(self, queue, arguments):
-    super(WorkerThread, self).__init__()
-
-    self.queue = queue
-    self.arguments = arguments
-
-
-  def run(self):
-    self.queue.put('STARTED')
-    worker.main(self.arguments)
-
-
-#----------------------------------------------------------
-
-
 class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
 
   def __init__(self, methodName='runTest'):
@@ -112,16 +80,9 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
 
 
   def start_scheduler(self):
-    args = [
-        '--settings=beat.web.settings.test',
-        '--interval=1',
-        '--address=127.0.0.1',
-        '--port=50000',
-    ]
-
-    self.scheduler_thread = SchedulerThread(multiprocessing.Queue(), args)
-    self.scheduler_thread.start()
-    self.scheduler_thread.queue.get()
+    (pid, self.scheduler_thread) = start_scheduler(settings_module='beat.web.settings.test',
+                                                   interval=1, address='127.0.0.1',
+                                                   port=52000)
 
 
   def stop_scheduler(self):
@@ -132,16 +93,8 @@ class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
 
 
   def start_worker(self, name):
-    args = [
-        '--prefix=%s' % settings.PREFIX,
-        '--cache=%s' % settings.CACHE_ROOT,
-        '--name=%s' % name,
-        'tcp://127.0.0.1:50000',
-    ]
-
-    worker_thread = WorkerThread(multiprocessing.Queue(), args)
-    worker_thread.start()
-    worker_thread.queue.get()
+    (pid, worker_thread) = start_worker(name, settings.PREFIX, settings.CACHE_ROOT,
+                                        'tcp://127.0.0.1:52000')
 
     self.worker_threads[name] = worker_thread
 
diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py
index 81c3f9a6b..23311a217 100755
--- a/beat/web/settings/settings.py
+++ b/beat/web/settings/settings.py
@@ -213,10 +213,8 @@ CACHE_ROOT       = os.path.join(PREFIX, 'cache')
 #
 ##############################################################################
 
-DATASETS_UID = 1000
-# DATASETS_UID = None
-DATASETS_ROOT_PATH = '/Users/pabbet/Projects/databases/'
-# DATASETS_ROOT_PATH = None
+DATASETS_UID = None
+DATASETS_ROOT_PATH = None
 
 
 ##############################################################################
@@ -267,58 +265,22 @@ EMAIL_USE_TLS        = False
 ##############################################################################
 
 # The scheduling interval controls the number of seconds between
-# scheduling attempts (calls to :py:func:`beat.web.backend.schedule.schedule`)
+# scheduling attempts
 SCHEDULING_INTERVAL = 5 #seconds
 
-# The worker interval controls the number of seconds between checks
-# a particular worker will run to verify jobs are not scheduled to itself
-WORKER_INTERVAL = 5 #seconds
-
 # If set, a testing panel that can accomplish scheduling activities will appear
 # at the scheduler page allowing administrators to launch scheduling activities
 # manually.
 SCHEDULING_PANEL = True
 
-# The maximum index split errors control the maximum number of times we can
-# incur in an index split error condition without cancelling the block
-# execution altogether. This number, multiplied by the scheduling interval,
-# must be larger than 60 seconds, as this is the default NFS caching interval.
-# If you run on a reliable networked filesystem (i.e., not NFS) or on the local
-# node, you may set this value to 0, which will cause the scheduling activity
-# to consider even a single splitting error as enough reason to cancel the
-# block execution (and, by consequence), the experiment.
-MAXIMUM_SPLIT_ERRORS = 0 #attempts to split without errors
-
-# The maximum number of IOErrors (due to cache loading) which are acceptable
-# for a particular split. If the number of cache errors is bigger than the
-# value set, then the split is considered as failed. This variable serves the
-# same purpose as ``MAXIMUM_SPLIT_ERRORS`` above and fills-in where NFS caching
-# does not. If you're running on a reliable filesystem, you can't leave it to
-# zero.
-MAXIMUM_IO_ERRORS = 0 #attempts to load cache files without errors
-
-# The maximum number of retries for saving (mostly at start() and end()), job
-# splits from remote processes. If you're using a SQLite database backend, this
-# number should be higher than 1 (recommended value is 3 to 5). In case you're
-# using another database, then this can value can be ignored. If set to a value
-# of one or more and there are "database is locked" errors, then job split
-# saving at ``start()`` or ``end()`` will be retried with a 1-second interval.
-MAXIMUM_SPLIT_SAVE_RETRIES = 5
-
-# The default user error is a message string that is set upon the failure of
-# execution of a particular block if the root cause of the problem is NOT a
-# user error, but the systems'. In this case, the system administrators receive
-# an e-mail indicating the exception caught and the user block for a given
-# experiment is marked with this informative message.
-DEFAULT_USER_ERROR = "A system error occurred and we could not run your " \
-    "algorithm.\nThe administrators have been informed.\nYou may try to run " \
-    "the experiment again at another moment."
-
-# If set, then detach children processes (I/O daemons) from the worker process
-# group. In this case, signals sent to the worker process will not be forwarded
-# to the processing children. This is desirable in a production environment, to
-# avoid user processes to be terminated in case the worker is updated.
-WORKER_DETACH_CHILDREN = False
+# The verbosity level used by the local scheduler when starting the 'scheduler.py'
+# and 'worker.py' scripts
+LOCAL_SCHEDULER_VERBOSITY = '--verbose'
+
+LOCAL_SCHEDULER_ADDRESS = '127.0.0.1'
+LOCAL_SCHEDULER_PORT = 50000
+
+LOCAL_SCHEDULER_USE_DOCKER = True
 
 
 ##############################################################################
diff --git a/beat/web/settings/test.py b/beat/web/settings/test.py
index 4b72e0697..8d3219cb8 100755
--- a/beat/web/settings/test.py
+++ b/beat/web/settings/test.py
@@ -62,6 +62,9 @@ TOOLCHAINS_ROOT  = os.path.join(PREFIX, 'toolchains')
 EXPERIMENTS_ROOT = os.path.join(PREFIX, 'experiments')
 CACHE_ROOT       = os.path.join(PREFIX, 'cache')
 
+LOCAL_SCHEDULER_VERBOSITY = None
+LOCAL_SCHEDULER_USE_DOCKER = False
+
 # To speed-up tests, don't put this in production
 PASSWORD_HASHERS = [
   'django.contrib.auth.hashers.MD5PasswordHasher',
-- 
GitLab