From 823da9ab4e6622808f58b724fd54fda191497716 Mon Sep 17 00:00:00 2001 From: Philip ABBET <philip.abbet@idiap.ch> Date: Fri, 13 Oct 2017 11:34:38 +0200 Subject: [PATCH] [backend] Refactoring of the local scheduler mechanism --- beat/web/backend/admin.py | 3 - beat/web/backend/api.py | 54 ++++++++ beat/web/backend/api_urls.py | 14 +- beat/web/backend/helpers.py | 53 ++++++- beat/web/backend/local_scheduler.py | 130 ++++++++++++++++++ .../0006_localschedulerprocesses.py | 23 ++++ beat/web/backend/models/__init__.py | 1 + beat/web/backend/models/local_scheduler.py | 40 ++++++ .../backend/templates/backend/scheduler.html | 116 ++++++---------- beat/web/backend/templatetags/backend_tags.py | 2 +- beat/web/backend/tests/test_helpers.py | 28 ++++ beat/web/backend/tests/test_scheduler.py | 2 - beat/web/backend/views.py | 93 ++++--------- beat/web/experiments/models/experiment.py | 2 +- beat/web/experiments/models/result.py | 2 + beat/web/experiments/serializers.py | 2 + .../static/experiments/js/panels.js | 2 + beat/web/scripts/scheduler.py | 54 ++++++-- beat/web/settings/settings.py | 10 +- 19 files changed, 464 insertions(+), 167 deletions(-) mode change 100644 => 100755 beat/web/backend/api_urls.py create mode 100755 beat/web/backend/local_scheduler.py create mode 100644 beat/web/backend/migrations/0006_localschedulerprocesses.py create mode 100755 beat/web/backend/models/local_scheduler.py diff --git a/beat/web/backend/admin.py b/beat/web/backend/admin.py index 87d912ed0..ea6b8001b 100755 --- a/beat/web/backend/admin.py +++ b/beat/web/backend/admin.py @@ -213,9 +213,6 @@ class Job(admin.ModelAdmin): def splits(self, obj): return obj.splits.count() - def has_delete_permission(self, request, obj=None): - return False - def has_add_permission(self, request): return False diff --git a/beat/web/backend/api.py b/beat/web/backend/api.py index f0757c3b6..fb14a0113 100755 --- a/beat/web/backend/api.py +++ b/beat/web/backend/api.py @@ -28,9 +28,19 @@ from rest_framework.decorators import api_view, permission_classes from rest_framework.response import Response from rest_framework import permissions +from rest_framework import status + +from django.conf import settings from .models import Environment +from .models import Worker +from .models import LocalSchedulerProcesses + from ..code.models import Code +from . import local_scheduler + + +#---------------------------------------------------------- @api_view(['GET']) @@ -73,3 +83,47 @@ def accessible_environments_list(request): }) return Response(result) + + +#---------------------------------------------------------- + + +@api_view(['POST']) +@permission_classes([permissions.IsAdminUser]) +def start_local_scheduler(request): + """Starts the local scheduler""" + + if not getattr(settings, 'SCHEDULING_PANEL', False): + return Response(status=status.HTTP_403_FORBIDDEN) + + # Clean start-up + LocalSchedulerProcesses.objects.all().delete() + + for worker in Worker.objects.all(): + (pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX, + settings.CACHE_ROOT, + 'tcp://127.0.0.1:50000') + LocalSchedulerProcesses(name=worker.name, pid=pid).save() + + (pid, _) = local_scheduler.start_scheduler(address='127.0.0.1', port=50000) + LocalSchedulerProcesses(name='Scheduler', pid=pid).save() + + return Response(status=status.HTTP_204_NO_CONTENT) + + +#---------------------------------------------------------- + + +@api_view(['POST']) +@permission_classes([permissions.IsAdminUser]) +def stop_local_scheduler(request): + """Starts the local scheduler""" + + if not getattr(settings, 'SCHEDULING_PANEL', False): + return Response(status=status.HTTP_403_FORBIDDEN) + + for process in LocalSchedulerProcesses.objects.all(): + local_scheduler.stop_process(process.pid) + process.delete() + + return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/beat/web/backend/api_urls.py b/beat/web/backend/api_urls.py old mode 100644 new mode 100755 index e5faf3d8c..cbf2f3724 --- a/beat/web/backend/api_urls.py +++ b/beat/web/backend/api_urls.py @@ -34,6 +34,18 @@ urlpatterns = [ r'^environments/$', api.accessible_environments_list, name='backend-api-environments', - ), + ), + + url( + r'^local_scheduler/start/$', + api.start_local_scheduler, + name='local_scheduler-start', + ), + + url( + r'^local_scheduler/stop/$', + api.stop_local_scheduler, + name='local_scheduler-stop', + ), ] diff --git a/beat/web/backend/helpers.py b/beat/web/backend/helpers.py index 0e6ecdb2d..573c2192c 100755 --- a/beat/web/backend/helpers.py +++ b/beat/web/backend/helpers.py @@ -3,7 +3,7 @@ ############################################################################### # # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.web module of the BEAT platform. # @@ -40,6 +40,7 @@ from datetime import datetime from ..experiments.models import Experiment from ..experiments.models import Block from ..experiments.models import CachedFile +from ..experiments.models import Result as CacheResult from .models import Queue from .models import Job from .models import JobSplit @@ -47,6 +48,10 @@ from .models import Worker from .models import Result import beat.core.hash +import beat.core.data +import beat.core.hash +import beat.core.algorithm +from beat.core.utils import NumpyJSONEncoder def schedule_experiment(experiment): @@ -102,8 +107,13 @@ def schedule_experiment(experiment): block.end_date = block.creation_date block.save() + if block.analyzer: + for cached_file in block.outputs.all(): + load_results_from_cache(block, cached_file) + else: Job.objects.create_job(block) + block.status = Block.PENDING block.creation_date = datetime.now() block.save() already_done = False @@ -728,7 +738,6 @@ def update_job(job): # Delete the job job.delete() - # Did the job succeed? elif job.splits.exclude(status=JobSplit.COMPLETED).count() == 0: @@ -739,6 +748,9 @@ def update_job(job): for cached_file in job.block.outputs.all(): cached_file.update(Block.DONE) + if job.block.analyzer: + load_results_from_cache(job.block, cached_file) + # Update the block job.block.status = Block.DONE job.block.end_date = job.end_date @@ -908,3 +920,40 @@ def update_experiment(experiment): experiment.status = Experiment.PENDING experiment.end_date = experiment.blocks.order_by('-end_date')[0].end_date experiment.save() + + +#---------------------------------------------------------- + + +def load_results_from_cache(block, cached_file): + if not block.analyzer: + return + + if cached_file.results.count() > 0: + return + + data_source = beat.core.data.CachedDataSource() + data_source.setup(os.path.join(settings.CACHE_ROOT, + beat.core.hash.toPath(cached_file.hash)), + settings.PREFIX) + + output_data = data_source.next()[0] + if output_data is not None: + algorithm = beat.core.algorithm.Algorithm( + settings.PREFIX, block.algorithm.fullname()) + + for field, value in output_data.as_dict().items(): + res, _ = CacheResult.objects.get_or_create( + name=field, cache=cached_file) + res.primary = algorithm.results[field]['display'] + res.type = algorithm.results[field]["type"] + + if res.type in ['int32', 'float32', 'bool', 'string']: + res.data_value = str(value) + else: + res.data_value = simplejson.dumps( + value, indent=4, cls=NumpyJSONEncoder) + + res.save() + + data_source.close() diff --git a/beat/web/backend/local_scheduler.py b/beat/web/backend/local_scheduler.py new file mode 100755 index 000000000..ba688dba4 --- /dev/null +++ b/beat/web/backend/local_scheduler.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +import multiprocessing +import psutil +import signal + +from ..scripts import scheduler + +from beat.core.scripts import worker + + +#---------------------------------------------------------- + + +class SchedulerProcess(multiprocessing.Process): + + def __init__(self, arguments, queue=None): + super(SchedulerProcess, self).__init__() + + if queue is None: + self.queue = multiprocessing.Queue() + else: + self.queue = queue + + self.arguments = arguments + + + def run(self): + self.queue.put('STARTED') + scheduler.main(self.arguments) + + +#---------------------------------------------------------- + + +class WorkerProcess(multiprocessing.Process): + + def __init__(self, arguments, queue=None): + super(WorkerProcess, self).__init__() + + if queue is None: + self.queue = multiprocessing.Queue() + else: + self.queue = queue + + self.arguments = arguments + + + def run(self): + self.queue.put('STARTED') + worker.main(self.arguments) + + +#---------------------------------------------------------- + + +def start_scheduler(settings='beat.web.settings.settings', interval=5, + address='127.0.0.1', port=50000): + args = [ + '--settings=%s' % str(settings), + '--interval=%d' % int(interval), + '--address=%s' % str(address), + '--port=%d' % int(port), + '--verbose', + ] + + process = SchedulerProcess(args) + process.start() + process.queue.get() + + return (process.pid, process) + + +#---------------------------------------------------------- + + +def start_worker(name, prefix, cache, scheduler_address): + args = [ + '--prefix=%s' % str(prefix), + '--cache=%s' % str(cache), + '--name=%s' % str(name), + '--verbose', + str(scheduler_address) + ] + + process = WorkerProcess(args) + process.start() + process.queue.get() + + return (process.pid, process) + + +#---------------------------------------------------------- + + +def stop_process(pid): + if not psutil.pid_exists(pid): + return + + process = psutil.Process(pid) + + process.send_signal(signal.SIGTERM) + + gone, alive = psutil.wait_procs([process]) diff --git a/beat/web/backend/migrations/0006_localschedulerprocesses.py b/beat/web/backend/migrations/0006_localschedulerprocesses.py new file mode 100644 index 000000000..5b0c88889 --- /dev/null +++ b/beat/web/backend/migrations/0006_localschedulerprocesses.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.13 on 2017-10-12 15:12 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('backend', '0005_job_modifications'), + ] + + operations = [ + migrations.CreateModel( + name='LocalSchedulerProcesses', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.TextField()), + ('pid', models.IntegerField()), + ], + ), + ] diff --git a/beat/web/backend/models/__init__.py b/beat/web/backend/models/__init__.py index 9672ab91a..6b2e0d1af 100755 --- a/beat/web/backend/models/__init__.py +++ b/beat/web/backend/models/__init__.py @@ -31,6 +31,7 @@ from .environment import Environment from .environment import EnvironmentLanguage from .job import Job from .job import JobSplit +from .local_scheduler import LocalSchedulerProcesses from .queue import QueueManager from .queue import Queue from .result import Result diff --git a/beat/web/backend/models/local_scheduler.py b/beat/web/backend/models/local_scheduler.py new file mode 100755 index 000000000..8dad9a2c9 --- /dev/null +++ b/beat/web/backend/models/local_scheduler.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from django.db import models + + +class LocalSchedulerProcesses(models.Model): + '''Information about the processes launched by the local scheduler''' + + name = models.TextField() + pid = models.IntegerField() + + + def __str__(self): + return '%s (pid = %d)' % (self.name, self.pid) diff --git a/beat/web/backend/templates/backend/scheduler.html b/beat/web/backend/templates/backend/scheduler.html index 1dbbb34ff..6f604b8ab 100644 --- a/beat/web/backend/templates/backend/scheduler.html +++ b/beat/web/backend/templates/backend/scheduler.html @@ -29,7 +29,8 @@ {% block stylesheets %} {{ block.super }} -<script src="{% fingerprint "Chart.js/Chart.min.js" %} type="text/javascript" charset="utf-8"></script> +{% csrf_token %} +<script src="{% fingerprint "Chart.js/Chart.min.js" %}" type="text/javascript" charset="utf-8"></script> {% endblock %} @@ -55,21 +56,13 @@ <h3 class="panel-title">Helper Panel</h3> </div> <div class="panel-body"> - <p class="help">Use this panel to <strong>locally</strong> launch scheduling activity. This functionality is intended as a <em>test</em> scheduler and worker replacement that can be used to run local experiments or debug. <strong>Don't use this in a production system.</strong> Every time you launch an activity, the page will reload to trigger this action. Scheduling happens in the context of the Django server running on the background. Worker processes are managed using subprocesses and don't block the web server.</p> + <p class="help">Use this panel to <strong>locally</strong> launch scheduling activity. This functionality is intended as a <em>test</em> scheduler and worker replacement that can be used to run local experiments or debug. <strong>Don't use this in a production system.</strong> The Scheduler and Worker processes are managed using subprocesses and don't block the web server.</p> <div class="form-inline"> - <div id="activity-group" class="form-group"> - <label class="sr-only" for="activity">Activity</label> - <select id="activity" class="form-control"> - <option value="both">Schedule & Work</option> - <option value="schedule">Schedule</option> - <option value="work">Work</option> - </select> - </div> <div id="periodically-group" class="form-group"> <div class="checkbox"> <label> - <input id="periodically" type="checkbox" checked="checked"> periodically + <input id="periodically" type="checkbox" checked="checked"> Refresh the page periodically </label> </div> </div> @@ -77,7 +70,7 @@ <label class="sr-only" for="period">Period</label> <div class="input-group"> <div class="input-group-addon">every</div> - <input type="text" class="form-control" id="period" value="{{ scheduling_period }}"> + <input type="text" class="form-control" id="period" value="{{ refreshing_period }}"> <div class="input-group-addon">s</div> </div> </div> @@ -142,14 +135,14 @@ <thead> <tr> <th>Name</th> - <th>Blocks/Jobs</th> + <th>Blocks</th> <th>Job Splits</th> <th>Assigned</th> <th>Running</th> <th>Completed</th> <th>Failed</th> <th>Cancelled</th> - <th>Skipped</th> + <th>Cancelling</th> </tr> </thead> <tbody> @@ -163,7 +156,7 @@ <td>{{ obj|count_job_splits:"C" }}</td> <td>{{ obj|count_job_splits:"F" }}</td> <td>{{ obj|count_job_splits:"L" }}</td> - <td>{{ obj|count_job_splits:"S" }}</td> + <td>{{ obj|count_job_splits:"K" }}</td> </tr> {% for split in obj.job_splits %} <tr class="job-split"> @@ -263,11 +256,17 @@ </div>{# col #} </div>{# row #} -{% csrf_token %} <script type="text/javascript"> $(document).ready(function() { + $.ajaxSetup({ + beforeSend: function(xhr, settings) { + var csrftoken = $.cookie('csrftoken'); + xhr.setRequestHeader('X-CSRFToken', csrftoken); + } + }); + /** * This bit of code here is to manage the normal parts of the scheduler * page, like the tags and the cache chart which is displayed. @@ -301,34 +300,6 @@ $(document).ready(function() { * shows up. */ - function display_periodic(period) { - $("input#periodically").prop("checked", true); - $("#period-group").show(); - $("button#start > span#start").text("Start"); - $("button#stop > span#stop").text("Stop"); - $("button#stop").show(); - $("input#period").val(period); - $("button#stop").disable(); - } - - function display_single_shot() { - $("input#periodically").prop("checked", false); - $("#period-group").hide(); - $("button#start > span#start").text("Go"); - $("button#stop > span#stop").text("Reset"); - $("button#start").enable(); - $("button#stop").enable(); - } - - /* controls button display */ - $("input#periodically").click(function() { - if($(this).is(":checked")) { - display_periodic({{ scheduling_period }}); - } else { - display_single_shot(); - } - }); - /* get url parameters */ function get_parameters() { var vars = {}; @@ -341,47 +312,48 @@ $(document).ready(function() { /* controls the initial appearance */ var params = get_parameters(); + var refresh_timeout = null; + if (location.search !== "") { - if ("period" in params) { - var period = parseInt(params.period, 10); - display_periodic(period); - $("button#start").disable(); - $("button#stop").enable(); - $("select#activity").disable(); - $("input#period").disable(); - $("input#periodically").disable(); - setTimeout("location.reload(true);", period * 1000); - } - else { - display_single_shot(); - } - if ("activity" in params) { - $("select#activity").val(params.activity); - } + var period = {{ refreshing_period }}; + if ("period" in params) + period = parseInt(params.period, 10); + $("button#start").disable(); + $("button#stop").enable(); + $("input#period").disable(); + $("input#periodically").disable(); + refresh_timeout = setTimeout("location.reload(true);", period * 1000); } else { - display_periodic({{ scheduling_period }}); $("button#start").enable(); $("button#stop").disable(); - $("select#activity").enable(); $("input#period").enable(); $("input#periodically").enable(); } /* controls form submission buttons */ $("button#start").click(function() { - var params = '?activity=' + $("select#activity").val(); - if ($("input#periodically").is(":checked")) { - params += '&period=' + $("input#period").val(); - } - if (location.search === params) { - location.reload(true); - } - else { - location.search = params; - } + var d = $.post("{% url 'api_backend:local_scheduler-start' %}"); + + d.done(function(data) { + if ($("input#periodically").is(":checked")) { + var params = '?period=' + $("input#period").val(); + + if (location.search === params) { + location.reload(true); + } + else { + location.search = params; + } + } + }); }); $("button#stop").click(function() { + if (refresh_timeout !== null) + clearTimeout(refresh_timeout); + + $.post("{% url 'api_backend:local_scheduler-stop' %}"); + location.search = ""; }); diff --git a/beat/web/backend/templatetags/backend_tags.py b/beat/web/backend/templatetags/backend_tags.py index 8d05e57fb..37b725d8d 100755 --- a/beat/web/backend/templatetags/backend_tags.py +++ b/beat/web/backend/templatetags/backend_tags.py @@ -89,5 +89,5 @@ def visible_queues(context, object): def count_job_splits(xp, status=None): """Returns job splits for an experiment in a certain state""" if status == 'A': - return xp.job_splits(stJobSplit.QUEUEDUEUED).filter(worker__isnull=False).count() + return xp.job_splits(status=JobSplit.QUEUED).filter(worker__isnull=False).count() return xp.job_splits(status=status).count() diff --git a/beat/web/backend/tests/test_helpers.py b/beat/web/backend/tests/test_helpers.py index e1fb8985b..a872ccb92 100755 --- a/beat/web/backend/tests/test_helpers.py +++ b/beat/web/backend/tests/test_helpers.py @@ -343,6 +343,34 @@ class ScheduleExperimentTest(BaseBackendTestCase): self.assertEqual(JobSplit.objects.count(), 0) + def test_cancelled_experiment(self): + fullname = 'user/user/single/1/single' + + xp = Experiment.objects.get(name=fullname.split('/')[-1]) + + self.set_experiment_state( + xp, + block_status={ + 'echo': Block.CANCELLED, + 'analysis': Block.CANCELLED, + }, + ) + + schedule_experiment(xp) + xp.refresh_from_db() + + b0 = xp.blocks.all()[0] + b1 = xp.blocks.all()[1] + + self.assertEqual(b0.status, Block.PENDING) + self.assertEqual(b1.status, Block.PENDING) + self.assertEqual(xp.status, Experiment.SCHEDULED) + + # schedule_experiment() didn't do anything + self.assertEqual(Job.objects.count(), 2) + self.assertEqual(JobSplit.objects.count(), 0) + + def test_two_different_experiments(self): fullname1 = 'user/user/single/1/single' fullname2 = 'user/user/single/1/single_add' diff --git a/beat/web/backend/tests/test_scheduler.py b/beat/web/backend/tests/test_scheduler.py index 9652820be..0280d5ca3 100755 --- a/beat/web/backend/tests/test_scheduler.py +++ b/beat/web/backend/tests/test_scheduler.py @@ -52,7 +52,6 @@ from beat.core.scripts import worker #---------------------------------------------------------- -# class SchedulerThread(threading.Thread): class SchedulerThread(multiprocessing.Process): def __init__(self, queue, arguments): @@ -70,7 +69,6 @@ class SchedulerThread(multiprocessing.Process): #---------------------------------------------------------- -# class WorkerThread(threading.Thread): class WorkerThread(multiprocessing.Process): def __init__(self, queue, arguments): diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index d0cdc50a3..f4a3a9084 100755 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -53,84 +53,34 @@ from . import schedule #------------------------------------------------ -class Work: - '''Helper to do the required worker job for local scheduling''' - - process = None - environments = None - worker = None - - def __setup__(self): - - Work.process = utils.resolve_process_path() - logger.debug("(path) process: `%s'", Work.process) - Work.environments = utils.find_environments(None) - logger.debug("Environments: %s", ", ".join(Work.environments)) - - # load worker, check environments, activate it - w = Worker.objects.get(name=socket.gethostname()) \ - if Worker.objects.count() != 1 else Worker.objects.get() - missing, unused = w.check_environments(Work.environments) - if unused: - logger.info("The following environments where found on your " \ - "setup, but will not be used with the current queue " \ - "configuration: %s" % ", ".join(unused)) - if missing: - raise RuntimeError("The following environments are currently " \ - "missing from your setup: %s" % ", ".join(missing)) - else: - logger.info("All required software environments were found") - - w.activate() - w.save() - Work.worker = w - - def __call__(self): - - if Work.worker is None: self.__setup__() - - # Regular work - Work.worker.work(Work.environments, Work.process) - - -#------------------------------------------------ - - @login_required def scheduler(request): - if not(request.user.is_superuser): return HttpResponseForbidden() + if not(request.user.is_superuser): + return HttpResponseForbidden() # data for the cache plot cache = state.cache() cache_chart_data = [ - dict( - label= 'Used (%d%%)' % round(100*float(cache['size-in-megabytes'])/cache['capacity-in-megabytes']), - value= cache['size-in-megabytes'], - color= '#F7464A', - highlight= '#FF5A5E', - ), - dict( - label= 'Free (%d%%)' % round(100*float(cache['capacity-in-megabytes'] - cache['size-in-megabytes'])/cache['capacity-in-megabytes']), - value= cache['capacity-in-megabytes'] - cache['size-in-megabytes'], - color= '#46BFBD', - highlight= '#5AD3D1', - ), - ] + dict( + label= 'Used (%d%%)' % round(100*float(cache['size-in-megabytes'])/cache['capacity-in-megabytes']), + value= cache['size-in-megabytes'], + color= '#F7464A', + highlight= '#FF5A5E', + ), + dict( + label= 'Free (%d%%)' % round(100*float(cache['capacity-in-megabytes'] - cache['size-in-megabytes'])/cache['capacity-in-megabytes']), + value= cache['capacity-in-megabytes'] - cache['size-in-megabytes'], + color= '#46BFBD', + highlight= '#5AD3D1', + ), + ] cache_gb = int(cache['capacity-in-megabytes'] / 1024.0) - # do scheduling and/or worker activity if required - if settings.SCHEDULING_PANEL and request.GET.has_key('activity'): - activity = request.GET['activity'] - - if activity in ('both', 'schedule'): - splits = schedule.schedule() - if splits: - logger.info("Scheduler assigned %d splits", len(splits)) - - if activity in ('both', 'work'): - Work()() + refreshing_period = getattr(settings, 'SCHEDULING_INTERVAL', 5) + if request.GET.has_key('period'): + refreshing_period = int(request.GET['period']) return render_to_response('backend/scheduler.html', dict( @@ -141,11 +91,14 @@ def scheduler(request): cache_chart_data=simplejson.dumps(cache_chart_data), cache_gb=cache_gb, helper_panel=getattr(settings, 'SCHEDULING_PANEL', False), - scheduling_period=getattr(settings, 'SCHEDULING_INTERVAL', 5), - ), + refreshing_period=refreshing_period, + ), context_instance=RequestContext(request)) +#------------------------------------------------ + + def environment(request, name, version): """Displays a single database""" diff --git a/beat/web/experiments/models/experiment.py b/beat/web/experiments/models/experiment.py index b22bb5ba6..029e3fbaa 100755 --- a/beat/web/experiments/models/experiment.py +++ b/beat/web/experiments/models/experiment.py @@ -631,7 +631,7 @@ class Experiment(Shareable): def job_splits(self, status=None): - from ..backend.models import JobSplit + from ...backend.models import JobSplit retval = JobSplit.objects.filter(job__block__in=self.blocks.all()) if status is not None: retval = retval.filter(status=status) diff --git a/beat/web/experiments/models/result.py b/beat/web/experiments/models/result.py index f7b214743..f274ee50c 100755 --- a/beat/web/experiments/models/result.py +++ b/beat/web/experiments/models/result.py @@ -28,6 +28,8 @@ from django.db import models from django.conf import settings +import simplejson + #---------------------------------------------------------- diff --git a/beat/web/experiments/serializers.py b/beat/web/experiments/serializers.py index 2709c2142..8f2b3c950 100755 --- a/beat/web/experiments/serializers.py +++ b/beat/web/experiments/serializers.py @@ -236,6 +236,8 @@ class ExperimentResultsSerializer(ShareableSerializer): results[block.name] = 'failed' elif block.status == Block.CANCELLED: results[block.name] = 'cancelled' + elif obj.status == Experiment.CANCELLING: + results[block.name] = 'cancelling' else: results[block.name] = 'processing' return results diff --git a/beat/web/experiments/static/experiments/js/panels.js b/beat/web/experiments/static/experiments/js/panels.js index 6d94dd666..fa226f91a 100644 --- a/beat/web/experiments/static/experiments/js/panels.js +++ b/beat/web/experiments/static/experiments/js/panels.js @@ -3675,6 +3675,8 @@ beat.experiments.panels.ExecutionDetails.prototype._getIcon = function(status) { return "icon-failed fa fa-bug"; else if (status == 'cancelled') return "icon-failed fa fa-power-off"; + else if (status == 'cancelling') + return "icon-scheduled fa fa-power-off"; else return "icon-pending fa fa-asterisk"; } diff --git a/beat/web/scripts/scheduler.py b/beat/web/scripts/scheduler.py index b518685dc..99d3ef730 100755 --- a/beat/web/scripts/scheduler.py +++ b/beat/web/scripts/scheduler.py @@ -161,12 +161,25 @@ def main(user_input=None): from django import setup setup() + + # Setup the logging + formatter = logging.Formatter(fmt="[%(asctime)s - Scheduler - " + \ + "%(name)s] %(levelname)s: %(message)s", + datefmt="%d/%b/%Y %H:%M:%S") + + handler = logging.StreamHandler() + handler.setFormatter(formatter) + global logger - logger = logging.getLogger('beat.scheduler') + logger = logging.getLogger(__name__) + logger.addHandler(handler) + if arguments['--verbose'] == 1: logger.setLevel(logging.INFO) elif arguments['--verbose'] >= 2: logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.WARNING) # Installs SIGTERM handler @@ -242,10 +255,11 @@ def main(user_input=None): # Is the job done? if status == WorkerController.DONE: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is DONE", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name) on_split_done(split, simplejson.loads(data[0])) @@ -254,22 +268,29 @@ def main(user_input=None): # Has the job failed? elif status == WorkerController.JOB_ERROR: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned an error", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name) - splits_to_cancel.extend(on_split_fail(split, simplejson.loads(data[0]))) + try: + error = simplejson.loads(data[0]) + except: + error = data[0] + + splits_to_cancel.extend(on_split_fail(split, error)) remove_split_id_from(running_job_splits, split_id) # Was the job cancelled? elif status == WorkerController.CANCELLED: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name) on_split_cancelled(split) @@ -279,10 +300,11 @@ def main(user_input=None): elif status == WorkerController.ERROR: if split_id in running_job_splits: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name, data[0]) splits_to_cancel.extend(on_split_fail(split, data[0])) @@ -295,10 +317,11 @@ def main(user_input=None): for split_to_cancel in splits_to_cancel: if split_to_cancel.id in running_job_splits: logger.info("Cancelling job split #%d (%s %d/%d @ %s) on '%s'", - split_to_cancel.job.block.name, split_to_cancel.split_index, + split_to_cancel.id, + split_to_cancel.job.block.name, + split_to_cancel.split_index, split_to_cancel.job.splits.count(), split_to_cancel.job.block.experiment.fullname(), - split_to_cancel.id, split_to_cancel.worker.name) worker_controller.cancel(split_to_cancel.worker.name) @@ -319,8 +342,11 @@ def main(user_input=None): configuration = get_configuration_for_split(split) logger.info("Starting job split #%d (%s %d/%d @ %s) on '%s'", - split.job.block.name, split.split_index, split.job.splits.count(), - split.job.block.experiment.fullname(), split.id, + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), split.worker.name) worker_controller.execute(split.worker.name, split.id, configuration) diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py index 0bdfdf7d0..81c3f9a6b 100755 --- a/beat/web/settings/settings.py +++ b/beat/web/settings/settings.py @@ -73,7 +73,7 @@ LOGGING = { 'disable_existing_loggers': False, 'formatters': { 'simple': { - 'format': '[%(asctime)s] %(levelname)s: %(message)s', + 'format': '[%(asctime)s - %(name)s] %(levelname)s: %(message)s', 'datefmt': "%d/%b/%Y %H:%M:%S", }, }, @@ -100,9 +100,17 @@ LOGGING = { 'beat.core': { 'handlers': ['console'], }, + 'beat.core.scripts.worker': { + 'handlers': ['discard'], + 'propagate': False, + }, 'beat.web': { 'handlers': ['console', 'mail_admins'], }, + 'beat.web.scripts.scheduler': { + 'handlers': ['discard'], + 'propagate': False, + }, 'beat.web.attestations.management.commands': { 'handlers': ['console'], 'propagate': False, #don't e-mail those! -- GitLab