Skip to content
Snippets Groups Projects
Commit ea81206d authored by Flavio TARSETTI's avatar Flavio TARSETTI
Browse files

Merge branch 'remove_unused_scripts' into 'django3_migration'

Remove the scheduler script

See merge request !371
parents f1fa3f1f af8183b7
No related branches found
No related tags found
2 merge requests!371Remove the scheduler script,!342Django 3 migration
Pipeline #42712 passed
......@@ -25,18 +25,13 @@
# #
###############################################################################
from django.conf import settings
from rest_framework import permissions
from rest_framework import status
from rest_framework.decorators import api_view
from rest_framework.decorators import permission_classes
from rest_framework.response import Response
from ..code.models import Code
from . import local_scheduler
from .models import Environment
from .models import LocalSchedulerProcesses
from .models import Worker
# ----------------------------------------------------------
......@@ -85,58 +80,3 @@ def accessible_environments_list(request):
)
return Response(result)
# ----------------------------------------------------------
@api_view(["POST"])
@permission_classes([permissions.IsAdminUser])
def start_local_scheduler(request):
"""Starts the local scheduler"""
if not getattr(settings, "SCHEDULING_PANEL", False):
return Response(status=status.HTTP_403_FORBIDDEN)
# Clean start-up
LocalSchedulerProcesses.objects.all().delete()
address = getattr(settings, "LOCAL_SCHEDULER_ADDRESS", "127.0.0.1")
port = getattr(settings, "LOCAL_SCHEDULER_PORT", 50000)
use_docker = getattr(settings, "LOCAL_SCHEDULER_USE_DOCKER", False)
full_address = "tcp://%s:%d" % (address, port)
for worker in Worker.objects.all():
(pid, _) = local_scheduler.start_worker(
worker.name,
settings.PREFIX,
settings.CACHE_ROOT,
full_address,
use_docker=use_docker,
)
LocalSchedulerProcesses(name=worker.name, pid=pid).save()
(pid, _) = local_scheduler.start_scheduler(address=address, port=port)
LocalSchedulerProcesses(name="Scheduler", pid=pid).save()
return Response(status=status.HTTP_204_NO_CONTENT)
# ----------------------------------------------------------
@api_view(["POST"])
@permission_classes([permissions.IsAdminUser])
def stop_local_scheduler(request):
"""Starts the local scheduler"""
if not getattr(settings, "SCHEDULING_PANEL", False):
return Response(status=status.HTTP_403_FORBIDDEN)
for process in LocalSchedulerProcesses.objects.all():
local_scheduler.stop_process(process.pid)
process.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
......@@ -37,14 +37,4 @@ urlpatterns = [
api.accessible_environments_list,
name="backend-api-environments",
),
url(
r"^local_scheduler/start/$",
api.start_local_scheduler,
name="local_scheduler-start",
),
url(
r"^local_scheduler/stop/$",
api.stop_local_scheduler,
name="local_scheduler-stop",
),
]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import multiprocessing
import signal
import psutil
from django import db
from django.conf import settings
from beat.core.scripts import worker
from ..scripts import scheduler
# ----------------------------------------------------------
class SchedulerProcess(multiprocessing.Process):
def __init__(self, arguments, queue=None):
super(SchedulerProcess, self).__init__()
if queue is None:
self.queue = multiprocessing.Queue()
else:
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put("STARTED")
scheduler.main(self.arguments)
# ----------------------------------------------------------
class WorkerProcess(multiprocessing.Process):
def __init__(self, arguments, queue=None):
super(WorkerProcess, self).__init__()
if queue is None:
self.queue = multiprocessing.Queue()
else:
self.queue = queue
self.arguments = arguments
def run(self):
self.queue.put("STARTED")
worker.main(self.arguments)
# ----------------------------------------------------------
def start_scheduler(
settings_module="beat.web.settings.settings",
interval=5,
address="127.0.0.1",
port=50000,
):
args = [
"--settings=%s" % str(settings_module),
"--interval=%d" % int(interval),
"--address=%s" % str(address),
"--port=%d" % int(port),
]
if getattr(settings, "LOCAL_SCHEDULER_VERBOSITY", None) is not None:
args.append(settings.LOCAL_SCHEDULER_VERBOSITY)
db.connections.close_all()
process = SchedulerProcess(args)
process.start()
process.queue.get()
return (process.pid, process)
# ----------------------------------------------------------
def start_worker(name, prefix, cache, scheduler_address, use_docker=False):
args = [
"--prefix=%s" % str(prefix),
"--cache=%s" % str(cache),
"--name=%s" % str(name),
str(scheduler_address),
]
if use_docker:
args.insert(3, "--docker")
if getattr(settings, "LOCAL_SCHEDULER_VERBOSITY", None) is not None:
args.insert(3, settings.LOCAL_SCHEDULER_VERBOSITY)
process = WorkerProcess(args)
process.start()
process.queue.get()
return (process.pid, process)
# ----------------------------------------------------------
def stop_process(pid):
if not psutil.pid_exists(pid):
return
process = psutil.Process(pid)
process.send_signal(signal.SIGTERM)
gone, alive = psutil.wait_procs([process])
# Generated by Django 3.1.1 on 2020-09-11 17:28
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("backend", "0006_localschedulerprocesses"),
]
operations = [migrations.DeleteModel(name="LocalSchedulerProcesses",)]
......@@ -31,7 +31,6 @@ from .environment import EnvironmentLanguage
from .environment import EnvironmentManager
from .job import Job
from .job import JobSplit
from .local_scheduler import LocalSchedulerProcesses
from .queue import Queue
from .queue import QueueManager
from .result import Result
......@@ -46,7 +45,6 @@ __all__ = [
"EnvironmentManager",
"Job",
"JobSplit",
"LocalSchedulerProcesses",
"Queue",
"QueueManager",
"Result",
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from django.db import models
class LocalSchedulerProcesses(models.Model):
"""Information about the processes launched by the local scheduler"""
name = models.TextField()
pid = models.IntegerField()
def __str__(self):
return "%s (pid = %d)" % (self.name, self.pid)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import os
from time import sleep
from time import time
from unittest import skipIf
from django.conf import settings
from django.test import TransactionTestCase
from ...common.testutils import tearDownModule # noqa test runner will call it
from ...experiments.models import Block
from ...experiments.models import Experiment
from ..helpers import cancel_experiment
from ..helpers import schedule_experiment
from ..local_scheduler import start_scheduler
from ..local_scheduler import start_worker
from ..models import Worker
from ..utils import setup_backend
from .common import ONE_QUEUE_TWO_WORKERS
from .common import BackendUtilitiesMixin
# ----------------------------------------------------------
class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
def __init__(self, methodName="runTest"):
super(TestSchedulerBase, self).__init__(methodName)
self.scheduler_thread = None
self.worker_threads = {}
self.timeout = int(os.environ.get("DB_REFRESH_TIMEOUT", 10))
@classmethod
def setUpTestData(cls):
cls.setup_test_data()
def setUp(self):
self.shutdown_everything()
self.clean_cache()
TestSchedulerBase.setup_test_data()
setup_backend(ONE_QUEUE_TWO_WORKERS)
Worker.objects.update(active=False)
def tearDown(self):
self.shutdown_everything()
self.clean_cache()
def shutdown_everything(self):
for name in list(self.worker_threads.keys()):
self.stop_worker(name)
self.worker_threads = {}
self.stop_scheduler()
def start_scheduler(self):
(pid, self.scheduler_thread) = start_scheduler(
settings_module="beat.web.settings.test",
interval=1,
address="127.0.0.1",
port=50800,
)
def stop_scheduler(self):
if self.scheduler_thread is not None:
self.scheduler_thread.terminate()
self.scheduler_thread.join()
self.scheduler_thread = None
def start_worker(self, name):
(pid, worker_thread) = start_worker(
name, settings.PREFIX, settings.CACHE_ROOT, "tcp://127.0.0.1:50800"
)
self.worker_threads[name] = worker_thread
def stop_worker(self, name):
if name in self.worker_threads:
self.worker_threads[name].terminate()
self.worker_threads[name].join()
del self.worker_threads[name]
def check_worker_status(self, name, active):
start = time()
while Worker.objects.filter(name=name, active=active).count() == 0:
if time() - start > 10: # Fail after 10 seconds
self.shutdown_everything()
self.assertTrue(False)
# ----------------------------------------------------------
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestConnection(TestSchedulerBase):
def test_worker_connection(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
def test_worker_disconnection(self):
self.start_scheduler()
self.start_worker("node1")
self.check_worker_status("node1", True)
self.worker_threads["node1"].terminate()
self.worker_threads["node1"].join()
del self.worker_threads["node1"]
self.check_worker_status("node1", False)
def test_two_workers_connection(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.start_worker("node2")
self.check_worker_status("node1", True)
self.check_worker_status("node2", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 2)
def test_scheduler_last(self):
self.start_worker("node1")
sleep(1)
self.start_scheduler()
self.check_worker_status("node1", True)
def test_scheduler_shutdown(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
self.stop_scheduler()
Worker.objects.update(active=False)
sleep(1)
self.start_scheduler()
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
def test_worker_disconnection_during_scheduler_shutdown(self):
self.start_scheduler()
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
self.start_worker("node1")
self.check_worker_status("node1", True)
self.assertEqual(Worker.objects.filter(active=True).count(), 1)
self.stop_scheduler()
Worker.objects.update(active=False)
sleep(1)
self.worker_threads["node1"].terminate()
self.worker_threads["node1"].join()
del self.worker_threads["node1"]
sleep(1)
self.start_scheduler()
sleep(5)
self.check_worker_status("node1", False)
self.assertEqual(Worker.objects.filter(active=True).count(), 0)
# ----------------------------------------------------------
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestExecution(TestSchedulerBase):
def setUp(self):
super(TestExecution, self).setUp()
self.start_scheduler()
self.start_worker("node1")
self.start_worker("node2")
self.check_worker_status("node1", True)
self.check_worker_status("node2", True)
def test_successful_experiment(self):
fullname = "user/user/double/1/double"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.DONE:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
b1 = xp.blocks.get(name="echo1")
b2 = xp.blocks.get(name="echo2")
b3 = xp.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b3.status, Block.DONE)
def test_successful_splitted_experiment(self):
fullname = "user/user/double/1/double_split_2"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.DONE:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
b1 = xp.blocks.get(name="echo1")
b2 = xp.blocks.get(name="echo2")
b3 = xp.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b3.status, Block.DONE)
def test_two_similar_experiments(self):
fullname1 = "user/user/double/1/double"
fullname2 = "user/user/double/1/double_split_2"
xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
xp1.refresh_from_db()
xp2.refresh_from_db()
start = time()
while (xp1.status != Experiment.DONE) or (xp2.status != Experiment.DONE):
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
xp2.refresh_from_db()
self.assertEqual(xp1.start_date, xp2.start_date)
self.assertEqual(xp1.end_date, xp2.end_date)
b1 = xp1.blocks.get(name="echo1")
b2 = xp2.blocks.get(name="echo1")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b1.start_date, b2.start_date)
self.assertEqual(b1.end_date, b2.end_date)
b1 = xp1.blocks.get(name="echo2")
b2 = xp2.blocks.get(name="echo2")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b1.start_date, b2.start_date)
self.assertEqual(b1.end_date, b2.end_date)
b1 = xp1.blocks.get(name="analysis")
b2 = xp2.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.DONE)
self.assertEqual(b1.start_date, b2.start_date)
self.assertEqual(b1.end_date, b2.end_date)
def test_two_different_experiments(self):
fullname1 = "user/user/single/1/single"
fullname2 = "user/user/single/1/single_add"
xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
xp1.refresh_from_db()
xp2.refresh_from_db()
start = time()
while (xp1.status != Experiment.DONE) or (xp2.status != Experiment.DONE):
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
xp2.refresh_from_db()
self.assertNotEqual(xp1.start_date, xp2.start_date)
self.assertNotEqual(xp1.end_date, xp2.end_date)
def test_two_delayed_similar_experiments(self):
fullname1 = "user/user/double/1/double"
fullname2 = "user/user/double/1/double_split_2"
xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
xp1.refresh_from_db()
start = time()
while xp1.status != Experiment.RUNNING:
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
schedule_experiment(xp2)
xp2.refresh_from_db()
start = time()
while (xp1.status != Experiment.DONE) or (xp2.status != Experiment.DONE):
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
xp2.refresh_from_db()
def test_failed_experiment(self):
fullname = "user/user/single/1/single_error"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.FAILED:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
b1 = xp.blocks.get(name="echo")
b2 = xp.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.FAILED)
self.assertEqual(b2.status, Block.CANCELLED)
def test_failed_splitted_experiment(self):
fullname = "user/user/double/1/double_error_split_2"
xp = Experiment.objects.get(name=fullname.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.FAILED:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
b1 = xp.blocks.get(name="echo1")
b2 = xp.blocks.get(name="echo2")
b3 = xp.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.FAILED)
self.assertEqual(b3.status, Block.CANCELLED)
def test_failed_mirror_experiments(self):
fullname1 = "user/user/double/1/double_error"
fullname2 = "user/user/double/1/double_error_split_2"
xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
xp1.refresh_from_db()
xp2.refresh_from_db()
start = time()
while (xp1.status != Experiment.FAILED) or (xp2.status != Experiment.FAILED):
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
xp2.refresh_from_db()
b1 = xp1.blocks.get(name="echo1")
b2 = xp1.blocks.get(name="echo2")
b3 = xp1.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.FAILED)
self.assertEqual(b3.status, Block.CANCELLED)
b1 = xp2.blocks.get(name="echo1")
b2 = xp2.blocks.get(name="echo2")
b3 = xp2.blocks.get(name="analysis")
self.assertEqual(b1.status, Block.DONE)
self.assertEqual(b2.status, Block.FAILED)
self.assertEqual(b3.status, Block.CANCELLED)
# ----------------------------------------------------------
@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
class TestCancellation(TestSchedulerBase):
def setUp(self):
super(TestCancellation, self).setUp()
self.start_scheduler()
self.start_worker("node1")
self.start_worker("node2")
self.check_worker_status("node1", True)
self.check_worker_status("node2", True)
def process(self, experiment_name, block_name=None):
xp = Experiment.objects.get(name=experiment_name.split("/")[-1])
self.prepare_databases(xp.declaration)
schedule_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.RUNNING:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
if block_name is not None:
block = xp.blocks.get(name=block_name)
start = time()
while block.status != Block.PROCESSING:
self.assertTrue(time() - start < self.timeout)
block.refresh_from_db()
cancel_experiment(xp)
xp.refresh_from_db()
start = time()
while xp.status != Experiment.PENDING:
self.assertTrue(time() - start < self.timeout)
xp.refresh_from_db()
self.assertTrue(xp.blocks.filter(status=Block.CANCELLED).count() > 0)
self.assertEqual(xp.blocks.filter(status=Block.PENDING).count(), 0)
self.assertEqual(xp.blocks.filter(status=Block.PROCESSING).count(), 0)
def process2(self, experiment_name1, experiment_name2, cancel_index=0):
xp1 = Experiment.objects.get(name=experiment_name1.split("/")[-1])
xp2 = Experiment.objects.get(name=experiment_name2.split("/")[-1])
self.prepare_databases(xp1.declaration)
self.prepare_databases(xp2.declaration)
schedule_experiment(xp1)
schedule_experiment(xp2)
xp1.refresh_from_db()
xp2.refresh_from_db()
start = time()
while (xp1.status != Experiment.RUNNING) or (xp2.status != Experiment.RUNNING):
self.assertTrue(time() - start < self.timeout)
xp1.refresh_from_db()
xp2.refresh_from_db()
if cancel_index == 0:
xp_to_cancel = xp1
xp_to_finish = xp2
else:
xp_to_cancel = xp2
xp_to_finish = xp1
cancel_experiment(xp_to_cancel)
xp_to_cancel.refresh_from_db()
xp_to_finish.refresh_from_db()
start = time()
while xp_to_cancel.status != Experiment.PENDING:
self.assertTrue(time() - start < self.timeout)
xp_to_cancel.refresh_from_db()
xp_to_finish.refresh_from_db()
start = time()
while xp_to_finish.status != Experiment.DONE:
self.assertTrue(time() - start < self.timeout)
xp_to_finish.refresh_from_db()
self.assertTrue(xp_to_cancel.blocks.filter(status=Block.CANCELLED).count() > 0)
self.assertEqual(xp_to_cancel.blocks.filter(status=Block.PENDING).count(), 0)
self.assertEqual(xp_to_cancel.blocks.filter(status=Block.PROCESSING).count(), 0)
self.assertEqual(xp_to_finish.blocks.filter(status=Block.CANCELLED).count(), 0)
self.assertEqual(xp_to_finish.blocks.filter(status=Block.PENDING).count(), 0)
self.assertEqual(xp_to_finish.blocks.filter(status=Block.PROCESSING).count(), 0)
def test_one_split_running(self):
self.process("user/user/single/1/single_sleep_4")
def test_one_split_of_two_in_a_block_running(self):
self.stop_worker("node2")
self.process("user/user/double/1/double_sleep_split_2", block_name="echo2")
def test_two_splits_of_same_block_running(self):
self.process("user/user/double/1/double_sleep_split_2", block_name="echo2")
def test_two_splits_of_different_blocks_running(self):
self.process("user/user/triangle/1/triangle_sleep_4")
def test_mirror_block(self):
self.stop_worker("node2")
self.process2(
"user/user/single/1/single_sleep_4",
"user/user/single/1/single_sleep_5",
cancel_index=1,
)
def test_mirrored_block(self):
self.stop_worker("node2")
self.process2(
"user/user/single/1/single_sleep_4",
"user/user/single/1/single_sleep_5",
cancel_index=0,
)
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""\
Starts the scheduling process.
Usage:
%(prog)s [-v ... | --verbose ...] [--settings=<file>] [--interval=<seconds>]
[--address=<address>] [--port=<port>]
%(prog)s (-h | --help)
%(prog)s (-V | --version)
Options:
-h, --help Show this help message
-V, --version Show program's version number
-v, --verbose Increases the output verbosity level
-S <file>, --settings=<file> The module name of the Django settings
file [default: beat.web.settings.settings]
-i <seconds>, --interval=<seconds> The time, in seconds, in which this
scheduler will try to allocate job splits
to existing workers. If not set, use the
value available on the Django settings
file, at the variable `SCHEDULING_INTERVAL`.
-a <address>, --address=<address> The address to which the processing nodes
must establish a connection to
-p <port>, --port=<port> The port to which the processing nodes
must establish a connection to
Examples:
To start the scheduling process do the following:
$ %(prog)s
You can pass the ``-v`` flag to start the scheduler with the logging level
set to ``INFO`` or ``-vv`` to set it to ``DEBUG``. By default, the logging
level is set to ``WARNING`` if no ``-v`` flag is passed.
"""
import logging
import os
import signal
import sys
import docopt
import simplejson
from beat.core.worker import WorkerController
from ..version import __version__
logger = None
# ----------------------------------------------------------
def onWorkerReady(name):
from ..backend.models import Worker
logger.info("Worker '%s' is ready", name)
try:
worker = Worker.objects.get(name=name)
worker.active = True
worker.info = "Connected to the scheduler"
worker.save()
except Exception:
import traceback
print(traceback.format_exc())
logger.error("No worker named '%s' found in the database", name)
# ----------------------------------------------------------
def onWorkerGone(name):
from ..backend.models import Worker
logger.info("Worker '%s' is gone", name)
try:
worker = Worker.objects.get(name=name)
worker.active = False
worker.info = "Disconnected from the scheduler"
worker.save()
except Exception:
logger.error("No worker named '%s' found in the database", name)
# ----------------------------------------------------------
def remove_split_id_from(list, split_id):
try:
list.remove(list.index(split_id))
except ValueError:
pass
# ----------------------------------------------------------
stop = False
def main(user_input=None):
# Parse the command-line arguments
if user_input is not None:
arguments = user_input
else:
arguments = sys.argv[1:]
arguments = docopt.docopt(
__doc__ % dict(prog=os.path.basename(sys.argv[0]),),
argv=arguments,
version="v%s" % __version__,
)
# Initialisation of the application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", arguments["--settings"])
from django import setup
from django.conf import settings
setup()
# Importations of beat.web modules must be done after the call to django.setup()
from ..backend.helpers import assign_splits_to_workers
from ..backend.helpers import get_configuration_for_split
from ..backend.helpers import on_split_cancelled
from ..backend.helpers import on_split_done
from ..backend.helpers import on_split_fail
from ..backend.helpers import on_split_started
from ..backend.helpers import process_newly_cancelled_experiments
from ..backend.helpers import split_new_jobs
from ..backend.models import JobSplit
from ..backend.models import Worker
# Setup the logging
formatter = logging.Formatter(
fmt="[%(asctime)s - Scheduler - " + "%(name)s] %(levelname)s: %(message)s",
datefmt="%d/%b/%Y %H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger("beat.web")
root_logger.handlers = []
root_logger.addHandler(handler)
if arguments["--verbose"] == 1:
root_logger.setLevel(logging.INFO)
elif arguments["--verbose"] >= 2:
root_logger.setLevel(logging.DEBUG)
else:
root_logger.setLevel(logging.WARNING)
global logger
logger = logging.getLogger(__name__)
logger.handlers = []
# Installs SIGTERM handler
def handler(signum, frame):
# Ignore further signals
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
logger.info("Signal %d caught, terminating...", signum)
global stop
stop = True
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
# Reset the status of all the workers in the database
for worker in Worker.objects.filter(active=True):
worker.active = False
worker.info = "Did not connect to the scheduler yet"
worker.save()
# Initialisation of the worker controller
# TODO: Default values
worker_controller = WorkerController(
arguments["--address"],
int(arguments["--port"]),
callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone,),
)
# Processing loop
interval = (
int(arguments["--interval"])
if arguments["--interval"]
else settings.SCHEDULING_INTERVAL
)
logger.info("Scheduling every %d seconds", interval)
running_job_splits = []
cancelling_jobs = []
global stop
while not stop:
logger.debug("Starting scheduler cycle...")
# Process all the incoming messages
splits_to_cancel = []
while True:
# Wait for a message
message = worker_controller.process(interval * 1000)
if message is None:
break
(address, status, split_id, data) = message
# Was there an error?
if status == WorkerController.ERROR:
if split_id is None:
if data != "Worker isn't busy":
logger.error("Worker '%s' sent: %s", address, data)
continue
split_id = int(split_id)
# Retrieve the job split
try:
split = JobSplit.objects.get(id=split_id)
except JobSplit.DoesNotExist:
logger.error(
"Received message '%s' for unknown job split #%d", status, split_id
)
continue
# Is the job done?
if status == WorkerController.DONE:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is DONE",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_done(split, simplejson.loads(data[0]))
remove_split_id_from(running_job_splits, split_id)
# Has the job failed?
elif status == WorkerController.JOB_ERROR:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' returned an error",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
try:
error = simplejson.loads(data[0])
except Exception:
error = data[0]
splits_to_cancel.extend(on_split_fail(split, error))
remove_split_id_from(running_job_splits, split_id)
# Was the job cancelled?
elif status == WorkerController.CANCELLED:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_cancelled(split)
remove_split_id_from(cancelling_jobs, split_id)
# Was there an error?
elif status == WorkerController.ERROR:
if split_id in running_job_splits:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
data[0],
)
splits_to_cancel.extend(on_split_fail(split, data[0]))
remove_split_id_from(running_job_splits, split_id)
# Effectively cancel newly-cancelled experiments
splits_to_cancel.extend(process_newly_cancelled_experiments())
# Cancel the necessary jobs (if any)
for split_to_cancel in splits_to_cancel:
if split_to_cancel.id in running_job_splits:
logger.info(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
split_to_cancel.id,
split_to_cancel.job.block.name,
split_to_cancel.split_index,
split_to_cancel.job.splits.count(),
split_to_cancel.job.block.experiment.fullname(),
split_to_cancel.worker.name,
)
worker_controller.cancel(
split_to_cancel.worker.name, split_to_cancel.id
)
remove_split_id_from(running_job_splits, split_to_cancel.id)
cancelling_jobs.append(split_to_cancel.id)
# If we must stop, don't start new jobs
if stop:
break
# Start new jobs
split_new_jobs()
assigned_splits = assign_splits_to_workers()
for split in assigned_splits:
running_job_splits.append(split.id)
configuration = get_configuration_for_split(split)
logger.info(
"Starting job split #%d (%s %d/%d @ %s) on '%s'",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
worker_controller.execute(split.worker.name, split.id, configuration)
on_split_started(split)
# Cleanup
logger.info("Gracefully exiting the scheduler")
worker_controller.destroy()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment