Skip to content
Snippets Groups Projects
Commit 97355ff9 authored by André Anjos's avatar André Anjos :speech_balloon:
Browse files

[backend] Add utilities for cache cleaning and queue setup from the old scheduler code base

parent 32580388
No related branches found
No related tags found
1 merge request!194Scheduler
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
'''Utilities for backend management'''
import os
import fnmatch
import glob
import time
import logging
logger = logging.getLogger(__name__)
from django.db import transaction
from ..experiments.models import CachedFile, Block, Experiment
from .models import Queue, Worker, Slot, Job, Environment
def cleanup_cache(path, age_in_minutes=0, delete=False):
"""Removes files which are older than a certain threshold from a directory
This function can be used for cache maintenance. It allows the the user to
remove all files under a certain directory which are older than a number of
minutes. It also save some files from being erased (if they belong to
experiments which are running or scheduled).
Parameters:
path (str): The path to the cache root (typically,
``settings.CACHE_ROOT``)
age_in_minutes (int, Optional): The mininum access time of files (in
minutes) that will be erased from the disk. All files older than this
cut-off value will be erased. All others will be kept.
delete (bool, Optional): If set (defaults to False), then we really
delete the marked files. Otherwise, we just return the list of files
which we would delete.
Returns:
list: A list of filenames that will be/were removed
"""
cutoff_access_time = time.time() - (60*age_in_minutes)
logger.info("Running `%s' clean-up: set file-access cutoff time to `%s'",
path, time.ctime(cutoff_access_time))
# Gets a list of cache files for active experiments:
blocks = Block.objects.filter(experiment__in=Experiment.objects.filter(status__in=(Experiment.SCHEDULED, Experiment.RUNNING)))
save_list = [k.path() + '*' for k in CachedFile.objects.filter(blocks__in=blocks)]
# Finds the files with an associated '.lock' file
for root, dirnames, filenames in os.walk(path):
for filename in fnmatch.filter(filenames, '*.lock'):
save_list += glob.glob(os.path.splitext(os.path.join(root, filename))[0] + '*')
removed_files = []
def _remove_dirs(p):
d = os.path.dirname(p)
if not os.listdir(d):
logger.info("[rmdir] `%s'", d)
os.removedirs(d)
for p, dirs, files in os.walk(path, topdown=True):
files = [f for f in files if not f.startswith('.')]
dirs[:] = [d for d in dirs if not d.startswith('.')] #note: in-place
for f in files:
fullpath = os.path.join(p, f)
save_it = sum([fullpath.endswith(k) for k in save_list])
if save_it:
logger.debug("[skip] `%s' (user list)", fullpath)
continue
# if you get to this point and the file ends in '.part', erase it
ext = os.path.splitext(fullpath)
if len(ext) > 1 and ext[1] == '.part':
if delete:
logger.info("[rm] `%s' (dangling)", fullpath)
os.remove(fullpath)
_remove_dirs(fullpath)
removed_files.append(fullpath)
continue
# if you get to this point, check file access time
if os.path.getatime(fullpath) < cutoff_access_time:
if delete:
logger.info("[rm] `%s'", fullpath)
os.remove(fullpath)
_remove_dirs(fullpath)
removed_files.append(fullpath)
else:
logger.debug("[skip] `%s' (%f >= %f)", fullpath,
os.path.getatime(fullpath), cutoff_access_time)
for d in dirs: #also remove empty directories
fullpath = os.path.join(p, d)
if not os.listdir(fullpath) and delete: os.removedirs(fullpath)
return removed_files
@transaction.atomic
def setup(d):
'''Configures or re-configures the internal queue setup
This method is called to re-configure the current backend architecture. It
is guaranteed to be called only if no experiments are currently running on
resources that will disappear (this is checked). The reason for this
present restriction goes through difficulties in handling "disappearing"
queues and/or environments.
Parameters:
d (dict): A JSON generated directory that defines the queue and slots
available at the backend farm.
Raises:
RuntimeError: If an error is detected and the re-configuration cannot
take place. In this case, it is safe to assume nothing has changed.
'''
# 1. We get a list of all current queue/environment combinations
q_envs = set([(q.name, str(e)) \
for q in Queue.objects.all() for e in q.environments])
# 2. We get a list of new queue/environment combinations
config_q_envs = set([(qname, envkey) \
for qname, qpar in d['queues'].items() \
for envkey in qpar['environments'].keys()])
# 3. We figure out which combinations of queue/environment's need to be
# deleted.
delete_q_envs = q_envs.difference(config_q_envs)
# 4. We figure out which combinations of queue/environment's are currently
# used by running or queued jobs.
used_q_envs = set([(job.block.queue.name, str(job.block.environment)) \
for job in Job.objects.filter(status__in=(Job.PROCESSING, Job.QUEUED))])
# 5. We request that no jobs should be either executing or scheduled for
# execution on queue/environment combinations that need to be deleted.
used_to_be_deleted = used_q_envs.intersection(delete_q_envs)
if len(used_to_be_deleted) != 0:
qenv_names = ['/'.join(k) for k in used_to_be_deleted]
reason = 'There are jobs currently running or scheduled to run on ' \
'the following queue/environment combinations which are ' \
'supposed to be deleted: %s. Aborting reconfiguration.'
raise RuntimeError(reason % ', '.join(qenv_names))
# 6. Request that no worker that is being used will disappear
existing_workers = set(Worker.objects.values_list('name', flat=True))
workers_to_be_deleted = existing_workers - set(d['workers'])
if workers_to_be_deleted:
wobjects_to_be_deleted = \
Worker.objects.filter(name__in=workers_to_be_deleted)
else:
wobjects_to_be_deleted = []
for w in wobjects_to_be_deleted:
if w.load() != 0:
reason = 'There are jobs currently running or scheduled to run ' \
'on some of the workers that would disappear: %s. ' \
'Aborting reconfiguration.'
raise RuntimeError(reason % ', '.join(workers_to_be_deleted))
# 7. Create new environments
config_envs = set(d['environments'].keys())
current_envs = set([str(k) for k in Environment.objects.all()])
new_envs = config_envs.difference(current_envs)
for envkey in new_envs:
attrs = d['environments'][envkey]
env = Environment(
name=attrs['name'],
version=attrs['version'],
short_description=attrs.get('short_description'),
description=attrs.get('description'),
)
logger.info("Creating `%s'...", env)
env.save()
# 8.1 Create new workers
config_workers = set(d['workers'].keys())
current_workers = set(Worker.objects.values_list('name', flat=True))
new_workers = config_workers - current_workers
for name in new_workers:
attrs = d['workers'][name]
worker = Worker(
name=name,
active=False,
cores=attrs['cores'],
memory=attrs['memory'],
)
logger.info("Creating `%s'...", worker)
worker.save()
# 8.2 Update existing workers
update_workers = current_workers.intersection(config_workers)
for name in update_workers:
attrs = d['workers'][name]
worker = Worker.objects.select_for_update().get(name=name)
worker.cores = attrs['cores']
worker.ram = attrs['memory']
logger.info("Updating `%s'...", worker)
worker.save()
# 9. Create new queues
config_qnames = set(d['queues'].keys())
current_qnames = set(Queue.objects.values_list('name', flat=True))
new_qnames = config_qnames.difference(current_qnames)
for name in new_qnames:
attrs = d['queues'][name]
queue = Queue(
name=name,
memory_limit=attrs['memory-limit'],
time_limit=attrs['time-limit'],
cores_per_slot=attrs['cores-per-slot'],
max_slots_per_user=attrs['max-slots-per-user'],
)
logger.info("Creating `%s'...", queue)
queue.save()
for hostname, par in attrs['slots'].items():
worker = Worker.objects.get(name=hostname)
priority = par.get('priority', 0)
slot = Slot(
queue=queue,
worker=worker,
priority=priority,
quantity=par['quantity'],
)
logger.info("Creating `%s'...", slot)
slot.save()
# Associates environments with queues
for envkey in attrs['environments']:
env = Environment.objects.get_by_natural_key(envkey)
logger.info("Appending `%s' to `%s'...", env, queue)
queue.environments.add(env)
# 10. Update existing queues
update_qnames = current_qnames.intersection(config_qnames)
for name in update_qnames:
queue = Queues.objects.select_for_update().get(name=name)
attrs = d['queues'][name]
# 10.1 Update queue parameterization: running jobs will be unaffected
# whereas queued jobs will be subject to the new settings.
queue.ram = attrs['memory-limit']
queue.max_time = attrs['time-limit']
queue.cores_per_slot = attrs['cores-per-slot']
queue.max_slots_per_user = attrs['max-slots-per-user']
logger.info("Updating `%s'...", queue)
queue.save()
# 10.2 Update the queue-slot allocation
queue.slots.delete()
for hostname, par in attrs['slots'].items():
worker = Worker.objects.get(name=hostname)
priority = par.get('priority', 0)
slot = Slot(
worker=worker,
queue=queue,
priority=priority,
quantity=par['quantity'],
)
logger.info("Creating `%s'...", slot)
slot.save()
# 10.3 Associate and dissociate environments
queue.environments.clear()
for e in attrs['environments']:
env = Environment.objects.get_by_natural_key(e)
logger.info("Appending `%s' to `%s'...", env, queue)
queue.environments.add(env)
# 11. Delete queues not mentioned on the new configuration
delete_qnames = current_qnames.difference(config_qnames)
for name in delete_qnames:
q = Queue.objects.select_for_update().get(name=name)
logger.info("Deleting `%s'...", queue)
q.delete() # slots are deleted on cascade
# 12. Delete environments w/o associated queues
qless_envs = Environment.objects.select_for_update().filter(queues=None)
for env in qless_envs:
logger.info("Deleting `%s'...", env)
env.delete()
# 13. Delete workers not mentioned on the new configuration
for w in wobjects_to_be_deleted:
logger.info("Deleting `%s'...", w)
w.delete()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment