diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index fce438433bd53dc6d8439f84a059c9fcbba6654a..e955b4c1026b839819eed0573406fd98aac7ff99 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -260,6 +260,44 @@ class Worker(models.Model): return dict(cores=self.cores, memory=self.memory) + def check_environments(self, environments): + '''Checks that this worker has access to all environments it needs + + This method will check if the found set of environments (in the + dictionary ``environments``) contains, at least, one environment for + each environment object this worker is supposed to be able to execute + user algorithms for. + + + Parameters: + + environments (dict): A dictionary of environments found by using + :py:func:`utils.find_environments` in which, keys represent the + natural keys of Django database environments. + + + Returns: + + list: A list of missing environments this worker can be assigned to + work with, but where not found + + list: A list of unused environments this worker cannot be assigned to + work with, but where nevertheless found + + ''' + + slots = Slot.objects.filter(worker=self) + queues = Queue.objects.filter(slots__in=slots) + wishlist = Environment.objects.filter(queues__in=queues) + wishlist = wishlist.order_by('id').distinct() + + required = [k.natural_key() for k in wishlist] + missing = [k for k in required if k not in environments] + unused = [k for k in environments if k not in required] + + return missing, unused + + def update_state(self): '''Updates state on the database based on current machine readings''' diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index a7850f62c5525dcbbd7e9a1a7f528a1d762539fa..c3bac6c255044f1eb283e18c8714e413dcddb5e9 100644 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -66,10 +66,26 @@ class Work: def __setup__(self): Work.cpulimit = resolve_cpulimit_path(None) + logger.debug("(path) cpulimit: `%s'", Work.cpulimit) Work.process = utils.resolve_process_path() + logger.debug("(path) process: `%s'", Work.process) Work.environments = utils.find_environments(None) + logger.debug("Environments: %s", ", ".join(Work.environments)) + + # load worker, check environments, activate it w = Worker.objects.get(name=socket.gethostname()) \ if Worker.objects.count() != 1 else Worker.objects.get() + missing, unused = w.check_environments(Work.environments) + if unused: + logger.info("The following environments where found on your " \ + "setup, but will not be used with the current queue " \ + "configuration: %s" % ", ".join(unused)) + if missing: + raise RuntimeError("The following environments are currently " \ + "missing from your setup: %s" % ", ".join(missing)) + else: + logger.info("All required software environments were found") + w.activate() w.save() Work.worker = w diff --git a/beat/web/scripts/worker.py b/beat/web/scripts/worker.py index dadc88c2dfe6dd44c9573495c62bf3bc98100ad5..8f9ffac4d4d00329930ba95c5dc33fe924176f43 100644 --- a/beat/web/scripts/worker.py +++ b/beat/web/scripts/worker.py @@ -117,15 +117,32 @@ def main(user_input=None): from beat.core.async import resolve_cpulimit_path cpulimit = resolve_cpulimit_path(arguments['--cpulimit']) + logger.debug("(path) cpulimit: `%s'", cpulimit) process = utils.resolve_process_path() + logger.debug("(path) process: `%s'", process) environments = utils.find_environments(arguments['--environments']) + logger.debug("Environments: %s", ", ".join(environments)) + + worker = Worker.objects.get(name=arguments['--name']) + + # check environments + missing, unused = worker.check_environments(environments) + if unused: + logger.info("The following environments where found on your " \ + "setup, but will not be used with the current queue " \ + "configuration: %s" % ", ".join(unused)) + if missing: + raise RuntimeError("The following environments are currently " \ + "missing from your setup: %s" % ", ".join(missing)) + else: + logger.info("All required software environments were found") timing = int(arguments['--period']) \ if arguments['--period'] else settings.WORKER_INTERVAL logger.info("Working at `%s' every %d seconds", arguments['--name'], timing) global stop - with Worker.objects.get(name=arguments['--name']) as worker: + with worker: while not stop: diff --git a/todo.rst b/todo.rst index 7a774ef1f2e139d6892150c24c0d36e99a0d05f2..f3e5f87be0f70c8e19a67127c86888f3d7ab0829 100644 --- a/todo.rst +++ b/todo.rst @@ -59,3 +59,9 @@ Experiments * Make sure to remove any spurious logs from the beat.scheduler before introducing stdout/stderr components to the experiment view + + +Backend +------- + +* Add proper e-mail queueing support