Skip to content
Snippets Groups Projects
Commit 63e32573 authored by Samuel GAIST's avatar Samuel GAIST Committed by André Anjos
Browse files

[utils][management] Implement processing management commands

This implements:
- scheduler
- broker
- worker

scheduler and broker are meant to be run on the scheduler node.
worker is here for development and test purposes.
parent 7165564b
No related branches found
No related tags found
1 merge request!275ZMQ refactoring
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# encoding: utf-8
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import logging
from django.core.management.base import BaseCommand
from django.db import transaction
from beat.web.backend.models import Worker
from beat.core.bcpapi.broker import BeatComputationBroker
from beat.core.utils import setup_logging
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
def onWorkerReady(name):
logger.info("Worker '%s' is ready", name)
try:
worker = Worker.objects.get(name=name)
except Worker.DoesNotExist:
logger.error("No worker named '%s' found in the database", name)
else:
with transaction.atomic():
worker.active = True
worker.info = "Connected to the scheduler"
worker.save()
# ----------------------------------------------------------
def onWorkerGone(name):
logger.info("Worker '%s' is gone", name)
try:
worker = Worker.objects.get(name=name)
except Worker.DoesNotExist:
logger.error("No worker named '%s' found in the database", name)
else:
with transaction.atomic():
worker.active = False
worker.info = "Disconnected from the scheduler"
worker.save()
class Command(BaseCommand):
help = "Start zmq broker"
def add_arguments(self, parser):
parser.add_argument(
"--port",
"-p",
type=int,
dest="port",
default=5555,
help="Port of the broker",
)
def handle(self, *args, **options):
verbosity = int(options["verbosity"])
logger = setup_logging(verbosity, "Broker", __name__)
if verbosity >= 1:
if verbosity == 1:
logger.setLevel(logging.INFO)
elif verbosity >= 2:
logger.setLevel(logging.DEBUG)
logger.info("starting broker")
address = "tcp://*:{}".format(options["port"])
broker = BeatComputationBroker(verbosity >= 2)
broker.set_worker_callbacks(onWorkerReady, onWorkerGone)
broker.bind(address)
broker.mediate()
broker.purge_workers()
logger.info("broker stopped")
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# encoding: utf-8
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import logging
import signal
import json
from django.core.management.base import BaseCommand
from django.conf import settings
from beat.core.bcpapi import BCP
from beat.core.bcpapi.client import BeatComputationClient
from ....backend.models import JobSplit
from ....backend.helpers import split_new_jobs
from ....backend.helpers import process_newly_cancelled_experiments
from ....backend.helpers import assign_splits_to_workers
from ....backend.helpers import get_configuration_for_split
from ....backend.helpers import on_split_started
from ....backend.helpers import on_split_done
from ....backend.helpers import on_split_fail
from ....backend.helpers import on_split_cancelled
logger = logging.getLogger(__name__)
def get_split(split_id, status):
try:
return JobSplit.objects.get(id=split_id)
except JobSplit.DoesNotExist:
logger.warning(
"Received message '{}' for unknown job split #{}".format(status, split_id),
status,
split_id,
)
return None
def remove_split_id_from(split_list, split_id):
try:
split_list.remove(split_list.index(split_id))
except ValueError:
pass
class Command(BaseCommand):
help = "Start scheduler"
def __init__(self):
super(Command, self).__init__()
self.continue_ = True
def __signal_handler(self, signum, frame):
self.continue_ = False
def add_arguments(self, parser):
parser.add_argument(
"--port",
"-p",
type=int,
dest="port",
default=5555,
help="Port of the broker",
)
parser.add_argument(
"--interval",
"-i",
type=int,
dest="interval",
default=settings.SCHEDULING_INTERVAL,
help="Polling interval",
)
def handle(self, *args, **options):
signal.signal(signal.SIGTERM, self.__signal_handler)
signal.signal(signal.SIGINT, self.__signal_handler)
broker_address = "tcp://localhost:{}".format(options["port"])
client = BeatComputationClient(broker_address, options["verbosity"] >= 2)
# client.timeout = 100
running_job_splits = []
cancelling_jobs = []
logger.info("starting scheduler")
while self.continue_:
splits_to_cancel = []
# Process all the incoming messages
reply = client.recv()
if reply is not None:
logger.info("Received: {}".format(reply))
worker_id, status = reply[:2]
split_id = int(reply[2])
if status == BCP.BCPP_JOB_RECEIVED:
logger.info(
"Job split {} was received by worker {}".format(
split_id, worker_id
)
)
elif status == BCP.BCPP_JOB_STARTED:
logger.info(
"Job split {} was was started by worker {}".format(
split_id, worker_id
)
)
split = get_split(split_id, status)
if split is not None:
on_split_started(split)
elif status == BCP.BCPP_JOB_DONE:
output = reply[3]
logger.info(
"Job split {} was was done by worker {}".format(
split_id, worker_id
)
)
split = get_split(split_id, status)
if split is not None:
on_split_done(split, json.loads(output))
remove_split_id_from(running_job_splits, split_id)
elif status == BCP.BCPP_JOB_CANCELLED:
split = get_split(split_id, status)
if split is not None:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_cancelled(split)
remove_split_id_from(running_job_splits, split_id)
elif status == BCP.BCPP_JOB_ERROR:
message = reply[3]
logger.info(
"Job split {} processed by worker {} failed:\n{}".format(
split_id, worker_id, message
)
)
split = get_split(split_id, status)
if split is not None:
try:
error = json.loads(message)
except json.decoder.JSONDecodeError:
error = message
splits_to_cancel.extend(on_split_fail(split, error))
remove_split_id_from(running_job_splits, split_id)
elif status == BCP.BCPP_ERROR:
message = reply[3]
logger.info(
"Worker {} error for job split {}:\n{}".format(
worker_id, split_id, message
)
)
if split_id in running_job_splits:
split = get_split(split_id, status)
if split is not None:
splits_to_cancel.extend(on_split_fail(split, message))
remove_split_id_from(running_job_splits, split_id)
# Effectively cancel newly-cancelled experiments
splits_to_cancel.extend(process_newly_cancelled_experiments())
# Cancel the necessary jobs (if any)
for split_to_cancel in splits_to_cancel:
if split_to_cancel.id in running_job_splits:
logger.info(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
split_to_cancel.id,
split_to_cancel.job.block.name,
split_to_cancel.split_index,
split_to_cancel.job.splits.count(),
split_to_cancel.job.block.experiment.fullname(),
split_to_cancel.worker.name,
)
request = [BCP.BCPE_CANCEL, str(split_to_cancel.id).encode("utf-8")]
client.send(split_to_cancel.worker.name.encode("utf-8"), request)
remove_split_id_from(running_job_splits, split_to_cancel.id)
cancelling_jobs.append(split_to_cancel.id)
if not self.continue_:
break
# Start new jobs
split_new_jobs()
assigned_splits = assign_splits_to_workers()
for split in assigned_splits:
running_job_splits.append(split.id)
configuration = get_configuration_for_split(split)
logger.info(
"Sending job split #%d (%s %d/%d @ %s) on '%s'",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
request = [
BCP.BCPE_EXECUTE,
str(split.id).encode("utf-8"),
json.dumps(configuration).encode("utf-8"),
]
client.send(split.worker.name.encode("utf-8"), request)
logger.info("scheduler stopped")
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# encoding: utf-8
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import socket
from django.core.management.base import BaseCommand
from django.conf import settings
from beat.core.bcp import worker
from beat.core.utils import setup_logging
class Command(BaseCommand):
help = "Start zmq worker"
def add_arguments(self, parser):
parser.add_argument(
"--name",
"-n",
type=str,
dest="service_name",
default=socket.gethostname(),
help="Service name",
)
parser.add_argument(
"--broker",
"-b",
type=str,
dest="broker_address",
default="tcp://localhost:5555",
help="Broker address",
)
parser.add_argument(
"--docker",
"-d",
action="store_true",
dest="use_docker",
default=False,
help="Use docker",
)
parser.add_argument(
"--prefix", "-p", dest="prefix", default=settings.PREFIX, help="Prefix path"
)
parser.add_argument(
"--cache",
"-c",
type=str,
dest="cache",
default=settings.CACHE_ROOT,
help="Cache path",
)
parser.add_argument(
"--network-name",
dest="network_name",
type=str,
default=None,
help="Docker network name",
)
parser.add_argument(
"--port-range",
dest="port_range",
type=str,
default=None,
help="Docker port range",
)
parser.add_argument(
"--docker-images-cache",
dest="docker_images_cache",
type=str,
default=None,
help="Docker image cache",
)
def handle(self, *args, **options):
verbosity = options["verbosity"]
logger = setup_logging(verbosity, "Worker", __name__)
logger.info("starting worker")
argv = [
"--name=" + options["service_name"],
"--prefix=" + options["prefix"],
"--cache=" + options["cache"],
]
argv.extend(["-v" for i in range(options["verbosity"])])
if options["use_docker"]:
argv.append("--docker")
network_name = options.get("network_name")
if network_name:
argv.append("--docker-network=" + network_name)
port_range = options.get("port_range")
if port_range:
argv.append("--port-range=" + port_range)
docker_images_cache = options.get("docker_images_cache")
if docker_images_cache:
argv.append("--docker-images-cache=" + docker_images_cache)
argv.append(options["broker_address"])
status = worker.main(argv)
logger.info("worker stopped")
return status
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment