Skip to content
Snippets Groups Projects
Commit 02969f77 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Check worker environments

parent 8837f11a
No related branches found
No related tags found
1 merge request!194Scheduler
......@@ -260,6 +260,44 @@ class Worker(models.Model):
return dict(cores=self.cores, memory=self.memory)
def check_environments(self, environments):
'''Checks that this worker has access to all environments it needs
This method will check if the found set of environments (in the
dictionary ``environments``) contains, at least, one environment for
each environment object this worker is supposed to be able to execute
user algorithms for.
Parameters:
environments (dict): A dictionary of environments found by using
:py:func:`utils.find_environments` in which, keys represent the
natural keys of Django database environments.
Returns:
list: A list of missing environments this worker can be assigned to
work with, but where not found
list: A list of unused environments this worker cannot be assigned to
work with, but where nevertheless found
'''
slots = Slot.objects.filter(worker=self)
queues = Queue.objects.filter(slots__in=slots)
wishlist = Environment.objects.filter(queues__in=queues)
wishlist = wishlist.order_by('id').distinct()
required = [k.natural_key() for k in wishlist]
missing = [k for k in required if k not in environments]
unused = [k for k in environments if k not in required]
return missing, unused
def update_state(self):
'''Updates state on the database based on current machine readings'''
......
......@@ -66,10 +66,26 @@ class Work:
def __setup__(self):
Work.cpulimit = resolve_cpulimit_path(None)
logger.debug("(path) cpulimit: `%s'", Work.cpulimit)
Work.process = utils.resolve_process_path()
logger.debug("(path) process: `%s'", Work.process)
Work.environments = utils.find_environments(None)
logger.debug("Environments: %s", ", ".join(Work.environments))
# load worker, check environments, activate it
w = Worker.objects.get(name=socket.gethostname()) \
if Worker.objects.count() != 1 else Worker.objects.get()
missing, unused = w.check_environments(Work.environments)
if unused:
logger.info("The following environments where found on your " \
"setup, but will not be used with the current queue " \
"configuration: %s" % ", ".join(unused))
if missing:
raise RuntimeError("The following environments are currently " \
"missing from your setup: %s" % ", ".join(missing))
else:
logger.info("All required software environments were found")
w.activate()
w.save()
Work.worker = w
......
......@@ -117,15 +117,32 @@ def main(user_input=None):
from beat.core.async import resolve_cpulimit_path
cpulimit = resolve_cpulimit_path(arguments['--cpulimit'])
logger.debug("(path) cpulimit: `%s'", cpulimit)
process = utils.resolve_process_path()
logger.debug("(path) process: `%s'", process)
environments = utils.find_environments(arguments['--environments'])
logger.debug("Environments: %s", ", ".join(environments))
worker = Worker.objects.get(name=arguments['--name'])
# check environments
missing, unused = worker.check_environments(environments)
if unused:
logger.info("The following environments where found on your " \
"setup, but will not be used with the current queue " \
"configuration: %s" % ", ".join(unused))
if missing:
raise RuntimeError("The following environments are currently " \
"missing from your setup: %s" % ", ".join(missing))
else:
logger.info("All required software environments were found")
timing = int(arguments['--period']) \
if arguments['--period'] else settings.WORKER_INTERVAL
logger.info("Working at `%s' every %d seconds", arguments['--name'], timing)
global stop
with Worker.objects.get(name=arguments['--name']) as worker:
with worker:
while not stop:
......
......@@ -59,3 +59,9 @@ Experiments
* Make sure to remove any spurious logs from the beat.scheduler before
introducing stdout/stderr components to the experiment view
Backend
-------
* Add proper e-mail queueing support
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment