scheduler.py 13.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.web module of the BEAT platform.              #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""\
Starts the scheduling process.

Usage:
33
  %(prog)s [-v ... | --verbose ...] [--settings=<file>] [--interval=<seconds>]
34
           [--address=<address>] [--port=<port>]
35 36 37 38 39
  %(prog)s (-h | --help)
  %(prog)s (-V | --version)


Options:
40 41 42 43 44 45 46 47 48 49 50 51 52 53
  -h, --help                         Show this help message
  -V, --version                      Show program's version number
  -v, --verbose                      Increases the output verbosity level
  -S <file>, --settings=<file>       The module name of the Django settings
                                     file [default: beat.web.settings.settings]
  -i <seconds>, --interval=<seconds> The time, in seconds, in which this
                                     scheduler will try to allocate job splits
                                     to existing workers. If not set, use the
                                     value available on the Django settings
                                     file, at the variable `SCHEDULING_INTERVAL`.
  -a <address>, --address=<address>  The address to which the processing nodes
                                     must establish a connection to
  -p <port>, --port=<port>           The port to which the processing nodes
                                     must establish a connection to
54 55 56 57 58 59 60 61 62 63 64 65 66


Examples:

  To start the scheduling process do the following:

    $ %(prog)s

  You can pass the ``-v`` flag to start the scheduler with the logging level
  set to ``INFO`` or ``-vv`` to set it to ``DEBUG``. By default, the logging
  level is set to ``WARNING`` if no ``-v`` flag is passed.

