Commit 648b32d5 authored by Samuel GAIST's avatar Samuel GAIST Committed by Flavio TARSETTI

[scripts][scheduler] Pre-commit cleanup

parent 8d888733
Pipeline #42672 passed with stage
in 15 minutes and 12 seconds
......@@ -64,22 +64,22 @@ Examples:
level is set to ``WARNING`` if no ``-v`` flag is passed.
"""
import logging
import os
import sys
import signal
import sys
import docopt
import logging
import simplejson
from ..version import __version__
from beat.core.worker import WorkerController
from ..version import __version__
logger = None
#----------------------------------------------------------
# ----------------------------------------------------------
def onWorkerReady(name):
......@@ -90,14 +90,16 @@ def onWorkerReady(name):
try:
worker = Worker.objects.get(name=name)
worker.active = True
worker.info = 'Connected to the scheduler'
worker.info = "Connected to the scheduler"
worker.save()
except:
import traceback; print(traceback.format_exc())
except Exception:
import traceback
print(traceback.format_exc())
logger.error("No worker named '%s' found in the database", name)
#----------------------------------------------------------
# ----------------------------------------------------------
def onWorkerGone(name):
......@@ -108,27 +110,28 @@ def onWorkerGone(name):
try:
worker = Worker.objects.get(name=name)
worker.active = False
worker.info = 'Disconnected from the scheduler'
worker.info = "Disconnected from the scheduler"
worker.save()
except:
except Exception:
logger.error("No worker named '%s' found in the database", name)
#----------------------------------------------------------
# ----------------------------------------------------------
def remove_split_id_from(list, split_id):
try:
list.remove(list.index(split_id))
except:
except ValueError:
pass
#----------------------------------------------------------
# ----------------------------------------------------------
stop = False
def main(user_input=None):
# Parse the command-line arguments
......@@ -138,50 +141,46 @@ def main(user_input=None):
arguments = sys.argv[1:]
arguments = docopt.docopt(
__doc__ % dict(
prog=os.path.basename(sys.argv[0]),
),
__doc__ % dict(prog=os.path.basename(sys.argv[0]),),
argv=arguments,
version='v%s' % __version__,
version="v%s" % __version__,
)
# Initialisation of the application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', arguments['--settings'])
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", arguments["--settings"])
from django import setup
setup()
from django.conf import settings
setup()
# Importations of beat.web modules must be done after the call to django.setup()
from ..backend.models import JobSplit
from ..backend.models import Worker
from ..backend.helpers import split_new_jobs
from ..backend.helpers import process_newly_cancelled_experiments
from ..backend.helpers import assign_splits_to_workers
from ..backend.helpers import get_configuration_for_split
from ..backend.helpers import on_split_started
from ..backend.helpers import on_split_cancelled
from ..backend.helpers import on_split_done
from ..backend.helpers import on_split_fail
from ..backend.helpers import on_split_cancelled
from ..backend.helpers import on_split_started
from ..backend.helpers import process_newly_cancelled_experiments
from ..backend.helpers import split_new_jobs
from ..backend.models import JobSplit
from ..backend.models import Worker
# Setup the logging
formatter = logging.Formatter(fmt="[%(asctime)s - Scheduler - " + \
"%(name)s] %(levelname)s: %(message)s",
datefmt="%d/%b/%Y %H:%M:%S")
formatter = logging.Formatter(
fmt="[%(asctime)s - Scheduler - " + "%(name)s] %(levelname)s: %(message)s",
datefmt="%d/%b/%Y %H:%M:%S",
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
root_logger = logging.getLogger('beat.web')
root_logger = logging.getLogger("beat.web")
root_logger.handlers = []
root_logger.addHandler(handler)
if arguments['--verbose'] == 1:
if arguments["--verbose"] == 1:
root_logger.setLevel(logging.INFO)
elif arguments['--verbose'] >= 2:
elif arguments["--verbose"] >= 2:
root_logger.setLevel(logging.DEBUG)
else:
root_logger.setLevel(logging.WARNING)
......@@ -190,7 +189,6 @@ def main(user_input=None):
logger = logging.getLogger(__name__)
logger.handlers = []
# Installs SIGTERM handler
def handler(signum, frame):
# Ignore further signals
......@@ -204,32 +202,26 @@ def main(user_input=None):
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
# Reset the status of all the workers in the database
for worker in Worker.objects.filter(active=True):
worker.active = False
worker.info = 'Did not connect to the scheduler yet'
worker.info = "Did not connect to the scheduler yet"
worker.save()
# Initialisation of the worker controller
# TODO: Default values
worker_controller = WorkerController(
arguments['--address'],
int(arguments['--port']),
callbacks=dict(
onWorkerReady = onWorkerReady,
onWorkerGone = onWorkerGone,
)
arguments["--address"],
int(arguments["--port"]),
callbacks=dict(onWorkerReady=onWorkerReady, onWorkerGone=onWorkerGone,),
)
# Processing loop
from ..backend.helpers import split_new_jobs
from ..backend.helpers import assign_splits_to_workers
interval = int(arguments['--interval']) \
if arguments['--interval'] else settings.SCHEDULING_INTERVAL
interval = (
int(arguments["--interval"])
if arguments["--interval"]
else settings.SCHEDULING_INTERVAL
)
logger.info("Scheduling every %d seconds", interval)
running_job_splits = []
......@@ -257,43 +249,47 @@ def main(user_input=None):
logger.error("Worker '%s' sent: %s", address, data)
continue
split_id = int(split_id)
# Retrieve the job split
try:
split = JobSplit.objects.get(id=split_id)
except:
logger.error("Received message '%s' for unknown job split #%d",
status, split_id)
except JobSplit.DoesNotExist:
logger.error(
"Received message '%s' for unknown job split #%d", status, split_id
)
continue
# Is the job done?
if status == WorkerController.DONE:
logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is DONE",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name)
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is DONE",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_done(split, simplejson.loads(data[0]))
remove_split_id_from(running_job_splits, split_id)
# Has the job failed?
elif status == WorkerController.JOB_ERROR:
logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned an error",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name)
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' returned an error",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
try:
error = simplejson.loads(data[0])
except:
except Exception:
error = data[0]
splits_to_cancel.extend(on_split_fail(split, error))
......@@ -301,13 +297,15 @@ def main(user_input=None):
# Was the job cancelled?
elif status == WorkerController.CANCELLED:
logger.info("Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name)
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_cancelled(split)
remove_split_id_from(cancelling_jobs, split_id)
......@@ -315,13 +313,16 @@ def main(user_input=None):
# Was there an error?
elif status == WorkerController.ERROR:
if split_id in running_job_splits:
logger.info("Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name, data[0])
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' returned a system error: %s",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
data[0],
)
splits_to_cancel.extend(on_split_fail(split, data[0]))
remove_split_id_from(running_job_splits, split_id)
......@@ -332,15 +333,19 @@ def main(user_input=None):
# Cancel the necessary jobs (if any)
for split_to_cancel in splits_to_cancel:
if split_to_cancel.id in running_job_splits:
logger.info("Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
split_to_cancel.id,
split_to_cancel.job.block.name,
split_to_cancel.split_index,
split_to_cancel.job.splits.count(),
split_to_cancel.job.block.experiment.fullname(),
split_to_cancel.worker.name)
worker_controller.cancel(split_to_cancel.worker.name, split_to_cancel.id)
logger.info(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
split_to_cancel.id,
split_to_cancel.job.block.name,
split_to_cancel.split_index,
split_to_cancel.job.splits.count(),
split_to_cancel.job.block.experiment.fullname(),
split_to_cancel.worker.name,
)
worker_controller.cancel(
split_to_cancel.worker.name, split_to_cancel.id
)
remove_split_id_from(running_job_splits, split_to_cancel.id)
cancelling_jobs.append(split_to_cancel.id)
......@@ -357,18 +362,19 @@ def main(user_input=None):
configuration = get_configuration_for_split(split)
logger.info("Starting job split #%d (%s %d/%d @ %s) on '%s'",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name)
logger.info(
"Starting job split #%d (%s %d/%d @ %s) on '%s'",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
worker_controller.execute(split.worker.name, split.id, configuration)
on_split_started(split)
# Cleanup
logger.info("Gracefully exiting the scheduler")
worker_controller.destroy()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment