diff --git a/beat/web/scripts/scheduler.py b/beat/web/scripts/scheduler.py index 4f787f53eec730a5ed809de335fdae2c3b3a7b28..6401a6069f45a0d736dd914ef27f21858fe0464f 100755 --- a/beat/web/scripts/scheduler.py +++ b/beat/web/scripts/scheduler.py @@ -64,22 +64,22 @@ Examples: level is set to ``WARNING`` if no ``-v`` flag is passed. """ - +import logging import os -import sys import signal +import sys + import docopt -import logging import simplejson -from ..version import __version__ - from beat.core.worker import WorkerController +from ..version import __version__ + logger = None -#---------------------------------------------------------- +# ---------------------------------------------------------- def onWorkerReady(name): @@ -90,14 +90,16 @@ def onWorkerReady(name): try: worker = Worker.objects.get(name=name) worker.active = True - worker.info = 'Connected to the scheduler' + worker.info = "Connected to the scheduler" worker.save() - except: - import traceback; print(traceback.format_exc()) + except Exception: + import traceback + + print(traceback.format_exc()) logger.error("No worker named '%s' found in the database", name) -#---------------------------------------------------------- +# ---------------------------------------------------------- def onWorkerGone(name): @@ -108,27 +110,28 @@ def onWorkerGone(name): try: worker = Worker.objects.get(name=name) worker.active = False - worker.info = 'Disconnected from the scheduler' + worker.info = "Disconnected from the scheduler" worker.save() - except: + except Exception: logger.error("No worker named '%s' found in the database", name) -#---------------------------------------------------------- +# ---------------------------------------------------------- def remove_split_id_from(list, split_id): try: list.remove(list.index(split_id)) - except: + except ValueError: pass -#---------------------------------------------------------- +# ---------------------------------------------------------- stop = False + def main(user_input=None): # Parse the command-line arguments @@ -138,50 +141,46 @@ def main(user_input=None): arguments = sys.argv[1:] arguments = docopt.docopt( - __doc__ % dict( - prog=os.path.basename(sys.argv[0]), - ), + __doc__ % dict(prog=os.path.basename(sys.argv[0]),), argv=arguments, - version='v%s' % __version__, + version="v%s" % __version__, ) - # Initialisation of the application - os.environ.setdefault('DJANGO_SETTINGS_MODULE', arguments['--settings']) - from django.conf import settings + os.environ.setdefault("DJANGO_SETTINGS_MODULE", arguments["--settings"]) from django import setup - setup() + from django.conf import settings + setup() # Importations of beat.web modules must be done after the call to django.setup() - from ..backend.models import JobSplit - from ..backend.models import Worker - from ..backend.helpers import split_new_jobs - from ..backend.helpers import process_newly_cancelled_experiments from ..backend.helpers import assign_splits_to_workers from ..backend.helpers import get_configuration_for_split - from ..backend.helpers import on_split_started + from ..backend.helpers import on_split_cancelled from ..backend.helpers import on_split_done from ..backend.helpers import on_split_fail - from ..backend.helpers import on_split_cancelled - - + from ..backend.helpers import on_split_started + from ..backend.helpers import process_newly_cancelled_experiments + from ..backend.helpers import split_new_jobs + from ..backend.models import JobSplit + from ..backend.models import Worker # Setup the logging - formatter = logging.Formatter(fmt="[%(asctime)s - Scheduler - " + \ - "%(name)s] %(levelname)s: %(message)s", - datefmt="%d/%b/%Y %H:%M:%S") + formatter = logging.Formatter( + fmt="[%(asctime)s - Scheduler - " + "%(name)s] %(levelname)s: %(message)s", + datefmt="%d/%b/%Y %H:%M:%S", + ) handler = logging.StreamHandler() handler.setFormatter(formatter) - root_logger = logging.getLogger('beat.web') + root_logger = logging.getLogger("beat.web") root_logger.handlers = [] root_logger.addHandler(handler) - if arguments['--verbose'] == 1: + if arguments["--verbose"] == 1: root_logger.setLevel(logging.INFO) - elif arguments['--verbose'] >= 2: + elif arguments["--verbose"] >= 2: root_logger.setLevel(logging.DEBUG) else: root_logger.setLevel(logging.WARNING) @@ -190,7 +189,6 @@ def main(user_input=None): logger = logging.getLogger(__name__) logger.handlers = [] - # Installs SIGTERM handler def handler(signum, frame): # Ignore further signals @@ -204,32 +202,26 @@ def main(user_input=None): signal.signal(signal.SIGTERM, handler) signal.signal(signal.SIGINT, handler) - # Reset the status of all the workers in the database for worker in Worker.objects.filter(active=True): worker.active = False - worker.info = 'Did not connect to the scheduler yet' + worker.info = "Did not connect to the scheduler yet" worker.save() - # Initialisation of the worker controller # TODO: Default values worker_controller = WorkerController( - arguments['--address'], - int(arguments['--port']), - callbacks=dict( - onWorkerReady = onWorkerReady, - onWorkerGone = onWorkerGone, - ) + arguments["--address"], + int(arguments["--port"]), + callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone,), ) - # Processing loop - from ..backend.helpers import split_new_jobs - from ..backend.helpers import assign_splits_to_workers - - interval = int(arguments['--interval']) \ - if arguments['--interval'] else settings.SCHEDULING_INTERVAL + interval = ( + int(arguments["--interval"]) + if arguments["--interval"] + else settings.SCHEDULING_INTERVAL + ) logger.info("Scheduling every %d seconds", interval) running_job_splits = [] @@ -257,43 +249,47 @@ def main(user_input=None): logger.error("Worker '%s' sent: %s", address, data) continue - split_id = int(split_id) # Retrieve the job split try: split = JobSplit.objects.get(id=split_id) - except: - logger.error("Received message '%s' for unknown job split #%d", - status, split_id) + except JobSplit.DoesNotExist: + logger.error( + "Received message '%s' for unknown job split #%d", status, split_id + ) continue # Is the job done? if status == WorkerController.DONE: - logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is DONE", - split.id, - split.job.block.name, - split.split_index, - split.job.splits.count(), - split.job.block.experiment.fullname(), - split.worker.name) + logger.info( + "Job split #%d (%s %d/%d @ %s) on '%s' is DONE", + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), + split.worker.name, + ) on_split_done(split, simplejson.loads(data[0])) remove_split_id_from(running_job_splits, split_id) # Has the job failed? elif status == WorkerController.JOB_ERROR: - logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned an error", - split.id, - split.job.block.name, - split.split_index, - split.job.splits.count(), - split.job.block.experiment.fullname(), - split.worker.name) + logger.info( + "Job split #%d (%s %d/%d @ %s) on '%s' returned an error", + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), + split.worker.name, + ) try: error = simplejson.loads(data[0]) - except: + except Exception: error = data[0] splits_to_cancel.extend(on_split_fail(split, error)) @@ -301,13 +297,15 @@ def main(user_input=None): # Was the job cancelled? elif status == WorkerController.CANCELLED: - logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED", - split.id, - split.job.block.name, - split.split_index, - split.job.splits.count(), - split.job.block.experiment.fullname(), - split.worker.name) + logger.info( + "Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED", + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), + split.worker.name, + ) on_split_cancelled(split) remove_split_id_from(cancelling_jobs, split_id) @@ -315,13 +313,16 @@ def main(user_input=None): # Was there an error? elif status == WorkerController.ERROR: if split_id in running_job_splits: - logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s", - split.id, - split.job.block.name, - split.split_index, - split.job.splits.count(), - split.job.block.experiment.fullname(), - split.worker.name, data[0]) + logger.info( + "Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s", + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), + split.worker.name, + data[0], + ) splits_to_cancel.extend(on_split_fail(split, data[0])) remove_split_id_from(running_job_splits, split_id) @@ -332,15 +333,19 @@ def main(user_input=None): # Cancel the necessary jobs (if any) for split_to_cancel in splits_to_cancel: if split_to_cancel.id in running_job_splits: - logger.info("Cancelling job split #%d (%s %d/%d @ %s) on '%s'", - split_to_cancel.id, - split_to_cancel.job.block.name, - split_to_cancel.split_index, - split_to_cancel.job.splits.count(), - split_to_cancel.job.block.experiment.fullname(), - split_to_cancel.worker.name) - - worker_controller.cancel(split_to_cancel.worker.name, split_to_cancel.id) + logger.info( + "Cancelling job split #%d (%s %d/%d @ %s) on '%s'", + split_to_cancel.id, + split_to_cancel.job.block.name, + split_to_cancel.split_index, + split_to_cancel.job.splits.count(), + split_to_cancel.job.block.experiment.fullname(), + split_to_cancel.worker.name, + ) + + worker_controller.cancel( + split_to_cancel.worker.name, split_to_cancel.id + ) remove_split_id_from(running_job_splits, split_to_cancel.id) cancelling_jobs.append(split_to_cancel.id) @@ -357,18 +362,19 @@ def main(user_input=None): configuration = get_configuration_for_split(split) - logger.info("Starting job split #%d (%s %d/%d @ %s) on '%s'", - split.id, - split.job.block.name, - split.split_index, - split.job.splits.count(), - split.job.block.experiment.fullname(), - split.worker.name) + logger.info( + "Starting job split #%d (%s %d/%d @ %s) on '%s'", + split.id, + split.job.block.name, + split.split_index, + split.job.splits.count(), + split.job.block.experiment.fullname(), + split.worker.name, + ) worker_controller.execute(split.worker.name, split.id, configuration) on_split_started(split) - # Cleanup logger.info("Gracefully exiting the scheduler") worker_controller.destroy()