"""
67
import logging
68 69
import os
import signal
70 71
import sys

72
import docopt
73 74 75 76
import simplejson

from beat.core.worker import WorkerController

77 78
from ..version import __version__

79 80 81
logger = None


82
# ----------------------------------------------------------
83 84 85 86 87 88 89 90 91 92


def onWorkerReady(name):
    from ..backend.models import Worker

    logger.info("Worker '%s' is ready", name)

    try:
        worker = Worker.objects.get(name=name)
        worker.active = True
93
        worker.info = "Connected to the scheduler"
94
        worker.save()
95 96 97 98
    except Exception:
        import traceback

        print(traceback.format_exc())
99 100 101
        logger.error("No worker named '%s' found in the database", name)


102
# ----------------------------------------------------------
103 104 105 106 107 108 109 110 111 112


def onWorkerGone(name):
    from ..backend.models import Worker

    logger.info("Worker '%s' is gone", name)

    try:
        worker = Worker.objects.get(name=name)
        worker.active = False
113
        worker.info = "Disconnected from the scheduler"
114
        worker.save()
115
    except Exception:
116 117 118
        logger.error("No worker named '%s' found in the database", name)


119
# ----------------------------------------------------------
120 121 122 123 124


def remove_split_id_from(list, split_id):
    try:
        list.remove(list.index(split_id))
125
    except ValueError:
126
        pass
127

128

129
# ----------------------------------------------------------
130

131 132 133

stop = False

134

135 136
def main(user_input=None):

137 138 139 140 141 142
    # Parse the command-line arguments
    if user_input is not None:
        arguments = user_input
    else:
        arguments = sys.argv[1:]

143
    arguments = docopt.docopt(
144
        __doc__ % dict(prog=os.path.basename(sys.argv[0]),),
145
        argv=arguments,
146
        version="v%s" % __version__,
147 148 149
    )

    # Initialisation of the application
150
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", arguments["--settings"])
151
    from django import setup
152
    from django.conf import settings
153

154
    setup()
155

156 157 158
    # Importations of beat.web modules must be done after the call to django.setup()
    from ..backend.helpers import assign_splits_to_workers
    from ..backend.helpers import get_configuration_for_split
159
    from ..backend.helpers import on_split_cancelled
160 161
    from ..backend.helpers import on_split_done
    from ..backend.helpers import on_split_fail
162 163 164 165 166
    from ..backend.helpers import on_split_started
    from ..backend.helpers import process_newly_cancelled_experiments
    from ..backend.helpers import split_new_jobs
    from ..backend.models import JobSplit
    from ..backend.models import Worker
167

168
    # Setup the logging
169 170 171 172
    formatter = logging.Formatter(
        fmt="[%(asctime)s - Scheduler - " + "%(name)s] %(levelname)s: %(message)s",
        datefmt="%d/%b/%Y %H:%M:%S",
    )
173 174 175 176

    handler = logging.StreamHandler()
    handler.setFormatter(formatter)

177
    root_logger = logging.getLogger("beat.web")
178 179
    root_logger.handlers = []
    root_logger.addHandler(handler)
180

181
    if arguments["--verbose"] == 1:
182
        root_logger.setLevel(logging.INFO)
183
    elif arguments["--verbose"] >= 2:
184
        root_logger.setLevel(logging.DEBUG)
185
    else:
186 187 188 189 190
        root_logger.setLevel(logging.WARNING)

    global logger
    logger = logging.getLogger(__name__)
    logger.handlers = []
191

192
    # Installs SIGTERM handler
193
    def handler(signum, frame):
194
        # Ignore further signals
195 196 197
        signal.signal(signal.SIGTERM, signal.SIG_IGN)
        signal.signal(signal.SIGINT, signal.SIG_IGN)

198 199 200
        logger.info("Signal %d caught, terminating...", signum)
        global stop
        stop = True
201

202 203 204
    signal.signal(signal.SIGTERM, handler)
    signal.signal(signal.SIGINT, handler)

205 206 207
    # Reset the status of all the workers in the database
    for worker in Worker.objects.filter(active=True):
        worker.active = False
208
        worker.info = "Did not connect to the scheduler yet"
209 210
        worker.save()

211 212 213
    # Initialisation of the worker controller
    # TODO: Default values
    worker_controller = WorkerController(
214 215 216
        arguments["--address"],
        int(arguments["--port"]),
        callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone,),
217 218 219
    )

    # Processing loop
220 221 222 223 224
    interval = (
        int(arguments["--interval"])
        if arguments["--interval"]
        else settings.SCHEDULING_INTERVAL
    )
225 226 227 228
    logger.info("Scheduling every %d seconds", interval)

    running_job_splits = []
    cancelling_jobs = []
229 230 231

    global stop
    while not stop:
232 233
        logger.debug("Starting scheduler cycle...")

234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
        # Process all the incoming messages
        splits_to_cancel = []

        while True:
            # Wait for a message
            message = worker_controller.process(interval * 1000)
            if message is None:
                break

            (address, status, split_id, data) = message

            # Was there an error?
            if status == WorkerController.ERROR:
                if split_id is None:
                    if data != "Worker isn't busy":
                        logger.error("Worker '%s' sent: %s", address, data)
                    continue

            split_id = int(split_id)

            # Retrieve the job split
            try:
                split = JobSplit.objects.get(id=split_id)
257 258 259 260
            except JobSplit.DoesNotExist:
                logger.error(
                    "Received message '%s' for unknown job split #%d", status, split_id
                )
261 262 263 264
                continue

            # Is the job done?
            if status == WorkerController.DONE:
265 266 267 268 269 270 271 272 273
                logger.info(
                    "Job split #%d (%s %d/%d @ %s) on '%s' is DONE",
                    split.id,
                    split.job.block.name,
                    split.split_index,
                    split.job.splits.count(),
                    split.job.block.experiment.fullname(),
                    split.worker.name,
                )
274 275 276 277 278 279

                on_split_done(split, simplejson.loads(data[0]))
                remove_split_id_from(running_job_splits, split_id)

            # Has the job failed?
            elif status == WorkerController.JOB_ERROR:
280 281 282 283 284 285 286 287 288
                logger.info(
                    "Job split #%d (%s %d/%d @ %s) on '%s' returned an error",
                    split.id,
                    split.job.block.name,
                    split.split_index,
                    split.job.splits.count(),
                    split.job.block.experiment.fullname(),
                    split.worker.name,
                )
289

290 291
                try:
                    error = simplejson.loads(data[0])
292
                except Exception:
293 294 295
                    error = data[0]

                splits_to_cancel.extend(on_split_fail(split, error))
296 297 298 299
                remove_split_id_from(running_job_splits, split_id)

            # Was the job cancelled?
            elif status == WorkerController.CANCELLED:
300 301 302 303 304 305 306 307 308
                logger.info(
                    "Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
                    split.id,
                    split.job.block.name,
                    split.split_index,
                    split.job.splits.count(),
                    split.job.block.experiment.fullname(),
                    split.worker.name,
                )
309 310 311 312 313 314 315

                on_split_cancelled(split)
                remove_split_id_from(cancelling_jobs, split_id)

            # Was there an error?
            elif status == WorkerController.ERROR:
                if split_id in running_job_splits:
316 317 318 319 320 321 322 323 324 325
                    logger.info(
                        "Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s",
                        split.id,
                        split.job.block.name,
                        split.split_index,
                        split.job.splits.count(),
                        split.job.block.experiment.fullname(),
                        split.worker.name,
                        data[0],
                    )
326 327 328 329 330 331 332 333 334 335

                    splits_to_cancel.extend(on_split_fail(split, data[0]))
                    remove_split_id_from(running_job_splits, split_id)

        # Effectively cancel newly-cancelled experiments
        splits_to_cancel.extend(process_newly_cancelled_experiments())

        # Cancel the necessary jobs (if any)
        for split_to_cancel in splits_to_cancel:
            if split_to_cancel.id in running_job_splits:
336 337 338 339 340 341 342 343 344 345 346 347 348
                logger.info(
                    "Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
                    split_to_cancel.id,
                    split_to_cancel.job.block.name,
                    split_to_cancel.split_index,
                    split_to_cancel.job.splits.count(),
                    split_to_cancel.job.block.experiment.fullname(),
                    split_to_cancel.worker.name,
                )

                worker_controller.cancel(
                    split_to_cancel.worker.name, split_to_cancel.id
                )
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364
                remove_split_id_from(running_job_splits, split_to_cancel.id)
                cancelling_jobs.append(split_to_cancel.id)

        # If we must stop, don't start new jobs
        if stop:
            break

        # Start new jobs
        split_new_jobs()
        assigned_splits = assign_splits_to_workers()

        for split in assigned_splits:
            running_job_splits.append(split.id)

            configuration = get_configuration_for_split(split)

365 366 367 368 369 370 371 372 373
            logger.info(
                "Starting job split #%d (%s %d/%d @ %s) on '%s'",
                split.id,
                split.job.block.name,
                split.split_index,
                split.job.splits.count(),
                split.job.block.experiment.fullname(),
                split.worker.name,
            )
374 375 376 377 378

            worker_controller.execute(split.worker.name, split.id, configuration)
            on_split_started(split)

    # Cleanup
379
    logger.info("Gracefully exiting the scheduler")
380
    worker_controller.destroy()