From 3628ff377dbfb1d3e5efc5780d89f1bd80be9b3e Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Thu, 21 Apr 2016 17:46:55 +0200 Subject: [PATCH] [backend] Implement dump option for current backend setup --- .../web/backend/management/commands/qsetup.py | 180 +++--------------- beat/web/backend/models.py | 30 +++ beat/web/backend/utils.py | 12 +- 3 files changed, 69 insertions(+), 153 deletions(-) diff --git a/beat/web/backend/management/commands/qsetup.py b/beat/web/backend/management/commands/qsetup.py index fe2200994..fa543da94 100644 --- a/beat/web/backend/management/commands/qsetup.py +++ b/beat/web/backend/management/commands/qsetup.py @@ -25,31 +25,27 @@ # # ############################################################################### - +import sys import logging logger = logging.getLogger(__name__) -import multiprocessing - import simplejson -from django.conf import settings from django.core.management.base import BaseCommand, CommandError -from django.contrib.auth.models import User, Group -from rest_framework.authtoken.models import Token -from guardian.shortcuts import assign_perm -from ....backend.models import Environment, Queue, Worker, Slot -from ....backend.models import Shareable +from ...models import Environment, Queue, Worker, Slot +from ...utils import setup_backend, dump_backend # Default configuration to start the state with import psutil import socket + CORES = psutil.cpu_count() RAM = psutil.virtual_memory().total/(1024*1024) ENVIRONMENT = {'name': 'environment', 'version': '1'} -ENVKEY = '{p.name} ({p.version})'.format(p=ENVIRONMENT) +ENVKEY = '%(name)s (%(version)s)' % ENVIRONMENT HOSTNAME = socket.gethostname() + DEFAULT_CONFIGURATION = { "queues": { "queue": { @@ -57,10 +53,7 @@ DEFAULT_CONFIGURATION = { "time-limit": 1440, #1 day "cores-per-slot": 1, "max-slots-per-user": CORES, - ], - "environments": [ - ENVKEY, - ], + "environments": [ENVKEY], "slots": { HOSTNAME: { "quantity": CORES, @@ -86,107 +79,11 @@ DEFAULT_CONFIGURATION = { } } - -def setup_environment(queue_config_filename, logger=logger): - - if queue_config_filename is None: - qconfig = DEFAULT_CONFIGURATION - else: - with open(queue_config_filename, 'rt') as f: - qconfig = simplejson.loads(f.read()) - - default_group = Group.objects.get(name='Default') - - for worker_name in qconfig['workers']: - worker = Worker.objects.filter(name=worker_name) - - config = qconfig['workers'][worker_name] - - if not worker: # create it - worker = Worker( - name=worker_name, - cores=config['cores'], - memory=config['memory'], - ) - worker.save() - logger.info("Created worker `%s'", worker) - else: - worker = worker[0] # there must be only one anyways - worker.cores = config['cores'] - worker.memory = config['memory-in-megabytes'] - worker.save() - logger.info("Updated worker `%s'", worker) - - for queue_name, queue_attrs in qconfig['queues'].items(): - - queue = Queue.objects.filter(name=queue_name) - - if not queue: # create it - queue = Queue( - name=queue_name, - memory_limit=queue_attrs['memory-in-megabytes'], - time_limit=queue_attrs['time-limit-in-minutes'], - cores_per_slot=queue_attrs['nb-cores-per-slot'], - max_slots_per_user=queue_attrs['max-slots-per-user'], - ) - queue.save() - logger.info("Created queue `%s'", queue) - - else: - assert queue.count() == 1 - queue = queue[0] - logger.info("Re-using existing queue `%s'", queue) - - # Sets up the permissions on the queue - assign_perm('can_access', default_group, queue) - - for env_attrs in queue_attrs['environments']: - - env = Environment.objects.filter(name=env_attrs['name'], - version=env_attrs['version']) - - if not env: # create it - env = Environment( - name=env_attrs['name'], - version=env_attrs['version'], - short_description=env_attrs.get('short_description', ''), - description=env_attrs.get('description', ''), - sharing=Shareable.PUBLIC, - ) - env.save() - logger.info("Created environment `%s'", env) - else: - assert env.count() == 1 - env = env[0] # there must be only one anyways - env.active = True # activate it - env.save() - logger.info("Re-using existing environment `%s'", env) - - queue.environments.add(env) - queue.save() - - for worker_name, slot_config in queue_attrs['slots'].items(): - - worker = Worker.objects.get(name=worker_name) - - slot = Slot.objects.filter( - queue=queue, - worker=worker, - ) - - if not slot: # create it - slot = Slot( - queue=queue, - worker=worker, - quantity=slot_config['quantity'], - priority=slot_config.get('priority', 0), - ) - slot.save() - logger.info("Created slot configuration `%s'", slot) - - - # returns one queue and one environment to use as default - return Queue.objects.all()[0], Environment.objects.all()[0] +RESET_CONFIGURATION = { + "queues": {}, + "environments": {}, + "workers": {} + } class Command(BaseCommand): @@ -196,6 +93,10 @@ class Command(BaseCommand): def add_arguments(self, parser): + parser.add_argument('--dump', action='store_true', dest='dump', + default=False, help='Dump all environment/worker/queues ' \ + 'configuration and exits') + parser.add_argument('--reset', action='store_true', dest='reset', default=False, help='Delete all environment/worker/queues ' \ 'before setting the given configuration') @@ -203,7 +104,7 @@ class Command(BaseCommand): parser.add_argument('qconfig', type=str, nargs='?', help='Optional custom queue configuration to use. If not ' \ 'passed, uses an internal default with a single ' \ - 'queue/worker/%d slots' % multiprocessing.cpu_count()) + 'queue/worker/%d slots' % CORES) def handle(self, *ignored, **arguments): @@ -215,42 +116,17 @@ class Command(BaseCommand): if arguments['verbosity'] == 1: logger.setLevel(logging.INFO) elif arguments['verbosity'] >= 2: logger.setLevel(logging.DEBUG) - if arguments['reset']: - - # make all old environments inactive - q = Environment.objects.all() - updates = q.update(active=False) - for obj in q: obj.save() - logger.info("De-activated %d environments", updates) - - # cleans-up queues and workers before installing new config - q = Queue.objects.all() - q.delete() - logger.info("Erased %d queues", q.count()) + if arguments['dump']: + d=dump_backend() + print(simplejson.dumps(d, indent=2)) + sys.exit(0) - q = Worker.objects.all() - q.delete() - logger.info("Erased %d workers", q.count()) - - assert Slot.objects.count() == 0, \ - "There is still %d queue-worker relations undeleted" % \ - Slot.objects.count() + if arguments['reset']: + setup_backend(RESET_CONFIGURATION) - # sets the tokens for the scheduler user to '3' (default localhost) - scheduler = User.objects.filter(username=settings.SCHEDULER_ACCOUNT) - usetoken = '3' - if scheduler: - scheduler = scheduler[0] - try: - scheduler.auth_token.delete() - except Exception: - pass - token = Token(user=scheduler) - token.key = usetoken - token.save() - logger.info("Reset `%s' token to `%s'", scheduler.username, - usetoken) - else: - raise CommandError("Could not find an account named `%s' on this database" % settings.SCHEDULER_ACCOUNT) + config = None + if arguments['qconfig']: + with open(arguments['qconfig'], 'rb') as f: + config = simplejson.load(f) - setup_environment(arguments['qconfig']) + setup_environment(config or DEFAULT_CONFIGURATION) diff --git a/beat/web/backend/models.py b/beat/web/backend/models.py index 73cefa8c6..8113411f2 100644 --- a/beat/web/backend/models.py +++ b/beat/web/backend/models.py @@ -137,6 +137,17 @@ class Environment(Shareable): return [q for q in self.queues.all() if user.has_perm('backend.can_access', q)] + def as_dict(self): + '''Returns a representation as a dictionary''' + + return dict( + name=self.name, + version=self.version, + short_description=self.short_description, + description=self.description, + ) + + #---------------------------------------------------------- @@ -207,6 +218,12 @@ class Worker(models.Model): self.active = True + def as_dict(self): + '''Returns a dictionary-like representation''' + + return dict(cores=self.cores, memory=self.memory) + + #---------------------------------------------------------- @@ -305,6 +322,19 @@ class Queue(models.Model): return [w[0] for w in workers] + def as_dict(self): + '''Returns a representation as a dictionary''' + + return { + 'memory-limit': self.memory_limit, + 'time-limit': self.time_limit, + 'cores-per-slot': self.cores_per_slot, + 'max-slots-per-user': self.max_slots_per_user, + 'environments': [k.natural_key() for k in self.environments.all()], + 'slots': dict([(s.worker.name, dict(quantity=s.quantity, + priority=s.priority)) for s in self.slots.all()]), + } + class SlotManager(models.Manager): diff --git a/beat/web/backend/utils.py b/beat/web/backend/utils.py index 246f6d207..92eda7f5a 100644 --- a/beat/web/backend/utils.py +++ b/beat/web/backend/utils.py @@ -135,7 +135,7 @@ def cleanup_cache(path, age_in_minutes=0, delete=False): @transaction.atomic -def setup(d): +def setup_backend(d): '''Configures or re-configures the internal queue setup This method is called to re-configure the current backend architecture. It @@ -333,3 +333,13 @@ def setup(d): for w in wobjects_to_be_deleted: logger.info("Deleting `%s'...", w) w.delete() + + +def dump_backend(): + '''Returns a dictionary that represents the current backend configuration''' + + return dict( + queues=dict([(k.name, k.as_dict()) for k in Queue.objects.all()]), + environments=dict([(str(k), k.as_dict()) for k in Environment.objects.all()]), + workers=dict([(k.name, k.as_dict()) for k in Worker.objects.all()]), + ) -- GitLab