Commit 9f92f26e authored by André Anjos's avatar André Anjos 💬
Browse files

Merge branch 'zmq_refactoring' into 'master'

ZMQ refactoring

Closes #518 and #517

See merge request !275
parents 7165564b 18cca41c
Pipeline #27969 passed with stages
in 16 minutes and 26 seconds
[flake8]
max-line-length = 80
select = B,C,E,F,W,T4,B9,B950
ignore = E501, W503
......@@ -15,6 +15,7 @@ sphinx/
.mr.developer.cfg
.coverage
*.sql3
*.sqlite3
.DS_Store
beat/web/settings/settings.py
src/
......
......@@ -27,7 +27,7 @@ build_linux_36:
- buildout
- python -c "from beat.core.test.utils import pull_docker_test_images as f; f()"
- export COVERAGE_FILE=.coverage.django
- ./bin/coverage run --source=${CI_PROJECT_NAME} ./bin/django test --settings=beat.web.settings.ci -v 2
- ./bin/coverage run --source=${CI_PROJECT_NAME} ./bin/django test --settings=beat.web.settings.ci -v 2 --noinput
- export BEAT_CMDLINE_TEST_PLATFORM="django://beat.web.settings.ci"
- export COVERAGE_FILE=.coverage.cmdline
- export NOSE_WITH_COVERAGE=1
......
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/ambv/black
rev: stable
hooks:
- id: black
language_version: python3.6
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: debug-statements
- id: check-added-large-files
- id: check-docstring-first
- id: flake8
- id: check-yaml
exclude: conda/meta.yaml
- repo: https://github.com/PyCQA/bandit
rev: 'master' # Update me!
hooks:
- id: bandit
exclude: beat/editor/test
......@@ -27,6 +27,7 @@
# Django settings for tests on the CI server
from .test import *
from .test import * # noqa
RUNNING_ON_CI = True
DATABASES["default"]["OPTIONS"]["timeout"] = 60 # noqa
......@@ -26,48 +26,64 @@
###############################################################################
# Django settings for tests
import os
import platform
import sys
from .settings import *
from .settings import * # noqa
TEST_CONFIGURATION = True
RUNNING_ON_CI = False
DEBUG = False
TEMPLATES[0]['OPTIONS']['debug'] = DEBUG
TEMPLATES[0]["OPTIONS"]["debug"] = DEBUG # noqa
ALLOWED_HOSTS = [
'testserver',
]
ALLOWED_HOSTS = ["testserver"]
DATABASES['default']['NAME'] = 'test.sql3'
DATABASES['default']['TEST'] = {'NAME': DATABASES['default']['NAME']}
DATABASES['default']['OPTIONS']['timeout'] = 30
if platform.system() == "Linux":
shm_path = "/dev/shm/beatweb" # nosec
if not os.path.exists(shm_path):
os.makedirs(shm_path)
import sys
if 'beat.cmdline' in sys.argv:
database_name = os.path.join(shm_path, "test.sqlite3")
else:
database_name = "test.sqlite3"
DATABASES["default"]["NAME"] = database_name # noqa
DATABASES["default"]["TEST"] = {"NAME": DATABASES["default"]["NAME"]} # noqa
DATABASES["default"]["OPTIONS"]["timeout"] = 30 # noqa
DATABASES["default"]["ATOMIC_REQUESTS"] = True # noqa
# Timeout used in test when waiting for an update
DB_REFRESH_TIMEOUT = int(os.environ.get("DB_REFRESH_TIMEOUT", 10))
if "beat.cmdline" in sys.argv:
# make it in-memory for cmdline app tests
DATABASES['default']['NAME'] = ':memory:'
DATABASES["default"]["NAME"] = ":memory:" # noqa
LOGGING['handlers']['console']['level'] = 'DEBUG'
LOGGING['loggers']['beat.core']['handlers'] = ['discard']
LOGGING['loggers']['beat.web']['handlers'] = ['discard']
LOGGING['loggers']['beat.web.utils.management.commands']['handlers'] = ['discard']
LOGGING["handlers"]["console"]["level"] = "DEBUG" # noqa
LOGGING["loggers"]["beat.core"]["handlers"] = ["discard"] # noqa
LOGGING["loggers"]["beat.web"]["handlers"] = ["discard"] # noqa
LOGGING["loggers"]["beat.web.utils.management.commands"]["handlers"] = [ # noqa
"discard"
]
BASE_DIR = os.path.dirname(os.path.abspath(__name__))
PREFIX = os.environ.get('BEAT_TEST_PREFIX', os.path.realpath('./test_prefix'))
ALGORITHMS_ROOT = os.path.join(PREFIX, 'algorithms')
PLOTTERS_ROOT = os.path.join(PREFIX, 'plotters')
LIBRARIES_ROOT = os.path.join(PREFIX, 'libraries')
DATABASES_ROOT = os.path.join(PREFIX, 'databases')
DATAFORMATS_ROOT = os.path.join(PREFIX, 'dataformats')
TOOLCHAINS_ROOT = os.path.join(PREFIX, 'toolchains')
EXPERIMENTS_ROOT = os.path.join(PREFIX, 'experiments')
CACHE_ROOT = os.path.join(PREFIX, 'cache')
if platform.system() == "Linux":
default_prefix = os.path.join(shm_path, "test_prefix") # nosec
else:
default_prefix = os.path.realpath("./test_prefix")
PREFIX = os.environ.get("BEAT_TEST_PREFIX", default_prefix)
ALGORITHMS_ROOT = os.path.join(PREFIX, "algorithms")
PLOTTERS_ROOT = os.path.join(PREFIX, "plotters")
LIBRARIES_ROOT = os.path.join(PREFIX, "libraries")
DATABASES_ROOT = os.path.join(PREFIX, "databases")
DATAFORMATS_ROOT = os.path.join(PREFIX, "dataformats")
TOOLCHAINS_ROOT = os.path.join(PREFIX, "toolchains")
EXPERIMENTS_ROOT = os.path.join(PREFIX, "experiments")
CACHE_ROOT = os.path.join(PREFIX, "cache")
LOCAL_SCHEDULER_VERBOSITY = None
LOCAL_SCHEDULER_USE_DOCKER = False
# To speed-up tests, don't put this in production
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.MD5PasswordHasher',
]
PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# encoding: utf-8
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import logging
from django.core.management.base import BaseCommand
from django.db import transaction
from beat.web.backend.models import Worker
from beat.core.bcpapi.broker import BeatComputationBroker
from beat.core.utils import setup_logging
logger = logging.getLogger(__name__)
# ----------------------------------------------------------
def onWorkerReady(name):
logger.info("Worker '%s' is ready", name)
try:
worker = Worker.objects.get(name=name)
except Worker.DoesNotExist:
logger.error("No worker named '%s' found in the database", name)
else:
with transaction.atomic():
worker.active = True
worker.info = "Connected to the scheduler"
worker.save()
# ----------------------------------------------------------
def onWorkerGone(name):
logger.info("Worker '%s' is gone", name)
try:
worker = Worker.objects.get(name=name)
except Worker.DoesNotExist:
logger.error("No worker named '%s' found in the database", name)
else:
with transaction.atomic():
worker.active = False
worker.info = "Disconnected from the scheduler"
worker.save()
class Command(BaseCommand):
help = "Start zmq broker"
def add_arguments(self, parser):
parser.add_argument(
"--port",
"-p",
type=int,
dest="port",
default=5555,
help="Port of the broker",
)
def handle(self, *args, **options):
verbosity = int(options["verbosity"])
logger = setup_logging(verbosity, "Broker", __name__)
if verbosity >= 1:
if verbosity == 1:
logger.setLevel(logging.INFO)
elif verbosity >= 2:
logger.setLevel(logging.DEBUG)
logger.info("starting broker")
address = "tcp://*:{}".format(options["port"])
broker = BeatComputationBroker(verbosity >= 2)
broker.set_worker_callbacks(onWorkerReady, onWorkerGone)
broker.bind(address)
broker.mediate()
broker.purge_workers()
logger.info("broker stopped")
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# encoding: utf-8
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import logging
import multiprocessing
import signal
from django.core.management.base import BaseCommand
from django.core.management import call_command
from django.conf import settings
from django import db
from beat.core.utils import find_free_port
logger = logging.getLogger(__name__)
def start_broker(port, verbosity=0):
db.connections.close_all()
call_command("broker", port=port, verbosity=verbosity)
def start_scheduler(broker_address, verbosity=0):
db.connections.close_all()
call_command("scheduler", broker_address=broker_address, verbosity=verbosity)
def start_worker(broker_address, prefix, cache, use_docker, verbosity=0):
call_command(
"worker",
broker_address=broker_address,
prefix=prefix,
cache=cache,
use_docker=use_docker,
verbosity=verbosity,
)
class Command(BaseCommand):
help = "Run a complete local scheduler/broker/worker setup"
def __init__(self):
super(Command, self).__init__()
self.broker = None
self.worker = None
self.scheduler = None
def __signal_handler(self, signum, frame):
self.scheduler.terminate()
self.worker.terminate()
self.broker.terminate()
def add_arguments(self, parser):
parser.add_argument(
"--docker",
"-d",
action="store_true",
dest="use_docker",
default=False,
help="Use docker",
)
def handle(self, *args, **options):
signal.signal(signal.SIGTERM, self.__signal_handler)
signal.signal(signal.SIGINT, self.__signal_handler)
verbosity = options["verbosity"]
port = find_free_port()
broker_address = "tcp://localhost:{}".format(port)
self.broker = multiprocessing.Process(
target=start_broker, args=(port, verbosity)
)
self.worker = multiprocessing.Process(
target=start_worker,
args=(
broker_address,
settings.PREFIX,
settings.CACHE_ROOT,
options["use_docker"],
verbosity,
),
)
self.scheduler = multiprocessing.Process(
target=start_scheduler, args=(broker_address, verbosity)
)
self.broker.start()
self.worker.start()
self.scheduler.start()
self.broker.join()
self.scheduler.join()
self.worker.join()
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# encoding: utf-8
###############################################################################
# #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.web module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
import logging
import signal
import json
import sys
from django.core.management.base import BaseCommand
from django.conf import settings
from beat.core.bcpapi import BCP
from beat.core.bcpapi.client import BeatComputationClient
from ....backend.models import JobSplit
from ....backend.helpers import split_new_jobs
from ....backend.helpers import process_newly_cancelled_experiments
from ....backend.helpers import assign_splits_to_workers
from ....backend.helpers import get_configuration_for_split
from ....backend.helpers import on_split_started
from ....backend.helpers import on_split_done
from ....backend.helpers import on_split_fail
from ....backend.helpers import on_split_cancelled
logger = logging.getLogger(__name__)
def get_split(split_id, status):
try:
return JobSplit.objects.get(id=split_id)
except JobSplit.DoesNotExist:
logger.warning(
"Received message '{}' for unknown job split #{}".format(status, split_id),
status,
split_id,
)
return None
def remove_split_id_from(split_list, split_id):
try:
split_list.remove(split_list.index(split_id))
except ValueError:
pass
class Command(BaseCommand):
help = "Start scheduler"
def __init__(self):
super(Command, self).__init__()
self.continue_ = True
def __signal_handler(self, signum, frame):
self.continue_ = False
def add_arguments(self, parser):
parser.add_argument(
"--broker-address",
"-b",
type=str,
dest="broker_address",
default="tcp://localhost:5555",
help="Address of the broker",
)
parser.add_argument(
"--interval",
"-i",
type=int,
dest="interval",
default=settings.SCHEDULING_INTERVAL,
help="Polling interval",
)
def handle(self, *args, **options):
signal.signal(signal.SIGTERM, self.__signal_handler)
signal.signal(signal.SIGINT, self.__signal_handler)
client = BeatComputationClient(
options["broker_address"], options["verbosity"] >= 2
)
# client.timeout = 100
running_job_splits = []
cancelling_jobs = []
logger.info("starting scheduler")
while self.continue_:
splits_to_cancel = []
# Process all the incoming messages
reply = client.recv()
if reply is not None:
logger.info("Received: {}".format(reply))
worker_id, status = reply[:2]
split_id = int(reply[2])
if status == BCP.BCPP_JOB_RECEIVED:
logger.info(
"Job split {} was received by worker {}".format(
split_id, worker_id
)
)
elif status == BCP.BCPP_JOB_STARTED:
logger.info(
"Job split {} was was started by worker {}".format(
split_id, worker_id
)
)
split = get_split(split_id, status)
if split is not None:
on_split_started(split)
elif status == BCP.BCPP_JOB_DONE:
output = reply[3]
if sys.version_info < (3, 6):
output = output.decode("utf-8")
logger.info(
"Job split {} was was done by worker {}".format(
split_id, worker_id
)
)
split = get_split(split_id, status)
if split is not None:
on_split_done(split, json.loads(output))
remove_split_id_from(running_job_splits, split_id)
elif status == BCP.BCPP_JOB_CANCELLED:
split = get_split(split_id, status)
if split is not None:
logger.info(
"Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
split.id,
split.job.block.name,
split.split_index,
split.job.splits.count(),
split.job.block.experiment.fullname(),
split.worker.name,
)
on_split_cancelled(split)
remove_split_id_from(running_job_splits, split_id)
elif status == BCP.BCPP_JOB_ERROR:
message = reply[3]
if sys.version_info < (3, 6):
message = message.decode("utf-8")
logger.info(
"Job split {} processed by worker {} failed:\n{}".format(
split_id, worker_id, message
)
)
split = get_split(split_id, status)
if split is not None:
try:
error = json.loads(message)
except json.decoder.JSONDecodeError:
error = message
splits_to_cancel.extend(on_split_fail(split, error))
remove_split_id_from(running_job_splits, split_id)
elif status == BCP.BCPP_ERROR:
message = reply[3]
logger.info(
"Worker {} error for job split {}:\n{}".format(
worker_id, split_id, message
)
)
if split_id in running_job_splits:
split = get_split(split_id, status)
if split is not None:
splits_to_cancel.extend(on_split_fail(split, message))
remove_split_id_from(running_job_splits, split_id)
# Effectively cancel newly-cancelled experiments
splits_to_cancel.extend(process_newly_cancelled_experiments())
# Cancel the necessary jobs (if any)
for split_to_cancel in splits_to_cancel:
if split_to_cancel.id in running_job_splits:
logger.info(
"Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
split_to_cancel.id,
split_to_cancel.job.block.name,
split_to_cancel.split_index,
split_to_cancel.job.splits.count(),
split_to_can