diff --git a/beat/web/backend/admin.py b/beat/web/backend/admin.py index 87d912ed05e2fdb116e42d40c7dbfe75c0552698..ea6b8001b0e5cbadb983d71e99063039bb07a883 100755 --- a/beat/web/backend/admin.py +++ b/beat/web/backend/admin.py @@ -213,9 +213,6 @@ class Job(admin.ModelAdmin): def splits(self, obj): return obj.splits.count() - def has_delete_permission(self, request, obj=None): - return False - def has_add_permission(self, request): return False diff --git a/beat/web/backend/api.py b/beat/web/backend/api.py index f0757c3b6973d8c3c4e22e8e1d50845e98f377bc..fb14a01138fe8ebfc2f4aeeaa1603456aa0891c7 100755 --- a/beat/web/backend/api.py +++ b/beat/web/backend/api.py @@ -28,9 +28,19 @@ from rest_framework.decorators import api_view, permission_classes from rest_framework.response import Response from rest_framework import permissions +from rest_framework import status + +from django.conf import settings from .models import Environment +from .models import Worker +from .models import LocalSchedulerProcesses + from ..code.models import Code +from . import local_scheduler + + +#---------------------------------------------------------- @api_view(['GET']) @@ -73,3 +83,47 @@ def accessible_environments_list(request): }) return Response(result) + + +#---------------------------------------------------------- + + +@api_view(['POST']) +@permission_classes([permissions.IsAdminUser]) +def start_local_scheduler(request): + """Starts the local scheduler""" + + if not getattr(settings, 'SCHEDULING_PANEL', False): + return Response(status=status.HTTP_403_FORBIDDEN) + + # Clean start-up + LocalSchedulerProcesses.objects.all().delete() + + for worker in Worker.objects.all(): + (pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX, + settings.CACHE_ROOT, + 'tcp://127.0.0.1:50000') + LocalSchedulerProcesses(name=worker.name, pid=pid).save() + + (pid, _) = local_scheduler.start_scheduler(address='127.0.0.1', port=50000) + LocalSchedulerProcesses(name='Scheduler', pid=pid).save() + + return Response(status=status.HTTP_204_NO_CONTENT) + + +#---------------------------------------------------------- + + +@api_view(['POST']) +@permission_classes([permissions.IsAdminUser]) +def stop_local_scheduler(request): + """Starts the local scheduler""" + + if not getattr(settings, 'SCHEDULING_PANEL', False): + return Response(status=status.HTTP_403_FORBIDDEN) + + for process in LocalSchedulerProcesses.objects.all(): + local_scheduler.stop_process(process.pid) + process.delete() + + return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/beat/web/backend/api_urls.py b/beat/web/backend/api_urls.py old mode 100644 new mode 100755 index e5faf3d8cf06d313978ed12103d6347416d398a5..cbf2f3724b7003200960eb8343534e3af9eb0f14 --- a/beat/web/backend/api_urls.py +++ b/beat/web/backend/api_urls.py @@ -34,6 +34,18 @@ urlpatterns = [ r'^environments/$', api.accessible_environments_list, name='backend-api-environments', - ), + ), + + url( + r'^local_scheduler/start/$', + api.start_local_scheduler, + name='local_scheduler-start', + ), + + url( + r'^local_scheduler/stop/$', + api.stop_local_scheduler, + name='local_scheduler-stop', + ), ] diff --git a/beat/web/backend/helpers.py b/beat/web/backend/helpers.py index 0e6ecdb2d50bc21c85a178b422819585268eb9b4..573c2192c398501564b29fb8deadb1e6ab610023 100755 --- a/beat/web/backend/helpers.py +++ b/beat/web/backend/helpers.py @@ -3,7 +3,7 @@ ############################################################################### # # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.web module of the BEAT platform. # @@ -40,6 +40,7 @@ from datetime import datetime from ..experiments.models import Experiment from ..experiments.models import Block from ..experiments.models import CachedFile +from ..experiments.models import Result as CacheResult from .models import Queue from .models import Job from .models import JobSplit @@ -47,6 +48,10 @@ from .models import Worker from .models import Result import beat.core.hash +import beat.core.data +import beat.core.hash +import beat.core.algorithm +from beat.core.utils import NumpyJSONEncoder def schedule_experiment(experiment): @@ -102,8 +107,13 @@ def schedule_experiment(experiment): block.end_date = block.creation_date block.save() + if block.analyzer: + for cached_file in block.outputs.all(): + load_results_from_cache(block, cached_file) + else: Job.objects.create_job(block) + block.status = Block.PENDING block.creation_date = datetime.now() block.save() already_done = False @@ -728,7 +738,6 @@ def update_job(job): # Delete the job job.delete() - # Did the job succeed? elif job.splits.exclude(status=JobSplit.COMPLETED).count() == 0: @@ -739,6 +748,9 @@ def update_job(job): for cached_file in job.block.outputs.all(): cached_file.update(Block.DONE) + if job.block.analyzer: + load_results_from_cache(job.block, cached_file) + # Update the block job.block.status = Block.DONE job.block.end_date = job.end_date @@ -908,3 +920,40 @@ def update_experiment(experiment): experiment.status = Experiment.PENDING experiment.end_date = experiment.blocks.order_by('-end_date')[0].end_date experiment.save() + + +#---------------------------------------------------------- + + +def load_results_from_cache(block, cached_file): + if not block.analyzer: + return + + if cached_file.results.count() > 0: + return + + data_source = beat.core.data.CachedDataSource() + data_source.setup(os.path.join(settings.CACHE_ROOT, + beat.core.hash.toPath(cached_file.hash)), + settings.PREFIX) + + output_data = data_source.next()[0] + if output_data is not None: + algorithm = beat.core.algorithm.Algorithm( + settings.PREFIX, block.algorithm.fullname()) + + for field, value in output_data.as_dict().items(): + res, _ = CacheResult.objects.get_or_create( + name=field, cache=cached_file) + res.primary = algorithm.results[field]['display'] + res.type = algorithm.results[field]["type"] + + if res.type in ['int32', 'float32', 'bool', 'string']: + res.data_value = str(value) + else: + res.data_value = simplejson.dumps( + value, indent=4, cls=NumpyJSONEncoder) + + res.save() + + data_source.close() diff --git a/beat/web/backend/local_scheduler.py b/beat/web/backend/local_scheduler.py new file mode 100755 index 0000000000000000000000000000000000000000..ba688dba4422f94564f023c3a5d6948f2c5604e2 --- /dev/null +++ b/beat/web/backend/local_scheduler.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +import multiprocessing +import psutil +import signal + +from ..scripts import scheduler + +from beat.core.scripts import worker + + +#---------------------------------------------------------- + + +class SchedulerProcess(multiprocessing.Process): + + def __init__(self, arguments, queue=None): + super(SchedulerProcess, self).__init__() + + if queue is None: + self.queue = multiprocessing.Queue() + else: + self.queue = queue + + self.arguments = arguments + + + def run(self): + self.queue.put('STARTED') + scheduler.main(self.arguments) + + +#---------------------------------------------------------- + + +class WorkerProcess(multiprocessing.Process): + + def __init__(self, arguments, queue=None): + super(WorkerProcess, self).__init__() + + if queue is None: + self.queue = multiprocessing.Queue() + else: + self.queue = queue + + self.arguments = arguments + + + def run(self): + self.queue.put('STARTED') + worker.main(self.arguments) + + +#---------------------------------------------------------- + + +def start_scheduler(settings='beat.web.settings.settings', interval=5, + address='127.0.0.1', port=50000): + args = [ + '--settings=%s' % str(settings), + '--interval=%d' % int(interval), + '--address=%s' % str(address), + '--port=%d' % int(port), + '--verbose', + ] + + process = SchedulerProcess(args) + process.start() + process.queue.get() + + return (process.pid, process) + + +#---------------------------------------------------------- + + +def start_worker(name, prefix, cache, scheduler_address): + args = [ + '--prefix=%s' % str(prefix), + '--cache=%s' % str(cache), + '--name=%s' % str(name), + '--verbose', + str(scheduler_address) + ] + + process = WorkerProcess(args) + process.start() + process.queue.get() + + return (process.pid, process) + + +#---------------------------------------------------------- + + +def stop_process(pid): + if not psutil.pid_exists(pid): + return + + process = psutil.Process(pid) + + process.send_signal(signal.SIGTERM) + + gone, alive = psutil.wait_procs([process]) diff --git a/beat/web/backend/migrations/0006_localschedulerprocesses.py b/beat/web/backend/migrations/0006_localschedulerprocesses.py new file mode 100644 index 0000000000000000000000000000000000000000..5b0c888891c365486068c4fdb73fac0a433bdc3f --- /dev/null +++ b/beat/web/backend/migrations/0006_localschedulerprocesses.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9.13 on 2017-10-12 15:12 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('backend', '0005_job_modifications'), + ] + + operations = [ + migrations.CreateModel( + name='LocalSchedulerProcesses', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('name', models.TextField()), + ('pid', models.IntegerField()), + ], + ), + ] diff --git a/beat/web/backend/models/__init__.py b/beat/web/backend/models/__init__.py index 9672ab91a845cb24d8a8b0bfd4c580ca47d4a574..6b2e0d1af4ea836c49177047c56e4ce142d8f35e 100755 --- a/beat/web/backend/models/__init__.py +++ b/beat/web/backend/models/__init__.py @@ -31,6 +31,7 @@ from .environment import Environment from .environment import EnvironmentLanguage from .job import Job from .job import JobSplit +from .local_scheduler import LocalSchedulerProcesses from .queue import QueueManager from .queue import Queue from .result import Result diff --git a/beat/web/backend/models/local_scheduler.py b/beat/web/backend/models/local_scheduler.py new file mode 100755 index 0000000000000000000000000000000000000000..8dad9a2c9fe8db282935c6f7d37f078244aba81d --- /dev/null +++ b/beat/web/backend/models/local_scheduler.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.web module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from django.db import models + + +class LocalSchedulerProcesses(models.Model): + '''Information about the processes launched by the local scheduler''' + + name = models.TextField() + pid = models.IntegerField() + + + def __str__(self): + return '%s (pid = %d)' % (self.name, self.pid) diff --git a/beat/web/backend/templates/backend/scheduler.html b/beat/web/backend/templates/backend/scheduler.html index 1dbbb34ff02b38d170771efefa5e3cf84e52a940..6f604b8abd4c2fffddc6e33bd9c2eb657bff7b09 100644 --- a/beat/web/backend/templates/backend/scheduler.html +++ b/beat/web/backend/templates/backend/scheduler.html @@ -29,7 +29,8 @@ {% block stylesheets %} {{ block.super }} -<script src="{% fingerprint "Chart.js/Chart.min.js" %} type="text/javascript" charset="utf-8"></script> +{% csrf_token %} +<script src="{% fingerprint "Chart.js/Chart.min.js" %}" type="text/javascript" charset="utf-8"></script> {% endblock %} @@ -55,21 +56,13 @@ <h3 class="panel-title">Helper Panel</h3> </div> <div class="panel-body"> - <p class="help">Use this panel to <strong>locally</strong> launch scheduling activity. This functionality is intended as a <em>test</em> scheduler and worker replacement that can be used to run local experiments or debug. <strong>Don't use this in a production system.</strong> Every time you launch an activity, the page will reload to trigger this action. Scheduling happens in the context of the Django server running on the background. Worker processes are managed using subprocesses and don't block the web server.</p> + <p class="help">Use this panel to <strong>locally</strong> launch scheduling activity. This functionality is intended as a <em>test</em> scheduler and worker replacement that can be used to run local experiments or debug. <strong>Don't use this in a production system.</strong> The Scheduler and Worker processes are managed using subprocesses and don't block the web server.</p> <div class="form-inline"> - <div id="activity-group" class="form-group"> - <label class="sr-only" for="activity">Activity</label> - <select id="activity" class="form-control"> - <option value="both">Schedule & Work</option> - <option value="schedule">Schedule</option> - <option value="work">Work</option> - </select> - </div> <div id="periodically-group" class="form-group"> <div class="checkbox"> <label> - <input id="periodically" type="checkbox" checked="checked"> periodically + <input id="periodically" type="checkbox" checked="checked"> Refresh the page periodically </label> </div> </div> @@ -77,7 +70,7 @@ <label class="sr-only" for="period">Period</label> <div class="input-group"> <div class="input-group-addon">every</div> - <input type="text" class="form-control" id="period" value="{{ scheduling_period }}"> + <input type="text" class="form-control" id="period" value="{{ refreshing_period }}"> <div class="input-group-addon">s</div> </div> </div> @@ -142,14 +135,14 @@ <thead> <tr> <th>Name</th> - <th>Blocks/Jobs</th> + <th>Blocks</th> <th>Job Splits</th> <th>Assigned</th> <th>Running</th> <th>Completed</th> <th>Failed</th> <th>Cancelled</th> - <th>Skipped</th> + <th>Cancelling</th> </tr> </thead> <tbody> @@ -163,7 +156,7 @@ <td>{{ obj|count_job_splits:"C" }}</td> <td>{{ obj|count_job_splits:"F" }}</td> <td>{{ obj|count_job_splits:"L" }}</td> - <td>{{ obj|count_job_splits:"S" }}</td> + <td>{{ obj|count_job_splits:"K" }}</td> </tr> {% for split in obj.job_splits %} <tr class="job-split"> @@ -263,11 +256,17 @@ </div>{# col #} </div>{# row #} -{% csrf_token %} <script type="text/javascript"> $(document).ready(function() { + $.ajaxSetup({ + beforeSend: function(xhr, settings) { + var csrftoken = $.cookie('csrftoken'); + xhr.setRequestHeader('X-CSRFToken', csrftoken); + } + }); + /** * This bit of code here is to manage the normal parts of the scheduler * page, like the tags and the cache chart which is displayed. @@ -301,34 +300,6 @@ $(document).ready(function() { * shows up. */ - function display_periodic(period) { - $("input#periodically").prop("checked", true); - $("#period-group").show(); - $("button#start > span#start").text("Start"); - $("button#stop > span#stop").text("Stop"); - $("button#stop").show(); - $("input#period").val(period); - $("button#stop").disable(); - } - - function display_single_shot() { - $("input#periodically").prop("checked", false); - $("#period-group").hide(); - $("button#start > span#start").text("Go"); - $("button#stop > span#stop").text("Reset"); - $("button#start").enable(); - $("button#stop").enable(); - } - - /* controls button display */ - $("input#periodically").click(function() { - if($(this).is(":checked")) { - display_periodic({{ scheduling_period }}); - } else { - display_single_shot(); - } - }); - /* get url parameters */ function get_parameters() { var vars = {}; @@ -341,47 +312,48 @@ $(document).ready(function() { /* controls the initial appearance */ var params = get_parameters(); + var refresh_timeout = null; + if (location.search !== "") { - if ("period" in params) { - var period = parseInt(params.period, 10); - display_periodic(period); - $("button#start").disable(); - $("button#stop").enable(); - $("select#activity").disable(); - $("input#period").disable(); - $("input#periodically").disable(); - setTimeout("location.reload(true);", period * 1000); - } - else { - display_single_shot(); - } - if ("activity" in params) { - $("select#activity").val(params.activity); - } + var period = {{ refreshing_period }}; + if ("period" in params) + period = parseInt(params.period, 10); + $("button#start").disable(); + $("button#stop").enable(); + $("input#period").disable(); + $("input#periodically").disable(); + refresh_timeout = setTimeout("location.reload(true);", period * 1000); } else { - display_periodic({{ scheduling_period }}); $("button#start").enable(); $("button#stop").disable(); - $("select#activity").enable(); $("input#period").enable(); $("input#periodically").enable(); } /* controls form submission buttons */ $("button#start").click(function() { - var params = '?activity=' + $("select#activity").val(); - if ($("input#periodically").is(":checked")) { - params += '&period=' + $("input#period").val(); - } - if (location.search === params) { - location.reload(true); - } - else { - location.search = params; - } + var d = $.post("{% url 'api_backend:local_scheduler-start' %}"); + + d.done(function(data) { + if ($("input#periodically").is(":checked")) { + var params = '?period=' + $("input#period").val(); + + if (location.search === params) { + location.reload(true); + } + else { + location.search = params; + } + } + }); }); $("button#stop").click(function() { + if (refresh_timeout !== null) + clearTimeout(refresh_timeout); + + $.post("{% url 'api_backend:local_scheduler-stop' %}"); + location.search = ""; }); diff --git a/beat/web/backend/templatetags/backend_tags.py b/beat/web/backend/templatetags/backend_tags.py index 8d05e57fb538394c8728860caf05ad3c0e62fd36..37b725d8d7b91f6e665e48436ae61df1784b20f7 100755 --- a/beat/web/backend/templatetags/backend_tags.py +++ b/beat/web/backend/templatetags/backend_tags.py @@ -89,5 +89,5 @@ def visible_queues(context, object): def count_job_splits(xp, status=None): """Returns job splits for an experiment in a certain state""" if status == 'A': - return xp.job_splits(stJobSplit.QUEUEDUEUED).filter(worker__isnull=False).count() + return xp.job_splits(status=JobSplit.QUEUED).filter(worker__isnull=False).count() return xp.job_splits(status=status).count() diff --git a/beat/web/backend/tests/test_helpers.py b/beat/web/backend/tests/test_helpers.py index e1fb8985bc992bccdf7d4fd4c93f312e1415b17c..a872ccb92c463d9392900cbc502f0582c24457dc 100755 --- a/beat/web/backend/tests/test_helpers.py +++ b/beat/web/backend/tests/test_helpers.py @@ -343,6 +343,34 @@ class ScheduleExperimentTest(BaseBackendTestCase): self.assertEqual(JobSplit.objects.count(), 0) + def test_cancelled_experiment(self): + fullname = 'user/user/single/1/single' + + xp = Experiment.objects.get(name=fullname.split('/')[-1]) + + self.set_experiment_state( + xp, + block_status={ + 'echo': Block.CANCELLED, + 'analysis': Block.CANCELLED, + }, + ) + + schedule_experiment(xp) + xp.refresh_from_db() + + b0 = xp.blocks.all()[0] + b1 = xp.blocks.all()[1] + + self.assertEqual(b0.status, Block.PENDING) + self.assertEqual(b1.status, Block.PENDING) + self.assertEqual(xp.status, Experiment.SCHEDULED) + + # schedule_experiment() didn't do anything + self.assertEqual(Job.objects.count(), 2) + self.assertEqual(JobSplit.objects.count(), 0) + + def test_two_different_experiments(self): fullname1 = 'user/user/single/1/single' fullname2 = 'user/user/single/1/single_add' diff --git a/beat/web/backend/tests/test_scheduler.py b/beat/web/backend/tests/test_scheduler.py index 9652820be855d0397bd3bb73785670a962484aa5..0280d5ca30107dd69ba3594301508f755741896b 100755 --- a/beat/web/backend/tests/test_scheduler.py +++ b/beat/web/backend/tests/test_scheduler.py @@ -52,7 +52,6 @@ from beat.core.scripts import worker #---------------------------------------------------------- -# class SchedulerThread(threading.Thread): class SchedulerThread(multiprocessing.Process): def __init__(self, queue, arguments): @@ -70,7 +69,6 @@ class SchedulerThread(multiprocessing.Process): #---------------------------------------------------------- -# class WorkerThread(threading.Thread): class WorkerThread(multiprocessing.Process): def __init__(self, queue, arguments): diff --git a/beat/web/backend/views.py b/beat/web/backend/views.py index d0cdc50a34a087baa4a675ae57d32ddb4da7136a..f4a3a90846b6526dfb9534b264e86d82544085e0 100755 --- a/beat/web/backend/views.py +++ b/beat/web/backend/views.py @@ -53,84 +53,34 @@ from . import schedule #------------------------------------------------ -class Work: - '''Helper to do the required worker job for local scheduling''' - - process = None - environments = None - worker = None - - def __setup__(self): - - Work.process = utils.resolve_process_path() - logger.debug("(path) process: `%s'", Work.process) - Work.environments = utils.find_environments(None) - logger.debug("Environments: %s", ", ".join(Work.environments)) - - # load worker, check environments, activate it - w = Worker.objects.get(name=socket.gethostname()) \ - if Worker.objects.count() != 1 else Worker.objects.get() - missing, unused = w.check_environments(Work.environments) - if unused: - logger.info("The following environments where found on your " \ - "setup, but will not be used with the current queue " \ - "configuration: %s" % ", ".join(unused)) - if missing: - raise RuntimeError("The following environments are currently " \ - "missing from your setup: %s" % ", ".join(missing)) - else: - logger.info("All required software environments were found") - - w.activate() - w.save() - Work.worker = w - - def __call__(self): - - if Work.worker is None: self.__setup__() - - # Regular work - Work.worker.work(Work.environments, Work.process) - - -#------------------------------------------------ - - @login_required def scheduler(request): - if not(request.user.is_superuser): return HttpResponseForbidden() + if not(request.user.is_superuser): + return HttpResponseForbidden() # data for the cache plot cache = state.cache() cache_chart_data = [ - dict( - label= 'Used (%d%%)' % round(100*float(cache['size-in-megabytes'])/cache['capacity-in-megabytes']), - value= cache['size-in-megabytes'], - color= '#F7464A', - highlight= '#FF5A5E', - ), - dict( - label= 'Free (%d%%)' % round(100*float(cache['capacity-in-megabytes'] - cache['size-in-megabytes'])/cache['capacity-in-megabytes']), - value= cache['capacity-in-megabytes'] - cache['size-in-megabytes'], - color= '#46BFBD', - highlight= '#5AD3D1', - ), - ] + dict( + label= 'Used (%d%%)' % round(100*float(cache['size-in-megabytes'])/cache['capacity-in-megabytes']), + value= cache['size-in-megabytes'], + color= '#F7464A', + highlight= '#FF5A5E', + ), + dict( + label= 'Free (%d%%)' % round(100*float(cache['capacity-in-megabytes'] - cache['size-in-megabytes'])/cache['capacity-in-megabytes']), + value= cache['capacity-in-megabytes'] - cache['size-in-megabytes'], + color= '#46BFBD', + highlight= '#5AD3D1', + ), + ] cache_gb = int(cache['capacity-in-megabytes'] / 1024.0) - # do scheduling and/or worker activity if required - if settings.SCHEDULING_PANEL and request.GET.has_key('activity'): - activity = request.GET['activity'] - - if activity in ('both', 'schedule'): - splits = schedule.schedule() - if splits: - logger.info("Scheduler assigned %d splits", len(splits)) - - if activity in ('both', 'work'): - Work()() + refreshing_period = getattr(settings, 'SCHEDULING_INTERVAL', 5) + if request.GET.has_key('period'): + refreshing_period = int(request.GET['period']) return render_to_response('backend/scheduler.html', dict( @@ -141,11 +91,14 @@ def scheduler(request): cache_chart_data=simplejson.dumps(cache_chart_data), cache_gb=cache_gb, helper_panel=getattr(settings, 'SCHEDULING_PANEL', False), - scheduling_period=getattr(settings, 'SCHEDULING_INTERVAL', 5), - ), + refreshing_period=refreshing_period, + ), context_instance=RequestContext(request)) +#------------------------------------------------ + + def environment(request, name, version): """Displays a single database""" diff --git a/beat/web/experiments/models/experiment.py b/beat/web/experiments/models/experiment.py index b22bb5ba624d702f24c6cdf75af467eb04e7ef59..029e3fbaa57015956fb897fcb0cb3c944e2a7107 100755 --- a/beat/web/experiments/models/experiment.py +++ b/beat/web/experiments/models/experiment.py @@ -631,7 +631,7 @@ class Experiment(Shareable): def job_splits(self, status=None): - from ..backend.models import JobSplit + from ...backend.models import JobSplit retval = JobSplit.objects.filter(job__block__in=self.blocks.all()) if status is not None: retval = retval.filter(status=status) diff --git a/beat/web/experiments/models/result.py b/beat/web/experiments/models/result.py index f7b21474323cf8baec5e223e411ce0506e717018..f274ee50cfb32eedb9be56b5875b2525c2e19e79 100755 --- a/beat/web/experiments/models/result.py +++ b/beat/web/experiments/models/result.py @@ -28,6 +28,8 @@ from django.db import models from django.conf import settings +import simplejson + #---------------------------------------------------------- diff --git a/beat/web/experiments/serializers.py b/beat/web/experiments/serializers.py index 2709c2142bab8fa68a6f45286c314912c3f6fe07..8f2b3c95021e2937fa29e956e9d2edfc3664ec92 100755 --- a/beat/web/experiments/serializers.py +++ b/beat/web/experiments/serializers.py @@ -236,6 +236,8 @@ class ExperimentResultsSerializer(ShareableSerializer): results[block.name] = 'failed' elif block.status == Block.CANCELLED: results[block.name] = 'cancelled' + elif obj.status == Experiment.CANCELLING: + results[block.name] = 'cancelling' else: results[block.name] = 'processing' return results diff --git a/beat/web/experiments/static/experiments/js/panels.js b/beat/web/experiments/static/experiments/js/panels.js index 6d94dd6665c43feacc906cdb171d46509b55106c..fa226f91abd20535337258ed2ff29413a145436b 100644 --- a/beat/web/experiments/static/experiments/js/panels.js +++ b/beat/web/experiments/static/experiments/js/panels.js @@ -3675,6 +3675,8 @@ beat.experiments.panels.ExecutionDetails.prototype._getIcon = function(status) { return "icon-failed fa fa-bug"; else if (status == 'cancelled') return "icon-failed fa fa-power-off"; + else if (status == 'cancelling') + return "icon-scheduled fa fa-power-off"; else return "icon-pending fa fa-asterisk"; } diff --git a/beat/web/scripts/scheduler.py b/beat/web/scripts/scheduler.py index b518685dcd066a65ae78792bdcddef0f2901456d..99d3ef7303d5a3e5b73b726f71c1b0855d8f3899 100755 --- a/beat/web/scripts/scheduler.py +++ b/beat/web/scripts/scheduler.py @@ -161,12 +161,25 @@ def main(user_input=None): from django import setup setup() + + # Setup the logging + formatter = logging.Formatter(fmt="[%(asctime)s - Scheduler - " + \ + "%(name)s] %(levelname)s: %(message)s", + datefmt="%d/%b/%Y %H:%M:%S") + + handler = logging.StreamHandler() + handler.setFormatter(formatter) + global logger - logger = logging.getLogger('beat.scheduler') + logger = logging.getLogger(__name__) + logger.addHandler(handler) + if arguments['--verbose'] == 1: logger.setLevel(logging.INFO) elif arguments['--verbose'] >= 2: logger.setLevel(logging.DEBUG) + else: + logger.setLevel(logging.WARNING) # Installs SIGTERM handler @@ -242,10 +255,11 @@ def main(user_input=None): # Is the job done? if status == WorkerController.DONE: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is DONE", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name) on_split_done(split, simplejson.loads(data[0])) @@ -254,22 +268,29 @@ def main(user_input=None): # Has the job failed? elif status == WorkerController.JOB_ERROR: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned an error", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name) - splits_to_cancel.extend(on_split_fail(split, simplejson.loads(data[0]))) + try: + error = simplejson.loads(data[0]) + except: + error = data[0] + + splits_to_cancel.extend(on_split_fail(split, error)) remove_split_id_from(running_job_splits, split_id) # Was the job cancelled? elif status == WorkerController.CANCELLED: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name) on_split_cancelled(split) @@ -279,10 +300,11 @@ def main(user_input=None): elif status == WorkerController.ERROR: if split_id in running_job_splits: logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s", - split.job.block.name, split.split_index, + split.id, + split.job.block.name, + split.split_index, split.job.splits.count(), split.job.block.experiment.fullname(), - split.id, split.worker.name, data[0]) splits_to_cancel.extend(on_split_fail(split, data[0])) @@ -295,10 +317,11 @@ def main(user_input=None): for split_to_cancel in splits_to_cancel: if split_to_cancel.id in running_job_splits: logger.info("Cancelling job split #%d (%s %d/%d @ %s) on '%s'", - split_to_cancel.job.block.name, split_to_cancel.split_index, + split_to_cancel.id, + split_to_cancel.job.block.name, + split_to_cancel.split_index, split_to_cancel.job.splits.count(), split_to_cancel.job.block.experiment.fullname(), - split_to_cancel.id, split_to_cancel.worker.name) worker_controller.cancel(split_to_cancel.worker.name) @@ -319,8 +342,11 @@ def main(user_input=None): configuration = get_configuration_for_split(split) logger.info("Starting job split #%d (%s %d/%d @ %s) on '%s'", - split.job.block.name, split.split_index, split.job.splits.count(), - split.job.block.experiment.fullname(), split.id, + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), split.worker.name) worker_controller.execute(split.worker.name, split.id, configuration) diff --git a/beat/web/settings/settings.py b/beat/web/settings/settings.py index 0bdfdf7d0f5606f1c471245b7ae23f70192b00df..81c3f9a6b76671bfd1af12b1694b4170e0a4994c 100755 --- a/beat/web/settings/settings.py +++ b/beat/web/settings/settings.py @@ -73,7 +73,7 @@ LOGGING = { 'disable_existing_loggers': False, 'formatters': { 'simple': { - 'format': '[%(asctime)s] %(levelname)s: %(message)s', + 'format': '[%(asctime)s - %(name)s] %(levelname)s: %(message)s', 'datefmt': "%d/%b/%Y %H:%M:%S", }, }, @@ -100,9 +100,17 @@ LOGGING = { 'beat.core': { 'handlers': ['console'], }, + 'beat.core.scripts.worker': { + 'handlers': ['discard'], + 'propagate': False, + }, 'beat.web': { 'handlers': ['console', 'mail_admins'], }, + 'beat.web.scripts.scheduler': { + 'handlers': ['discard'], + 'propagate': False, + }, 'beat.web.attestations.management.commands': { 'handlers': ['console'], 'propagate': False, #don't e-mail those!