diff --git a/beat/web/utils/management/commands/install.py b/beat/web/utils/management/commands/install.py index 3043d287dbc8b42478c36239edba6a6b11afd88c..4a6148f7123581df18460e9ec1625b5641142992 100644 --- a/beat/web/utils/management/commands/install.py +++ b/beat/web/utils/management/commands/install.py @@ -134,7 +134,7 @@ def setup_environment(queue_config_filename): from .qsetup import setup_environment as _method - return _method(queue_config_filename) + return _method(queue_config_filename, logger) def create_sites(): diff --git a/beat/web/utils/management/commands/qsetup.py b/beat/web/utils/management/commands/qsetup.py index 911485cf5670c331f37148451c93b6c0c462842d..3824c3671f046cc6e546f6a00d058267ab7e0882 100644 --- a/beat/web/utils/management/commands/qsetup.py +++ b/beat/web/utils/management/commands/qsetup.py @@ -39,42 +39,80 @@ from django.contrib.auth.models import User, Group from rest_framework.authtoken.models import Token from guardian.shortcuts import assign_perm -from ....backend.models import Environment, Queue, Worker, QueueWorkerSlot +from ....backend.models import Environment, Queue, Worker, Slot from ....backend.models import Shareable - -DEFAULT_QUEUE_CONFIGURATION = { - "default": { - "memory-in-megabytes": 4096, - "time-limit-in-minutes": 180, #3 hours - "nb-cores-per-slot": 1, - "max-slots-per-user": multiprocessing.cpu_count(), - "environments": [ - { - "name": "environment", - "version": "1", - } - ], - "slots": { - "node1": multiprocessing.cpu_count(), - } - } +# Default configuration to start the state with +import psutil +import socket +CORES = psutil.cpu_count() +RAM = psutil.virtual_memory().total/(1024*1024) +ENVIRONMENT = {'name': 'environment', 'version': '1'} +HOSTNAME = socket.gethostname() +DEFAULT_CONFIGURATION = { + "queues": { + "queue": { + "memory-in-megabytes": RAM/CORES, + "time-limit-in-minutes": 1440, #1 day + "nb-cores-per-slot": 1, + "max-slots-per-user": CORES, + "environments": [ + { + "name": ENVIRONMENT['name'], + "version": ENVIRONMENT['version'], + "short_description": "Local python interpreter", + "description": "Automatically generated local python " \ + "interpreter environment", + } + ], + "slots": { + HOSTNAME: { + "quantity": CORES, + "priority": 0 + } } + } + }, + "workers": { + HOSTNAME: { + "cores": CORES, + "memory-in-megabytes": RAM + } + } + } -def setup_environment(queue_config_filename): - - global logger +def setup_environment(queue_config_filename, logger=logger): if queue_config_filename is None: - qconfig = DEFAULT_QUEUE_CONFIGURATION + qconfig = DEFAULT_CONFIGURATION else: with open(queue_config_filename, 'rt') as f: qconfig = simplejson.loads(f.read()) default_group = Group.objects.get(name='Default') - for queue_name, queue_attrs in qconfig.items(): + for worker_name in qconfig['workers']: + worker = Worker.objects.filter(name=worker_name) + + config = qconfig['workers'][worker_name] + + if not worker: # create it + worker = Worker( + name=worker_name, + cores=config['cores'], + memory=config['memory-in-megabytes'], + ) + worker.save() + logger.info("Created worker `%s'", worker) + else: + worker = worker[0] # there must be only one anyways + worker.cores = config['cores'] + worker.memory = config['memory-in-megabytes'] + worker.save() + logger.info("Updated worker `%s'", worker) + + for queue_name, queue_attrs in qconfig['queues'].items(): queue = Queue.objects.filter(name=queue_name) @@ -83,7 +121,7 @@ def setup_environment(queue_config_filename): name=queue_name, memory_limit=queue_attrs['memory-in-megabytes'], time_limit=queue_attrs['time-limit-in-minutes'], - nb_cores_per_slot=queue_attrs['nb-cores-per-slot'], + cores_per_slot=queue_attrs['nb-cores-per-slot'], max_slots_per_user=queue_attrs['max-slots-per-user'], ) queue.save() @@ -122,36 +160,25 @@ def setup_environment(queue_config_filename): queue.environments.add(env) queue.save() - for worker_name, slots in queue_attrs['slots'].items(): - - worker = Worker.objects.filter(name=worker_name) + for worker_name, slot_config in queue_attrs['slots'].items(): - if not worker: # create it - worker = Worker(name=worker_name, nb_cores=slots) - worker.save() - logger.info("Created worker `%s'", worker) - else: - assert worker.count() == 1 - worker = worker[0] # there must be only one anyways - logger.info("Re-using existing worker `%s'", worker) + worker = Worker.objects.get(name=worker_name) - # associate worker - queue - number of slots - worker_link = QueueWorkerSlot.objects.filter( + slot = Slot.objects.filter( queue=queue, worker=worker, - nb_slots=slots, ) - if not worker_link: # create it - worker_link = QueueWorkerSlot( + if not slot: # create it + slot = Slot( queue=queue, worker=worker, - nb_slots=slots, + quantity=slot_config['quantity'], + priority=slot_config.get('priority', 0), ) - worker_link.save() - logger.info("Created queue-worker slot `%s'", worker_link) - else: - assert worker_link.count() == 1 + slot.save() + logger.info("Created slot configuration `%s'", slot) + # returns one queue and one environment to use as default return Queue.objects.all()[0], Environment.objects.all()[0] @@ -200,9 +227,9 @@ class Command(BaseCommand): q.delete() logger.info("Erased %d workers", q.count()) - assert QueueWorkerSlot.objects.count() == 0, \ + assert Slot.objects.count() == 0, \ "There is still %d queue-worker relations undeleted" % \ - QueueWorkerSlot.objects.count() + Slot.objects.count() # sets the tokens for the scheduler user to '3' (default localhost) scheduler = User.objects.filter(username=settings.SCHEDULER_ACCOUNT)