Commit 823da9ab authored by Philip ABBET's avatar Philip ABBET
Browse files

[backend] Refactoring of the local scheduler mechanism

parent 48d6b63b
......@@ -213,9 +213,6 @@ class Job(admin.ModelAdmin):
def splits(self, obj):
return obj.splits.count()
def has_delete_permission(self, request, obj=None):
return False
def has_add_permission(self, request):
return False
......
......@@ -28,9 +28,19 @@
from rest_framework.decorators import api_view, permission_classes
from rest_framework.response import Response
from rest_framework import permissions
from rest_framework import status
from django.conf import settings
from .models import Environment
from .models import Worker
from .models import LocalSchedulerProcesses
from ..code.models import Code
from . import local_scheduler
#----------------------------------------------------------
@api_view(['GET'])
......@@ -73,3 +83,47 @@ def accessible_environments_list(request):
})
return Response(result)
#----------------------------------------------------------
@api_view(['POST'])
@permission_classes([permissions.IsAdminUser])
def start_local_scheduler(request):
"""Starts the local scheduler"""
if not getattr(settings, 'SCHEDULING_PANEL', False):
return Response(status=status.HTTP_403_FORBIDDEN)
# Clean start-up
LocalSchedulerProcesses.objects.all().delete()
for worker in Worker.objects.all():
(pid, _) = local_scheduler.start_worker(worker.name, settings.PREFIX,
settings.CACHE_ROOT,
'tcp://127.0.0.1:50000')
LocalSchedulerProcesses(name=worker.name, pid=pid).save()
(pid, _) = local_scheduler.start_scheduler(address='127.0.0.1', port=50000)
LocalSchedulerProcesses(name='Scheduler', pid=pid).save()
return Response(status=status.HTTP_204_NO_CONTENT)
#----------------------------------------------------------
@api_view(['POST'])
@permission_classes([permissions.IsAdminUser])
def stop_local_scheduler(request):
"""Starts the local scheduler"""
if not getattr(settings, 'SCHEDULING_PANEL', False):
return Response(status=status.HTTP_403_FORBIDDEN)
for process in LocalSchedulerProcesses.objects.all():
local_scheduler.stop_process(process.pid)
process.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
......@@ -34,6 +34,18 @@ urlpatterns = [
r'^environments/$',
api.accessible_environments_list,
name='backend-api-environments',
),
),
url(
r'^local_scheduler/start/$',
api.start_local_scheduler,
name='local_scheduler-start',
),
url(
r'^local_scheduler/stop/$',
api.stop_local_scheduler,
name='local_scheduler-stop',
),
]
......@@ -3,7 +3,7 @@
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
......@@ -40,6 +40,7 @@ from datetime import datetime
from ..experiments.models import Experiment
from ..experiments.models import Block
from ..experiments.models import CachedFile
from ..experiments.models import Result as CacheResult
from .models import Queue
from .models import Job
from .models import JobSplit
......@@ -47,6 +48,10 @@ from .models import Worker
from .models import Result
import beat.core.hash
import beat.core.data
import beat.core.hash
import beat.core.algorithm
from beat.core.utils import NumpyJSONEncoder
def schedule_experiment(experiment):
......@@ -102,8 +107,13 @@ def schedule_experiment(experiment):
block.end_date = block.creation_date
block.save()
if block.analyzer:
for cached_file in block.outputs.all():
load_results_from_cache(block, cached_file)
else:
Job.objects.create_job(block)
block.status = Block.PENDING
block.creation_date = datetime.now()
block.save()
already_done = False
......@@ -728,7 +738,6 @@ def update_job(job):
# Delete the job
job.delete()
# Did the job succeed?
elif job.splits.exclude(status=JobSplit.COMPLETED).count() == 0:
......@@ -739,6 +748,9 @@ def update_job(job):
for cached_file in job.block.outputs.all():
cached_file.update(Block.DONE)
if job.block.analyzer:
load_results_from_cache(job.block, cached_file)
# Update the block
job.block.status = Block.DONE
job.block.end_date = job.end_date
......@@ -908,3 +920,40 @@ def update_experiment(experiment):
experiment.status = Experiment.PENDING
experiment.end_date = experiment.blocks.order_by('-end_date')[0].end_date
experiment.save()
#----------------------------------------------------------
def load_results_from_cache(block, cached_file):
if not block.analyzer:
return
if cached_file.results.count() > 0:
return
data_source = beat.core.data.CachedDataSource()
data_source.setup(os.path.join(settings.CACHE_ROOT,
beat.core.hash.toPath(cached_file.hash)),
settings.PREFIX)
output_data = data_source.next()[0]
if output_data is not None:
algorithm = beat.core.algorithm.Algorithm(
settings.PREFIX, block.algorithm.fullname())
for field, value in output_data.as_dict().items():
res, _ = CacheResult.objects.get_or_create(
name=field, cache=cached_file)
res.primary = algorithm.results[field]['display']
res.type = algorithm.results[field]["type"]
if res.type in ['int32', 'float32', 'bool', 'string']:
res.data_value = str(value)
else:
res.data_value = simplejson.dumps(
value, indent=4, cls=NumpyJSONEncoder)
res.save()
data_source.close()
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import multiprocessing
import psutil
import signal
from ..scripts import scheduler
from beat.core.scripts import worker
#----------------------------------------------------------
class SchedulerProcess(multiprocessing.Process):
def __init__(self, arguments, queue=None):
super(SchedulerProcess, self).__init__()
if queue is None:
self.queue = multiprocessing.Queue()
else:
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put('STARTED')
scheduler.main(self.arguments)
#----------------------------------------------------------
class WorkerProcess(multiprocessing.Process):
def __init__(self, arguments, queue=None):
super(WorkerProcess, self).__init__()
if queue is None:
self.queue = multiprocessing.Queue()
else:
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put('STARTED')
worker.main(self.arguments)
#----------------------------------------------------------
def start_scheduler(settings='beat.web.settings.settings', interval=5,
address='127.0.0.1', port=50000):
args = [
'--settings=%s' % str(settings),
'--interval=%d' % int(interval),
'--address=%s' % str(address),
'--port=%d' % int(port),
'--verbose',
]
process = SchedulerProcess(args)
process.start()
process.queue.get()
return (process.pid, process)
#----------------------------------------------------------
def start_worker(name, prefix, cache, scheduler_address):
args = [
'--prefix=%s' % str(prefix),
'--cache=%s' % str(cache),
'--name=%s' % str(name),
'--verbose',
str(scheduler_address)
]
process = WorkerProcess(args)
process.start()
process.queue.get()
return (process.pid, process)
#----------------------------------------------------------
def stop_process(pid):
if not psutil.pid_exists(pid):
return
process = psutil.Process(pid)
process.send_signal(signal.SIGTERM)
gone, alive = psutil.wait_procs([process])
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2017-10-12 15:12
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('backend', '0005_job_modifications'),
]
operations = [
migrations.CreateModel(
name='LocalSchedulerProcesses',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.TextField()),
('pid', models.IntegerField()),
],
),
]
......@@ -31,6 +31,7 @@ from .environment import Environment
from .environment import EnvironmentLanguage
from .job import Job
from .job import JobSplit
from .local_scheduler import LocalSchedulerProcesses
from .queue import QueueManager
from .queue import Queue
from .result import Result
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
class LocalSchedulerProcesses(models.Model):
'''Information about the processes launched by the local scheduler'''
name = models.TextField()
pid = models.IntegerField()
def __str__(self):
return '%s (pid = %d)' % (self.name, self.pid)
......@@ -29,7 +29,8 @@
{% block stylesheets %}
{{ block.super }}
<script src="{% fingerprint "Chart.js/Chart.min.js" %} type="text/javascript" charset="utf-8"></script>
{% csrf_token %}
<script src="{% fingerprint "Chart.js/Chart.min.js" %}" type="text/javascript" charset="utf-8"></script>
{% endblock %}
......@@ -55,21 +56,13 @@
<h3 class="panel-title">Helper Panel</h3>
</div>
<div class="panel-body">
<p class="help">Use this panel to <strong>locally</strong> launch scheduling activity. This functionality is intended as a <em>test</em> scheduler and worker replacement that can be used to run local experiments or debug. <strong>Don't use this in a production system.</strong> Every time you launch an activity, the page will reload to trigger this action. Scheduling happens in the context of the Django server running on the background. Worker processes are managed using subprocesses and don't block the web server.</p>
<p class="help">Use this panel to <strong>locally</strong> launch scheduling activity. This functionality is intended as a <em>test</em> scheduler and worker replacement that can be used to run local experiments or debug. <strong>Don't use this in a production system.</strong> The Scheduler and Worker processes are managed using subprocesses and don't block the web server.</p>
<div class="form-inline">
<div id="activity-group" class="form-group">
<label class="sr-only" for="activity">Activity</label>
<select id="activity" class="form-control">
<option value="both">Schedule &amp; Work</option>
<option value="schedule">Schedule</option>
<option value="work">Work</option>
</select>
</div>
<div id="periodically-group" class="form-group">
<div class="checkbox">
<label>
<input id="periodically" type="checkbox" checked="checked"> periodically
<input id="periodically" type="checkbox" checked="checked"> Refresh the page periodically
</label>
</div>
</div>
......@@ -77,7 +70,7 @@
<label class="sr-only" for="period">Period</label>
<div class="input-group">
<div class="input-group-addon">every</div>
<input type="text" class="form-control" id="period" value="{{ scheduling_period }}">
<input type="text" class="form-control" id="period" value="{{ refreshing_period }}">
<div class="input-group-addon">s</div>
</div>
</div>
......@@ -142,14 +135,14 @@
<thead>
<tr>
<th>Name</th>
<th>Blocks/Jobs</th>
<th>Blocks</th>
<th>Job Splits</th>
<th>Assigned</th>
<th>Running</th>
<th>Completed</th>
<th>Failed</th>
<th>Cancelled</th>
<th>Skipped</th>
<th>Cancelling</th>
</tr>
</thead>
<tbody>
......@@ -163,7 +156,7 @@
<td>{{ obj|count_job_splits:"C" }}</td>
<td>{{ obj|count_job_splits:"F" }}</td>
<td>{{ obj|count_job_splits:"L" }}</td>
<td>{{ obj|count_job_splits:"S" }}</td>
<td>{{ obj|count_job_splits:"K" }}</td>
</tr>
{% for split in obj.job_splits %}
<tr class="job-split">
......@@ -263,11 +256,17 @@
</div>{# col #}
</div>{# row #}
{% csrf_token %}
<script type="text/javascript">
$(document).ready(function() {
$.ajaxSetup({
beforeSend: function(xhr, settings) {
var csrftoken = $.cookie('csrftoken');
xhr.setRequestHeader('X-CSRFToken', csrftoken);
}
});
/**
* This bit of code here is to manage the normal parts of the scheduler
* page, like the tags and the cache chart which is displayed.
......@@ -301,34 +300,6 @@ $(document).ready(function() {
* shows up.
*/
function display_periodic(period) {
$("input#periodically").prop("checked", true);
$("#period-group").show();
$("button#start > span#start").text("Start");
$("button#stop > span#stop").text("Stop");
$("button#stop").show();
$("input#period").val(period);
$("button#stop").disable();
}
function display_single_shot() {
$("input#periodically").prop("checked", false);
$("#period-group").hide();
$("button#start > span#start").text("Go");
$("button#stop > span#stop").text("Reset");
$("button#start").enable();
$("button#stop").enable();
}
/* controls button display */
$("input#periodically").click(function() {
if($(this).is(":checked")) {
display_periodic({{ scheduling_period }});
} else {
display_single_shot();
}
});
/* get url parameters */
function get_parameters() {
var vars = {};
......@@ -341,47 +312,48 @@ $(document).ready(function() {
/* controls the initial appearance */
var params = get_parameters();
var refresh_timeout = null;
if (location.search !== "") {
if ("period" in params) {
var period = parseInt(params.period, 10);
display_periodic(period);
$("button#start").disable();
$("button#stop").enable();
$("select#activity").disable();
$("input#period").disable();
$("input#periodically").disable();
setTimeout("location.reload(true);", period * 1000);
}
else {
display_single_shot();
}
if ("activity" in params) {
$("select#activity").val(params.activity);
}
var period = {{ refreshing_period }};
if ("period" in params)
period = parseInt(params.period, 10);
$("button#start").disable();
$("button#stop").enable();
$("input#period").disable();
$("input#periodically").disable();
refresh_timeout = setTimeout("location.reload(true);", period * 1000);
} else {
display_periodic({{ scheduling_period }});
$("button#start").enable();
$("button#stop").disable();
$("select#activity").enable();
$("input#period").enable();
$("input#periodically").enable();
}
/* controls form submission buttons */
$("button#start").click(function() {
var params = '?activity=' + $("select#activity").val();
if ($("input#periodically").is(":checked")) {
params += '&period=' + $("input#period").val();
}
if (location.search === params) {
location.reload(true);
}
else {
location.search = params;
}
var d = $.post("{% url 'api_backend:local_scheduler-start' %}");
d.done(function(data) {
if ($("input#periodically").is(":checked")) {
var params = '?period=' + $("input#period").val();
if (location.search === params) {
location.reload(true);
}
else {
location.search = params;
}
}
});
});
$("button#stop").click(function() {
if (refresh_timeout !== null)
clearTimeout(refresh_timeout);
$.post("{% url 'api_backend:local_scheduler-stop' %}");
location.search = "";
});
......
......@@ -89,5 +89,5 @@ def visible_queues(context, object):
def count_job_splits(xp, status=None):
"""Returns job splits for an experiment in a certain state"""
if status == 'A':
return xp.job_splits(stJobSplit.QUEUEDUEUED).filter(worker__isnull=False).count()
return xp.job_splits(status=JobSplit.QUEUED).filter(worker__isnull=False).count()
return xp.job_splits(status=status).count()