diff --git a/beat/web/backend/__init__.py b/beat/web/backend/__init__.py index b84678dd5db8ee46bea528d98d03f1c1df143cde..db19c730f327a3a2e53f09d8da45c64241dee632 100644 --- a/beat/web/backend/__init__.py +++ b/beat/web/backend/__init__.py @@ -25,4 +25,4 @@ # # ############################################################################### -default_app_config = 'beat.web.backend.apps.BackendConfig' +default_app_config = "beat.web.backend.apps.BackendConfig" diff --git a/beat/web/backend/admin.py b/beat/web/backend/admin.py index 31704238648f90193463e2e0833a20a685326ad7..fcfdcf9ecbcec123033643257b6fde19c591a0b8 100755 --- a/beat/web/backend/admin.py +++ b/beat/web/backend/admin.py @@ -25,43 +25,37 @@ # # ############################################################################### -from django.contrib import admin from django import forms +from django.contrib import admin +from ..common.admin import Django18ProofGuardedModelAdmin +from ..common.admin import notify_by_email +from ..common.texts import Messages +from ..ui.forms import CodeMirrorRSTCharField from .models import Environment as EnvironmentModel from .models import EnvironmentLanguage as EnvironmentLanguageModel -from .models import Worker as WorkerModel -from .models import Queue as QueueModel -from .models import Slot as SlotModel from .models import Job as JobModel from .models import JobSplit as JobSplitModel +from .models import Queue as QueueModel +from .models import Slot as SlotModel +from .models import Worker as WorkerModel -from ..ui.forms import CodeMirrorRSTCharField -from ..common.texts import Messages -from ..common.admin import Django18ProofGuardedModelAdmin, notify_by_email +# ---------------------------------------------------------- -#---------------------------------------------------------- class EnvironmentModelForm(forms.ModelForm): description = CodeMirrorRSTCharField( - required=False, - help_text=Messages['description'], + required=False, help_text=Messages["description"], ) class Meta: model = EnvironmentModel exclude = [] widgets = { - 'name': forms.TextInput( - attrs=dict(size=100), - ), - 'version': forms.TextInput( - attrs=dict(size=40), - ), - 'short_description': forms.TextInput( - attrs=dict(size=100), - ), + "name": forms.TextInput(attrs=dict(size=100),), + "version": forms.TextInput(attrs=dict(size=40),), + "short_description": forms.TextInput(attrs=dict(size=100),), } @@ -71,66 +65,57 @@ class EnvironmentLanguageInline(admin.TabularInline): class Environment(admin.ModelAdmin): - list_display = ( - 'id', - 'name', - 'version', - 'sharing', - 'active', - 'short_description', + list_display = ( + "id", + "name", + "version", + "sharing", + "active", + "short_description", ) - search_fields = [ - 'name', - 'version', - 'short_description', - 'description', + search_fields = [ + "name", + "version", + "short_description", + "description", ] - list_display_links = ( - 'id', - 'name', + list_display_links = ( + "id", + "name", ) - inlines = [ - EnvironmentLanguageInline - ] + inlines = [EnvironmentLanguageInline] form = EnvironmentModelForm - filter_horizontal = [ - 'shared_with', - 'shared_with_team' - ] + filter_horizontal = ["shared_with", "shared_with_team"] actions = [ - notify_by_email('environment_notifications_enabled'), + notify_by_email("environment_notifications_enabled"), ] fieldsets = ( - (None, - dict( - fields=('name', 'version', 'previous_version', 'active'), - ), - ), - ('Documentation', - dict( - classes=('collapse',), - fields=('short_description', 'description',), - ), - ), - ('Sharing', - dict( - classes=('collapse',), - fields=('sharing', 'shared_with', 'shared_with_team'), - ), - ), + (None, dict(fields=("name", "version", "previous_version", "active"),),), + ( + "Documentation", + dict(classes=("collapse",), fields=("short_description", "description",),), + ), + ( + "Sharing", + dict( + classes=("collapse",), + fields=("sharing", "shared_with", "shared_with_team"), + ), + ), ) + admin.site.register(EnvironmentModel, Environment) -#---------------------------------------------------------- +# ---------------------------------------------------------- def activate_workers(modeladmin, request, queryset): @@ -139,7 +124,9 @@ def activate_workers(modeladmin, request, queryset): for q in queryset: q.active = True q.save() -activate_workers.short_description = 'Activate selected workers' + + +activate_workers.short_description = "Activate selected workers" def deactivate_workers(modeladmin, request, queryset): @@ -148,44 +135,55 @@ def deactivate_workers(modeladmin, request, queryset): for q in queryset: q.active = False q.save() -deactivate_workers.short_description = 'Deactivate selected workers' + + +deactivate_workers.short_description = "Deactivate selected workers" class Worker(admin.ModelAdmin): - list_display = ('id', 'name', 'cores', 'memory', 'active') - search_fields = ['name'] - list_display_links = ('id', 'name') + list_display = ("id", "name", "cores", "memory", "active") + search_fields = ["name"] + list_display_links = ("id", "name") actions = [ activate_workers, deactivate_workers, ] + admin.site.register(WorkerModel, Worker) -#---------------------------------------------------------- +# ---------------------------------------------------------- class SlotInline(admin.TabularInline): model = SlotModel -#---------------------------------------------------------- +# ---------------------------------------------------------- class Queue(Django18ProofGuardedModelAdmin): - list_display = ('id', 'name', 'memory_limit', 'time_limit', 'cores_per_slot', 'max_slots_per_user') - search_fields = ['name'] - list_display_links = ('id', 'name') - inlines = [SlotInline] + list_display = ( + "id", + "name", + "memory_limit", + "time_limit", + "cores_per_slot", + "max_slots_per_user", + ) + search_fields = ["name"] + list_display_links = ("id", "name") + inlines = [SlotInline] + admin.site.register(QueueModel, Queue) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplitInline(admin.TabularInline): @@ -195,25 +193,25 @@ class JobSplitInline(admin.TabularInline): return False def has_add_permission(self, request): - return False - + return False class Job(admin.ModelAdmin): - list_display = ('id', 'key', 'runnable_date', 'start_date', 'block', 'splits') - search_fields = ['block__name', 'block__experiment__name'] - list_display_links = ('id', 'block', 'key') - ordering = ('runnable_date', 'start_date', 'id') - inlines = [JobSplitInline] + list_display = ("id", "key", "runnable_date", "start_date", "block", "splits") + search_fields = ["block__name", "block__experiment__name"] + list_display_links = ("id", "block", "key") + ordering = ("runnable_date", "start_date", "id") + inlines = [JobSplitInline] # to avoid very slow loading of cached files - raw_id_fields = ('block',) + raw_id_fields = ("block",) def splits(self, obj): return obj.splits.count() def has_add_permission(self, request): - return False + return False + admin.site.register(JobModel, Job) diff --git a/beat/web/backend/api.py b/beat/web/backend/api.py index 65c34e880dec5b611a24b2776bb47812781a6fd5..a1b644a1858b3ae1481751f64d0ce7615cff0e2f 100755 --- a/beat/web/backend/api.py +++ b/beat/web/backend/api.py @@ -25,109 +25,114 @@ # # ############################################################################### -from rest_framework.decorators import api_view, permission_classes -from rest_framework.response import Response +from django.conf import settings from rest_framework import permissions from rest_framework import status - -from django.conf import settings - -from .models import Environment -from .models import Worker -from .models import LocalSchedulerProcesses +from rest_framework.decorators import api_view +from rest_framework.decorators import permission_classes +from rest_framework.response import Response from ..code.models import Code from . import local_scheduler +from .models import Environment +from .models import LocalSchedulerProcesses +from .models import Worker - -#---------------------------------------------------------- +# ---------------------------------------------------------- -@api_view(['GET']) +@api_view(["GET"]) @permission_classes([permissions.AllowAny]) def accessible_environments_list(request): """Returns all accessible environments for a given user""" # Retrieve the list of environments - environments = Environment.objects.filter(active=True).order_by('name', - 'version') + environments = Environment.objects.filter(active=True).order_by("name", "version") result = [] for environment in environments.iterator(): # Check that the user has access to the environment (has_access, accessibility) = environment.accessibility_for(request.user) - if not(has_access): + if not (has_access): continue # Retrieve the details about the queues queues = {} for queue in environment.queues.iterator(): - if request.user.has_perm('can_access', queue): + if request.user.has_perm("can_access", queue): queues[queue.name] = { - 'nb_slots': queue.number_of_slots(), - 'memory_limit': queue.memory_limit, - 'time_limit': queue.time_limit, - 'nb_cores_per_slot': queue.cores_per_slot, - 'max_slots_per_user':queue.max_slots_per_user, + "nb_slots": queue.number_of_slots(), + "memory_limit": queue.memory_limit, + "time_limit": queue.time_limit, + "nb_cores_per_slot": queue.cores_per_slot, + "max_slots_per_user": queue.max_slots_per_user, } # Only returns environments which effectively have queues associated if environment.queues.count(): - result.append({ - 'name': environment.name, - 'version': environment.version, - 'short_description': environment.short_description, - 'queues': queues, - 'accessibility': accessibility, - 'languages': [ Code.language_identifier(x.language) for x in environment.languages.iterator() ], - }) + result.append( + { + "name": environment.name, + "version": environment.version, + "short_description": environment.short_description, + "queues": queues, + "accessibility": accessibility, + "languages": [ + Code.language_identifier(x.language) + for x in environment.languages.iterator() + ], + } + ) return Response(result) -#---------------------------------------------------------- +# ---------------------------------------------------------- -@api_view(['POST']) +@api_view(["POST"]) @permission_classes([permissions.IsAdminUser]) def start_local_scheduler(request): """Starts the local scheduler""" - if not getattr(settings, 'SCHEDULING_PANEL', False): + if not getattr(settings, "SCHEDULING_PANEL", False): return Response(status=status.HTTP_403_FORBIDDEN) # Clean start-up LocalSchedulerProcesses.objects.all().delete() - address = getattr(settings, 'LOCAL_SCHEDULER_ADDRESS', '127.0.0.1') - port = getattr(settings, 'LOCAL_SCHEDULER_PORT', 50000) - use_docker = getattr(settings, 'LOCAL_SCHEDULER_USE_DOCKER', False) + address = getattr(settings, "LOCAL_SCHEDULER_ADDRESS", "127.0.0.1") + port = getattr(settings, "LOCAL_SCHEDULER_PORT", 50000) + use_docker = getattr(settings, "LOCAL_SCHEDULER_USE_DOCKER", False) - full_address = 'tcp://%s:%d' % (address, port) + full_address = "tcp://%s:%d" % (address, port) for worker in Worker.objects.all(): - (pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX, - settings.CACHE_ROOT, - full_address, - use_docker=use_docker) + (pid, _) = local_scheduler.start_worker( + worker.name, + settings.PREFIX, + settings.CACHE_ROOT, + full_address, + use_docker=use_docker, + ) LocalSchedulerProcesses(name=worker.name, pid=pid).save() (pid, _) = local_scheduler.start_scheduler(address=address, port=port) - LocalSchedulerProcesses(name='Scheduler', pid=pid).save() + LocalSchedulerProcesses(name="Scheduler", pid=pid).save() return Response(status=status.HTTP_204_NO_CONTENT) -#---------------------------------------------------------- +# ---------------------------------------------------------- -@api_view(['POST']) +@api_view(["POST"]) @permission_classes([permissions.IsAdminUser]) def stop_local_scheduler(request): """Starts the local scheduler""" - if not getattr(settings, 'SCHEDULING_PANEL', False): + if not getattr(settings, "SCHEDULING_PANEL", False): return Response(status=status.HTTP_403_FORBIDDEN) for process in LocalSchedulerProcesses.objects.all(): diff --git a/beat/web/backend/apps.py b/beat/web/backend/apps.py index 07e67f57e25f3dfa84d570f8df537c3989a107ef..208ac73911196d66c29b6d3e33b4ecf98d482025 100644 --- a/beat/web/backend/apps.py +++ b/beat/web/backend/apps.py @@ -28,7 +28,8 @@ from django.apps import AppConfig from django.utils.translation import ugettext_lazy as _ + class BackendConfig(AppConfig): - name = 'beat.web.backend' - verbose_name = _('Backend') + name = "beat.web.backend" + verbose_name = _("Backend") diff --git a/beat/web/backend/helpers.py b/beat/web/backend/helpers.py index f9c7541d54b2c6d5d99dc959143fe1cd786e9404..bbe68694beaacc49c81bb8fbddedcc033c86bbb9 100755 --- a/beat/web/backend/helpers.py +++ b/beat/web/backend/helpers.py @@ -25,46 +25,45 @@ # # ############################################################################### +import glob +import logging +import os +from datetime import datetime + +import simplejson from django.conf import settings from django.db import transaction from django.db.models import Count from django.db.models import Q -import logging -logger = logging.getLogger(__name__) - -import os -import glob -import simplejson -from datetime import datetime +import beat.core.algorithm +import beat.core.data +import beat.core.hash +from beat.core.utils import NumpyJSONEncoder -from ..experiments.models import Experiment from ..experiments.models import Block from ..experiments.models import CachedFile +from ..experiments.models import Experiment from ..experiments.models import Result as CacheResult -from .models import Queue from .models import Job from .models import JobSplit -from .models import Worker +from .models import Queue from .models import Result +from .models import Worker -import beat.core.hash -import beat.core.data -import beat.core.hash -import beat.core.algorithm -from beat.core.utils import NumpyJSONEncoder +logger = logging.getLogger(__name__) @transaction.atomic def schedule_experiment(experiment): - '''Schedules the experiment for execution at the backend + """Schedules the experiment for execution at the backend Scheduling an experiment only means creating one :py:class:`.models.Job` instance for each block of the experiment. This function is expected to be called on the web server. The Scheduler is tasked to notice the newly-scheduled experiment and execute it. - ''' + """ # Lock the experiment, so nobody else can modify it experiment = Experiment.objects.select_for_update().get(pk=experiment.pk) @@ -73,23 +72,25 @@ def schedule_experiment(experiment): if experiment.status != Experiment.PENDING: return - # Check that the queues and environments of all the blocks are still valid for block in experiment.blocks.all(): if block.queue is None: - raise RuntimeError("Block `%s' does not have a queue assigned " \ - "- this normally indicates the originally selected " \ - "queue was deleted since the experiment was first " \ - "configured. Re-configure this experiment and select a new " \ - "default or block-specific queue" % block.name) + raise RuntimeError( + "Block `%s' does not have a queue assigned " + "- this normally indicates the originally selected " + "queue was deleted since the experiment was first " + "configured. Re-configure this experiment and select a new " + "default or block-specific queue" % block.name + ) if block.environment is None: - raise RuntimeError("Block `%s' does not have an environment " \ - "assigned - this normally indicates the originally selected " \ - "environment was deleted since the experiment was first " \ - "configured. Re-configure this experiment and select a new " \ - "default or block-specific environment" % block.name) - + raise RuntimeError( + "Block `%s' does not have an environment " + "assigned - this normally indicates the originally selected " + "environment was deleted since the experiment was first " + "configured. Re-configure this experiment and select a new " + "default or block-specific environment" % block.name + ) # Process all the blocks of the experiment already_done = True @@ -99,8 +100,12 @@ def schedule_experiment(experiment): block = Block.objects.select_for_update().get(pk=block.pk) # Check if the block outputs aren't already in the cache - must_skip = all([cached_file.status == CachedFile.CACHED - for cached_file in block.outputs.all()]) + must_skip = all( + [ + cached_file.status == CachedFile.CACHED + for cached_file in block.outputs.all() + ] + ) if must_skip: block.status = Block.DONE @@ -120,7 +125,6 @@ def schedule_experiment(experiment): block.save() already_done = False - # Mark the experiment as scheduled (or done) if already_done: experiment.start_date = datetime.now() @@ -132,18 +136,18 @@ def schedule_experiment(experiment): experiment.save() -#---------------------------------------------------------- +# ---------------------------------------------------------- @transaction.atomic def cancel_experiment(experiment): - '''Cancel the execution of the experiment on the backend + """Cancel the execution of the experiment on the backend Cancelling an experiment only means marking the experiment as 'cancelling'. This function is expected to be called on the web server. The Scheduler is tasked to notice the newly-cancelled experiment and does what it takes. - ''' + """ # Lock the experiment, so nobody else can modify it experiment = Experiment.objects.select_for_update().get(pk=experiment.pk) @@ -157,11 +161,11 @@ def cancel_experiment(experiment): experiment.save() -#---------------------------------------------------------- +# ---------------------------------------------------------- def split_new_jobs(): - '''Retrieve all the jobs not already splitted, and create the appropriate splits''' + """Retrieve all the jobs not already splitted, and create the appropriate splits""" def _process(candidate_jobs): additional_jobs = [] @@ -170,8 +174,12 @@ def split_new_jobs(): for job in candidate_jobs: # Check that the files weren't generated since the scheduling of the job - must_skip = all([cached_file.status == CachedFile.CACHED - for cached_file in job.block.outputs.all()]) + must_skip = all( + [ + cached_file.status == CachedFile.CACHED + for cached_file in job.block.outputs.all() + ] + ) if must_skip: job.block.status = Block.DONE @@ -188,8 +196,10 @@ def split_new_jobs(): # Check that the job isn't a mirror of an currently running one nb_existing_splits = JobSplit.objects.filter( - ~Q(status=JobSplit.QUEUED) | Q(worker__isnull=False), job__key=job.key, - job__runnable_date__isnull=False, job__mirror=False, + ~Q(status=JobSplit.QUEUED) | Q(worker__isnull=False), + job__key=job.key, + job__runnable_date__isnull=False, + job__mirror=False, ).count() if nb_existing_splits > 0: @@ -202,22 +212,22 @@ def split_new_jobs(): return additional_jobs - # First retrieve all the candidate jobs from the database, process them and # if the processing mark any other job as a candidate one, process it too, # recursively, until no candidate job is left - candidate_jobs = Job.objects.annotate(nb_splits=Count('splits')).filter( - runnable_date__isnull=False, mirror=False, nb_splits=0) + candidate_jobs = Job.objects.annotate(nb_splits=Count("splits")).filter( + runnable_date__isnull=False, mirror=False, nb_splits=0 + ) while len(candidate_jobs) > 0: candidate_jobs = _process(candidate_jobs) -#---------------------------------------------------------- +# ---------------------------------------------------------- def process_newly_cancelled_experiments(): - '''Retrieve all the experiments that must be cancelled, and do it''' + """Retrieve all the experiments that must be cancelled, and do it""" # Retrieve all the experiments marked as cancelling cancelling_experiments = Experiment.objects.filter(status=Experiment.CANCELLING) @@ -226,7 +236,12 @@ def process_newly_cancelled_experiments(): for experiment in cancelling_experiments: # Only process those which have blocks that aren't already cancelling - if experiment.blocks.filter(Q(status=Block.PENDING) | Q(status=Block.PROCESSING)).count() == 0: + if ( + experiment.blocks.filter( + Q(status=Block.PENDING) | Q(status=Block.PROCESSING) + ).count() + == 0 + ): continue new_splits_to_cancel = cancel_all_blocks(experiment) @@ -239,11 +254,11 @@ def process_newly_cancelled_experiments(): return splits_to_cancel -#---------------------------------------------------------- +# ---------------------------------------------------------- def is_cache_complete(path, nb_expected_blocks, cache=settings.CACHE_ROOT): - '''Check that an entry of the cache is complete + """Check that an entry of the cache is complete Due to the distributed nature of the platform, with volumes shared by several different machines, a (hopefully) small delay might occur between @@ -251,37 +266,35 @@ def is_cache_complete(path, nb_expected_blocks, cache=settings.CACHE_ROOT): on the current machine. This function checks that all the necessary files are there, and complete. - ''' + """ def _extract_indices_from_filename(filename): - parts = filename.split('.') + parts = filename.split(".") return (int(parts[-3]), int(parts[-2]), filename) - def _verify_checksum(filename): - checksum_file = filename + '.checksum' + checksum_file = filename + ".checksum" try: - with open(checksum_file, 'rt') as f: + with open(checksum_file, "rt") as f: recorded = f.read().strip() actual = beat.core.hash.hashFileContents(filename) - except: + except Exception: return False - return (actual == recorded) - + return actual == recorded # Retrieve all the index files abs_path = os.path.join(cache, path) - index_files = glob.glob(abs_path + '*.index') - index_files = sorted([ _extract_indices_from_filename(x) for x in index_files ]) + index_files = glob.glob(abs_path + "*.index") + index_files = sorted([_extract_indices_from_filename(x) for x in index_files]) # Check that there is no missing index if len(index_files) > 1: for i in range(1, len(index_files)): - if index_files[i][0] != index_files[i-1][1] + 1: + if index_files[i][0] != index_files[i - 1][1] + 1: return False # Sum the number of blocks represented by each index file @@ -293,42 +306,47 @@ def is_cache_complete(path, nb_expected_blocks, cache=settings.CACHE_ROOT): return False # Check that the data file is complete - data_file = index_file.replace('.index', '.data') + data_file = index_file.replace(".index", ".data") if not _verify_checksum(data_file): return False # Retrieve the number of blocks from the file - with open(index_file, 'rt') as f: + with open(index_file, "rt") as f: lines = f.readlines() nb_blocks += len(lines) - return (nb_blocks == nb_expected_blocks) + return nb_blocks == nb_expected_blocks -#---------------------------------------------------------- +# ---------------------------------------------------------- def assign_splits_to_workers(): - '''Assign existing job splits to available workers from the appropriate queues''' + """Assign existing job splits to available workers from the appropriate queues""" # Retrieve the queues in a good order - queues = Queue.objects.order_by('-cores_per_slot', 'max_slots_per_user') + queues = Queue.objects.order_by("-cores_per_slot", "max_slots_per_user") # Retrieve the candidate jobs on each queue - candidate_splits_per_queue = [ (q, retrieve_candidate_splits_for_queue(q)) for q in queues ] - candidate_splits_per_queue = [ x for x in candidate_splits_per_queue if x[1] ] + candidate_splits_per_queue = [ + (q, retrieve_candidate_splits_for_queue(q)) for q in queues + ] + candidate_splits_per_queue = [x for x in candidate_splits_per_queue if x[1]] if not candidate_splits_per_queue: return [] - logger.debug('Considering splits: %s', candidate_splits_per_queue) + logger.debug("Considering splits: %s", candidate_splits_per_queue) # Build a "white list" of available workers - whitelist = dict([ (worker, worker.available_cores()) - for worker in Worker.objects.filter(active=True) ]) - - logger.debug('Worker availability: %s', whitelist) + whitelist = dict( + [ + (worker, worker.available_cores()) + for worker in Worker.objects.filter(active=True) + ] + ) + logger.debug("Worker availability: %s", whitelist) # Process the candidates of each queue assigned_splits = [] @@ -354,8 +372,9 @@ def assign_splits_to_workers(): if available_cores < required_cores: continue - logger.debug("Assigning `%s' to worker `%s'", - candidate_split, candidate_worker) + logger.debug( + "Assigning `%s' to worker `%s'", candidate_split, candidate_worker + ) assign_split_to_worker(candidate_split, candidate_worker) assigned_splits.append(candidate_split) @@ -363,42 +382,46 @@ def assign_splits_to_workers(): mark_similar_jobs_as_mirror(candidate_split.job) whitelist[candidate_worker] -= required_cores - logger.debug("`%s' cores available: %d", candidate_worker, whitelist[candidate_worker]) + logger.debug( + "`%s' cores available: %d", + candidate_worker, + whitelist[candidate_worker], + ) break - return JobSplit.objects.filter(id__in=[ x.id for x in assigned_splits ]) + return JobSplit.objects.filter(id__in=[x.id for x in assigned_splits]) -#---------------------------------------------------------- +# ---------------------------------------------------------- def get_configuration_for_split(split): - '''Retrieve the configuration to be used to execute the provided job split + """Retrieve the configuration to be used to execute the provided job split on a worker node - ''' + """ # Retrieve the block configuration configuration = simplejson.loads(str(split.job.block.command)) # (If necessary) Add the infos needed to access the database files if settings.DATASETS_UID is not None: - configuration['datasets_uid'] = settings.DATASETS_UID + configuration["datasets_uid"] = settings.DATASETS_UID if settings.DATASETS_ROOT_PATH is not None: - configuration['datasets_root_path'] = settings.DATASETS_ROOT_PATH + configuration["datasets_root_path"] = settings.DATASETS_ROOT_PATH # (If necessary) Add the range of indices to process if (split.start_index is not None) and (split.end_index is not None): - configuration['range'] = [split.start_index, split.end_index] + configuration["range"] = [split.start_index, split.end_index] return configuration -#---------------------------------------------------------- +# ---------------------------------------------------------- def on_split_started(split): - '''Must be called each time a split job is started''' + """Must be called each time a split job is started""" now = datetime.now() @@ -424,7 +447,6 @@ def on_split_started(split): split.job.block.experiment.start_date = now split.job.block.experiment.save() - # Mark the mirror jobs and their blocks as running mirror_jobs = Job.objects.filter(key=split.job.key, mirror=True) for mirror_job in mirror_jobs: @@ -442,18 +464,18 @@ def on_split_started(split): mirror_job.block.experiment.save() -#---------------------------------------------------------- +# ---------------------------------------------------------- def on_split_done(split, result): - '''Must be called each time a split job is successfully completed''' + """Must be called each time a split job is successfully completed""" result = Result( - status = result['status'], - stdout = result['stdout'], - stderr = result['stderr'], - usrerr = result['user_error'], - _stats = simplejson.dumps(result['statistics'], indent=2), + status=result["status"], + stdout=result["stdout"], + stderr=result["stderr"], + usrerr=result["user_error"], + _stats=simplejson.dumps(result["statistics"], indent=2), ) result.save() @@ -466,27 +488,22 @@ def on_split_done(split, result): update_job(split.job) -#---------------------------------------------------------- +# ---------------------------------------------------------- def on_split_fail(split, result): - '''Must be called each time a split job is successfully completed''' + """Must be called each time a split job is successfully completed""" if isinstance(result, dict): result = Result( - status = result['status'], - stdout = result['stdout'], - stderr = result['stderr'], - usrerr = result['user_error'], - _stats = simplejson.dumps(result['statistics'], indent=2), + status=result["status"], + stdout=result["stdout"], + stderr=result["stderr"], + usrerr=result["user_error"], + _stats=simplejson.dumps(result["statistics"], indent=2), ) else: - result = Result( - status = 1, - stdout = '', - stderr = result, - usrerr = '', - ) + result = Result(status=1, stdout="", stderr=result, usrerr="",) result.save() @@ -499,11 +516,11 @@ def on_split_fail(split, result): return update_job(split.job) -#---------------------------------------------------------- +# ---------------------------------------------------------- def on_split_cancelled(split): - '''Must be called each time a split job is successfully cancelled''' + """Must be called each time a split job is successfully cancelled""" split.status = JobSplit.CANCELLED split.end_date = datetime.now() @@ -513,36 +530,36 @@ def on_split_cancelled(split): return update_job(split.job) -#---------------------------------------------------------- +# ---------------------------------------------------------- def retrieve_candidate_splits_for_queue(queue): - '''Retrieve the splits assigned to the given queue that could be considered + """Retrieve the splits assigned to the given queue that could be considered for execution - ''' + """ # Retrieve the pending jobs assigned to the queue, from oldest to newest - splits = JobSplit.objects.filter(job__block__queue=queue, status=JobSplit.QUEUED, - worker__isnull=True - ).order_by('job__runnable_date') - + splits = JobSplit.objects.filter( + job__block__queue=queue, status=JobSplit.QUEUED, worker__isnull=True + ).order_by("job__runnable_date") # Retrieve the list of the users that submitted those jobs - users = set(splits.values_list('job__block__experiment__author', flat=True)) - + users = set(splits.values_list("job__block__experiment__author", flat=True)) # Determine how much slots each user is already using on the queue - user_current_slots = [ JobSplit.objects.filter(job__block__experiment__author=k, - job__block__queue=queue, - status=JobSplit.PROCESSING).count() - for k in users ] - + user_current_slots = [ + JobSplit.objects.filter( + job__block__experiment__author=k, + job__block__queue=queue, + status=JobSplit.PROCESSING, + ).count() + for k in users + ] # Determine how much slots each user is still afforded on the queue - allowance = [ queue.max_slots_per_user - k for k in user_current_slots ] + allowance = [queue.max_slots_per_user - k for k in user_current_slots] allowance = dict(zip(users, allowance)) - # Limit runnable splits so we reach a maximum of allowed user slots candidates = [] for split in splits: @@ -551,17 +568,16 @@ def retrieve_candidate_splits_for_queue(queue): candidates.append(split) allowance[author] -= 1 - # Return the list of candidates splits return candidates -#---------------------------------------------------------- +# ---------------------------------------------------------- @transaction.atomic def assign_split_to_worker(split, worker): - '''Schedules the split to be executed on a given worker''' + """Schedules the split to be executed on a given worker""" split = JobSplit.objects.select_for_update().get(pk=split.pk) worker = Worker.objects.select_for_update().get(pk=worker.pk) @@ -569,18 +585,24 @@ def assign_split_to_worker(split, worker): split.worker = worker split.save() - logger.info("Job split %s scheduled at `%s' was assigned to `%s'", - split, split.job.block.queue, worker) + logger.info( + "Job split %s scheduled at `%s' was assigned to `%s'", + split, + split.job.block.queue, + worker, + ) -#---------------------------------------------------------- +# ---------------------------------------------------------- @transaction.atomic def mark_similar_jobs_as_mirror(job): - '''Mark all similar jobs as mirror, and delete their job splits''' + """Mark all similar jobs as mirror, and delete their job splits""" - similar_jobs = Job.objects.select_for_update().filter(key=job.key).exclude(pk=job.pk) + similar_jobs = ( + Job.objects.select_for_update().filter(key=job.key).exclude(pk=job.pk) + ) for similar_job in similar_jobs: similar_job.mirror = True @@ -592,28 +614,27 @@ def mark_similar_jobs_as_mirror(job): logger.info("Job `%s' is now marked as a mirror of `%s'", similar_job, job) -#---------------------------------------------------------- +# ---------------------------------------------------------- def update_job(job): - def _collect_results(splits): cached_files_infos = dict( - cpu_time = 0.0, - max_memory = 0, - stdout = '', - stderr = '', - error_report = '', - data_read_size = 0, - data_written_size = 0, - data_read_nb_blocks = 0, - data_written_nb_blocks = 0, - data_read_time = 0.0, - data_written_time = 0.0, - queuing_time = 0.0, - linear_execution_time = 0.0, - speed_up_real = 1.0, - speed_up_maximal = 1.0, + cpu_time=0.0, + max_memory=0, + stdout="", + stderr="", + error_report="", + data_read_size=0, + data_written_size=0, + data_read_nb_blocks=0, + data_written_nb_blocks=0, + data_read_time=0.0, + data_written_time=0.0, + queuing_time=0.0, + linear_execution_time=0.0, + speed_up_real=1.0, + speed_up_maximal=1.0, ) split_durations = [] @@ -623,56 +644,78 @@ def update_job(job): statistics = split.result.stats - cached_files_infos['cpu_time'] += statistics.cpu['user'] + statistics.cpu['system'] - cached_files_infos['max_memory'] += statistics.memory['rss'] + cached_files_infos["cpu_time"] += ( + statistics.cpu["user"] + statistics.cpu["system"] + ) + cached_files_infos["max_memory"] += statistics.memory["rss"] - header = '' + header = "" if split.start_index is not None: - header = 'Split #%d (from indices %d to %d):' % ( - split.split_index, split.start_index, split.end_index) - header += '\n' + ('=' * len(header)) + '\n' - - stdout = split.result.stdout if split.result.stdout != '\n' else '' - stderr = split.result.stderr if split.result.stderr != '\n' else '' - - if stdout != '': - cached_files_infos['stdout'] += header + stdout + '\n' - - if stderr != '': - cached_files_infos['stderr'] += header + stderr + '\n' - - if split.result.usrerr != '': - cached_files_infos['error_report'] += header + split.result.usrerr + '\n' - - if 'volume' in statistics.data: - cached_files_infos['data_read_size'] += statistics.data['volume'].get('read', 0) - cached_files_infos['data_written_size'] += statistics.data['volume'].get('write', 0) - - if 'blocks' in statistics.data: - cached_files_infos['data_read_nb_blocks'] += statistics.data['blocks'].get('read', 0) - cached_files_infos['data_written_nb_blocks'] += statistics.data['blocks'].get('write', 0) - - if 'time' in statistics.data: - cached_files_infos['data_read_time'] += statistics.data['time'].get('read', 0) - cached_files_infos['data_written_time'] += statistics.data['time'].get('write', 0) + header = "Split #%d (from indices %d to %d):" % ( + split.split_index, + split.start_index, + split.end_index, + ) + header += "\n" + ("=" * len(header)) + "\n" + + stdout = split.result.stdout if split.result.stdout != "\n" else "" + stderr = split.result.stderr if split.result.stderr != "\n" else "" + + if stdout != "": + cached_files_infos["stdout"] += header + stdout + "\n" + + if stderr != "": + cached_files_infos["stderr"] += header + stderr + "\n" + + if split.result.usrerr != "": + cached_files_infos["error_report"] += ( + header + split.result.usrerr + "\n" + ) + + if "volume" in statistics.data: + cached_files_infos["data_read_size"] += statistics.data["volume"].get( + "read", 0 + ) + cached_files_infos["data_written_size"] += statistics.data[ + "volume" + ].get("write", 0) + + if "blocks" in statistics.data: + cached_files_infos["data_read_nb_blocks"] += statistics.data[ + "blocks" + ].get("read", 0) + cached_files_infos["data_written_nb_blocks"] += statistics.data[ + "blocks" + ].get("write", 0) + + if "time" in statistics.data: + cached_files_infos["data_read_time"] += statistics.data["time"].get( + "read", 0 + ) + cached_files_infos["data_written_time"] += statistics.data["time"].get( + "write", 0 + ) job = splits[0].job - cached_files_infos['queuing_time'] = (job.start_date - job.runnable_date).total_seconds() - cached_files_infos['linear_execution_time'] = sum(split_durations) + cached_files_infos["queuing_time"] = ( + job.start_date - job.runnable_date + ).total_seconds() + cached_files_infos["linear_execution_time"] = sum(split_durations) if job.block.required_slots > 1: - cached_files_infos['speed_up_real'] = float(cached_files_infos['linear_execution_time']) / \ - (job.end_date - job.start_date).total_seconds() - cached_files_infos['speed_up_maximal'] = float(cached_files_infos['linear_execution_time']) / \ - max(split_durations) + cached_files_infos["speed_up_real"] = ( + float(cached_files_infos["linear_execution_time"]) + / (job.end_date - job.start_date).total_seconds() + ) + cached_files_infos["speed_up_maximal"] = float( + cached_files_infos["linear_execution_time"] + ) / max(split_durations) return cached_files_infos - splits_to_cancel = [] - # If the job is failed if job.splits.filter(status=JobSplit.FAILED).count() > 0: @@ -689,22 +732,28 @@ def update_job(job): split.status = JobSplit.CANCELLING split.save() - # Check that all the splits are done - if job.splits.filter(Q(status=JobSplit.QUEUED) | Q(status=JobSplit.PROCESSING) | Q(status=JobSplit.CANCELLING)).count() > 0: + if ( + job.splits.filter( + Q(status=JobSplit.QUEUED) + | Q(status=JobSplit.PROCESSING) + | Q(status=JobSplit.CANCELLING) + ).count() + > 0 + ): return splits_to_cancel - # Save the end date - job.end_date = job.splits.order_by('-end_date')[0].end_date + job.end_date = job.splits.order_by("-end_date")[0].end_date job.save() - # Did the job fail? if job.splits.filter(status=JobSplit.FAILED).count() > 0: # (If necessary) Update the cached files - splits = job.splits.filter(Q(status=JobSplit.FAILED) | Q(status=JobSplit.COMPLETED)) + splits = job.splits.filter( + Q(status=JobSplit.FAILED) | Q(status=JobSplit.COMPLETED) + ) cached_files_infos = _collect_results(splits) job.block.outputs.update(**cached_files_infos) @@ -771,7 +820,7 @@ def update_job(job): mirror_job.block.end_date = job.end_date mirror_job.block.save() - # Update the dependent jobs + # Update the dependent jobs additional_jobs = update_dependent_jobs(mirror_job) # (If necessary) Update the experiment @@ -783,7 +832,6 @@ def update_job(job): # Delete the job job.delete() - # Was the job cancelled? elif job.splits.filter(status=JobSplit.CANCELLED).count() > 0: @@ -799,18 +847,17 @@ def update_job(job): # Delete the job job.delete() - return splits_to_cancel -#---------------------------------------------------------- +# ---------------------------------------------------------- def update_dependent_jobs(job): - '''Mark the dependent jobs of the provided one as runnable + """Mark the dependent jobs of the provided one as runnable Intended to be called after a job is done - ''' + """ updated_jobs = [] @@ -823,21 +870,21 @@ def update_dependent_jobs(job): return updated_jobs -#---------------------------------------------------------- +# ---------------------------------------------------------- def cancel_all_blocks(experiment): - '''Mark the all the blocks of the provided experiment as cancelled + """Mark the all the blocks of the provided experiment as cancelled Intended to be called after a job has failed - ''' + """ splits_to_cancel = [] - # Retrieve all the blocks to cancel - blocks_to_cancel = experiment.blocks.filter(Q(status=Block.PROCESSING) | Q(status=Block.PENDING)) \ - .exclude(job__mirror=True) + blocks_to_cancel = experiment.blocks.filter( + Q(status=Block.PROCESSING) | Q(status=Block.PENDING) + ).exclude(job__mirror=True) for block in blocks_to_cancel: @@ -871,10 +918,10 @@ def cancel_all_blocks(experiment): block.set_canceled() block.job.delete() - # Retrieve all the mirror blocks - mirror_blocks_to_cancel = experiment.blocks.filter(Q(status=Block.PROCESSING) | Q(status=Block.PENDING)) \ - .filter(job__mirror=True) + mirror_blocks_to_cancel = experiment.blocks.filter( + Q(status=Block.PROCESSING) | Q(status=Block.PENDING) + ).filter(job__mirror=True) for block in mirror_blocks_to_cancel: block.set_canceled() @@ -883,7 +930,7 @@ def cancel_all_blocks(experiment): return splits_to_cancel -#---------------------------------------------------------- +# ---------------------------------------------------------- @transaction.atomic @@ -893,25 +940,25 @@ def update_experiment(experiment): # Experiment done? if experiment.blocks.exclude(status=Block.DONE).count() == 0: experiment.status = Experiment.DONE - experiment.end_date = experiment.blocks.order_by('-end_date')[0].end_date + experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date experiment.save() # Experiment failed? elif experiment.blocks.filter(status=Block.FAILED).count() > 0: if experiment.blocks.filter(status=Block.PROCESSING).count() == 0: experiment.status = Experiment.FAILED - experiment.end_date = experiment.blocks.order_by('-end_date')[0].end_date + experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date experiment.save() # Experiment cancelled? elif experiment.blocks.filter(status=Block.CANCELLED).count() > 0: if experiment.blocks.filter(status=Block.PROCESSING).count() == 0: experiment.status = Experiment.PENDING - experiment.end_date = experiment.blocks.order_by('-end_date')[0].end_date + experiment.end_date = experiment.blocks.order_by("-end_date")[0].end_date experiment.save() -#---------------------------------------------------------- +# ---------------------------------------------------------- def load_results_from_cache(block, cached_file): @@ -922,26 +969,26 @@ def load_results_from_cache(block, cached_file): return data_source = beat.core.data.CachedDataSource() - data_source.setup(os.path.join(settings.CACHE_ROOT, - beat.core.hash.toPath(cached_file.hash)), - settings.PREFIX) + data_source.setup( + os.path.join(settings.CACHE_ROOT, beat.core.hash.toPath(cached_file.hash)), + settings.PREFIX, + ) (output_data, start_index, end_index) = data_source[0] if output_data is not None: algorithm = beat.core.algorithm.Algorithm( - settings.PREFIX, block.algorithm.fullname()) + settings.PREFIX, block.algorithm.fullname() + ) for field, value in output_data.as_dict().items(): - res, _ = CacheResult.objects.get_or_create( - name=field, cache=cached_file) - res.primary = algorithm.results[field]['display'] + res, _ = CacheResult.objects.get_or_create(name=field, cache=cached_file) + res.primary = algorithm.results[field]["display"] res.type = algorithm.results[field]["type"] - if res.type in ['int32', 'float32', 'bool', 'string']: + if res.type in ["int32", "float32", "bool", "string"]: res.data_value = str(value) else: - res.data_value = simplejson.dumps( - value, indent=4, cls=NumpyJSONEncoder) + res.data_value = simplejson.dumps(value, indent=4, cls=NumpyJSONEncoder) res.save() diff --git a/beat/web/backend/local_scheduler.py b/beat/web/backend/local_scheduler.py index fc799842f1db43c94cc65b1a2a5b5004954c748f..ee00a6ed5d7d6a257e54b4c72eadfa0bc8ddd5af 100755 --- a/beat/web/backend/local_scheduler.py +++ b/beat/web/backend/local_scheduler.py @@ -26,73 +26,72 @@ ############################################################################### -from django.conf import settings -from django import db - import multiprocessing -import psutil import signal -from ..scripts import scheduler +import psutil +from django import db +from django.conf import settings from beat.core.scripts import worker +from ..scripts import scheduler -#---------------------------------------------------------- +# ---------------------------------------------------------- class SchedulerProcess(multiprocessing.Process): + def __init__(self, arguments, queue=None): + super(SchedulerProcess, self).__init__() - def __init__(self, arguments, queue=None): - super(SchedulerProcess, self).__init__() - - if queue is None: - self.queue = multiprocessing.Queue() - else: - self.queue = queue + if queue is None: + self.queue = multiprocessing.Queue() + else: + self.queue = queue - self.arguments = arguments + self.arguments = arguments + def run(self): + self.queue.put("STARTED") + scheduler.main(self.arguments) - def run(self): - self.queue.put('STARTED') - scheduler.main(self.arguments) - -#---------------------------------------------------------- +# ---------------------------------------------------------- class WorkerProcess(multiprocessing.Process): + def __init__(self, arguments, queue=None): + super(WorkerProcess, self).__init__() - def __init__(self, arguments, queue=None): - super(WorkerProcess, self).__init__() - - if queue is None: - self.queue = multiprocessing.Queue() - else: - self.queue = queue - - self.arguments = arguments + if queue is None: + self.queue = multiprocessing.Queue() + else: + self.queue = queue + self.arguments = arguments - def run(self): - self.queue.put('STARTED') - worker.main(self.arguments) + def run(self): + self.queue.put("STARTED") + worker.main(self.arguments) -#---------------------------------------------------------- +# ---------------------------------------------------------- -def start_scheduler(settings_module='beat.web.settings.settings', interval=5, - address='127.0.0.1', port=50000): +def start_scheduler( + settings_module="beat.web.settings.settings", + interval=5, + address="127.0.0.1", + port=50000, +): args = [ - '--settings=%s' % str(settings_module), - '--interval=%d' % int(interval), - '--address=%s' % str(address), - '--port=%d' % int(port), + "--settings=%s" % str(settings_module), + "--interval=%d" % int(interval), + "--address=%s" % str(address), + "--port=%d" % int(port), ] - if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None: + if getattr(settings, "LOCAL_SCHEDULER_VERBOSITY", None) is not None: args.append(settings.LOCAL_SCHEDULER_VERBOSITY) db.connections.close_all() @@ -104,21 +103,21 @@ def start_scheduler(settings_module='beat.web.settings.settings', interval=5, return (process.pid, process) -#---------------------------------------------------------- +# ---------------------------------------------------------- def start_worker(name, prefix, cache, scheduler_address, use_docker=False): args = [ - '--prefix=%s' % str(prefix), - '--cache=%s' % str(cache), - '--name=%s' % str(name), - str(scheduler_address) + "--prefix=%s" % str(prefix), + "--cache=%s" % str(cache), + "--name=%s" % str(name), + str(scheduler_address), ] if use_docker: - args.insert(3, '--docker') + args.insert(3, "--docker") - if getattr(settings, 'LOCAL_SCHEDULER_VERBOSITY', None) is not None: + if getattr(settings, "LOCAL_SCHEDULER_VERBOSITY", None) is not None: args.insert(3, settings.LOCAL_SCHEDULER_VERBOSITY) process = WorkerProcess(args) @@ -128,7 +127,7 @@ def start_worker(name, prefix, cache, scheduler_address, use_docker=False): return (process.pid, process) -#---------------------------------------------------------- +# ---------------------------------------------------------- def stop_process(pid): diff --git a/beat/web/backend/management/commands/cleanup_cache.py b/beat/web/backend/management/commands/cleanup_cache.py index 60217e7905d679694ff8cb259bb544cff249affc..19298a1b5e6eb180b1d0168c918a205b45237831 100644 --- a/beat/web/backend/management/commands/cleanup_cache.py +++ b/beat/web/backend/management/commands/cleanup_cache.py @@ -25,54 +25,68 @@ # # ############################################################################### -import os import logging -logger = logging.getLogger('beat.web') +import os -from django.core.management.base import BaseCommand from django.conf import settings +from django.core.management.base import BaseCommand from ...utils import cleanup_cache -from ....import __version__ +logger = logging.getLogger("beat.web") -class Command(BaseCommand): - help = 'Cleans-up the cache, removing old files' +class Command(BaseCommand): + help = "Cleans-up the cache, removing old files" def add_arguments(self, parser): - parser.add_argument('--olderthan', type=int, metavar='MINUTES', - default=0, help='All files which are older than this value ' \ - 'in *minutes* and are not locked or being used by active ' \ - 'experiments (running or scheduled) will be deleted ' \ - '[default: %(default)s]') - - parser.add_argument('--delete', action='store_true', default=False, - help='By default we only list cache files that will ' \ - 'be erased. If you pass this flag, then we really erase them') - - parser.add_argument('--path', default=settings.CACHE_ROOT, - help='By default, we erase the CACHE path on your settings. Set ' \ - 'this flag if you want to operate on a different path ' \ - '[default: %(default)s]') - + parser.add_argument( + "--olderthan", + type=int, + metavar="MINUTES", + default=0, + help="All files which are older than this value " + "in *minutes* and are not locked or being used by active " + "experiments (running or scheduled) will be deleted " + "[default: %(default)s]", + ) + + parser.add_argument( + "--delete", + action="store_true", + default=False, + help="By default we only list cache files that will " + "be erased. If you pass this flag, then we really erase them", + ) + + parser.add_argument( + "--path", + default=settings.CACHE_ROOT, + help="By default, we erase the CACHE path on your settings. Set " + "this flag if you want to operate on a different path " + "[default: %(default)s]", + ) def handle(self, *ignored, **arguments): # Setup this command's logging level global logger - arguments['verbosity'] = int(arguments['verbosity']) - if arguments['verbosity'] >= 1: - if arguments['verbosity'] == 1: logger.setLevel(logging.INFO) - elif arguments['verbosity'] >= 2: logger.setLevel(logging.DEBUG) - - deleted = cleanup_cache(arguments['path'], - age_in_minutes=arguments['olderthan'], - delete=arguments['delete']) - - if not arguments['delete']: + arguments["verbosity"] = int(arguments["verbosity"]) + if arguments["verbosity"] >= 1: + if arguments["verbosity"] == 1: + logger.setLevel(logging.INFO) + elif arguments["verbosity"] >= 2: + logger.setLevel(logging.DEBUG) + + deleted = cleanup_cache( + arguments["path"], + age_in_minutes=arguments["olderthan"], + delete=arguments["delete"], + ) + + if not arguments["delete"]: print("%d cache files can be deleted" % len(deleted)) for k in deleted: - print(os.path.join(arguments['path'], k)) + print(os.path.join(arguments["path"], k)) diff --git a/beat/web/backend/management/commands/qsetup.py b/beat/web/backend/management/commands/qsetup.py index d6ae088df68b47c4fef5988e570d421047f29861..fd83f27d8ce9039b76c952ba5c085162d9cf5cb0 100755 --- a/beat/web/backend/management/commands/qsetup.py +++ b/beat/web/backend/management/commands/qsetup.py @@ -25,17 +25,17 @@ # # ############################################################################### -import sys import logging -import simplejson +import socket +import sys # Default configuration to start the state with import psutil -import socket - +import simplejson from django.core.management.base import BaseCommand -from ...utils import setup_backend, dump_backend +from ...utils import dump_backend +from ...utils import setup_backend logger = logging.getLogger("beat.web") diff --git a/beat/web/backend/migrations/0001_initial.py b/beat/web/backend/migrations/0001_initial.py index bf12834df99bb8fcad3f59fbec1a1c4af3e65051..f6f0d6e011fbf3d5c44529f16944a96ac247cb51 100644 --- a/beat/web/backend/migrations/0001_initial.py +++ b/beat/web/backend/migrations/0001_initial.py @@ -27,77 +27,216 @@ from __future__ import unicode_literals -from django.db import migrations, models from django.conf import settings +from django.db import migrations +from django.db import models class Migration(migrations.Migration): dependencies = [ migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ('team', '0001_initial'), + ("team", "0001_initial"), ] operations = [ migrations.CreateModel( - name='Environment', + name="Environment", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('sharing', models.CharField(default=b'P', max_length=1, choices=[(b'P', b'Private'), (b'S', b'Shared'), (b'A', b'Public'), (b'U', b'Usable')])), - ('name', models.CharField(help_text=b'The name for this object (space-like characters will be automatically replaced by dashes)', max_length=200)), - ('version', models.CharField(help_text=b'Free-style version for this environment (normally read from the Worker/Scheduler available environments)', max_length=20)), - ('short_description', models.CharField(default=b'', help_text=b'Describe the object succinctly (try to keep it under 80 characters)', max_length=100, blank=True)), - ('description', models.TextField(default=b'', help_text=b'Describe the object thoroughly using <a href="http://docutils.sourceforge.net/rst.html">reStructuredText mark-up</a><br/><i class="fa fa-thumbs-up"></i> The ruler at 80 columns indicate suggested <a href="https://en.wikipedia.org/wiki/POSIX">POSIX line breaks</a> (for readability).<br/><i class="fa fa-thumbs-up"></i> The editor will automatically enlarge to accomodate the entirety of your input<br/><i class="fa fa-thumbs-up"></i> Use <a href="http://codemirror.net/doc/manual.html#commands">keyboard shortcuts</a> for search/replace and faster editing. For example, use Ctrl-F (PC) or Cmd-F (Mac) to search through this box', blank=True)), - ('creation_date', models.DateTimeField(auto_now_add=True, verbose_name=b'Creation date')), - ('active', models.BooleanField(default=True, help_text=b'If this environment can be used in experiments')), - ('previous_version', models.ForeignKey(related_name='next_versions', blank=True, to='backend.Environment', null=True, on_delete=models.SET_NULL)), - ('shared_with', models.ManyToManyField(related_name='shared_environments', to=settings.AUTH_USER_MODEL, blank=True)), - ('shared_with_team', models.ManyToManyField(related_name='shared_environments', to='team.Team', blank=True)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ( + "sharing", + models.CharField( + default=b"P", + max_length=1, + choices=[ + (b"P", b"Private"), + (b"S", b"Shared"), + (b"A", b"Public"), + (b"U", b"Usable"), + ], + ), + ), + ( + "name", + models.CharField( + help_text=b"The name for this object (space-like characters will be automatically replaced by dashes)", + max_length=200, + ), + ), + ( + "version", + models.CharField( + help_text=b"Free-style version for this environment (normally read from the Worker/Scheduler available environments)", + max_length=20, + ), + ), + ( + "short_description", + models.CharField( + default=b"", + help_text=b"Describe the object succinctly (try to keep it under 80 characters)", + max_length=100, + blank=True, + ), + ), + ( + "description", + models.TextField( + default=b"", + help_text=b'Describe the object thoroughly using <a href="http://docutils.sourceforge.net/rst.html">reStructuredText mark-up</a><br/><i class="fa fa-thumbs-up"></i> The ruler at 80 columns indicate suggested <a href="https://en.wikipedia.org/wiki/POSIX">POSIX line breaks</a> (for readability).<br/><i class="fa fa-thumbs-up"></i> The editor will automatically enlarge to accomodate the entirety of your input<br/><i class="fa fa-thumbs-up"></i> Use <a href="http://codemirror.net/doc/manual.html#commands">keyboard shortcuts</a> for search/replace and faster editing. For example, use Ctrl-F (PC) or Cmd-F (Mac) to search through this box', + blank=True, + ), + ), + ( + "creation_date", + models.DateTimeField( + auto_now_add=True, verbose_name=b"Creation date" + ), + ), + ( + "active", + models.BooleanField( + default=True, + help_text=b"If this environment can be used in experiments", + ), + ), + ( + "previous_version", + models.ForeignKey( + related_name="next_versions", + blank=True, + to="backend.Environment", + null=True, + on_delete=models.SET_NULL, + ), + ), + ( + "shared_with", + models.ManyToManyField( + related_name="shared_environments", + to=settings.AUTH_USER_MODEL, + blank=True, + ), + ), + ( + "shared_with_team", + models.ManyToManyField( + related_name="shared_environments", to="team.Team", blank=True + ), + ), ], ), migrations.CreateModel( - name='Queue', + name="Queue", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('name', models.CharField(help_text=b'The name for this object (space-like characters will be automatically replaced by dashes)', max_length=100)), - ('memory_limit', models.IntegerField(help_text=b'In megabytes')), - ('time_limit', models.IntegerField(help_text=b'In minutes')), - ('nb_cores_per_slot', models.IntegerField()), - ('max_slots_per_user', models.IntegerField()), - ('environments', models.ManyToManyField(related_name='queues', to='backend.Environment')), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ( + "name", + models.CharField( + help_text=b"The name for this object (space-like characters will be automatically replaced by dashes)", + max_length=100, + ), + ), + ("memory_limit", models.IntegerField(help_text=b"In megabytes")), + ("time_limit", models.IntegerField(help_text=b"In minutes")), + ("nb_cores_per_slot", models.IntegerField()), + ("max_slots_per_user", models.IntegerField()), + ( + "environments", + models.ManyToManyField( + related_name="queues", to="backend.Environment" + ), + ), ], - options={ - 'permissions': [['can_access', 'Can access queue']], - }, + options={"permissions": [["can_access", "Can access queue"]]}, ), migrations.CreateModel( - name='QueueWorkerSlot', + name="QueueWorkerSlot", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('nb_slots', models.IntegerField(help_text=b'Number of processing slots to dedicate in this worker for a given queue', verbose_name=b'Number of slots')), - ('queue', models.ForeignKey(related_name='slots', to='backend.Queue', on_delete=models.CASCADE)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ( + "nb_slots", + models.IntegerField( + help_text=b"Number of processing slots to dedicate in this worker for a given queue", + verbose_name=b"Number of slots", + ), + ), + ( + "queue", + models.ForeignKey( + related_name="slots", + to="backend.Queue", + on_delete=models.CASCADE, + ), + ), ], ), migrations.CreateModel( - name='Worker', + name="Worker", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('name', models.CharField(help_text=b'The name for this object (space-like characters will be automatically replaced by dashes)', unique=True, max_length=100)), - ('active', models.BooleanField(default=True, help_text='If this worker is usable presently')), - ('nb_cores', models.IntegerField()), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ( + "name", + models.CharField( + help_text=b"The name for this object (space-like characters will be automatically replaced by dashes)", + unique=True, + max_length=100, + ), + ), + ( + "active", + models.BooleanField( + default=True, help_text="If this worker is usable presently" + ), + ), + ("nb_cores", models.IntegerField()), ], ), migrations.AddField( - model_name='queueworkerslot', - name='worker', - field=models.ForeignKey(related_name='slots', to='backend.Worker', on_delete=models.CASCADE), + model_name="queueworkerslot", + name="worker", + field=models.ForeignKey( + related_name="slots", to="backend.Worker", on_delete=models.CASCADE + ), ), migrations.AlterUniqueTogether( - name='queueworkerslot', - unique_together=set([('queue', 'worker', 'nb_slots')]), + name="queueworkerslot", + unique_together=set([("queue", "worker", "nb_slots")]), ), migrations.AlterUniqueTogether( - name='environment', - unique_together=set([('name', 'version')]), + name="environment", unique_together=set([("name", "version")]), ), ] diff --git a/beat/web/backend/migrations/0002_scheduler_addons.py b/beat/web/backend/migrations/0002_scheduler_addons.py index 8c3f5351fc4127c1cd55f54028f25ee90f523464..c85e16fdc3364da6db63383eb2e266891ef871ba 100644 --- a/beat/web/backend/migrations/0002_scheduler_addons.py +++ b/beat/web/backend/migrations/0002_scheduler_addons.py @@ -28,163 +28,278 @@ from __future__ import unicode_literals -from django.db import migrations, models +from django.db import migrations +from django.db import models class Migration(migrations.Migration): dependencies = [ - ('backend', '0001_initial'), + ("backend", "0001_initial"), ] operations = [ - migrations.RenameModel('QueueWorkerSlot', 'Slot'), + migrations.RenameModel("QueueWorkerSlot", "Slot"), migrations.RenameField( - model_name='slot', - old_name='nb_slots', - new_name='quantity', + model_name="slot", old_name="nb_slots", new_name="quantity", ), migrations.AddField( - model_name='slot', - name='priority', - field=models.PositiveIntegerField(default=0, help_text=b'Priority of these slots on the defined queue'), + model_name="slot", + name="priority", + field=models.PositiveIntegerField( + default=0, help_text=b"Priority of these slots on the defined queue" + ), ), migrations.AlterUniqueTogether( - name='slot', - unique_together=set([('queue', 'worker')]), + name="slot", unique_together=set([("queue", "worker")]), ), migrations.RenameField( - model_name='queue', - old_name='nb_cores_per_slot', - new_name='cores_per_slot', + model_name="queue", old_name="nb_cores_per_slot", new_name="cores_per_slot", ), migrations.AlterField( - model_name='worker', - name='active', - field=models.BooleanField(default=False, help_text='If this worker is usable presently'), + model_name="worker", + name="active", + field=models.BooleanField( + default=False, help_text="If this worker is usable presently" + ), ), migrations.RenameField( - model_name='worker', - old_name='nb_cores', - new_name='cores', + model_name="worker", old_name="nb_cores", new_name="cores", ), migrations.AddField( - model_name='worker', - name='info', - field=models.TextField(help_text=b'Informative message from the worker', null=True, blank=True), + model_name="worker", + name="info", + field=models.TextField( + help_text=b"Informative message from the worker", null=True, blank=True + ), ), migrations.AddField( - model_name='worker', - name='memory', - field=models.PositiveIntegerField(default=0, help_text=b'In megabytes'), + model_name="worker", + name="memory", + field=models.PositiveIntegerField(default=0, help_text=b"In megabytes"), ), migrations.AddField( - model_name='worker', - name='used_cores', - field=models.PositiveIntegerField(default=0, help_text=b'In %'), + model_name="worker", + name="used_cores", + field=models.PositiveIntegerField(default=0, help_text=b"In %"), ), migrations.AddField( - model_name='worker', - name='used_memory', - field=models.PositiveIntegerField(default=0, help_text=b'In %'), + model_name="worker", + name="used_memory", + field=models.PositiveIntegerField(default=0, help_text=b"In %"), ), migrations.AddField( - model_name='worker', - name='update', - field=models.BooleanField(default=False, help_text='If this worker state must be updated at the next cycle'), + model_name="worker", + name="update", + field=models.BooleanField( + default=False, + help_text="If this worker state must be updated at the next cycle", + ), ), migrations.AddField( - model_name='worker', - name='updated', + model_name="worker", + name="updated", field=models.DateTimeField(auto_now=True, null=True), ), migrations.AlterField( - model_name='queue', - name='name', - field=models.CharField(help_text=b'The name for this object (space-like characters will be automatically replaced by dashes)', unique=True, max_length=100), + model_name="queue", + name="name", + field=models.CharField( + help_text=b"The name for this object (space-like characters will be automatically replaced by dashes)", + unique=True, + max_length=100, + ), ), migrations.AlterField( - model_name='queue', - name='cores_per_slot', + model_name="queue", + name="cores_per_slot", field=models.PositiveIntegerField(), ), migrations.AlterField( - model_name='queue', - name='max_slots_per_user', + model_name="queue", + name="max_slots_per_user", field=models.PositiveIntegerField(), ), migrations.AlterField( - model_name='queue', - name='memory_limit', - field=models.PositiveIntegerField(help_text=b'In megabytes'), + model_name="queue", + name="memory_limit", + field=models.PositiveIntegerField(help_text=b"In megabytes"), ), migrations.AlterField( - model_name='queue', - name='time_limit', - field=models.PositiveIntegerField(help_text=b'In minutes'), + model_name="queue", + name="time_limit", + field=models.PositiveIntegerField(help_text=b"In minutes"), ), migrations.AlterField( - model_name='slot', - name='quantity', - field=models.PositiveIntegerField(help_text=b'Number of processing slots to dedicate in this worker for a given queue', verbose_name=b'Number of slots'), + model_name="slot", + name="quantity", + field=models.PositiveIntegerField( + help_text=b"Number of processing slots to dedicate in this worker for a given queue", + verbose_name=b"Number of slots", + ), ), migrations.AlterField( - model_name='worker', - name='cores', - field=models.PositiveIntegerField(), + model_name="worker", name="cores", field=models.PositiveIntegerField(), ), migrations.AlterField( - model_name='environment', - name='previous_version', - field=models.ForeignKey(related_name='next_versions', on_delete=models.deletion.SET_NULL, blank=True, to='backend.Environment', null=True), + model_name="environment", + name="previous_version", + field=models.ForeignKey( + related_name="next_versions", + on_delete=models.deletion.SET_NULL, + blank=True, + to="backend.Environment", + null=True, + ), ), migrations.CreateModel( - name='Result', + name="Result", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('status', models.IntegerField()), - ('stdout', models.TextField(null=True, blank=True)), - ('stderr', models.TextField(null=True, blank=True)), - ('usrerr', models.TextField(null=True, blank=True)), - ('syserr', models.TextField(null=True, blank=True)), - ('_stats', models.TextField(null=True, blank=True)), - ('timed_out', models.BooleanField(default=False)), - ('cancelled', models.BooleanField(default=False)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("status", models.IntegerField()), + ("stdout", models.TextField(null=True, blank=True)), + ("stderr", models.TextField(null=True, blank=True)), + ("usrerr", models.TextField(null=True, blank=True)), + ("syserr", models.TextField(null=True, blank=True)), + ("_stats", models.TextField(null=True, blank=True)), + ("timed_out", models.BooleanField(default=False)), + ("cancelled", models.BooleanField(default=False)), ], ), migrations.CreateModel( - name='Job', + name="Job", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Cancel')])), - ('runnable_date', models.DateTimeField(null=True, blank=True)), - ('start_date', models.DateTimeField(null=True, blank=True)), - ('end_date', models.DateTimeField(null=True, blank=True)), - ('split_errors', models.PositiveIntegerField(default=0)), - ('block', models.OneToOneField(related_name='job', on_delete=models.deletion.CASCADE, to='experiments.Block', null=True)), - ('parent', models.OneToOneField(related_name='child', to='backend.Job', null=True, on_delete=models.deletion.SET_NULL)), - ('result', models.OneToOneField(to='backend.Result', null=True, on_delete=models.deletion.CASCADE, related_name='job')), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ( + "status", + models.CharField( + default=b"N", + max_length=1, + choices=[ + (b"N", b"Queued"), + (b"P", b"Processing"), + (b"C", b"Completed"), + (b"F", b"Failed"), + (b"S", b"Skipped"), + (b"L", b"Cancelled"), + (b"K", b"Cancel"), + ], + ), + ), + ("runnable_date", models.DateTimeField(null=True, blank=True)), + ("start_date", models.DateTimeField(null=True, blank=True)), + ("end_date", models.DateTimeField(null=True, blank=True)), + ("split_errors", models.PositiveIntegerField(default=0)), + ( + "block", + models.OneToOneField( + related_name="job", + on_delete=models.deletion.CASCADE, + to="experiments.Block", + null=True, + ), + ), + ( + "parent", + models.OneToOneField( + related_name="child", + to="backend.Job", + null=True, + on_delete=models.deletion.SET_NULL, + ), + ), + ( + "result", + models.OneToOneField( + to="backend.Result", + null=True, + on_delete=models.deletion.CASCADE, + related_name="job", + ), + ), ], ), migrations.CreateModel( - name='JobSplit', + name="JobSplit", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('split_index', models.PositiveIntegerField()), - ('start_index', models.PositiveIntegerField(null=True)), - ('end_index', models.PositiveIntegerField(null=True)), - ('cache_errors', models.PositiveIntegerField(default=0)), - ('status', models.CharField(default=b'N', max_length=1, choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'S', b'Skipped'), (b'L', b'Cancelled'), (b'K', b'Cancel')])), - ('start_date', models.DateTimeField(null=True)), - ('end_date', models.DateTimeField(null=True)), - ('process_id', models.PositiveIntegerField(null=True)), - ('job', models.ForeignKey(related_name='splits', to='backend.Job', null=True, on_delete=models.SET_NULL)), - ('worker', models.ForeignKey(related_name='splits', on_delete=models.deletion.SET_NULL, to='backend.Worker', null=True)), - ('result', models.OneToOneField(to='backend.Result', related_name='split', null=True, on_delete=models.deletion.CASCADE)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("split_index", models.PositiveIntegerField()), + ("start_index", models.PositiveIntegerField(null=True)), + ("end_index", models.PositiveIntegerField(null=True)), + ("cache_errors", models.PositiveIntegerField(default=0)), + ( + "status", + models.CharField( + default=b"N", + max_length=1, + choices=[ + (b"N", b"Queued"), + (b"P", b"Processing"), + (b"C", b"Completed"), + (b"F", b"Failed"), + (b"S", b"Skipped"), + (b"L", b"Cancelled"), + (b"K", b"Cancel"), + ], + ), + ), + ("start_date", models.DateTimeField(null=True)), + ("end_date", models.DateTimeField(null=True)), + ("process_id", models.PositiveIntegerField(null=True)), + ( + "job", + models.ForeignKey( + related_name="splits", + to="backend.Job", + null=True, + on_delete=models.SET_NULL, + ), + ), + ( + "worker", + models.ForeignKey( + related_name="splits", + on_delete=models.deletion.SET_NULL, + to="backend.Worker", + null=True, + ), + ), + ( + "result", + models.OneToOneField( + to="backend.Result", + related_name="split", + null=True, + on_delete=models.deletion.CASCADE, + ), + ), ], ), migrations.AlterUniqueTogether( - name='jobsplit', - unique_together=set([('job', 'split_index')]), + name="jobsplit", unique_together=set([("job", "split_index")]), ), ] diff --git a/beat/web/backend/migrations/0003_remove_result_syserr.py b/beat/web/backend/migrations/0003_remove_result_syserr.py index 4d779b4fd0f4b614123f66c48eb4b2f9d15f9ad4..91efa5895a900387c83c12f3da49b3ae1642f57a 100644 --- a/beat/web/backend/migrations/0003_remove_result_syserr.py +++ b/beat/web/backend/migrations/0003_remove_result_syserr.py @@ -8,12 +8,9 @@ from django.db import migrations class Migration(migrations.Migration): dependencies = [ - ('backend', '0002_scheduler_addons'), + ("backend", "0002_scheduler_addons"), ] operations = [ - migrations.RemoveField( - model_name='result', - name='syserr', - ), + migrations.RemoveField(model_name="result", name="syserr",), ] diff --git a/beat/web/backend/migrations/0004_environmentlanguage.py b/beat/web/backend/migrations/0004_environmentlanguage.py index 53a3594d3c4ab75314e61f0e0e21aaeecfcb9838..eef6c322a5bd53d148f436b54c9d8ec67b160c15 100644 --- a/beat/web/backend/migrations/0004_environmentlanguage.py +++ b/beat/web/backend/migrations/0004_environmentlanguage.py @@ -2,8 +2,9 @@ # Generated by Django 1.9.11 on 2016-11-23 12:18 from __future__ import unicode_literals -from django.db import migrations, models import django.db.models.deletion +from django.db import migrations +from django.db import models def add_default_language(apps, schema_editor): @@ -11,23 +12,51 @@ def add_default_language(apps, schema_editor): EnvironmentLanguage = apps.get_model("backend", "EnvironmentLanguage") for env in Environment.objects.all(): - lang = EnvironmentLanguage(language='P', environment=env) + lang = EnvironmentLanguage(language="P", environment=env) lang.save() class Migration(migrations.Migration): dependencies = [ - ('backend', '0003_remove_result_syserr'), + ("backend", "0003_remove_result_syserr"), ] operations = [ migrations.CreateModel( - name='EnvironmentLanguage', + name="EnvironmentLanguage", fields=[ - ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('language', models.CharField(choices=[(b'U', b'Unknown'), (b'C', b'Cxx'), (b'M', b'Matlab'), (b'P', b'Python'), (b'R', b'R')], default=b'P', max_length=1)), - ('environment', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='languages', to='backend.Environment')), + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "language", + models.CharField( + choices=[ + (b"U", b"Unknown"), + (b"C", b"Cxx"), + (b"M", b"Matlab"), + (b"P", b"Python"), + (b"R", b"R"), + ], + default=b"P", + max_length=1, + ), + ), + ( + "environment", + models.ForeignKey( + on_delete=django.db.models.deletion.CASCADE, + related_name="languages", + to="backend.Environment", + ), + ), ], ), migrations.RunPython(add_default_language, migrations.RunPython.noop), diff --git a/beat/web/backend/migrations/0005_job_modifications.py b/beat/web/backend/migrations/0005_job_modifications.py index f9cfb4f38c62d5544118258a66846905dd182fd5..f023e9524e782fed53ccbe29f4c6702a118c0f48 100644 --- a/beat/web/backend/migrations/0005_job_modifications.py +++ b/beat/web/backend/migrations/0005_job_modifications.py @@ -2,50 +2,45 @@ # Generated by Django 1.9.13 on 2017-09-29 16:54 from __future__ import unicode_literals -from django.db import migrations, models +from django.db import migrations +from django.db import models class Migration(migrations.Migration): dependencies = [ - ('backend', '0004_environmentlanguage'), + ("backend", "0004_environmentlanguage"), ] operations = [ - migrations.RemoveField( - model_name='job', - name='parent', - ), - migrations.RemoveField( - model_name='job', - name='split_errors', - ), + migrations.RemoveField(model_name="job", name="parent",), + migrations.RemoveField(model_name="job", name="split_errors",), migrations.AddField( - model_name='job', - name='key', - field=models.CharField(default='', max_length=64), + model_name="job", + name="key", + field=models.CharField(default="", max_length=64), preserve_default=False, ), migrations.AddField( - model_name='job', - name='mirror', - field=models.BooleanField(default=False), - ), - migrations.RemoveField( - model_name='job', - name='status', - ), - migrations.RemoveField( - model_name='jobsplit', - name='cache_errors', - ), - migrations.RemoveField( - model_name='jobsplit', - name='process_id', + model_name="job", name="mirror", field=models.BooleanField(default=False), ), + migrations.RemoveField(model_name="job", name="status",), + migrations.RemoveField(model_name="jobsplit", name="cache_errors",), + migrations.RemoveField(model_name="jobsplit", name="process_id",), migrations.AlterField( - model_name='jobsplit', - name='status', - field=models.CharField(choices=[(b'N', b'Queued'), (b'P', b'Processing'), (b'C', b'Completed'), (b'F', b'Failed'), (b'L', b'Cancelled'), (b'K', b'Cancelling')], default=b'N', max_length=1), + model_name="jobsplit", + name="status", + field=models.CharField( + choices=[ + (b"N", b"Queued"), + (b"P", b"Processing"), + (b"C", b"Completed"), + (b"F", b"Failed"), + (b"L", b"Cancelled"), + (b"K", b"Cancelling"), + ], + default=b"N", + max_length=1, + ), ), ] diff --git a/beat/web/backend/migrations/0006_localschedulerprocesses.py b/beat/web/backend/migrations/0006_localschedulerprocesses.py index 5b0c888891c365486068c4fdb73fac0a433bdc3f..52a15dbcc874f55f1e53dd5aa43ec30c82b9329f 100644 --- a/beat/web/backend/migrations/0006_localschedulerprocesses.py +++ b/beat/web/backend/migrations/0006_localschedulerprocesses.py @@ -2,22 +2,31 @@ # Generated by Django 1.9.13 on 2017-10-12 15:12 from __future__ import unicode_literals -from django.db import migrations, models +from django.db import migrations +from django.db import models class Migration(migrations.Migration): dependencies = [ - ('backend', '0005_job_modifications'), + ("backend", "0005_job_modifications"), ] operations = [ migrations.CreateModel( - name='LocalSchedulerProcesses', + name="LocalSchedulerProcesses", fields=[ - ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('name', models.TextField()), - ('pid', models.IntegerField()), + ( + "id", + models.AutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ("name", models.TextField()), + ("pid", models.IntegerField()), ], ), ] diff --git a/beat/web/backend/models/__init__.py b/beat/web/backend/models/__init__.py index 6b2e0d1af4ea836c49177047c56e4ce142d8f35e..50a277e0a6f3f4a81bf9d5b6444865d296a01583 100755 --- a/beat/web/backend/models/__init__.py +++ b/beat/web/backend/models/__init__.py @@ -26,16 +26,32 @@ ############################################################################### -from .environment import EnvironmentManager from .environment import Environment from .environment import EnvironmentLanguage +from .environment import EnvironmentManager from .job import Job from .job import JobSplit from .local_scheduler import LocalSchedulerProcesses -from .queue import QueueManager from .queue import Queue +from .queue import QueueManager from .result import Result -from .slot import SlotManager from .slot import Slot -from .worker import WorkerManager +from .slot import SlotManager from .worker import Worker +from .worker import WorkerManager + +__all__ = [ + "Environment", + "EnvironmentLanguage", + "EnvironmentManager", + "Job", + "JobSplit", + "LocalSchedulerProcesses", + "Queue", + "QueueManager", + "Result", + "Slot", + "SlotManager", + "Worker", + "WorkerManager", +] diff --git a/beat/web/backend/models/environment.py b/beat/web/backend/models/environment.py index 346a15a8e8295d3242bddd815a35c52a2cd511a7..7dfe83195775c3b7561b2c18891e0fbc6ac59245 100755 --- a/beat/web/backend/models/environment.py +++ b/beat/web/backend/models/environment.py @@ -26,15 +26,15 @@ ############################################################################### from django.db import models +from django.db.models import Count +from django.db.models import Q from django.urls import reverse -from django.db.models import Count, Q from ...code.models import Code from ...common.models import Shareable from ...common.models import ShareableManager from ...common.texts import Messages - # ---------------------------------------------------------- @@ -58,8 +58,8 @@ class EnvironmentManager(ShareableManager): used for blocks that are done. """ - from ...experiments.models import Experiment from ...experiments.models import Block + from ...experiments.models import Experiment # Tries to figure through a maximum if blocks in the list have been # successfully used inside an environment. @@ -165,7 +165,9 @@ class Environment(Shareable): class EnvironmentLanguage(models.Model): - environment = models.ForeignKey(Environment, related_name="languages", on_delete=models.CASCADE) + environment = models.ForeignKey( + Environment, related_name="languages", on_delete=models.CASCADE + ) language = models.CharField( max_length=1, choices=Code.CODE_LANGUAGE, default=Code.PYTHON diff --git a/beat/web/backend/models/job.py b/beat/web/backend/models/job.py index 2ddd21cf477f322417bd4200beb3b5ca5ba1901d..4efe1c616c9748d760e14a358ece07bc15480a0d 100755 --- a/beat/web/backend/models/job.py +++ b/beat/web/backend/models/job.py @@ -26,32 +26,28 @@ ############################################################################### import datetime - import logging -logger = logging.getLogger(__name__) import simplejson - -from django.db import utils -from django.db import models from django.conf import settings +from django.db import models import beat.core.data import beat.core.hash from .result import Result +logger = logging.getLogger(__name__) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobManager(models.Manager): - def create_job(self, block): # Compute the key of the job - hashes = [ x.hash for x in block.outputs.order_by('hash') ] - key = beat.core.hash.hash(''.join(hashes)) + hashes = [x.hash for x in block.outputs.order_by("hash")] + key = beat.core.hash.hash("".join(hashes)) # Determine if the job can be run or is dependent on others runnable_date = None @@ -59,28 +55,26 @@ class JobManager(models.Manager): runnable_date = datetime.datetime.now() # Create the job - job = self.model( - block=block, - key=key, - runnable_date=runnable_date - ) + job = self.model(block=block, key=key, runnable_date=runnable_date) job.save() return job -#---------------------------------------------------------- +# ---------------------------------------------------------- class Job(models.Model): - '''Class describing the execution of a Job on the backend''' + """Class describing the execution of a Job on the backend""" - block = models.OneToOneField('experiments.Block', null=True, - on_delete=models.CASCADE, related_name='job') + block = models.OneToOneField( + "experiments.Block", null=True, on_delete=models.CASCADE, related_name="job" + ) - result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE, - related_name='job') + result = models.OneToOneField( + Result, null=True, on_delete=models.CASCADE, related_name="job" + ) runnable_date = models.DateTimeField(null=True, blank=True) @@ -92,23 +86,23 @@ class Job(models.Model): mirror = models.BooleanField(default=False) - objects = JobManager() - def __str__(self): - return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % \ - (self.block.name, self.block.experiment.name, - self.key, str(self.mirror), - self.block.required_slots, - self.block.queue.cores_per_slot) + return "Job(%s, %s, key=%s, mirror=%s, splits=%d, cores=%d)" % ( + self.block.name, + self.block.experiment.name, + self.key, + str(self.mirror), + self.block.required_slots, + self.block.queue.cores_per_slot, + ) -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplitManager(models.Manager): - def create_splits(self, job): # If no splitting is required, only create one split @@ -117,21 +111,29 @@ class JobSplitManager(models.Manager): split.save() return [split] - # Retrieve the list of synchronized inputs configuration = simplejson.loads(job.block.command) - inputs = [ entry for name, entry in configuration['inputs'].items() - if entry['channel'] == configuration['channel'] ] - + inputs = [ + entry + for name, entry in configuration["inputs"].items() + if entry["channel"] == configuration["channel"] + ] # Load the list of indices for each inputs indices = [] for input_cfg in inputs: - if 'database' in input_cfg: - indices.extend(beat.core.data.load_data_index_db(settings.CACHE_ROOT, input_cfg['path'])) + if "database" in input_cfg: + indices.extend( + beat.core.data.load_data_index_db( + settings.CACHE_ROOT, input_cfg["path"] + ) + ) else: - indices.append(beat.core.data.load_data_index(settings.CACHE_ROOT, input_cfg['path'])) - + indices.append( + beat.core.data.load_data_index( + settings.CACHE_ROOT, input_cfg["path"] + ) + ) # Attempt to split the indices nb_splits = job.block.required_slots @@ -146,21 +148,23 @@ class JobSplitManager(models.Manager): nb_splits -= 1 if nb_splits != job.block.required_slots: - message = "The processing of the block `%s' of experiment `%s' " \ - "was splitted in %d instead of the requested %d" % \ - (job.block.name, job.block.experiment.fullname(), - nb_splits, job.block.required_slots) - logger.warning(message) - + message = ( + "The processing of the block `%s' of experiment `%s' " + "was splitted in %d instead of the requested %d" + % ( + job.block.name, + job.block.experiment.fullname(), + nb_splits, + job.block.required_slots, + ) + ) + logger.warning(message) # Create the necessary splits and assign the ranges splits = [] for i, indices in enumerate(split_indices): split = JobSplit( - job=job, - split_index=i, - start_index=indices[0], - end_index=indices[1], + job=job, split_index=i, start_index=indices[0], end_index=indices[1], ) split.save() @@ -169,34 +173,35 @@ class JobSplitManager(models.Manager): return splits -#---------------------------------------------------------- +# ---------------------------------------------------------- class JobSplit(models.Model): - '''Class describing a part of job of an experiment''' + """Class describing a part of job of an experiment""" - QUEUED = 'N' - PROCESSING = 'P' - COMPLETED = 'C' - FAILED = 'F' - CANCELLED = 'L' - CANCELLING = 'K' + QUEUED = "N" + PROCESSING = "P" + COMPLETED = "C" + FAILED = "F" + CANCELLED = "L" + CANCELLING = "K" STATUS = ( - (QUEUED, 'Queued'), - (PROCESSING, 'Processing'), - (COMPLETED, 'Completed'), - (FAILED, 'Failed'), - (CANCELLED, 'Cancelled'), - (CANCELLING, 'Cancelling'), + (QUEUED, "Queued"), + (PROCESSING, "Processing"), + (COMPLETED, "Completed"), + (FAILED, "Failed"), + (CANCELLED, "Cancelled"), + (CANCELLING, "Cancelling"), ) + worker = models.ForeignKey( + "Worker", null=True, on_delete=models.SET_NULL, related_name="splits" + ) - worker = models.ForeignKey('Worker', null=True, on_delete=models.SET_NULL, - related_name='splits') - - job = models.ForeignKey(Job, null=True, on_delete=models.CASCADE, - related_name='splits') + job = models.ForeignKey( + Job, null=True, on_delete=models.CASCADE, related_name="splits" + ) split_index = models.PositiveIntegerField() @@ -206,32 +211,28 @@ class JobSplit(models.Model): status = models.CharField(max_length=1, choices=STATUS, default=QUEUED) - result = models.OneToOneField(Result, null=True, on_delete=models.CASCADE, - related_name='split') + result = models.OneToOneField( + Result, null=True, on_delete=models.CASCADE, related_name="split" + ) start_date = models.DateTimeField(null=True) end_date = models.DateTimeField(null=True) - objects = JobSplitManager() - class Meta: - unique_together = ('job', 'split_index') - + unique_together = ("job", "split_index") def __str__(self): - return "JobSplit(%s, index=%d, state=%s)%s" % \ - (self.job, self.split_index, self.status, - ('@%s' % self.worker) if self.worker else '') - + return "JobSplit(%s, index=%d, state=%s)%s" % ( + self.job, + self.split_index, + self.status, + ("@%s" % self.worker) if self.worker else "", + ) def done(self): - '''Says whether the job has finished or not''' + """Says whether the job has finished or not""" - return self.status in ( - JobSplit.COMPLETED, - JobSplit.FAILED, - JobSplit.CANCELLED, - ) + return self.status in (JobSplit.COMPLETED, JobSplit.FAILED, JobSplit.CANCELLED,) diff --git a/beat/web/backend/models/local_scheduler.py b/beat/web/backend/models/local_scheduler.py index 8dad9a2c9fe8db282935c6f7d37f078244aba81d..f8e842cd27ab3762144c80f0fd891f29f50d2328 100755 --- a/beat/web/backend/models/local_scheduler.py +++ b/beat/web/backend/models/local_scheduler.py @@ -30,11 +30,10 @@ from django.db import models class LocalSchedulerProcesses(models.Model): - '''Information about the processes launched by the local scheduler''' + """Information about the processes launched by the local scheduler""" name = models.TextField() pid = models.IntegerField() - def __str__(self): - return '%s (pid = %d)' % (self.name, self.pid) + return "%s (pid = %d)" % (self.name, self.pid) diff --git a/beat/web/backend/models/queue.py b/beat/web/backend/models/queue.py index 75b94d93b6fe5dc5f76939948054714e09b78bb0..4d349b243bdb9f4bee8605bb68c5852a20a5ad03 100755 --- a/beat/web/backend/models/queue.py +++ b/beat/web/backend/models/queue.py @@ -28,104 +28,93 @@ import operator +from django.contrib.auth.models import Group from django.db import models from django.urls import reverse -from django.contrib.auth.models import Group from django.utils.translation import ugettext_lazy as _ - from guardian.shortcuts import get_perms from ...common.texts import Messages - -#---------------------------------------------------------- +# ---------------------------------------------------------- class QueueManager(models.Manager): - def get_by_natural_key(self, name): return self.get(name=name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Queue(models.Model): - name = models.CharField(max_length=100, help_text=Messages['name'], - unique=True) + name = models.CharField(max_length=100, help_text=Messages["name"], unique=True) - memory_limit = models.PositiveIntegerField(help_text='In megabytes') + memory_limit = models.PositiveIntegerField(help_text="In megabytes") - time_limit = models.PositiveIntegerField(help_text='In minutes') + time_limit = models.PositiveIntegerField(help_text="In minutes") cores_per_slot = models.PositiveIntegerField() max_slots_per_user = models.PositiveIntegerField() - environments = models.ManyToManyField('Environment', related_name='queues') - + environments = models.ManyToManyField("Environment", related_name="queues") objects = QueueManager() - - #_____ Meta parameters __________ + # _____ Meta parameters __________ class Meta: - permissions = [ - ['can_access', _('Can access queue')] - ] - + permissions = [["can_access", _("Can access queue")]] - #_____ Overrides __________ + # _____ Overrides __________ def __str__(self): - return '%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)' % ( - self.name, - self.time_limit, - self.memory_limit, - self.cores_per_slot, - self.max_slots_per_user + return ( + "%s (%d minutes, %d megabytes, %d cores per slot, %d slots max per user)" + % ( + self.name, + self.time_limit, + self.memory_limit, + self.cores_per_slot, + self.max_slots_per_user, + ) ) - def natural_key(self): return (self.name,) - def get_admin_change_url(self): - return reverse('admin:backend_queue_change', args=(self.id,)) - + return reverse("admin:backend_queue_change", args=(self.id,)) - #_____ Utilities __________ + # _____ Utilities __________ def number_of_slots(self): - '''Total number of slots considering all assigned worker/slots''' + """Total number of slots considering all assigned worker/slots""" r = self.slots.filter(worker__active=True) - return r.aggregate(nslots=models.Sum('quantity'))['nslots'] or 0 - + return r.aggregate(nslots=models.Sum("quantity"))["nslots"] or 0 def availability(self): - '''Returns the availability for this queue in terms of number of slots + """Returns the availability for this queue in terms of number of slots This method does not take into consideration the occupation of this queue slots caused by jobs on other queues. It only looks to its inner occupancy and reports on that. Returns an integer between 0 and :py:meth:`Queue.slots`. - ''' + """ from ..models import JobSplit - from ..models import Job - running = JobSplit.objects.filter(job__block__in=self.blocks.all(), - status=JobSplit.PROCESSING).count() + running = JobSplit.objects.filter( + job__block__in=self.blocks.all(), status=JobSplit.PROCESSING + ).count() return max(self.number_of_slots() - running, 0) - def worker_availability(self): - '''Returns an ordered dictionary indicating the availability of workers + """Returns an ordered dictionary indicating the availability of workers according to their queue priority. The dictionary contains, as value, the number of slots available per @@ -137,34 +126,42 @@ class Queue(models.Model): * load (the lower, the better) * name (alphabetically) - ''' + """ - workers = [(k.worker, -k.priority, k.worker.load(), k.worker.name) \ - for k in self.slots.filter(worker__active=True)] + workers = [ + (k.worker, -k.priority, k.worker.load(), k.worker.name) + for k in self.slots.filter(worker__active=True) + ] - workers = sorted(workers, key=operator.itemgetter(1,2,3)) + workers = sorted(workers, key=operator.itemgetter(1, 2, 3)) return [w[0] for w in workers] - def splits(self): - '''Lists all job splits currently associated to this queue''' + """Lists all job splits currently associated to this queue""" + from ..models import JobSplit - from ..models import Job return JobSplit.objects.filter(job__block__queue=self) - def as_dict(self): - '''Returns a representation as a dictionary''' + """Returns a representation as a dictionary""" return { - 'memory-limit': self.memory_limit, - 'time-limit': self.time_limit, - 'cores-per-slot': self.cores_per_slot, - 'max-slots-per-user': self.max_slots_per_user, - 'environments': [k.fullname() for k in self.environments.all()], - 'slots': dict([(s.worker.name, dict(quantity=s.quantity, priority=s.priority)) - for s in self.slots.all()]), - 'groups': [k.name for k in Group.objects.all() if 'can_access' in get_perms(k, self)] + "memory-limit": self.memory_limit, + "time-limit": self.time_limit, + "cores-per-slot": self.cores_per_slot, + "max-slots-per-user": self.max_slots_per_user, + "environments": [k.fullname() for k in self.environments.all()], + "slots": dict( + [ + (s.worker.name, dict(quantity=s.quantity, priority=s.priority)) + for s in self.slots.all() + ] + ), + "groups": [ + k.name + for k in Group.objects.all() + if "can_access" in get_perms(k, self) + ], } diff --git a/beat/web/backend/models/result.py b/beat/web/backend/models/result.py index c58603e98e077b609e2a1332820369c9c70add6c..f000a2860698ef366d8d6b13738db7c0a2d1d2f9 100755 --- a/beat/web/backend/models/result.py +++ b/beat/web/backend/models/result.py @@ -26,17 +26,15 @@ ############################################################################### -from django.db import models - import simplejson +from django.db import models import beat.core.stats - class Result(models.Model): - '''Logging and status information concerning block or job execution. - ''' + """Logging and status information concerning block or job execution. + """ # exit status code status = models.IntegerField() @@ -47,33 +45,29 @@ class Result(models.Model): timed_out = models.BooleanField(default=False) cancelled = models.BooleanField(default=False) - def __str__(self): - status = 'success' if self.status == 0 else 'failed' - retval = 'Result(%s' % status + status = "success" if self.status == 0 else "failed" + retval = "Result(%s" % status if self.stdout: - retval += ', stdout=' + self.stdout + retval += ", stdout=" + self.stdout if self.stderr: - retval += ', stderr=' + self.stderr + retval += ", stderr=" + self.stderr if self.usrerr: - retval += ', usrerr=' + self.usrerr + retval += ", usrerr=" + self.usrerr - retval += ')' + retval += ")" return retval - def _get_stats(self): if self._stats is not None: return beat.core.stats.Statistics(simplejson.loads(self._stats)) else: return beat.core.stats.Statistics() - def _set_stats(self, v): self._stats = simplejson.dumps(v.as_dict()) - stats = property(_get_stats, _set_stats) diff --git a/beat/web/backend/models/slot.py b/beat/web/backend/models/slot.py index 17b10a7d5b6bd24802250bc6d7063677b8bc58b0..6e2eea908805ecf476acbc01547b51365446be9a 100755 --- a/beat/web/backend/models/slot.py +++ b/beat/web/backend/models/slot.py @@ -28,51 +28,48 @@ from django.db import models - -#---------------------------------------------------------- +# ---------------------------------------------------------- class SlotManager(models.Manager): - def get_by_natural_key(self, queue_name, worker_name): return self.get(queue__name=queue_name, worker__name=worker_name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Slot(models.Model): - queue = models.ForeignKey('Queue', related_name='slots', on_delete=models.CASCADE) + queue = models.ForeignKey("Queue", related_name="slots", on_delete=models.CASCADE) - worker = models.ForeignKey('Worker', related_name='slots', on_delete=models.CASCADE) + worker = models.ForeignKey("Worker", related_name="slots", on_delete=models.CASCADE) quantity = models.PositiveIntegerField( - 'Number of slots', - help_text='Number of processing slots to dedicate in this worker for a given queue' + "Number of slots", + help_text="Number of processing slots to dedicate in this worker for a given queue", ) priority = models.PositiveIntegerField( - default=0, - help_text='Priority of these slots on the defined queue' + default=0, help_text="Priority of these slots on the defined queue" ) - objects = SlotManager() - - #_____ Meta parameters __________ + # _____ Meta parameters __________ class Meta: - unique_together = ('queue', 'worker') - + unique_together = ("queue", "worker") - #_____ Overrides __________ + # _____ Overrides __________ def __str__(self): - return '%s - %s (slots: %d, priority: %d)' % (self.queue, self.worker, - self.quantity, self.priority) - + return "%s - %s (slots: %d, priority: %d)" % ( + self.queue, + self.worker, + self.quantity, + self.priority, + ) def natural_key(self): return (self.queue.name, self.worker.name) diff --git a/beat/web/backend/models/worker.py b/beat/web/backend/models/worker.py index 1897a249bc84af97b8c1c09ada2d12e8cd0dce20..cc70226fff3e973608a5977a1cd466e5d0fe4dcf 100755 --- a/beat/web/backend/models/worker.py +++ b/beat/web/backend/models/worker.py @@ -25,27 +25,22 @@ # # ############################################################################### -import os -import signal -import subprocess -import psutil - import logging -logger = logging.getLogger(__name__) +import psutil from django.db import models -from django.db import transaction -from django.conf import settings from django.urls import reverse from ...common.texts import Messages +from .job import JobSplit +logger = logging.getLogger(__name__) -#---------------------------------------------------------- +# ---------------------------------------------------------- def _cleanup_zombies(): - '''Cleans-up eventual zombie subprocesses launched by the worker''' + """Cleans-up eventual zombie subprocesses launched by the worker""" for child in psutil.Process().children(recursive=True): try: @@ -56,33 +51,27 @@ def _cleanup_zombies(): pass -#---------------------------------------------------------- +# ---------------------------------------------------------- class WorkerManager(models.Manager): - def get_by_natural_key(self, name): return self.get(name=name) -#---------------------------------------------------------- +# ---------------------------------------------------------- class Worker(models.Model): - name = models.CharField( - max_length=100, - help_text=Messages['name'], - unique=True, - ) + name = models.CharField(max_length=100, help_text=Messages["name"], unique=True,) active = models.BooleanField( - help_text=u'If this worker is usable presently', - default=False, + help_text=u"If this worker is usable presently", default=False, ) update = models.BooleanField( - help_text=u'If this worker state must be updated at the next cycle', + help_text=u"If this worker state must be updated at the next cycle", default=False, ) @@ -90,52 +79,49 @@ class Worker(models.Model): cores = models.PositiveIntegerField() - memory = models.PositiveIntegerField(default=0, help_text='In megabytes') - - used_cores = models.PositiveIntegerField(default=0, help_text='In %') + memory = models.PositiveIntegerField(default=0, help_text="In megabytes") - used_memory = models.PositiveIntegerField(default=0, help_text='In %') + used_cores = models.PositiveIntegerField(default=0, help_text="In %") - info = models.TextField(null=True, blank=True, - help_text='Informative message from the worker') + used_memory = models.PositiveIntegerField(default=0, help_text="In %") + info = models.TextField( + null=True, blank=True, help_text="Informative message from the worker" + ) objects = WorkerManager() - - #_____ Overrides __________ - + # _____ Overrides __________ def __str__(self): - retval = '%s (%d cores, %d Mb)' % (self.name, self.cores, self.memory) + retval = "%s (%d cores, %d Mb)" % (self.name, self.cores, self.memory) if not self.active: - retval += ' [INACTIVE]' + retval += " [INACTIVE]" return retval - def natural_key(self): return (self.name,) - def get_admin_change_url(self): - return reverse('admin:backend_worker_change', args=(self.id,)) - + return reverse("admin:backend_worker_change", args=(self.id,)) def load(self): - '''Calculates the number of cores in use or to be used in the future''' + """Calculates the number of cores in use or to be used in the future""" return sum([j.job.block.queue.cores_per_slot for j in self.splits.all()]) - def current_load(self): - '''Calculates the number of cores being used currently''' - return sum([j.job.block.queue.cores_per_slot for j in self.splits.filter(status=JobSplit.PROCESSING)]) - + """Calculates the number of cores being used currently""" + return sum( + [ + j.job.block.queue.cores_per_slot + for j in self.splits.filter(status=JobSplit.PROCESSING) + ] + ) def available_cores(self): - '''Calculates the number of available cores considering current load''' + """Calculates the number of available cores considering current load""" return max(self.cores - self.load(), 0) - def as_dict(self): - '''Returns a dictionary-like representation''' + """Returns a dictionary-like representation""" return dict(cores=self.cores, memory=self.memory) diff --git a/beat/web/backend/state.py b/beat/web/backend/state.py index 1086f1119a95357d5f7e9f199d23262751555816..be88b1aba09cab5096d9896ab313339f03dd2c27 100755 --- a/beat/web/backend/state.py +++ b/beat/web/backend/state.py @@ -25,24 +25,21 @@ # # ############################################################################### -'''Utilities for summarizing scheduler state''' - - -import os +"""Utilities for summarizing scheduler state""" import logging -logger = logging.getLogger(__name__) +import os import psutil - from django.conf import settings -from .models import Job, JobSplit - from ..experiments.models import Experiment +from .models import JobSplit + +logger = logging.getLogger(__name__) def cache(): - '''Returns a current cache state''' + """Returns a current cache state""" if not os.path.exists(settings.CACHE_ROOT): os.makedirs(settings.CACHE_ROOT) @@ -53,10 +50,10 @@ def cache(): MB = 1024 * 1024 return { - 'size-in-megabytes': df.used / MB, - 'capacity-in-megabytes': df.total / MB, - 'free': df.free / MB, - 'percent-used': df.percent, + "size-in-megabytes": df.used / MB, + "capacity-in-megabytes": df.total / MB, + "free": df.free / MB, + "percent-used": df.percent, } @@ -77,8 +74,13 @@ def experiments(): return dict( running=Experiment.objects.filter(status=Experiment.RUNNING).count(), scheduled=Experiment.objects.filter(status=Experiment.SCHEDULED).count(), - ls=Experiment.objects.filter(status__in=(Experiment.RUNNING, - Experiment.SCHEDULED)).order_by('author__username', - 'toolchain__author__username', 'toolchain__name', - 'toolchain__version', 'name'), + ls=Experiment.objects.filter( + status__in=(Experiment.RUNNING, Experiment.SCHEDULED) + ).order_by( + "author__username", + "toolchain__author__username", + "toolchain__name", + "toolchain__version", + "name", + ), ) diff --git a/beat/web/backend/templates/backend/environment_list.html b/beat/web/backend/templates/backend/environment_list.html index 7b9b27d4bd2c1db6720c55a7082310ee25f3fd3d..76708f0b0b60b67bd561b7d6a364ecc53eb334f7 100644 --- a/beat/web/backend/templates/backend/environment_list.html +++ b/beat/web/backend/templates/backend/environment_list.html @@ -2,21 +2,21 @@ {% comment %} * Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ * Contact: beat.support@idiap.ch - * + * * This file is part of the beat.web module of the BEAT platform. - * + * * Commercial License Usage * Licensees holding valid commercial BEAT licenses may use this file in * accordance with the terms contained in a written agreement between you * and Idiap. For further information contact tto@idiap.ch - * + * * Alternatively, this file may be used under the terms of the GNU Affero * Public License version 3 as published by the Free Software and appearing * in the file LICENSE.AGPL included in the packaging of this file. * The BEAT platform is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. - * + * * You should have received a copy of the GNU Affero Public License along * with the BEAT platform. If not, see http://www.gnu.org/licenses/. {% endcomment %} diff --git a/beat/web/backend/templates/backend/panels/environment_actions.html b/beat/web/backend/templates/backend/panels/environment_actions.html index 1950395aab46bfa207094e6bb1be785e1e3a52af..08467aba4219d022c505186a9f7be769fb5b07b5 100644 --- a/beat/web/backend/templates/backend/panels/environment_actions.html +++ b/beat/web/backend/templates/backend/panels/environment_actions.html @@ -1,21 +1,21 @@ {% comment %} * Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ * Contact: beat.support@idiap.ch - * + * * This file is part of the beat.web module of the BEAT platform. - * + * * Commercial License Usage * Licensees holding valid commercial BEAT licenses may use this file in * accordance with the terms contained in a written agreement between you * and Idiap. For further information contact tto@idiap.ch - * + * * Alternatively, this file may be used under the terms of the GNU Affero * Public License version 3 as published by the Free Software and appearing * in the file LICENSE.AGPL included in the packaging of this file. * The BEAT platform is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. - * + * * You should have received a copy of the GNU Affero Public License along * with the BEAT platform. If not, see http://www.gnu.org/licenses/. {% endcomment %} diff --git a/beat/web/backend/templates/backend/panels/environment_table.html b/beat/web/backend/templates/backend/panels/environment_table.html index d041542460193f44f4b9e7bde41fd4a8bed56d1d..6a3d41536cbc24dac0fd94aafc8c8d546a60b92a 100644 --- a/beat/web/backend/templates/backend/panels/environment_table.html +++ b/beat/web/backend/templates/backend/panels/environment_table.html @@ -1,21 +1,21 @@ {% comment %} * Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ * Contact: beat.support@idiap.ch - * + * * This file is part of the beat.web module of the BEAT platform. - * + * * Commercial License Usage * Licensees holding valid commercial BEAT licenses may use this file in * accordance with the terms contained in a written agreement between you * and Idiap. For further information contact tto@idiap.ch - * + * * Alternatively, this file may be used under the terms of the GNU Affero * Public License version 3 as published by the Free Software and appearing * in the file LICENSE.AGPL included in the packaging of this file. * The BEAT platform is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY * or FITNESS FOR A PARTICULAR PURPOSE. - * + * * You should have received a copy of the GNU Affero Public License along * with the BEAT platform. If not, see http://www.gnu.org/licenses/. {% endcomment %} diff --git a/beat/web/backend/templatetags/backend_tags.py b/beat/web/backend/templatetags/backend_tags.py index 0e31aa0fef32ef7c4eab28f6c6797ee7dfa7023b..f180a1fb613ee5aa884eb1d1dd4a83423f28f608 100755 --- a/beat/web/backend/templatetags/backend_tags.py +++ b/beat/web/backend/templatetags/backend_tags.py @@ -27,17 +27,15 @@ from django import template -from django.contrib.auth.models import User from ..models import JobSplit - register = template.Library() -@register.inclusion_tag('backend/panels/environment_table.html', takes_context=True) +@register.inclusion_tag("backend/panels/environment_table.html", takes_context=True) def environment_table(context, objects, id): - '''Composes a environment list table + """Composes a environment list table This panel primarily exists for user's environment list page. @@ -47,18 +45,14 @@ def environment_table(context, objects, id): id: The HTML id to set on the generated table. This is handy for the filter functionality normally available on list pages. - ''' + """ - return dict( - request=context['request'], - objects=objects, - panel_id=id, - ) + return dict(request=context["request"], objects=objects, panel_id=id,) -@register.inclusion_tag('backend/panels/environment_actions.html', takes_context=True) +@register.inclusion_tag("backend/panels/environment_actions.html", takes_context=True) def environment_actions(context, object, display_count): - '''Composes the action buttons for a particular environment + """Composes the action buttons for a particular environment This panel primarily exists for showing action buttons for a given environment taking into consideration it is being displayed for a given user. @@ -71,23 +65,21 @@ def environment_actions(context, object, display_count): display the number of queues associated to this environment as a button (useful for the list view) - ''' - return dict( - request=context['request'], - object=object, - display_count=display_count, - ) + """ + return dict(request=context["request"], object=object, display_count=display_count,) @register.simple_tag(takes_context=True) def visible_queues(context, object): - '''Calculates the visible queues for an environment and requestor''' - return object.queues_for(context['request'].user) + """Calculates the visible queues for an environment and requestor""" + return object.queues_for(context["request"].user) @register.filter def count_job_splits(xp, status=None): """Returns job splits for an experiment in a certain state""" - if status == 'A': - return xp.job_splits(status=JobSplit.QUEUED).filter(worker__isnull=False).count() + if status == "A": + return ( + xp.job_splits(status=JobSplit.QUEUED).filter(worker__isnull=False).count() + ) return xp.job_splits(status=status).count() diff --git a/beat/web/backend/tests/common.py b/beat/web/backend/tests/common.py index eb9a69aa55c1e0d10c10ca0d95c37d49f24ce011..6c0cf7f4ef786d8de9e8dd4f616f14ce3f860bca 100755 --- a/beat/web/backend/tests/common.py +++ b/beat/web/backend/tests/common.py @@ -27,27 +27,23 @@ import os -import beat.core.hash - -from django.test import TestCase from django.conf import settings +from django.test import TestCase + +import beat.core.hash +from beat.core.data import CachedDataSink +from beat.core.database import Database +from beat.core.dataformat import DataFormat -from ...utils.management.commands import install -from ...experiments.models import Block from ...algorithms.models import Algorithm from ...common.testutils import tearDownModule # noqa test runner will call it - +from ...experiments.models import Block +from ...utils.management.commands import install +from ..management.commands import qsetup +from ..models import Environment from ..models import Queue from ..models import Worker -from ..models import Environment - from ..utils import setup_backend -from ..management.commands import qsetup - -from beat.core.dataformat import DataFormat -from beat.core.data import CachedDataSink -from beat.core.database import Database - # ---------------------------------------------------------- diff --git a/beat/web/backend/tests/test_cache.py b/beat/web/backend/tests/test_cache.py index a83e179bbaee0e0aa938ea1d2ebd6e7c2ea19d52..95744c9ea3be98188812c4bdd8473620869a7b41 100755 --- a/beat/web/backend/tests/test_cache.py +++ b/beat/web/backend/tests/test_cache.py @@ -26,9 +26,9 @@ ############################################################################### import os -import time import shutil import tempfile +import time from django.core import management from django.test import TestCase diff --git a/beat/web/backend/tests/test_helpers.py b/beat/web/backend/tests/test_helpers.py index 572deb6aaec87b80b4a6a620e8825bcabafe6245..c65c3ed8816cf285d30d34a63fe103a305a710d0 100755 --- a/beat/web/backend/tests/test_helpers.py +++ b/beat/web/backend/tests/test_helpers.py @@ -25,39 +25,34 @@ # # ############################################################################### -import beat.core.hash - import os from django.conf import settings -from ...experiments.models import Experiment +import beat.core.hash + +from ...common.testutils import tearDownModule # noqa test runner will call it from ...experiments.models import Block from ...experiments.models import CachedFile -from ...common.testutils import tearDownModule # noqa test runner will call it - -from ..models import Job -from ..models import JobSplit -from ..models import Worker -from ..models import Queue - -from ..helpers import schedule_experiment -from ..helpers import cancel_experiment -from ..helpers import split_new_jobs -from ..helpers import process_newly_cancelled_experiments -from ..helpers import is_cache_complete +from ...experiments.models import Experiment from ..helpers import assign_splits_to_workers +from ..helpers import cancel_experiment from ..helpers import get_configuration_for_split -from ..helpers import on_split_started +from ..helpers import is_cache_complete +from ..helpers import on_split_cancelled from ..helpers import on_split_done from ..helpers import on_split_fail -from ..helpers import on_split_cancelled - +from ..helpers import on_split_started +from ..helpers import process_newly_cancelled_experiments +from ..helpers import schedule_experiment +from ..helpers import split_new_jobs +from ..models import Job +from ..models import JobSplit +from ..models import Queue +from ..models import Worker from ..utils import setup_backend - -from .common import BaseBackendTestCase from .common import ONE_QUEUE_TWO_WORKERS - +from .common import BaseBackendTestCase # ---------------------------------------------------------- diff --git a/beat/web/backend/tests/test_scheduler.py b/beat/web/backend/tests/test_scheduler.py index 194929e405d640bdfea065b2a26600f54b343099..860d120e8b4aa9ef00675f465f9b478e09ea3f7e 100755 --- a/beat/web/backend/tests/test_scheduler.py +++ b/beat/web/backend/tests/test_scheduler.py @@ -26,29 +26,24 @@ ############################################################################### import os - -from time import time from time import sleep +from time import time from unittest import skipIf from django.conf import settings from django.test import TransactionTestCase -from ...experiments.models import Experiment -from ...experiments.models import Block from ...common.testutils import tearDownModule # noqa test runner will call it - -from ..models import Worker - -from ..utils import setup_backend -from ..helpers import schedule_experiment +from ...experiments.models import Block +from ...experiments.models import Experiment from ..helpers import cancel_experiment +from ..helpers import schedule_experiment from ..local_scheduler import start_scheduler from ..local_scheduler import start_worker - -from .common import BackendUtilitiesMixin +from ..models import Worker +from ..utils import setup_backend from .common import ONE_QUEUE_TWO_WORKERS - +from .common import BackendUtilitiesMixin # ---------------------------------------------------------- diff --git a/beat/web/backend/tests/test_setup.py b/beat/web/backend/tests/test_setup.py index 7399aee34933021901dc61a1dc0be47cf7fe7e4d..029d2c4076e244adf8fb0637fba83d420dab59dd 100755 --- a/beat/web/backend/tests/test_setup.py +++ b/beat/web/backend/tests/test_setup.py @@ -29,18 +29,15 @@ import collections from django.core import management -from .common import BaseBackendTestCase - +from ...common.testutils import tearDownModule # noqa test runner will call it +from ...experiments.models import Experiment +from ..management.commands import qsetup from ..models import Queue -from ..models import Worker from ..models import Slot +from ..models import Worker from ..utils import dump_backend from ..utils import setup_backend -from ..management.commands import qsetup - -from ...experiments.models import Experiment -from ...common.testutils import tearDownModule # noqa test runner will call it - +from .common import BaseBackendTestCase # Example configuration with 3 queues with an increasing amount of resources # running on the same host diff --git a/beat/web/backend/utils.py b/beat/web/backend/utils.py index 5cbfe82d27b50df7758959b206a7a4b969ae8450..802ff3b32a7d3516632a20af50343061ee4df730 100755 --- a/beat/web/backend/utils.py +++ b/beat/web/backend/utils.py @@ -25,37 +25,30 @@ # # ############################################################################### -'''Utilities for backend management''' - -import os -import sys +"""Utilities for backend management""" import fnmatch import glob -import time -import distutils.spawn - import logging -logger = logging.getLogger(__name__) - -import psutil +import os +import time -from django.conf import settings -from django.db import transaction from django.contrib.auth.models import Group +from django.db import transaction from guardian.shortcuts import assign_perm from ..code.models import Code from ..common.models import Shareable -from ..experiments.models import CachedFile from ..experiments.models import Block +from ..experiments.models import CachedFile from ..experiments.models import Experiment -from .models import Queue -from .models import Worker -from .models import Job -from .models import JobSplit from .models import Environment from .models import EnvironmentLanguage +from .models import Job +from .models import Queue from .models import Slot +from .models import Worker + +logger = logging.getLogger(__name__) def cleanup_cache(path, age_in_minutes=0, delete=False): @@ -87,26 +80,35 @@ def cleanup_cache(path, age_in_minutes=0, delete=False): """ - cutoff_access_time = time.time() - (60*age_in_minutes) + cutoff_access_time = time.time() - (60 * age_in_minutes) - logger.info("Running `%s' clean-up: set file-access cutoff time to `%s'", - path, time.ctime(cutoff_access_time)) + logger.info( + "Running `%s' clean-up: set file-access cutoff time to `%s'", + path, + time.ctime(cutoff_access_time), + ) # Gets a list of cache files for active experiments: - blocks = Block.objects.filter(experiment__in=Experiment.objects.filter(status__in=(Experiment.SCHEDULED, Experiment.RUNNING))) - save_list = [k.path() + '*' for k in CachedFile.objects.filter(blocks__in=blocks)] + blocks = Block.objects.filter( + experiment__in=Experiment.objects.filter( + status__in=(Experiment.SCHEDULED, Experiment.RUNNING) + ) + ) + save_list = [k.path() + "*" for k in CachedFile.objects.filter(blocks__in=blocks)] # Finds the files with an associated '.lock' file for root, dirnames, filenames in os.walk(path): - for filename in fnmatch.filter(filenames, '*.lock'): - save_list += glob.glob(os.path.splitext(os.path.join(root, filename))[0] + '*') + for filename in fnmatch.filter(filenames, "*.lock"): + save_list += glob.glob( + os.path.splitext(os.path.join(root, filename))[0] + "*" + ) removed_files = [] for p, dirs, files in os.walk(path, topdown=False): - files = [f for f in files if not f.startswith('.')] - dirs[:] = [d for d in dirs if not d.startswith('.')] #note: in-place + files = [f for f in files if not f.startswith(".")] + dirs[:] = [d for d in dirs if not d.startswith(".")] # note: in-place for f in files: fullpath = os.path.join(p, f) @@ -118,7 +120,7 @@ def cleanup_cache(path, age_in_minutes=0, delete=False): # if you get to this point and the file ends in '.part', erase it ext = os.path.splitext(fullpath) - if len(ext) > 1 and ext[1] == '.part': + if len(ext) > 1 and ext[1] == ".part": if delete: logger.info("[rm] `%s' (dangling)", fullpath) os.remove(fullpath) @@ -133,10 +135,14 @@ def cleanup_cache(path, age_in_minutes=0, delete=False): os.remove(fullpath) removed_files.append(fullpath) else: - logger.debug("[skip] `%s' (%f >= %f)", fullpath, - os.path.getatime(fullpath), cutoff_access_time) - - for d in dirs: #also remove empty directories + logger.debug( + "[skip] `%s' (%f >= %f)", + fullpath, + os.path.getatime(fullpath), + cutoff_access_time, + ) + + for d in dirs: # also remove empty directories fullpath = os.path.join(p, d) if not os.listdir(fullpath) and delete: os.rmdir(fullpath) @@ -146,7 +152,7 @@ def cleanup_cache(path, age_in_minutes=0, delete=False): @transaction.atomic def setup_backend(d): - '''Configures or re-configures the internal queue setup + """Configures or re-configures the internal queue setup This method is called to re-configure the current backend architecture. It is guaranteed to be called only if no experiments are currently running on @@ -166,16 +172,21 @@ def setup_backend(d): RuntimeError: If an error is detected and the re-configuration cannot take place. In this case, it is safe to assume nothing has changed. - ''' + """ # 1. We get a list of all current queue/environment combinations - q_envs = set([(q.name, str(e)) \ - for q in Queue.objects.all() for e in q.environments.all()]) + q_envs = set( + [(q.name, str(e)) for q in Queue.objects.all() for e in q.environments.all()] + ) # 2. We get a list of new queue/environment combinations - config_q_envs = set([(qname, envkey) \ - for qname, qpar in d['queues'].items() \ - for envkey in qpar['environments']]) + config_q_envs = set( + [ + (qname, envkey) + for qname, qpar in d["queues"].items() + for envkey in qpar["environments"] + ] + ) # 3. We figure out which combinations of queue/environment's need to be # deleted. @@ -183,71 +194,74 @@ def setup_backend(d): # 4. We figure out which combinations of queue/environment's are currently # used by queued jobs. - used_q_envs = set([(job.block.queue.name, str(job.block.environment)) \ - for job in Job.objects.filter(start_date__isnull=True)]) + used_q_envs = set( + [ + (job.block.queue.name, str(job.block.environment)) + for job in Job.objects.filter(start_date__isnull=True) + ] + ) # 5. We request that no jobs should be either executing or scheduled for # execution on queue/environment combinations that need to be deleted. used_to_be_deleted = used_q_envs.intersection(delete_q_envs) if len(used_to_be_deleted) != 0: - qenv_names = ['/'.join(k) for k in used_to_be_deleted] - reason = 'There are jobs currently running or scheduled to run on ' \ - 'the following queue/environment combinations which are ' \ - 'supposed to be deleted: %s. Aborting reconfiguration.' - raise RuntimeError(reason % ', '.join(qenv_names)) + qenv_names = ["/".join(k) for k in used_to_be_deleted] + reason = ( + "There are jobs currently running or scheduled to run on " + "the following queue/environment combinations which are " + "supposed to be deleted: %s. Aborting reconfiguration." + ) + raise RuntimeError(reason % ", ".join(qenv_names)) # 6. Request that no worker that is being used will disappear - existing_workers = set(Worker.objects.values_list('name', flat=True)) - workers_to_be_deleted = existing_workers - set(d['workers']) + existing_workers = set(Worker.objects.values_list("name", flat=True)) + workers_to_be_deleted = existing_workers - set(d["workers"]) if workers_to_be_deleted: - wobjects_to_be_deleted = \ - Worker.objects.filter(name__in=workers_to_be_deleted) + wobjects_to_be_deleted = Worker.objects.filter(name__in=workers_to_be_deleted) else: - wobjects_to_be_deleted = [] + wobjects_to_be_deleted = [] for w in wobjects_to_be_deleted: if w.load() != 0: - reason = 'There are jobs currently running or scheduled to run ' \ - 'on some of the workers that would disappear: %s. ' \ - 'Aborting reconfiguration.' - raise RuntimeError(reason % ', '.join(workers_to_be_deleted)) + reason = ( + "There are jobs currently running or scheduled to run " + "on some of the workers that would disappear: %s. " + "Aborting reconfiguration." + ) + raise RuntimeError(reason % ", ".join(workers_to_be_deleted)) # 7. Create new environments - config_envs = set(d['environments'].keys()) + config_envs = set(d["environments"].keys()) current_envs = set([str(k) for k in Environment.objects.all()]) new_envs = config_envs.difference(current_envs) for envkey in new_envs: - attrs = d['environments'][envkey] + attrs = d["environments"][envkey] env = Environment( - name=attrs['name'], - version=attrs['version'], - short_description=attrs.get('short_description'), - description=attrs.get('description'), + name=attrs["name"], + version=attrs["version"], + short_description=attrs.get("short_description"), + description=attrs.get("description"), sharing=Shareable.PUBLIC, ) logger.info("Creating `%s'...", env) env.save() - for language in attrs['languages']: + for language in attrs["languages"]: lang = EnvironmentLanguage( - language=Code.language_db(language), - environment=env + language=Code.language_db(language), environment=env ) lang.save() # 8.1 Create new workers - config_workers = set(d['workers'].keys()) - current_workers = set(Worker.objects.values_list('name', flat=True)) + config_workers = set(d["workers"].keys()) + current_workers = set(Worker.objects.values_list("name", flat=True)) new_workers = config_workers - current_workers for name in new_workers: - attrs = d['workers'][name] + attrs = d["workers"][name] worker = Worker( - name=name, - active=False, - cores=attrs['cores'], - memory=attrs['memory'], + name=name, active=False, cores=attrs["cores"], memory=attrs["memory"], ) logger.info("Creating `%s'...", worker) worker.save() @@ -255,49 +269,46 @@ def setup_backend(d): # 8.2 Update existing workers update_workers = current_workers.intersection(config_workers) for name in update_workers: - attrs = d['workers'][name] + attrs = d["workers"][name] worker = Worker.objects.select_for_update().get(name=name) - worker.cores = attrs['cores'] - worker.ram = attrs['memory'] + worker.cores = attrs["cores"] + worker.ram = attrs["memory"] logger.info("Updating `%s'...", worker) worker.save() # 9. Create new queues - config_qnames = set(d['queues'].keys()) - current_qnames = set(Queue.objects.values_list('name', flat=True)) + config_qnames = set(d["queues"].keys()) + current_qnames = set(Queue.objects.values_list("name", flat=True)) new_qnames = config_qnames.difference(current_qnames) for name in new_qnames: - attrs = d['queues'][name] + attrs = d["queues"][name] queue = Queue( name=name, - memory_limit=attrs['memory-limit'], - time_limit=attrs['time-limit'], - cores_per_slot=attrs['cores-per-slot'], - max_slots_per_user=attrs['max-slots-per-user'], + memory_limit=attrs["memory-limit"], + time_limit=attrs["time-limit"], + cores_per_slot=attrs["cores-per-slot"], + max_slots_per_user=attrs["max-slots-per-user"], ) logger.info("Creating `%s'...", queue) queue.save() - for gname in attrs['groups']: + for gname in attrs["groups"]: group = Group.objects.get(name=gname) logger.info("Allowing group `%s' access to `%s'...", group, queue) - assign_perm('can_access', group, queue) + assign_perm("can_access", group, queue) - for hostname, par in attrs['slots'].items(): + for hostname, par in attrs["slots"].items(): worker = Worker.objects.get(name=hostname) - priority = par.get('priority', 0) + priority = par.get("priority", 0) slot = Slot( - queue=queue, - worker=worker, - priority=priority, - quantity=par['quantity'], + queue=queue, worker=worker, priority=priority, quantity=par["quantity"], ) logger.info("Creating `%s'...", slot) slot.save() # Associates environments with queues - for envkey in attrs['environments']: + for envkey in attrs["environments"]: env = Environment.objects.get_by_fullname(envkey) logger.info("Appending `%s' to `%s'...", env, queue) queue.environments.add(env) @@ -307,34 +318,31 @@ def setup_backend(d): for name in update_qnames: queue = Queue.objects.select_for_update().get(name=name) - attrs = d['queues'][name] + attrs = d["queues"][name] # 10.1 Update queue parameterization: running jobs will be unaffected # whereas queued jobs will be subject to the new settings. - queue.ram = attrs['memory-limit'] - queue.max_time = attrs['time-limit'] - queue.cores_per_slot = attrs['cores-per-slot'] - queue.max_slots_per_user = attrs['max-slots-per-user'] + queue.ram = attrs["memory-limit"] + queue.max_time = attrs["time-limit"] + queue.cores_per_slot = attrs["cores-per-slot"] + queue.max_slots_per_user = attrs["max-slots-per-user"] logger.info("Updating `%s'...", queue) queue.save() # 10.2 Update the queue-slot allocation queue.slots.all().delete() - for hostname, par in attrs['slots'].items(): + for hostname, par in attrs["slots"].items(): worker = Worker.objects.get(name=hostname) - priority = par.get('priority', 0) + priority = par.get("priority", 0) slot = Slot( - worker=worker, - queue=queue, - priority=priority, - quantity=par['quantity'], + worker=worker, queue=queue, priority=priority, quantity=par["quantity"], ) logger.info("Creating `%s'...", slot) slot.save() # 10.3 Associate and dissociate environments queue.environments.clear() - for e in attrs['environments']: + for e in attrs["environments"]: env = Environment.objects.get_by_fullname(e) logger.info("Appending `%s' to `%s'...", env, queue) queue.environments.add(env) @@ -344,21 +352,23 @@ def setup_backend(d): for name in delete_qnames: q = Queue.objects.select_for_update().get(name=name) logger.info("Deleting `%s'...", q) - q.delete() # slots are deleted on cascade + q.delete() # slots are deleted on cascade # 12. Delete workers not mentioned on the new configuration for w in wobjects_to_be_deleted: - logger.info("Deleting `%s'...", w) - w.delete() + logger.info("Deleting `%s'...", w) + w.delete() def dump_backend(): - '''Returns a dictionary that represents the current backend configuration''' + """Returns a dictionary that represents the current backend configuration""" environments = {} for env in Environment.objects.all(): environments[str(env)] = env.as_dict() - environments[str(env)]['languages'] = [ Code.language_identifier(x.language) for x in env.languages.iterator() ] + environments[str(env)]["languages"] = [ + Code.language_identifier(x.language) for x in env.languages.iterator() + ] return dict( queues=dict([(k.name, k.as_dict()) for k in Queue.objects.all()]), diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index 9bfb0765c9c30200214eff64bb43808a4f1e44e2..bf415717cd0615024e0a65d9f7c5f5cdff386987 100755 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -25,138 +25,151 @@ # # ############################################################################### -import os -import socket import logging -logger = logging.getLogger(__name__) import simplejson - -from django.http import Http404, HttpResponseRedirect from django.conf import settings -from django.urls import reverse -from django.shortcuts import get_object_or_404 -from django.shortcuts import render +from django.contrib import messages from django.contrib.auth.decorators import login_required +from django.http import Http404 from django.http import HttpResponseForbidden -from django.contrib import messages +from django.http import HttpResponseRedirect +from django.shortcuts import get_object_or_404 +from django.shortcuts import render +from django.urls import reverse from ..experiments.models import Experiment - -from .models import Environment, Worker, Queue from . import state -from . import utils +from .models import Environment +from .models import Queue +from .models import Worker +logger = logging.getLogger(__name__) -#------------------------------------------------ +# ------------------------------------------------ @login_required def scheduler(request): - if not(request.user.is_superuser): + if not (request.user.is_superuser): return HttpResponseForbidden() # data for the cache plot cache = state.cache() cache_chart_data = [ dict( - label= 'Used (%d%%)' % round(100*float(cache['size-in-megabytes'])/cache['capacity-in-megabytes']), - value= cache['size-in-megabytes'], - color= '#F7464A', - highlight= '#FF5A5E', + label="Used (%d%%)" + % round( + 100 * float(cache["size-in-megabytes"]) / cache["capacity-in-megabytes"] + ), + value=cache["size-in-megabytes"], + color="#F7464A", + highlight="#FF5A5E", ), dict( - label= 'Free (%d%%)' % round(100*float(cache['capacity-in-megabytes'] - cache['size-in-megabytes'])/cache['capacity-in-megabytes']), - value= cache['capacity-in-megabytes'] - cache['size-in-megabytes'], - color= '#46BFBD', - highlight= '#5AD3D1', + label="Free (%d%%)" + % round( + 100 + * float(cache["capacity-in-megabytes"] - cache["size-in-megabytes"]) + / cache["capacity-in-megabytes"] + ), + value=cache["capacity-in-megabytes"] - cache["size-in-megabytes"], + color="#46BFBD", + highlight="#5AD3D1", ), ] - cache_gb = int(cache['capacity-in-megabytes'] / 1024.0) + cache_gb = int(cache["capacity-in-megabytes"] / 1024.0) - refreshing_period = getattr(settings, 'SCHEDULING_INTERVAL', 5) - if 'period' in request.GET: - refreshing_period = int(request.GET['period']) + refreshing_period = getattr(settings, "SCHEDULING_INTERVAL", 5) + if "period" in request.GET: + refreshing_period = int(request.GET["period"]) - return render(request, - 'backend/scheduler.html', - dict( - jobs=state.jobs(), - experiments=state.experiments(), - workers=Worker.objects.order_by('-active', 'name'), - queues=Queue.objects.order_by('memory_limit', 'max_slots_per_user'), - cache_chart_data=simplejson.dumps(cache_chart_data), - cache_gb=cache_gb, - helper_panel=getattr(settings, 'SCHEDULING_PANEL', False), - refreshing_period=refreshing_period - )) + return render( + request, + "backend/scheduler.html", + dict( + jobs=state.jobs(), + experiments=state.experiments(), + workers=Worker.objects.order_by("-active", "name"), + queues=Queue.objects.order_by("memory_limit", "max_slots_per_user"), + cache_chart_data=simplejson.dumps(cache_chart_data), + cache_gb=cache_gb, + helper_panel=getattr(settings, "SCHEDULING_PANEL", False), + refreshing_period=refreshing_period, + ), + ) -#------------------------------------------------ +# ------------------------------------------------ def environment(request, name, version): """Displays a single database""" # Retrieve the data base - environment = get_object_or_404(Environment, - name__iexact=name, version__iexact=version) + environment = get_object_or_404( + Environment, name__iexact=name, version__iexact=version + ) if not environment.accessibility_for(request.user)[0]: raise Http404() # Render the page - return render(request, - 'backend/environment.html', - {'environment': environment}) + return render(request, "backend/environment.html", {"environment": environment}) -#---------------------------------------------------------- +# ---------------------------------------------------------- def list_environments(request): """Displays all accessible (active and inactive) environments""" - objects = Environment.objects.for_user(request.user, True).filter(active=True).order_by('-creation_date') + objects = ( + Environment.objects.for_user(request.user, True) + .filter(active=True) + .order_by("-creation_date") + ) # Render the page - return render(request, - 'backend/environment_list.html', - dict( - objects=objects, - author=request.user, - owner=True - )) - + return render( + request, + "backend/environment_list.html", + dict(objects=objects, author=request.user, owner=True), + ) -#---------------------------------------------------------- +# ---------------------------------------------------------- @login_required def cancel_all_experiments(request): - if not(request.user.is_superuser): return HttpResponseForbidden() + if not (request.user.is_superuser): + return HttpResponseForbidden() - qs = Experiment.objects.filter(status__in=(Experiment.RUNNING, Experiment.SCHEDULED)) + qs = Experiment.objects.filter( + status__in=(Experiment.RUNNING, Experiment.SCHEDULED) + ) counter = qs.count() - for xp in qs: xp.cancel() + for xp in qs: + xp.cancel() messages.success(request, "Successfuly cancelled %d experiments" % counter) - return HttpResponseRedirect(reverse('backend:scheduler')) + return HttpResponseRedirect(reverse("backend:scheduler")) -#---------------------------------------------------------- +# ---------------------------------------------------------- @login_required def update_workers(request): - if not(request.user.is_superuser): return HttpResponseForbidden() + if not (request.user.is_superuser): + return HttpResponseForbidden() qs = Worker.objects.all() counter = qs.count() @@ -164,4 +177,4 @@ def update_workers(request): messages.success(request, "Requested %d workers for updates" % counter) - return HttpResponseRedirect(reverse('backend:scheduler') + '#workers') + return HttpResponseRedirect(reverse("backend:scheduler") + "#workers")