From 97355ff973a73d1d846911cc66025af3c82ac937 Mon Sep 17 00:00:00 2001 From: Andre Anjos <andre.dos.anjos@gmail.com> Date: Thu, 21 Apr 2016 17:15:30 +0200 Subject: [PATCH] [backend] Add utilities for cache cleaning and queue setup from the old scheduler code base --- beat/web/backend/utils.py | 335 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) create mode 100644 beat/web/backend/utils.py diff --git a/beat/web/backend/utils.py b/beat/web/backend/utils.py new file mode 100644 index 000000000..246f6d207 --- /dev/null +++ b/beat/web/backend/utils.py @@ -0,0 +1,335 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + +'''Utilities for backend management''' + +import os +import fnmatch +import glob +import time + +import logging +logger = logging.getLogger(__name__) + +from django.db import transaction + +from ..experiments.models import CachedFile, Block, Experiment +from .models import Queue, Worker, Slot, Job, Environment + + +def cleanup_cache(path, age_in_minutes=0, delete=False): + """Removes files which are older than a certain threshold from a directory + + This function can be used for cache maintenance. It allows the the user to + remove all files under a certain directory which are older than a number of + minutes. It also save some files from being erased (if they belong to + experiments which are running or scheduled). + + + Parameters: + + path (str): The path to the cache root (typically, + ``settings.CACHE_ROOT``) + + age_in_minutes (int, Optional): The mininum access time of files (in + minutes) that will be erased from the disk. All files older than this + cut-off value will be erased. All others will be kept. + + delete (bool, Optional): If set (defaults to False), then we really + delete the marked files. Otherwise, we just return the list of files + which we would delete. + + + Returns: + + list: A list of filenames that will be/were removed + + """ + + cutoff_access_time = time.time() - (60*age_in_minutes) + + logger.info("Running `%s' clean-up: set file-access cutoff time to `%s'", + path, time.ctime(cutoff_access_time)) + + # Gets a list of cache files for active experiments: + blocks = Block.objects.filter(experiment__in=Experiment.objects.filter(status__in=(Experiment.SCHEDULED, Experiment.RUNNING))) + save_list = [k.path() + '*' for k in CachedFile.objects.filter(blocks__in=blocks)] + + # Finds the files with an associated '.lock' file + for root, dirnames, filenames in os.walk(path): + for filename in fnmatch.filter(filenames, '*.lock'): + save_list += glob.glob(os.path.splitext(os.path.join(root, filename))[0] + '*') + + removed_files = [] + + def _remove_dirs(p): + d = os.path.dirname(p) + if not os.listdir(d): + logger.info("[rmdir] `%s'", d) + os.removedirs(d) + + for p, dirs, files in os.walk(path, topdown=True): + + files = [f for f in files if not f.startswith('.')] + dirs[:] = [d for d in dirs if not d.startswith('.')] #note: in-place + + for f in files: + fullpath = os.path.join(p, f) + + save_it = sum([fullpath.endswith(k) for k in save_list]) + if save_it: + logger.debug("[skip] `%s' (user list)", fullpath) + continue + + # if you get to this point and the file ends in '.part', erase it + ext = os.path.splitext(fullpath) + if len(ext) > 1 and ext[1] == '.part': + if delete: + logger.info("[rm] `%s' (dangling)", fullpath) + os.remove(fullpath) + _remove_dirs(fullpath) + removed_files.append(fullpath) + + continue + + # if you get to this point, check file access time + if os.path.getatime(fullpath) < cutoff_access_time: + if delete: + logger.info("[rm] `%s'", fullpath) + os.remove(fullpath) + _remove_dirs(fullpath) + removed_files.append(fullpath) + else: + logger.debug("[skip] `%s' (%f >= %f)", fullpath, + os.path.getatime(fullpath), cutoff_access_time) + + for d in dirs: #also remove empty directories + fullpath = os.path.join(p, d) + if not os.listdir(fullpath) and delete: os.removedirs(fullpath) + + return removed_files + + +@transaction.atomic +def setup(d): + '''Configures or re-configures the internal queue setup + + This method is called to re-configure the current backend architecture. It + is guaranteed to be called only if no experiments are currently running on + resources that will disappear (this is checked). The reason for this + present restriction goes through difficulties in handling "disappearing" + queues and/or environments. + + + Parameters: + + d (dict): A JSON generated directory that defines the queue and slots + available at the backend farm. + + + Raises: + + RuntimeError: If an error is detected and the re-configuration cannot + take place. In this case, it is safe to assume nothing has changed. + + ''' + + # 1. We get a list of all current queue/environment combinations + q_envs = set([(q.name, str(e)) \ + for q in Queue.objects.all() for e in q.environments]) + + # 2. We get a list of new queue/environment combinations + config_q_envs = set([(qname, envkey) \ + for qname, qpar in d['queues'].items() \ + for envkey in qpar['environments'].keys()]) + + # 3. We figure out which combinations of queue/environment's need to be + # deleted. + delete_q_envs = q_envs.difference(config_q_envs) + + # 4. We figure out which combinations of queue/environment's are currently + # used by running or queued jobs. + used_q_envs = set([(job.block.queue.name, str(job.block.environment)) \ + for job in Job.objects.filter(status__in=(Job.PROCESSING, Job.QUEUED))]) + + # 5. We request that no jobs should be either executing or scheduled for + # execution on queue/environment combinations that need to be deleted. + used_to_be_deleted = used_q_envs.intersection(delete_q_envs) + if len(used_to_be_deleted) != 0: + qenv_names = ['/'.join(k) for k in used_to_be_deleted] + reason = 'There are jobs currently running or scheduled to run on ' \ + 'the following queue/environment combinations which are ' \ + 'supposed to be deleted: %s. Aborting reconfiguration.' + raise RuntimeError(reason % ', '.join(qenv_names)) + + # 6. Request that no worker that is being used will disappear + existing_workers = set(Worker.objects.values_list('name', flat=True)) + workers_to_be_deleted = existing_workers - set(d['workers']) + if workers_to_be_deleted: + wobjects_to_be_deleted = \ + Worker.objects.filter(name__in=workers_to_be_deleted) + else: + wobjects_to_be_deleted = [] + + for w in wobjects_to_be_deleted: + if w.load() != 0: + reason = 'There are jobs currently running or scheduled to run ' \ + 'on some of the workers that would disappear: %s. ' \ + 'Aborting reconfiguration.' + raise RuntimeError(reason % ', '.join(workers_to_be_deleted)) + + # 7. Create new environments + config_envs = set(d['environments'].keys()) + current_envs = set([str(k) for k in Environment.objects.all()]) + new_envs = config_envs.difference(current_envs) + + for envkey in new_envs: + attrs = d['environments'][envkey] + env = Environment( + name=attrs['name'], + version=attrs['version'], + short_description=attrs.get('short_description'), + description=attrs.get('description'), + ) + logger.info("Creating `%s'...", env) + env.save() + + # 8.1 Create new workers + config_workers = set(d['workers'].keys()) + current_workers = set(Worker.objects.values_list('name', flat=True)) + new_workers = config_workers - current_workers + + for name in new_workers: + attrs = d['workers'][name] + worker = Worker( + name=name, + active=False, + cores=attrs['cores'], + memory=attrs['memory'], + ) + logger.info("Creating `%s'...", worker) + worker.save() + + # 8.2 Update existing workers + update_workers = current_workers.intersection(config_workers) + for name in update_workers: + attrs = d['workers'][name] + worker = Worker.objects.select_for_update().get(name=name) + worker.cores = attrs['cores'] + worker.ram = attrs['memory'] + logger.info("Updating `%s'...", worker) + worker.save() + + # 9. Create new queues + config_qnames = set(d['queues'].keys()) + current_qnames = set(Queue.objects.values_list('name', flat=True)) + new_qnames = config_qnames.difference(current_qnames) + + for name in new_qnames: + attrs = d['queues'][name] + queue = Queue( + name=name, + memory_limit=attrs['memory-limit'], + time_limit=attrs['time-limit'], + cores_per_slot=attrs['cores-per-slot'], + max_slots_per_user=attrs['max-slots-per-user'], + ) + logger.info("Creating `%s'...", queue) + queue.save() + + for hostname, par in attrs['slots'].items(): + worker = Worker.objects.get(name=hostname) + priority = par.get('priority', 0) + slot = Slot( + queue=queue, + worker=worker, + priority=priority, + quantity=par['quantity'], + ) + logger.info("Creating `%s'...", slot) + slot.save() + + # Associates environments with queues + for envkey in attrs['environments']: + env = Environment.objects.get_by_natural_key(envkey) + logger.info("Appending `%s' to `%s'...", env, queue) + queue.environments.add(env) + + # 10. Update existing queues + update_qnames = current_qnames.intersection(config_qnames) + for name in update_qnames: + + queue = Queues.objects.select_for_update().get(name=name) + attrs = d['queues'][name] + + # 10.1 Update queue parameterization: running jobs will be unaffected + # whereas queued jobs will be subject to the new settings. + queue.ram = attrs['memory-limit'] + queue.max_time = attrs['time-limit'] + queue.cores_per_slot = attrs['cores-per-slot'] + queue.max_slots_per_user = attrs['max-slots-per-user'] + logger.info("Updating `%s'...", queue) + queue.save() + + # 10.2 Update the queue-slot allocation + queue.slots.delete() + for hostname, par in attrs['slots'].items(): + worker = Worker.objects.get(name=hostname) + priority = par.get('priority', 0) + slot = Slot( + worker=worker, + queue=queue, + priority=priority, + quantity=par['quantity'], + ) + logger.info("Creating `%s'...", slot) + slot.save() + + # 10.3 Associate and dissociate environments + queue.environments.clear() + for e in attrs['environments']: + env = Environment.objects.get_by_natural_key(e) + logger.info("Appending `%s' to `%s'...", env, queue) + queue.environments.add(env) + + # 11. Delete queues not mentioned on the new configuration + delete_qnames = current_qnames.difference(config_qnames) + for name in delete_qnames: + q = Queue.objects.select_for_update().get(name=name) + logger.info("Deleting `%s'...", queue) + q.delete() # slots are deleted on cascade + + # 12. Delete environments w/o associated queues + qless_envs = Environment.objects.select_for_update().filter(queues=None) + for env in qless_envs: + logger.info("Deleting `%s'...", env) + env.delete() + + # 13. Delete workers not mentioned on the new configuration + for w in wobjects_to_be_deleted: + logger.info("Deleting `%s'...", w) + w.delete() -- GitLab