Commit e271f0d3 authored by Samuel GAIST's avatar Samuel GAIST

[scripts] Remove the scheduler script

It's not used anymore.

Part of #567
parent 880d942b
Pipeline #42708 failed with stage
in 2 minutes and 27 seconds
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""\
Starts the scheduling process.
Usage:
%(prog)s [-v ... | --verbose ...] [--settings=<file>] [--interval=<seconds>]
[--address=<address>] [--port=<port>]
%(prog)s (-h | --help)
%(prog)s (-V | --version)
Options:
-h, --help Show this help message
-V, --version Show program's version number
-v, --verbose Increases the output verbosity level
-S <file>, --settings=<file> The module name of the Django settings
file [default: beat.web.settings.settings]
-i <seconds>, --interval=<seconds> The time, in seconds, in which this
scheduler will try to allocate job splits
to existing workers. If not set, use the
value available on the Django settings
file, at the variable `SCHEDULING_INTERVAL`.
-a <address>, --address=<address> The address to which the processing nodes
must establish a connection to
-p <port>, --port=<port> The port to which the processing nodes
must establish a connection to
Examples:
To start the scheduling process do the following:
$ %(prog)s
You can pass the ``-v`` flag to start the scheduler with the logging level
set to ``INFO`` or ``-vv`` to set it to ``DEBUG``. By default, the logging
level is set to ``WARNING`` if no ``-v`` flag is passed.
"""
import logging
import os
import signal
import sys
import docopt
import simplejson
from beat.core.worker import WorkerController
from ..version import __version__
logger = None
# ----------------------------------------------------------
def onWorkerReady(name):
from ..backend.models import Worker
logger.info("Worker '%s' is ready", name)
try:
worker = Worker.objects.get(name=name)
worker.active = True
worker.info = "Connected to the scheduler"
worker.save()
except Exception:
import traceback
print(traceback.format_exc())
logger.error("No worker named '%s' found in the database", name)
# ----------------------------------------------------------
def onWorkerGone(name):
from ..backend.models import Worker
logger.info("Worker '%s' is gone", name)
try:
worker = Worker.objects.get(name=name)
worker.active = False
worker.info = "Disconnected from the scheduler"
worker.save()
except Exception:
logger.error("No worker named '%s' found in the database", name)
# ----------------------------------------------------------
def remove_split_id_from(list, split_id):
try:
list.remove(list.index(split_id))
except ValueError:
pass
# ----------------------------------------------------------
stop = False
def main(user_input=None):
# Parse the command-line arguments
if user_input is not None:
arguments = user_input
else:
arguments = sys.argv[1:]
arguments = docopt.docopt(
__doc__ % dict(prog=os.path.basename(sys.argv[0]),),
argv=arguments,
version="v%s" % __version__,
)
# Initialisation of the application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", arguments["--settings"])
from django import setup
from django.conf import settings
setup()
# Importations of beat.web modules must be done after the call to django.setup()
from ..backend.helpers import assign_splits_to_workers
from ..backend.helpers import get_configuration_for_split
from ..backend.helpers import on_split_cancelled
from ..backend.helpers import on_split_done
from ..backend.helpers import on_split_fail
from ..backend.helpers import on_split_started
from ..backend.helpers import process_newly_cancelled_experiments
from ..backend.helpers import split_new_jobs
from ..backend.models import JobSplit
from ..backend.models import Worker
# Setup the logging
formatter = logging.Formatter(
fmt="[%(asctime)s - Scheduler - " + "%(name)s] %(levelname)s: %(message)s",
datefmt="%d/%b/%Y %H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger("beat.web")
root_logger.handlers = []
root_logger.addHandler(handler)
if arguments["--verbose"] == 1:
root_logger.setLevel(logging.INFO)
elif arguments["--verbose"] >= 2:
root_logger.setLevel(logging.DEBUG)
else:
root_logger.setLevel(logging.WARNING)
global logger
logger = logging.getLogger(__name__)
logger.handlers = []
# Installs SIGTERM handler
def handler(signum, frame):
# Ignore further signals
signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGINT, signal.SIG_IGN)
logger.info("Signal %d caught, terminating...", signum)
global stop
stop = True
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
# Reset the status of all the workers in the database
for worker in Worker.objects.filter(active=True):
worker.active = False
worker.info = "Did not connect to the scheduler yet"
worker.save()
# Initialisation of the worker controller
# TODO: Default values
worker_controller = WorkerController(
arguments["--address"],
int(arguments["--port"]),
callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone,),
)
# Processing loop
interval = (
int(arguments["--interval"])
if arguments["--interval"]
else settings.SCHEDULING_INTERVAL
)
logger.info("Scheduling every %d seconds", interval)
running_job_splits = []
cancelling_jobs = []
global stop
while not stop:
logger.debug("Starting scheduler cycle...")
# Process all the incoming messages
splits_to_cancel = []
while True:
# Wait for a message
message = worker_controller.process(interval * 1000)
if message is None:
break
(address, status, split_id, data) = message
# Was there an error?
if status == WorkerController.ERROR:
if split_id is None:
if data != "Worker isn't busy":
logger.error("Worker '%s' sent: %s", address, data)
continue
split_id = int(split_id)
# Retrieve the job split
try:
split = JobSplit.objects.get(id=split_id)
except JobSplit.DoesNotExist:
logger.error(
"Received message '%s' for unknown job split #%d", status, split_id
)
continue
# Is the job done?
if status == WorkerController.DONE:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is DONE",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_done(split, simplejson.loads(data[0]))
remove_split_id_from(running_job_splits, split_id)
# Has the job failed?
elif status == WorkerController.JOB_ERROR:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' returned an error",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
try:
error = simplejson.loads(data[0])
except Exception:
error = data[0]
splits_to_cancel.extend(on_split_fail(split, error))
remove_split_id_from(running_job_splits, split_id)
# Was the job cancelled?
elif status == WorkerController.CANCELLED:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_cancelled(split)
remove_split_id_from(cancelling_jobs, split_id)
# Was there an error?
elif status == WorkerController.ERROR:
if split_id in running_job_splits:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
data[0],
)
splits_to_cancel.extend(on_split_fail(split, data[0]))
remove_split_id_from(running_job_splits, split_id)
# Effectively cancel newly-cancelled experiments
splits_to_cancel.extend(process_newly_cancelled_experiments())
# Cancel the necessary jobs (if any)
for split_to_cancel in splits_to_cancel:
if split_to_cancel.id in running_job_splits:
logger.info(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
split_to_cancel.id,
split_to_cancel.job.block.name,
split_to_cancel.split_index,
split_to_cancel.job.splits.count(),
split_to_cancel.job.block.experiment.fullname(),
split_to_cancel.worker.name,
)
worker_controller.cancel(
split_to_cancel.worker.name, split_to_cancel.id
)
remove_split_id_from(running_job_splits, split_to_cancel.id)
cancelling_jobs.append(split_to_cancel.id)
# If we must stop, don't start new jobs
if stop:
break
# Start new jobs
split_new_jobs()
assigned_splits = assign_splits_to_workers()
for split in assigned_splits:
running_job_splits.append(split.id)
configuration = get_configuration_for_split(split)
logger.info(
"Starting job split #%d (%s %d/%d @ %s) on '%s'",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
worker_controller.execute(split.worker.name, split.id, configuration)
on_split_started(split)
# Cleanup
logger.info("Gracefully exiting the scheduler")
worker_controller.destroy()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment