Commit 8c2840f6 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[backend] Remove old scheduler related code

The scheduling can be started from the
command line using the corresponding management
commands which allows to have a scheduler working
the same way as in production rendering the local
scheduler useless.
parent f1fa3f1f
......@@ -25,18 +25,13 @@
# #
###############################################################################
from django.conf import settings
from rest_framework import permissions
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.decorators import permission_classes
from rest_framework.response import Response
from ..code.models import Code
from . import local_scheduler
from .models import Environment
from .models import LocalSchedulerProcesses
from .models import Worker
# ----------------------------------------------------------
......@@ -85,58 +80,3 @@ def accessible_environments_list(request):
)
return Response(result)
# ----------------------------------------------------------
@api_view(["POST"])
@permission_classes([permissions.IsAdminUser])
def start_local_scheduler(request):
"""Starts the local scheduler"""
if not getattr(settings, "SCHEDULING_PANEL", False):
return Response(status=status.HTTP_403_FORBIDDEN)
# Clean start-up
LocalSchedulerProcesses.objects.all().delete()
address = getattr(settings, "LOCAL_SCHEDULER_ADDRESS", "127.0.0.1")
port = getattr(settings, "LOCAL_SCHEDULER_PORT", 50000)
use_docker = getattr(settings, "LOCAL_SCHEDULER_USE_DOCKER", False)
full_address = "tcp://%s:%d" % (address, port)
for worker in Worker.objects.all():
(pid, _) = local_scheduler.start_worker(
worker.name,
settings.PREFIX,
settings.CACHE_ROOT,
full_address,
use_docker=use_docker,
)
LocalSchedulerProcesses(name=worker.name, pid=pid).save()
(pid, _) = local_scheduler.start_scheduler(address=address, port=port)
LocalSchedulerProcesses(name="Scheduler", pid=pid).save()
return Response(status=status.HTTP_204_NO_CONTENT)
# ----------------------------------------------------------
@api_view(["POST"])
@permission_classes([permissions.IsAdminUser])
def stop_local_scheduler(request):
"""Starts the local scheduler"""
if not getattr(settings, "SCHEDULING_PANEL", False):
return Response(status=status.HTTP_403_FORBIDDEN)
for process in LocalSchedulerProcesses.objects.all():
local_scheduler.stop_process(process.pid)
process.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
......@@ -37,14 +37,4 @@ urlpatterns = [
api.accessible_environments_list,
name="backend-api-environments",
),
url(
r"^local_scheduler/start/$",
api.start_local_scheduler,
name="local_scheduler-start",
),
url(
r"^local_scheduler/stop/$",
api.stop_local_scheduler,
name="local_scheduler-stop",
),
]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import multiprocessing
import signal
import psutil
from django import db
from django.conf import settings
from beat.core.scripts import worker
from ..scripts import scheduler
# ----------------------------------------------------------
class SchedulerProcess(multiprocessing.Process):
def __init__(self, arguments, queue=None):
super(SchedulerProcess, self).__init__()
if queue is None:
self.queue = multiprocessing.Queue()
else:
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put("STARTED")
scheduler.main(self.arguments)
# ----------------------------------------------------------
class WorkerProcess(multiprocessing.Process):
def __init__(self, arguments, queue=None):
super(WorkerProcess, self).__init__()
if queue is None:
self.queue = multiprocessing.Queue()
else:
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put("STARTED")
worker.main(self.arguments)
# ----------------------------------------------------------
def start_scheduler(
settings_module="beat.web.settings.settings",
interval=5,
address="127.0.0.1",
port=50000,
):
args = [
"--settings=%s" % str(settings_module),
"--interval=%d" % int(interval),
"--address=%s" % str(address),
"--port=%d" % int(port),
]
if getattr(settings, "LOCAL_SCHEDULER_VERBOSITY", None) is not None:
args.append(settings.LOCAL_SCHEDULER_VERBOSITY)
db.connections.close_all()
process = SchedulerProcess(args)
process.start()
process.queue.get()
return (process.pid, process)
# ----------------------------------------------------------
def start_worker(name, prefix, cache, scheduler_address, use_docker=False):
args = [
"--prefix=%s" % str(prefix),
"--cache=%s" % str(cache),
"--name=%s" % str(name),
str(scheduler_address),
]
if use_docker:
args.insert(3, "--docker")
if getattr(settings, "LOCAL_SCHEDULER_VERBOSITY", None) is not None:
args.insert(3, settings.LOCAL_SCHEDULER_VERBOSITY)
process = WorkerProcess(args)
process.start()
process.queue.get()
return (process.pid, process)
# ----------------------------------------------------------
def stop_process(pid):
if not psutil.pid_exists(pid):
return
process = psutil.Process(pid)
process.send_signal(signal.SIGTERM)
gone, alive = psutil.wait_procs([process])
# Generated by Django 3.1.1 on 2020-09-11 17:28
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("backend", "0006_localschedulerprocesses"),
]
operations = [migrations.DeleteModel(name="LocalSchedulerProcesses",)]
......@@ -31,7 +31,6 @@ from .environment import EnvironmentLanguage
from .environment import EnvironmentManager
from .job import Job
from .job import JobSplit
from .local_scheduler import LocalSchedulerProcesses
from .queue import Queue
from .queue import QueueManager
from .result import Result
......@@ -46,7 +45,6 @@ __all__ = [
"EnvironmentManager",
"Job",
"JobSplit",
"LocalSchedulerProcesses",
"Queue",
"QueueManager",
"Result",
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
class LocalSchedulerProcesses(models.Model):
"""Information about the processes launched by the local scheduler"""
name = models.TextField()
pid = models.IntegerField()
def __str__(self):
return "%s (pid = %d)" % (self.name, self.pid)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import os
from time import sleep
from time import time
from unittest import skipIf
from django.conf import settings
from django.test import TransactionTestCase
from ...common.testutils import tearDownModule # noqa test runner will call it
from ...experiments.models import Block
from ...experiments.models import Experiment
from ..helpers import cancel_experiment
from ..helpers import schedule_experiment
from ..local_scheduler import start_scheduler
from ..local_scheduler import start_worker
from ..models import Worker
from ..utils import setup_backend
from .common import ONE_QUEUE_TWO_WORKERS
from .common import BackendUtilitiesMixin
# ----------------------------------------------------------
class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def __init__(self, methodName="runTest"):
super(TestSchedulerBase, self).__init__(methodName)
self.scheduler_thread = None
self.worker_threads = {}
self.timeout = int(os.environ.get("DB_REFRESH_TIMEOUT", 10))
@classmethod
def setUpTestData(cls):
cls.setup_test_data()
def setUp(self):
self.shutdown_everything()
self.clean_cache()
TestSchedulerBase.setup_test_data()
setup_backend(ONE_QUEUE_TWO_WORKERS)
Worker.objects.update(active=False)
def tearDown(self):
self.shutdown_everything()
self.clean_cache()
def shutdown_everything(self):
for name in list(self.worker_threads.keys()):
self.stop_worker(name)
self.worker_threads = {}
self.stop_scheduler()
def start_scheduler(self):
(pid, self.scheduler_thread) = start_scheduler(
settings_module="beat.web.settings.test",
interval=1,
address="127.0.0.1",
port=50800,
)
def stop_scheduler(self):
if self.scheduler_thread is not None:
self.scheduler_thread.terminate()
self.scheduler_thread.join()
self.scheduler_thread = None
def start_worker(self, name):
(pid, worker_thread) = start_worker(
name, settings.PREFIX, settings.CACHE_ROOT, "tcp://127.0.0.1:50800"
)
self.worker_threads[name] = worker_thread
def stop_worker(self, name):
if name in self.worker_threads:
self.worker_threads[name].terminate()
self.worker_threads[name].join()
del self.worker_threads[name]
def check_worker_status(self, name, active):
start = time()
while Worker.objects.filter(name=name, active=active).count() == 0:
if time() - start > 10: # Fail after 10 seconds
self.shutdown_everything()
self.assertTrue(False)
# ----------------------------------------------------------
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestConnection(TestSchedulerBase):
def test_worker_connection(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
def test_worker_disconnection(self):
self.start_scheduler()
self.start_worker("node1")
self.check_worker_status("node1", True)
self.worker_threads["node1"].terminate()
self.worker_threads["node1"].join()
del self.worker_threads["node1"]
self.check_worker_status("node1", False)
def test_two_workers_connection(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.start_worker("node2")
self.check_worker_status("node1", True)
self.check_worker_status("node2", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 2)
def test_scheduler_last(self):
self.start_worker("node1")
sleep(1)
self.start_scheduler()
self.check_worker_status("node1", True)
def test_scheduler_shutdown(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
self.stop_scheduler()
Worker.objects.update(active=False)
sleep(1)
self.start_scheduler()
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
def test_worker_disconnection_during_scheduler_shutdown(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
self.stop_scheduler()
Worker.objects.update(active=False)
sleep(1)
self.worker_threads["node1"].terminate()
self.worker_threads["node1"].join()
del self.worker_threads["node1"]
sleep(1)
self.start_scheduler()
sleep(5)
self.check_worker_status("node1", False)
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
# ----------------------------------------------------------
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestExecution(TestSchedulerBase):
def setUp(self):
super(TestExecution, self).setUp()
self.start_scheduler()
self.start_worker("node1")
self.start_worker("node2")
self.check_worker_status("node1", True)
self.check_worker_status("node2", True)
def test_successful_experiment(self):
fullname = "user/user/double/1/double"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.DONE:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
b1 = xp.blocks.get(name="echo1")
b2 = xp.blocks.get(name="echo2")
b3 = xp.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b3.status, Block.DONE)
def test_successful_splitted_experiment(self):
fullname = "user/user/double/1/double_split_2"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.DONE:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
b1 = xp.blocks.get(name="echo1")
b2 = xp.blocks.get(name="echo2")
b3 = xp.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b3.status, Block.DONE)
def test_two_similar_experiments(self):
fullname1 = "user/user/double/1/double"
fullname2 = "user/user/double/1/double_split_2"
xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
xp1.refresh_from_db()
xp2.refresh_from_db()
start = time()
while (xp1.status != Experiment.DONE) or (xp2.status != Experiment.DONE):
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
xp2.refresh_from_db()