Skip to content
Snippets Groups Projects
Commit 3628ff37 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Implement dump option for current backend setup

parent 08fc366b
No related branches found
No related tags found
1 merge request!194Scheduler
......@@ -25,31 +25,27 @@
# #
###############################################################################
import sys
import logging
logger = logging.getLogger(__name__)
import multiprocessing
import simplejson
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.contrib.auth.models import User, Group
from rest_framework.authtoken.models import Token
from guardian.shortcuts import assign_perm
from ....backend.models import Environment, Queue, Worker, Slot
from ....backend.models import Shareable
from ...models import Environment, Queue, Worker, Slot
from ...utils import setup_backend, dump_backend
# Default configuration to start the state with
import psutil
import socket
CORES = psutil.cpu_count()
RAM = psutil.virtual_memory().total/(1024*1024)
ENVIRONMENT = {'name': 'environment', 'version': '1'}
ENVKEY = '{p.name} ({p.version})'.format(p=ENVIRONMENT)
ENVKEY = '%(name)s (%(version)s)' % ENVIRONMENT
HOSTNAME = socket.gethostname()
DEFAULT_CONFIGURATION = {
"queues": {
"queue": {
......@@ -57,10 +53,7 @@ DEFAULT_CONFIGURATION = {
"time-limit": 1440, #1 day
"cores-per-slot": 1,
"max-slots-per-user": CORES,
],
"environments": [
ENVKEY,
],
"environments": [ENVKEY],
"slots": {
HOSTNAME: {
"quantity": CORES,
......@@ -86,107 +79,11 @@ DEFAULT_CONFIGURATION = {
}
}
def setup_environment(queue_config_filename, logger=logger):
if queue_config_filename is None:
qconfig = DEFAULT_CONFIGURATION
else:
with open(queue_config_filename, 'rt') as f:
qconfig = simplejson.loads(f.read())
default_group = Group.objects.get(name='Default')
for worker_name in qconfig['workers']:
worker = Worker.objects.filter(name=worker_name)
config = qconfig['workers'][worker_name]
if not worker: # create it
worker = Worker(
name=worker_name,
cores=config['cores'],
memory=config['memory'],
)
worker.save()
logger.info("Created worker `%s'", worker)
else:
worker = worker[0] # there must be only one anyways
worker.cores = config['cores']
worker.memory = config['memory-in-megabytes']
worker.save()
logger.info("Updated worker `%s'", worker)
for queue_name, queue_attrs in qconfig['queues'].items():
queue = Queue.objects.filter(name=queue_name)
if not queue: # create it
queue = Queue(
name=queue_name,
memory_limit=queue_attrs['memory-in-megabytes'],
time_limit=queue_attrs['time-limit-in-minutes'],
cores_per_slot=queue_attrs['nb-cores-per-slot'],
max_slots_per_user=queue_attrs['max-slots-per-user'],
)
queue.save()
logger.info("Created queue `%s'", queue)
else:
assert queue.count() == 1
queue = queue[0]
logger.info("Re-using existing queue `%s'", queue)
# Sets up the permissions on the queue
assign_perm('can_access', default_group, queue)
for env_attrs in queue_attrs['environments']:
env = Environment.objects.filter(name=env_attrs['name'],
version=env_attrs['version'])
if not env: # create it
env = Environment(
name=env_attrs['name'],
version=env_attrs['version'],
short_description=env_attrs.get('short_description', ''),
description=env_attrs.get('description', ''),
sharing=Shareable.PUBLIC,
)
env.save()
logger.info("Created environment `%s'", env)
else:
assert env.count() == 1
env = env[0] # there must be only one anyways
env.active = True # activate it
env.save()
logger.info("Re-using existing environment `%s'", env)
queue.environments.add(env)
queue.save()
for worker_name, slot_config in queue_attrs['slots'].items():
worker = Worker.objects.get(name=worker_name)
slot = Slot.objects.filter(
queue=queue,
worker=worker,
)
if not slot: # create it
slot = Slot(
queue=queue,
worker=worker,
quantity=slot_config['quantity'],
priority=slot_config.get('priority', 0),
)
slot.save()
logger.info("Created slot configuration `%s'", slot)
# returns one queue and one environment to use as default
return Queue.objects.all()[0], Environment.objects.all()[0]
RESET_CONFIGURATION = {
"queues": {},
"environments": {},
"workers": {}
}
class Command(BaseCommand):
......@@ -196,6 +93,10 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--dump', action='store_true', dest='dump',
default=False, help='Dump all environment/worker/queues ' \
'configuration and exits')
parser.add_argument('--reset', action='store_true', dest='reset',
default=False, help='Delete all environment/worker/queues ' \
'before setting the given configuration')
......@@ -203,7 +104,7 @@ class Command(BaseCommand):
parser.add_argument('qconfig', type=str, nargs='?',
help='Optional custom queue configuration to use. If not ' \
'passed, uses an internal default with a single ' \
'queue/worker/%d slots' % multiprocessing.cpu_count())
'queue/worker/%d slots' % CORES)
def handle(self, *ignored, **arguments):
......@@ -215,42 +116,17 @@ class Command(BaseCommand):
if arguments['verbosity'] == 1: logger.setLevel(logging.INFO)
elif arguments['verbosity'] >= 2: logger.setLevel(logging.DEBUG)
if arguments['reset']:
# make all old environments inactive
q = Environment.objects.all()
updates = q.update(active=False)
for obj in q: obj.save()
logger.info("De-activated %d environments", updates)
# cleans-up queues and workers before installing new config
q = Queue.objects.all()
q.delete()
logger.info("Erased %d queues", q.count())
if arguments['dump']:
d=dump_backend()
print(simplejson.dumps(d, indent=2))
sys.exit(0)
q = Worker.objects.all()
q.delete()
logger.info("Erased %d workers", q.count())
assert Slot.objects.count() == 0, \
"There is still %d queue-worker relations undeleted" % \
Slot.objects.count()
if arguments['reset']:
setup_backend(RESET_CONFIGURATION)
# sets the tokens for the scheduler user to '3' (default localhost)
scheduler = User.objects.filter(username=settings.SCHEDULER_ACCOUNT)
usetoken = '3'
if scheduler:
scheduler = scheduler[0]
try:
scheduler.auth_token.delete()
except Exception:
pass
token = Token(user=scheduler)
token.key = usetoken
token.save()
logger.info("Reset `%s' token to `%s'", scheduler.username,
usetoken)
else:
raise CommandError("Could not find an account named `%s' on this database" % settings.SCHEDULER_ACCOUNT)
config = None
if arguments['qconfig']:
with open(arguments['qconfig'], 'rb') as f:
config = simplejson.load(f)
setup_environment(arguments['qconfig'])
setup_environment(config or DEFAULT_CONFIGURATION)
......@@ -137,6 +137,17 @@ class Environment(Shareable):
return [q for q in self.queues.all() if user.has_perm('backend.can_access', q)]
def as_dict(self):
'''Returns a representation as a dictionary'''
return dict(
name=self.name,
version=self.version,
short_description=self.short_description,
description=self.description,
)
#----------------------------------------------------------
......@@ -207,6 +218,12 @@ class Worker(models.Model):
self.active = True
def as_dict(self):
'''Returns a dictionary-like representation'''
return dict(cores=self.cores, memory=self.memory)
#----------------------------------------------------------
......@@ -305,6 +322,19 @@ class Queue(models.Model):
return [w[0] for w in workers]
def as_dict(self):
'''Returns a representation as a dictionary'''
return {
'memory-limit': self.memory_limit,
'time-limit': self.time_limit,
'cores-per-slot': self.cores_per_slot,
'max-slots-per-user': self.max_slots_per_user,
'environments': [k.natural_key() for k in self.environments.all()],
'slots': dict([(s.worker.name, dict(quantity=s.quantity,
priority=s.priority)) for s in self.slots.all()]),
}
class SlotManager(models.Manager):
......
......@@ -135,7 +135,7 @@ def cleanup_cache(path, age_in_minutes=0, delete=False):
@transaction.atomic
def setup(d):
def setup_backend(d):
'''Configures or re-configures the internal queue setup
This method is called to re-configure the current backend architecture. It
......@@ -333,3 +333,13 @@ def setup(d):
for w in wobjects_to_be_deleted:
logger.info("Deleting `%s'...", w)
w.delete()
def dump_backend():
'''Returns a dictionary that represents the current backend configuration'''
return dict(
queues=dict([(k.name, k.as_dict()) for k in Queue.objects.all()]),
environments=dict([(str(k), k.as_dict()) for k in Environment.objects.all()]),
workers=dict([(k.name, k.as_dict()) for k in Worker.objects.all()]),
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment