diff --git a/.flake8 b/.flake8
new file mode 100644
index 0000000000000000000000000000000000000000..5fabfeed91611982e96a84c038dbfe24b0708055
--- /dev/null
+++ b/.flake8
@@ -0,0 +1,4 @@
+[flake8]
+max-line-length = 80
+select = B,C,E,F,W,T4,B9,B950
+ignore = E501, W503
diff --git a/.gitignore b/.gitignore
index 403e78b465845a77091f627aec5a3a0859af122f..5b977e4a7c18e35fc6096d5e39fa956f9439fde3 100644
--- a/.gitignore
+++ b/.gitignore
@@ -15,6 +15,7 @@ sphinx/
 .mr.developer.cfg
 .coverage
 *.sql3
+*.sqlite3
 .DS_Store
 beat/web/settings/settings.py
 src/
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index d990672c614808736f280e8b5c5f899507516525..f7e6c3d9aed8c41e2d6cdf20c15fdb3eab3bbcc2 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -27,7 +27,7 @@ build_linux_36:
     - buildout
     - python -c "from beat.core.test.utils import pull_docker_test_images as f; f()"
     - export COVERAGE_FILE=.coverage.django
-    - ./bin/coverage run --source=${CI_PROJECT_NAME} ./bin/django test --settings=beat.web.settings.ci -v 2
+    - ./bin/coverage run --source=${CI_PROJECT_NAME} ./bin/django test --settings=beat.web.settings.ci -v 2 --noinput
     - export BEAT_CMDLINE_TEST_PLATFORM="django://beat.web.settings.ci"
     - export COVERAGE_FILE=.coverage.cmdline
     - export NOSE_WITH_COVERAGE=1
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
new file mode 100644
index 0000000000000000000000000000000000000000..bb2dfa1a6897347679b3da2cb1bb15548e155a8b
--- /dev/null
+++ b/.pre-commit-config.yaml
@@ -0,0 +1,24 @@
+# See https://pre-commit.com for more information
+# See https://pre-commit.com/hooks.html for more hooks
+repos:
+  - repo: https://github.com/ambv/black
+    rev: stable
+    hooks:
+    - id: black
+      language_version: python3.6
+  - repo: https://github.com/pre-commit/pre-commit-hooks
+    rev: v2.0.0
+    hooks:
+      - id: trailing-whitespace
+      - id: end-of-file-fixer
+      - id: debug-statements
+      - id: check-added-large-files
+      - id: check-docstring-first
+      - id: flake8
+      - id: check-yaml
+        exclude: conda/meta.yaml
+  - repo: https://github.com/PyCQA/bandit
+    rev: 'master' # Update me!
+    hooks:
+      - id: bandit
+        exclude: beat/editor/test
diff --git a/beat/web/settings/ci.py b/beat/web/settings/ci.py
index 6bbfaf113d01c992e1fa43dd751d5642527f597a..3ee6eac46cc9db9997716b44d0b5d61d1208d54a 100755
--- a/beat/web/settings/ci.py
+++ b/beat/web/settings/ci.py
@@ -27,6 +27,7 @@
 
 # Django settings for tests on the CI server
 
-from .test import *
+from .test import *  # noqa
 
 RUNNING_ON_CI = True
+DATABASES["default"]["OPTIONS"]["timeout"] = 60  # noqa
diff --git a/beat/web/settings/test.py b/beat/web/settings/test.py
index 87eb015f4cf4ddbffa1115f64d43af9e19520875..53d5d1d83e5040f212b788b050fad476de1de2b3 100755
--- a/beat/web/settings/test.py
+++ b/beat/web/settings/test.py
@@ -26,48 +26,64 @@
 ###############################################################################
 
 # Django settings for tests
+import os
+import platform
+import sys
 
-from .settings import *
+from .settings import *  # noqa
 
 TEST_CONFIGURATION = True
 RUNNING_ON_CI = False
 
 DEBUG = False
-TEMPLATES[0]['OPTIONS']['debug'] = DEBUG
+TEMPLATES[0]["OPTIONS"]["debug"] = DEBUG  # noqa
 
-ALLOWED_HOSTS = [
-    'testserver',
-]
+ALLOWED_HOSTS = ["testserver"]
 
-DATABASES['default']['NAME'] = 'test.sql3'
-DATABASES['default']['TEST'] = {'NAME': DATABASES['default']['NAME']}
-DATABASES['default']['OPTIONS']['timeout'] = 30
+if platform.system() == "Linux":
+    shm_path = "/dev/shm/beatweb"  # nosec
+    if not os.path.exists(shm_path):
+        os.makedirs(shm_path)
 
-import sys
-if 'beat.cmdline' in sys.argv:
+    database_name = os.path.join(shm_path, "test.sqlite3")
+else:
+    database_name = "test.sqlite3"
+DATABASES["default"]["NAME"] = database_name  # noqa
+DATABASES["default"]["TEST"] = {"NAME": DATABASES["default"]["NAME"]}  # noqa
+DATABASES["default"]["OPTIONS"]["timeout"] = 30  # noqa
+DATABASES["default"]["ATOMIC_REQUESTS"] = True  # noqa
+
+# Timeout used in test when waiting for an update
+DB_REFRESH_TIMEOUT = int(os.environ.get("DB_REFRESH_TIMEOUT", 10))
+
+if "beat.cmdline" in sys.argv:
     # make it in-memory for cmdline app tests
-    DATABASES['default']['NAME'] = ':memory:'
+    DATABASES["default"]["NAME"] = ":memory:"  # noqa
 
-LOGGING['handlers']['console']['level'] = 'DEBUG'
-LOGGING['loggers']['beat.core']['handlers'] = ['discard']
-LOGGING['loggers']['beat.web']['handlers'] = ['discard']
-LOGGING['loggers']['beat.web.utils.management.commands']['handlers'] = ['discard']
+LOGGING["handlers"]["console"]["level"] = "DEBUG"  # noqa
+LOGGING["loggers"]["beat.core"]["handlers"] = ["discard"]  # noqa
+LOGGING["loggers"]["beat.web"]["handlers"] = ["discard"]  # noqa
+LOGGING["loggers"]["beat.web.utils.management.commands"]["handlers"] = [  # noqa
+    "discard"
+]
 
 BASE_DIR = os.path.dirname(os.path.abspath(__name__))
-PREFIX = os.environ.get('BEAT_TEST_PREFIX', os.path.realpath('./test_prefix'))
-ALGORITHMS_ROOT  = os.path.join(PREFIX, 'algorithms')
-PLOTTERS_ROOT    = os.path.join(PREFIX, 'plotters')
-LIBRARIES_ROOT   = os.path.join(PREFIX, 'libraries')
-DATABASES_ROOT   = os.path.join(PREFIX, 'databases')
-DATAFORMATS_ROOT = os.path.join(PREFIX, 'dataformats')
-TOOLCHAINS_ROOT  = os.path.join(PREFIX, 'toolchains')
-EXPERIMENTS_ROOT = os.path.join(PREFIX, 'experiments')
-CACHE_ROOT       = os.path.join(PREFIX, 'cache')
+if platform.system() == "Linux":
+    default_prefix = os.path.join(shm_path, "test_prefix")  # nosec
+else:
+    default_prefix = os.path.realpath("./test_prefix")
+PREFIX = os.environ.get("BEAT_TEST_PREFIX", default_prefix)
+ALGORITHMS_ROOT = os.path.join(PREFIX, "algorithms")
+PLOTTERS_ROOT = os.path.join(PREFIX, "plotters")
+LIBRARIES_ROOT = os.path.join(PREFIX, "libraries")
+DATABASES_ROOT = os.path.join(PREFIX, "databases")
+DATAFORMATS_ROOT = os.path.join(PREFIX, "dataformats")
+TOOLCHAINS_ROOT = os.path.join(PREFIX, "toolchains")
+EXPERIMENTS_ROOT = os.path.join(PREFIX, "experiments")
+CACHE_ROOT = os.path.join(PREFIX, "cache")
 
 LOCAL_SCHEDULER_VERBOSITY = None
 LOCAL_SCHEDULER_USE_DOCKER = False
 
 # To speed-up tests, don't put this in production
-PASSWORD_HASHERS = [
-    'django.contrib.auth.hashers.MD5PasswordHasher',
-]
+PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
diff --git a/beat/web/utils/management/commands/broker.py b/beat/web/utils/management/commands/broker.py
new file mode 100644
index 0000000000000000000000000000000000000000..282cc35ddc20686f5d025111e4ac5545527ec9d3
--- /dev/null
+++ b/beat/web/utils/management/commands/broker.py
@@ -0,0 +1,105 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+# encoding: utf-8
+
+###############################################################################
+#                                                                             #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/           #
+# Contact: beat.support@idiap.ch                                              #
+#                                                                             #
+# This file is part of the beat.web module of the BEAT platform.              #
+#                                                                             #
+# Commercial License Usage                                                    #
+# Licensees holding valid commercial BEAT licenses may use this file in       #
+# accordance with the terms contained in a written agreement between you      #
+# and Idiap. For further information contact tto@idiap.ch                     #
+#                                                                             #
+# Alternatively, this file may be used under the terms of the GNU Affero      #
+# Public License version 3 as published by the Free Software and appearing    #
+# in the file LICENSE.AGPL included in the packaging of this file.            #
+# The BEAT platform is distributed in the hope that it will be useful, but    #
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
+# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
+#                                                                             #
+# You should have received a copy of the GNU Affero Public License along      #
+# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
+#                                                                             #
+###############################################################################
+
+import logging
+
+from django.core.management.base import BaseCommand
+from django.db import transaction
+
+from beat.web.backend.models import Worker
+from beat.core.bcpapi.broker import BeatComputationBroker
+from beat.core.utils import setup_logging
+
+logger = logging.getLogger(__name__)
+
+# ----------------------------------------------------------
+
+
+def onWorkerReady(name):
+    logger.info("Worker '%s' is ready", name)
+    try:
+        worker = Worker.objects.get(name=name)
+    except Worker.DoesNotExist:
+        logger.error("No worker named '%s' found in the database", name)
+    else:
+        with transaction.atomic():
+            worker.active = True
+            worker.info = "Connected to the scheduler"
+            worker.save()
+
+
+# ----------------------------------------------------------
+
+
+def onWorkerGone(name):
+    logger.info("Worker '%s' is gone", name)
+    try:
+        worker = Worker.objects.get(name=name)
+    except Worker.DoesNotExist:
+        logger.error("No worker named '%s' found in the database", name)
+    else:
+        with transaction.atomic():
+            worker.active = False
+            worker.info = "Disconnected from the scheduler"
+            worker.save()
+
+
+class Command(BaseCommand):
+
+    help = "Start zmq broker"
+
+    def add_arguments(self, parser):
+        parser.add_argument(
+            "--port",
+            "-p",
+            type=int,
+            dest="port",
+            default=5555,
+            help="Port of the broker",
+        )
+
+    def handle(self, *args, **options):
+        verbosity = int(options["verbosity"])
+        logger = setup_logging(verbosity, "Broker", __name__)
+
+        if verbosity >= 1:
+            if verbosity == 1:
+                logger.setLevel(logging.INFO)
+            elif verbosity >= 2:
+                logger.setLevel(logging.DEBUG)
+
+        logger.info("starting broker")
+
+        address = "tcp://*:{}".format(options["port"])
+        broker = BeatComputationBroker(verbosity >= 2)
+        broker.set_worker_callbacks(onWorkerReady, onWorkerGone)
+        broker.bind(address)
+        broker.mediate()
+        broker.purge_workers()
+
+        logger.info("broker stopped")
diff --git a/beat/web/utils/management/commands/full_scheduling.py b/beat/web/utils/management/commands/full_scheduling.py
new file mode 100644
index 0000000000000000000000000000000000000000..96bc69283d210c41eef4c342c256195f5a693646
--- /dev/null
+++ b/beat/web/utils/management/commands/full_scheduling.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+# encoding: utf-8
+
+###############################################################################
+#                                                                             #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/           #
+# Contact: beat.support@idiap.ch                                              #
+#                                                                             #
+# This file is part of the beat.web module of the BEAT platform.              #
+#                                                                             #
+# Commercial License Usage                                                    #
+# Licensees holding valid commercial BEAT licenses may use this file in       #
+# accordance with the terms contained in a written agreement between you      #
+# and Idiap. For further information contact tto@idiap.ch                     #
+#                                                                             #
+# Alternatively, this file may be used under the terms of the GNU Affero      #
+# Public License version 3 as published by the Free Software and appearing    #
+# in the file LICENSE.AGPL included in the packaging of this file.            #
+# The BEAT platform is distributed in the hope that it will be useful, but    #
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
+# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
+#                                                                             #
+# You should have received a copy of the GNU Affero Public License along      #
+# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
+#                                                                             #
+###############################################################################
+
+import logging
+import multiprocessing
+import signal
+
+from django.core.management.base import BaseCommand
+from django.core.management import call_command
+from django.conf import settings
+from django import db
+
+from beat.core.utils import find_free_port
+
+logger = logging.getLogger(__name__)
+
+
+def start_broker(port, verbosity=0):
+    db.connections.close_all()
+    call_command("broker", port=port, verbosity=verbosity)
+
+
+def start_scheduler(broker_address, verbosity=0):
+    db.connections.close_all()
+    call_command("scheduler", broker_address=broker_address, verbosity=verbosity)
+
+
+def start_worker(broker_address, prefix, cache, use_docker, verbosity=0):
+    call_command(
+        "worker",
+        broker_address=broker_address,
+        prefix=prefix,
+        cache=cache,
+        use_docker=use_docker,
+        verbosity=verbosity,
+    )
+
+
+class Command(BaseCommand):
+
+    help = "Run a complete local scheduler/broker/worker setup"
+
+    def __init__(self):
+        super(Command, self).__init__()
+
+        self.broker = None
+        self.worker = None
+        self.scheduler = None
+
+    def __signal_handler(self, signum, frame):
+        self.scheduler.terminate()
+        self.worker.terminate()
+        self.broker.terminate()
+
+    def add_arguments(self, parser):
+        parser.add_argument(
+            "--docker",
+            "-d",
+            action="store_true",
+            dest="use_docker",
+            default=False,
+            help="Use docker",
+        )
+
+    def handle(self, *args, **options):
+        signal.signal(signal.SIGTERM, self.__signal_handler)
+        signal.signal(signal.SIGINT, self.__signal_handler)
+
+        verbosity = options["verbosity"]
+
+        port = find_free_port()
+        broker_address = "tcp://localhost:{}".format(port)
+
+        self.broker = multiprocessing.Process(
+            target=start_broker, args=(port, verbosity)
+        )
+        self.worker = multiprocessing.Process(
+            target=start_worker,
+            args=(
+                broker_address,
+                settings.PREFIX,
+                settings.CACHE_ROOT,
+                options["use_docker"],
+                verbosity,
+            ),
+        )
+        self.scheduler = multiprocessing.Process(
+            target=start_scheduler, args=(broker_address, verbosity)
+        )
+
+        self.broker.start()
+        self.worker.start()
+        self.scheduler.start()
+
+        self.broker.join()
+        self.scheduler.join()
+        self.worker.join()
diff --git a/beat/web/utils/management/commands/scheduler.py b/beat/web/utils/management/commands/scheduler.py
new file mode 100644
index 0000000000000000000000000000000000000000..06beb3d1cd7e5073beabf8431fdf62ceffcb8965
--- /dev/null
+++ b/beat/web/utils/management/commands/scheduler.py
@@ -0,0 +1,257 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+# encoding: utf-8
+
+###############################################################################
+#                                                                             #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/           #
+# Contact: beat.support@idiap.ch                                              #
+#                                                                             #
+# This file is part of the beat.web module of the BEAT platform.              #
+#                                                                             #
+# Commercial License Usage                                                    #
+# Licensees holding valid commercial BEAT licenses may use this file in       #
+# accordance with the terms contained in a written agreement between you      #
+# and Idiap. For further information contact tto@idiap.ch                     #
+#                                                                             #
+# Alternatively, this file may be used under the terms of the GNU Affero      #
+# Public License version 3 as published by the Free Software and appearing    #
+# in the file LICENSE.AGPL included in the packaging of this file.            #
+# The BEAT platform is distributed in the hope that it will be useful, but    #
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
+# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
+#                                                                             #
+# You should have received a copy of the GNU Affero Public License along      #
+# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
+#                                                                             #
+###############################################################################
+
+import logging
+import signal
+import json
+import sys
+
+from django.core.management.base import BaseCommand
+from django.conf import settings
+
+from beat.core.bcpapi import BCP
+from beat.core.bcpapi.client import BeatComputationClient
+
+from ....backend.models import JobSplit
+from ....backend.helpers import split_new_jobs
+from ....backend.helpers import process_newly_cancelled_experiments
+from ....backend.helpers import assign_splits_to_workers
+from ....backend.helpers import get_configuration_for_split
+from ....backend.helpers import on_split_started
+from ....backend.helpers import on_split_done
+from ....backend.helpers import on_split_fail
+from ....backend.helpers import on_split_cancelled
+
+logger = logging.getLogger(__name__)
+
+
+def get_split(split_id, status):
+    try:
+        return JobSplit.objects.get(id=split_id)
+    except JobSplit.DoesNotExist:
+        logger.warning(
+            "Received message '{}' for unknown job split #{}".format(status, split_id),
+            status,
+            split_id,
+        )
+        return None
+
+
+def remove_split_id_from(split_list, split_id):
+    try:
+        split_list.remove(split_list.index(split_id))
+    except ValueError:
+        pass
+
+
+class Command(BaseCommand):
+
+    help = "Start scheduler"
+
+    def __init__(self):
+        super(Command, self).__init__()
+
+        self.continue_ = True
+
+    def __signal_handler(self, signum, frame):
+        self.continue_ = False
+
+    def add_arguments(self, parser):
+        parser.add_argument(
+            "--broker-address",
+            "-b",
+            type=str,
+            dest="broker_address",
+            default="tcp://localhost:5555",
+            help="Address of the broker",
+        )
+        parser.add_argument(
+            "--interval",
+            "-i",
+            type=int,
+            dest="interval",
+            default=settings.SCHEDULING_INTERVAL,
+            help="Polling interval",
+        )
+
+    def handle(self, *args, **options):
+        signal.signal(signal.SIGTERM, self.__signal_handler)
+        signal.signal(signal.SIGINT, self.__signal_handler)
+
+        client = BeatComputationClient(
+            options["broker_address"], options["verbosity"] >= 2
+        )
+        # client.timeout = 100
+
+        running_job_splits = []
+        cancelling_jobs = []
+
+        logger.info("starting scheduler")
+        while self.continue_:
+            splits_to_cancel = []
+
+            # Process all the incoming messages
+            reply = client.recv()
+
+            if reply is not None:
+                logger.info("Received: {}".format(reply))
+
+                worker_id, status = reply[:2]
+                split_id = int(reply[2])
+
+                if status == BCP.BCPP_JOB_RECEIVED:
+                    logger.info(
+                        "Job split {} was received by worker {}".format(
+                            split_id, worker_id
+                        )
+                    )
+
+                elif status == BCP.BCPP_JOB_STARTED:
+                    logger.info(
+                        "Job split {} was was started by worker {}".format(
+                            split_id, worker_id
+                        )
+                    )
+                    split = get_split(split_id, status)
+                    if split is not None:
+                        on_split_started(split)
+
+                elif status == BCP.BCPP_JOB_DONE:
+                    output = reply[3]
+                    if sys.version_info < (3, 6):
+                        output = output.decode("utf-8")
+
+                    logger.info(
+                        "Job split {} was was done by worker {}".format(
+                            split_id, worker_id
+                        )
+                    )
+                    split = get_split(split_id, status)
+                    if split is not None:
+                        on_split_done(split, json.loads(output))
+                        remove_split_id_from(running_job_splits, split_id)
+
+                elif status == BCP.BCPP_JOB_CANCELLED:
+                    split = get_split(split_id, status)
+                    if split is not None:
+                        logger.info(
+                            "Job split #%d (%s %d/%d @ %s) on '%s' is CANCELLED",
+                            split.id,
+                            split.job.block.name,
+                            split.split_index,
+                            split.job.splits.count(),
+                            split.job.block.experiment.fullname(),
+                            split.worker.name,
+                        )
+
+                        on_split_cancelled(split)
+                        remove_split_id_from(running_job_splits, split_id)
+
+                elif status == BCP.BCPP_JOB_ERROR:
+                    message = reply[3]
+                    if sys.version_info < (3, 6):
+                        message = message.decode("utf-8")
+                    logger.info(
+                        "Job split {} processed by worker {} failed:\n{}".format(
+                            split_id, worker_id, message
+                        )
+                    )
+                    split = get_split(split_id, status)
+                    if split is not None:
+                        try:
+                            error = json.loads(message)
+                        except json.decoder.JSONDecodeError:
+                            error = message
+
+                        splits_to_cancel.extend(on_split_fail(split, error))
+                        remove_split_id_from(running_job_splits, split_id)
+                elif status == BCP.BCPP_ERROR:
+                    message = reply[3]
+                    logger.info(
+                        "Worker {} error for job split {}:\n{}".format(
+                            worker_id, split_id, message
+                        )
+                    )
+                    if split_id in running_job_splits:
+                        split = get_split(split_id, status)
+                        if split is not None:
+                            splits_to_cancel.extend(on_split_fail(split, message))
+                            remove_split_id_from(running_job_splits, split_id)
+
+            # Effectively cancel newly-cancelled experiments
+            splits_to_cancel.extend(process_newly_cancelled_experiments())
+
+            # Cancel the necessary jobs (if any)
+            for split_to_cancel in splits_to_cancel:
+                if split_to_cancel.id in running_job_splits:
+                    logger.info(
+                        "Cancelling job split #%d (%s %d/%d @ %s) on '%s'",
+                        split_to_cancel.id,
+                        split_to_cancel.job.block.name,
+                        split_to_cancel.split_index,
+                        split_to_cancel.job.splits.count(),
+                        split_to_cancel.job.block.experiment.fullname(),
+                        split_to_cancel.worker.name,
+                    )
+
+                    request = [BCP.BCPE_CANCEL, str(split_to_cancel.id).encode("utf-8")]
+                    client.send(split_to_cancel.worker.name.encode("utf-8"), request)
+
+                    remove_split_id_from(running_job_splits, split_to_cancel.id)
+                    cancelling_jobs.append(split_to_cancel.id)
+
+            if not self.continue_:
+                break
+
+            # Start new jobs
+            split_new_jobs()
+            assigned_splits = assign_splits_to_workers()
+
+            for split in assigned_splits:
+                running_job_splits.append(split.id)
+
+                configuration = get_configuration_for_split(split)
+
+                logger.info(
+                    "Sending job split #%d (%s %d/%d @ %s) on '%s'",
+                    split.id,
+                    split.job.block.name,
+                    split.split_index,
+                    split.job.splits.count(),
+                    split.job.block.experiment.fullname(),
+                    split.worker.name,
+                )
+
+                request = [
+                    BCP.BCPE_EXECUTE,
+                    str(split.id).encode("utf-8"),
+                    json.dumps(configuration).encode("utf-8"),
+                ]
+                client.send(split.worker.name.encode("utf-8"), request)
+
+        logger.info("scheduler stopped")
diff --git a/beat/web/utils/management/commands/worker.py b/beat/web/utils/management/commands/worker.py
new file mode 100644
index 0000000000000000000000000000000000000000..4429b282ef4f9092d346bd92dbe0dfa17128932a
--- /dev/null
+++ b/beat/web/utils/management/commands/worker.py
@@ -0,0 +1,134 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+# encoding: utf-8
+
+###############################################################################
+#                                                                             #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/           #
+# Contact: beat.support@idiap.ch                                              #
+#                                                                             #
+# This file is part of the beat.web module of the BEAT platform.              #
+#                                                                             #
+# Commercial License Usage                                                    #
+# Licensees holding valid commercial BEAT licenses may use this file in       #
+# accordance with the terms contained in a written agreement between you      #
+# and Idiap. For further information contact tto@idiap.ch                     #
+#                                                                             #
+# Alternatively, this file may be used under the terms of the GNU Affero      #
+# Public License version 3 as published by the Free Software and appearing    #
+# in the file LICENSE.AGPL included in the packaging of this file.            #
+# The BEAT platform is distributed in the hope that it will be useful, but    #
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
+# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
+#                                                                             #
+# You should have received a copy of the GNU Affero Public License along      #
+# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
+#                                                                             #
+###############################################################################
+
+import socket
+
+from django.core.management.base import BaseCommand
+from django.conf import settings
+
+from beat.core.bcp import worker
+from beat.core.utils import setup_logging
+
+
+class Command(BaseCommand):
+
+    help = "Start zmq worker"
+
+    def add_arguments(self, parser):
+        parser.add_argument(
+            "--name",
+            "-n",
+            type=str,
+            dest="service_name",
+            default=socket.gethostname(),
+            help="Service name",
+        )
+        parser.add_argument(
+            "--broker",
+            "-b",
+            type=str,
+            dest="broker_address",
+            default="tcp://localhost:5555",
+            help="Broker address",
+        )
+        parser.add_argument(
+            "--docker",
+            "-d",
+            action="store_true",
+            dest="use_docker",
+            default=False,
+            help="Use docker",
+        )
+        parser.add_argument(
+            "--prefix", "-p", dest="prefix", default=settings.PREFIX, help="Prefix path"
+        )
+        parser.add_argument(
+            "--cache",
+            "-c",
+            type=str,
+            dest="cache",
+            default=settings.CACHE_ROOT,
+            help="Cache path",
+        )
+        parser.add_argument(
+            "--network-name",
+            dest="network_name",
+            type=str,
+            default=None,
+            help="Docker network name",
+        )
+        parser.add_argument(
+            "--port-range",
+            dest="port_range",
+            type=str,
+            default=None,
+            help="Docker port range",
+        )
+        parser.add_argument(
+            "--docker-images-cache",
+            dest="docker_images_cache",
+            type=str,
+            default=None,
+            help="Docker image cache",
+        )
+
+    def handle(self, *args, **options):
+        verbosity = options["verbosity"]
+        logger = setup_logging(verbosity, "Worker", __name__)
+        logger.info("starting worker")
+
+        argv = [
+            "--name=" + options["service_name"],
+            "--prefix=" + options["prefix"],
+            "--cache=" + options["cache"],
+        ]
+
+        argv.extend(["-v" for i in range(options["verbosity"])])
+
+        if options["use_docker"]:
+            argv.append("--docker")
+
+            network_name = options.get("network_name")
+            if network_name:
+                argv.append("--docker-network=" + network_name)
+
+            port_range = options.get("port_range")
+            if port_range:
+                argv.append("--port-range=" + port_range)
+
+            docker_images_cache = options.get("docker_images_cache")
+            if docker_images_cache:
+                argv.append("--docker-images-cache=" + docker_images_cache)
+
+        argv.append(options["broker_address"])
+
+        status = worker.main(argv)
+
+        logger.info("worker stopped")
+
+        return status
diff --git a/beat/web/utils/tests/__init__.py b/beat/web/utils/tests/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/beat/web/utils/tests/test_scheduling.py b/beat/web/utils/tests/test_scheduling.py
new file mode 100644
index 0000000000000000000000000000000000000000..9da85506640c292407f299c7c34da5eb6ec7ad3e
--- /dev/null
+++ b/beat/web/utils/tests/test_scheduling.py
@@ -0,0 +1,701 @@
+#!/usr/bin/env python
+# vim: set fileencoding=utf-8 :
+
+###############################################################################
+#                                                                             #
+# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/           #
+# Contact: beat.support@idiap.ch                                              #
+#                                                                             #
+# This file is part of the beat.web module of the BEAT platform.              #
+#                                                                             #
+# Commercial License Usage                                                    #
+# Licensees holding valid commercial BEAT licenses may use this file in       #
+# accordance with the terms contained in a written agreement between you      #
+# and Idiap. For further information contact tto@idiap.ch                     #
+#                                                                             #
+# Alternatively, this file may be used under the terms of the GNU Affero      #
+# Public License version 3 as published by the Free Software and appearing    #
+# in the file LICENSE.AGPL included in the packaging of this file.            #
+# The BEAT platform is distributed in the hope that it will be useful, but    #
+# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
+# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
+#                                                                             #
+# You should have received a copy of the GNU Affero Public License along      #
+# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
+#                                                                             #
+###############################################################################
+
+import os
+import multiprocessing
+import tempfile
+import time
+
+from unittest import skipIf
+
+from django.test import TransactionTestCase
+from django.conf import settings
+from django.core.management import call_command
+from django import db
+
+from beat.core.utils import find_free_port
+
+from beat.web.backend.models.worker import Worker
+from beat.web.backend.tests.common import BackendUtilitiesMixin
+from beat.web.backend.tests.common import ONE_QUEUE_TWO_WORKERS
+from beat.web.backend.utils import setup_backend
+from beat.web.backend.helpers import schedule_experiment
+from beat.web.backend.helpers import cancel_experiment
+from beat.web.experiments.models import Experiment
+from beat.web.experiments.models import Block
+
+
+def start_broker(port, verbosity=0):
+    db.connections.close_all()
+    call_command("broker", port=port, verbosity=verbosity)
+
+
+def start_worker(worker_name, broker_address, prefix, cache, verbosity=0):
+    call_command(
+        "worker",
+        name=worker_name,
+        broker_address=broker_address,
+        prefix=prefix,
+        cache=cache,
+        verbosity=verbosity,
+    )
+
+
+def start_scheduler(broker_address, verbosity=0):
+    db.connections.close_all()
+    call_command("scheduler", broker_address=broker_address, verbosity=verbosity)
+
+
+@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
+class WorkerRegistration(TransactionTestCase):
+    def setUp(self):
+        db_worker = Worker(name="test_worker1", active=False, cores=1, memory=12)
+        db_worker.save()
+        db_worker2 = Worker(name="test_worker2", active=False, cores=1, memory=12)
+        db_worker2.save()
+
+        self.db_workers = [db_worker, db_worker2]
+        self.port = find_free_port()
+        self.broker_address = "tcp://localhost:{}".format(self.port)
+
+    def test_worker_registration(self):
+        prefix = tempfile.TemporaryDirectory(prefix="beat.scheduling")
+        cache = os.path.join(prefix.name, "cache")
+        os.mkdir(cache)
+
+        broker = multiprocessing.Process(target=start_broker, args=(self.port, 0))
+        workers = []
+        for db_worker in self.db_workers:
+            worker = multiprocessing.Process(
+                target=start_worker,
+                args=(db_worker.name, self.broker_address, prefix.name, cache, 0),
+            )
+            workers.append(worker)
+
+        broker.start()
+
+        for worker in workers:
+            worker.start()
+
+        max_rounds = 5
+        for db_worker in self.db_workers:
+            db_worker.refresh_from_db()
+
+        while max_rounds > 0 and not all([worker.active for worker in self.db_workers]):
+            max_rounds -= 1
+            worker.join(5)  # give time to have heartbeat sent and gone detected
+            for db_worker in self.db_workers:
+                db_worker.refresh_from_db()
+
+        was_active = all([db_worker.active for worker in self.db_workers])
+
+        for worker in workers:
+            worker.terminate()
+            worker.join()
+
+        max_rounds = 5
+        while max_rounds > 0 and not all(
+            [not worker.active for worker in self.db_workers]
+        ):
+            max_rounds -= 1
+            broker.join(5)  # give time to have heartbeat sent and gone detected
+            for db_worker in self.db_workers:
+                db_worker.refresh_from_db()
+
+        broker.terminate()
+        broker.join()
+
+        self.assertTrue(was_active)
+        self.assertTrue(all([not db_worker.active for worker in self.db_workers]))
+
+
+# ----------------------------------------------------------
+
+
+@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
+class TestSchedulerBase(TransactionTestCase, BackendUtilitiesMixin):
+    def __init__(self, methodName="runTest"):
+        super(TestSchedulerBase, self).__init__(methodName)
+        self.port = find_free_port()
+        self.broker_address = "tcp://localhost:{}".format(self.port)
+
+        self.scheduler = None
+        self.broker = None
+        self.workers = {}
+
+        self.command_verbosity = 3
+        self.timeout = settings.DB_REFRESH_TIMEOUT  # Failure timeout
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.setup_test_data()
+
+    def setUp(self):
+        self.shutdown_everything()
+        self.clean_cache()
+
+        TestSchedulerBase.setup_test_data()
+        setup_backend(ONE_QUEUE_TWO_WORKERS)
+        Worker.objects.update(active=False)
+
+    def tearDown(self):
+        self.shutdown_everything()
+        self.clean_cache()
+
+    def start_scheduling(self, verbosity=None):
+        if verbosity is None:
+            verbosity = self.command_verbosity
+
+        self.start_broker(verbosity)
+        self.start_scheduler(verbosity)
+
+    def shutdown_everything(self):
+        for name in list(self.workers.keys()):
+            self.stop_worker(name)
+
+        self.workers = {}
+
+        self.stop_scheduler()
+        self.stop_broker()
+
+    def start_scheduler(self, verbosity=None):
+        if verbosity is None:
+            verbosity = self.command_verbosity
+
+        self.scheduler = multiprocessing.Process(
+            target=start_scheduler, args=(self.broker_address, verbosity)
+        )
+        self.scheduler.start()
+
+    def stop_scheduler(self):
+        if self.scheduler is not None:
+            self.scheduler.terminate()
+            self.scheduler.join()
+            self.scheduler = None
+
+    def start_broker(self, verbosity=None):
+        if verbosity is None:
+            verbosity = self.command_verbosity
+
+        self.broker = multiprocessing.Process(
+            target=start_broker, args=(self.port, verbosity)
+        )
+        self.broker.start()
+
+    def stop_broker(self):
+        if self.broker is not None:
+            self.broker.terminate()
+            self.broker.join()
+            self.broker = None
+
+    def start_worker(self, name, verbosity=None):
+        if verbosity is None:
+            verbosity = self.command_verbosity
+
+        worker = multiprocessing.Process(
+            target=start_worker,
+            args=(
+                name,
+                self.broker_address,
+                settings.PREFIX,
+                settings.CACHE_ROOT,
+                verbosity,
+            ),
+        )
+
+        worker.start()
+
+        self.workers[name] = worker
+
+    def stop_worker(self, name):
+        worker = self.workers.pop(name, None)
+
+        if worker is not None:
+            worker.terminate()
+            worker.join()
+            del worker
+
+    def check_worker_status(self, name, active, timeout=10):
+        start = time.time()
+        worker = Worker.objects.get(name=name)
+
+        while (
+            worker.active != active
+        ):  # Worker.objects.filter(name=name, active=active).count() == 0:
+            if time.time() - start > timeout:
+                self.shutdown_everything()
+                self.assertTrue(False)
+            worker.refresh_from_db()
+
+
+# ----------------------------------------------------------
+
+
+@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
+class TestConnection(TestSchedulerBase):
+    def test_worker_connection(self):
+        self.start_scheduling()
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 0)
+
+        self.start_worker("node1")
+
+        self.check_worker_status("node1", True)
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 1)
+
+    def test_worker_disconnection(self):
+        self.start_scheduling()
+        self.start_worker("node1")
+
+        self.check_worker_status("node1", True)
+
+        self.workers["node1"].terminate()
+        self.workers["node1"].join()
+        del self.workers["node1"]
+
+        self.check_worker_status("node1", False)
+
+    def test_two_workers_connection(self):
+        self.start_scheduling()
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 0)
+
+        self.start_worker("node1")
+        self.start_worker("node2")
+
+        self.check_worker_status("node1", True)
+        self.check_worker_status("node2", True)
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 2)
+
+    def test_scheduler_last(self):
+        self.start_worker("node1")
+
+        time.sleep(1)
+
+        self.start_scheduling()
+
+        self.check_worker_status("node1", True)
+
+    def test_broker_shutdown(self):
+        self.start_broker()
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 0)
+
+        self.start_worker("node1")
+
+        self.check_worker_status("node1", True)
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 1)
+
+        self.stop_broker()
+
+        self.assertEqual(Worker.objects.filter(active=False).count(), 1)
+
+        time.sleep(1)
+
+        self.start_broker()
+
+        self.check_worker_status("node1", True)
+
+        self.assertEqual(Worker.objects.filter(active=True).count(), 1)
+
+
+# ----------------------------------------------------------
+
+
+@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
+class TestExecution(TestSchedulerBase):
+    def setUp(self):
+        super(TestExecution, self).setUp()
+
+        self.start_scheduling()
+        self.start_worker("node1")
+        self.start_worker("node2")
+        self.check_worker_status("node1", True)
+        self.check_worker_status("node2", True)
+
+    def test_successful_experiment(self):
+        fullname = "user/user/double/1/double"
+
+        xp = Experiment.objects.get(name=fullname.split("/")[-1])
+
+        self.prepare_databases(xp.declaration)
+
+        schedule_experiment(xp)
+        xp.refresh_from_db()
+
+        start = time.time()
+        while xp.status != Experiment.DONE:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp.refresh_from_db()
+
+        b1 = xp.blocks.get(name="echo1")
+        b2 = xp.blocks.get(name="echo2")
+        b3 = xp.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.DONE)
+        self.assertEqual(b3.status, Block.DONE)
+
+    def test_successful_splitted_experiment(self):
+        fullname = "user/user/double/1/double_split_2"
+
+        xp = Experiment.objects.get(name=fullname.split("/")[-1])
+
+        self.prepare_databases(xp.declaration)
+
+        schedule_experiment(xp)
+        xp.refresh_from_db()
+
+        start = time.time()
+        while xp.status != Experiment.DONE:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp.refresh_from_db()
+
+        b1 = xp.blocks.get(name="echo1")
+        b2 = xp.blocks.get(name="echo2")
+        b3 = xp.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.DONE)
+        self.assertEqual(b3.status, Block.DONE)
+
+    def test_two_similar_experiments(self):
+        fullname1 = "user/user/double/1/double"
+        fullname2 = "user/user/double/1/double_split_2"
+
+        xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
+        xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
+
+        self.prepare_databases(xp1.declaration)
+        self.prepare_databases(xp2.declaration)
+
+        schedule_experiment(xp1)
+        schedule_experiment(xp2)
+
+        xp1.refresh_from_db()
+        xp2.refresh_from_db()
+
+        start = time.time()
+        while (xp1.status != Experiment.DONE) or (xp2.status != Experiment.DONE):
+            self.assertTrue(time.time() - start < self.timeout)
+            xp1.refresh_from_db()
+            xp2.refresh_from_db()
+
+        self.assertEqual(xp1.start_date, xp2.start_date)
+        self.assertEqual(xp1.end_date, xp2.end_date)
+
+        b1 = xp1.blocks.get(name="echo1")
+        b2 = xp2.blocks.get(name="echo1")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.DONE)
+        self.assertEqual(b1.start_date, b2.start_date)
+        self.assertEqual(b1.end_date, b2.end_date)
+
+        b1 = xp1.blocks.get(name="echo2")
+        b2 = xp2.blocks.get(name="echo2")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.DONE)
+        self.assertEqual(b1.start_date, b2.start_date)
+        self.assertEqual(b1.end_date, b2.end_date)
+
+        b1 = xp1.blocks.get(name="analysis")
+        b2 = xp2.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.DONE)
+        self.assertEqual(b1.start_date, b2.start_date)
+        self.assertEqual(b1.end_date, b2.end_date)
+
+    def test_two_different_experiments(self):
+        fullname1 = "user/user/single/1/single"
+        fullname2 = "user/user/single/1/single_add"
+
+        xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
+        xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
+
+        self.prepare_databases(xp1.declaration)
+        self.prepare_databases(xp2.declaration)
+
+        schedule_experiment(xp1)
+        schedule_experiment(xp2)
+
+        xp1.refresh_from_db()
+        xp2.refresh_from_db()
+
+        start = time.time()
+        while (xp1.status != Experiment.DONE) or (xp2.status != Experiment.DONE):
+            self.assertTrue(time.time() - start < self.timeout)
+            xp1.refresh_from_db()
+            xp2.refresh_from_db()
+
+        self.assertNotEqual(xp1.start_date, xp2.start_date)
+        self.assertNotEqual(xp1.end_date, xp2.end_date)
+
+    def test_two_delayed_similar_experiments(self):
+        fullname1 = "user/user/double/1/double"
+        fullname2 = "user/user/double/1/double_split_2"
+
+        xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
+        xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
+
+        self.prepare_databases(xp1.declaration)
+        self.prepare_databases(xp2.declaration)
+
+        schedule_experiment(xp1)
+
+        start = time.time()
+        while xp1.status != Experiment.RUNNING:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp1.refresh_from_db()
+
+        schedule_experiment(xp2)
+        # xp2.refresh_from_db()
+
+        start = time.time()
+        while xp1.status != Experiment.DONE or xp2.status != Experiment.DONE:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp1.refresh_from_db()
+            xp2.refresh_from_db()
+
+    def test_failed_experiment(self):
+        fullname = "user/user/single/1/single_error"
+
+        xp = Experiment.objects.get(name=fullname.split("/")[-1])
+
+        self.prepare_databases(xp.declaration)
+
+        schedule_experiment(xp)
+        xp.refresh_from_db()
+
+        start = time.time()
+        while xp.status != Experiment.FAILED:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp.refresh_from_db()
+
+        b1 = xp.blocks.get(name="echo")
+        b2 = xp.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.FAILED)
+        self.assertEqual(b2.status, Block.CANCELLED)
+
+    def test_failed_splitted_experiment(self):
+        fullname = "user/user/double/1/double_error_split_2"
+
+        xp = Experiment.objects.get(name=fullname.split("/")[-1])
+
+        self.prepare_databases(xp.declaration)
+
+        schedule_experiment(xp)
+        xp.refresh_from_db()
+
+        start = time.time()
+        while xp.status != Experiment.FAILED:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp.refresh_from_db()
+
+        b1 = xp.blocks.get(name="echo1")
+        b2 = xp.blocks.get(name="echo2")
+        b3 = xp.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.FAILED)
+        self.assertEqual(b3.status, Block.CANCELLED)
+
+    def test_failed_mirror_experiments(self):
+        fullname1 = "user/user/double/1/double_error"
+        fullname2 = "user/user/double/1/double_error_split_2"
+
+        xp1 = Experiment.objects.get(name=fullname1.split("/")[-1])
+        xp2 = Experiment.objects.get(name=fullname2.split("/")[-1])
+
+        self.prepare_databases(xp1.declaration)
+        self.prepare_databases(xp2.declaration)
+
+        schedule_experiment(xp1)
+        schedule_experiment(xp2)
+
+        xp1.refresh_from_db()
+        xp2.refresh_from_db()
+
+        start = time.time()
+        while xp1.status != Experiment.FAILED or xp2.status != Experiment.FAILED:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp1.refresh_from_db()
+            xp2.refresh_from_db()
+
+        b1 = xp1.blocks.get(name="echo1")
+        b2 = xp1.blocks.get(name="echo2")
+        b3 = xp1.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.FAILED)
+        self.assertEqual(b3.status, Block.CANCELLED)
+
+        b1 = xp2.blocks.get(name="echo1")
+        b2 = xp2.blocks.get(name="echo2")
+        b3 = xp2.blocks.get(name="analysis")
+
+        self.assertEqual(b1.status, Block.DONE)
+        self.assertEqual(b2.status, Block.FAILED)
+        self.assertEqual(b3.status, Block.CANCELLED)
+
+
+# ----------------------------------------------------------
+
+
+@skipIf(settings.RUNNING_ON_CI, "Not runnable on the CI")
+class TestCancellation(TestSchedulerBase):
+    def setUp(self):
+        super(TestCancellation, self).setUp()
+
+        self.start_scheduling(0)
+        self.start_worker("node1", 0)
+        self.check_worker_status("node1", True)
+        self.start_worker("node2", 0)
+        self.check_worker_status("node2", True)
+
+    def process(self, experiment_name, block_name=None):
+        xp_name = experiment_name.split("/")[-1]
+
+        xp = Experiment.objects.get(name=xp_name)
+
+        self.prepare_databases(xp.declaration)
+
+        schedule_experiment(xp)
+        xp.refresh_from_db()
+
+        start = time.time()
+        while xp.status != Experiment.RUNNING:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp.refresh_from_db()
+
+        if block_name is not None:
+            block = xp.blocks.get(name=block_name)
+
+            start = time.time()
+            while block.status != Block.PROCESSING:
+                self.assertTrue(time.time() - start < self.timeout)
+                block.refresh_from_db()
+
+        cancel_experiment(xp)
+        xp.refresh_from_db()
+
+        start = time.time()
+        while xp.status != Experiment.PENDING:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp.refresh_from_db()
+
+        self.assertTrue(xp.blocks.filter(status=Block.CANCELLED).count() > 0)
+        self.assertEqual(xp.blocks.filter(status=Block.PENDING).count(), 0)
+        self.assertEqual(xp.blocks.filter(status=Block.PROCESSING).count(), 0)
+
+    def process2(self, experiment_name1, experiment_name2, cancel_index=0):
+        xp1 = Experiment.objects.get(name=experiment_name1.split("/")[-1])
+        xp2 = Experiment.objects.get(name=experiment_name2.split("/")[-1])
+
+        self.prepare_databases(xp1.declaration)
+        self.prepare_databases(xp2.declaration)
+
+        schedule_experiment(xp1)
+        schedule_experiment(xp2)
+        xp1.refresh_from_db()
+        xp2.refresh_from_db()
+
+        start = time.time()
+        while xp1.status != Experiment.RUNNING or xp2.status != Experiment.RUNNING:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp1.refresh_from_db()
+            xp2.refresh_from_db()
+
+        if cancel_index == 0:
+            xp_to_cancel = xp1
+            xp_to_finish = xp2
+        else:
+            xp_to_cancel = xp2
+            xp_to_finish = xp1
+
+        cancel_experiment(xp_to_cancel)
+
+        xp_to_cancel.refresh_from_db()
+        xp_to_finish.refresh_from_db()
+
+        start = time.time()
+        while xp_to_cancel.status != Experiment.PENDING:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp_to_cancel.refresh_from_db()
+
+        xp_to_finish.refresh_from_db()
+
+        start = time.time()
+        while xp_to_finish.status != Experiment.DONE:
+            self.assertTrue(time.time() - start < self.timeout)
+            xp_to_finish.refresh_from_db()
+
+        self.assertTrue(xp_to_cancel.blocks.filter(status=Block.CANCELLED).count() > 0)
+        self.assertEqual(xp_to_cancel.blocks.filter(status=Block.PENDING).count(), 0)
+        self.assertEqual(xp_to_cancel.blocks.filter(status=Block.PROCESSING).count(), 0)
+
+        self.assertEqual(xp_to_finish.blocks.filter(status=Block.CANCELLED).count(), 0)
+        self.assertEqual(xp_to_finish.blocks.filter(status=Block.PENDING).count(), 0)
+        self.assertEqual(xp_to_finish.blocks.filter(status=Block.PROCESSING).count(), 0)
+
+    def test_one_split_running(self):
+        self.process("user/user/single/1/single_sleep_4")
+
+    def test_one_split_of_two_in_a_block_running(self):
+        self.stop_worker("node2")
+        self.check_worker_status("node2", False)
+        self.process("user/user/double/1/double_sleep_split_2", block_name="echo2")
+
+    def test_two_splits_of_same_block_running(self):
+        self.process("user/user/double/1/double_sleep_split_2", block_name="echo2")
+
+    def test_two_splits_of_different_blocks_running(self):
+        self.process("user/user/triangle/1/triangle_sleep_4")
+
+    def test_mirror_block(self):
+        self.stop_worker("node2")
+        self.check_worker_status("node2", False)
+        self.process2(
+            "user/user/single/1/single_sleep_4",
+            "user/user/single/1/single_sleep_5",
+            cancel_index=1,
+        )
+
+    def test_mirrored_block(self):
+        self.stop_worker("node2")
+        self.check_worker_status("node2", False)
+        self.process2(
+            "user/user/single/1/single_sleep_4",
+            "user/user/single/1/single_sleep_5",
+            cancel_index=0,
+        )
diff --git a/buildout.cfg b/buildout.cfg
index f05cb1601476fa7445e37ea6e9ec1c0fc42f9ad3..5640bae095d18c3a60f1caf9d6253c3859422a74 100644
--- a/buildout.cfg
+++ b/buildout.cfg
@@ -1,5 +1,6 @@
 [buildout]
 extends = common.cfg
+parts += docker
 develop = .
 
 [sources]
@@ -11,3 +12,9 @@ scripts += protractor webdriver-manager
 
 [bower]
 base-directory = beat/web
+
+[docker]
+recipe = collective.recipe.cmd
+cmds = ./bin/python -c 'from beat.core.test.utils import pull_docker_test_images as f; f()'
+on_install = true
+on_update = true
diff --git a/common.cfg b/common.cfg
index a83ff173b3f89ebd3bbcad24f147aad415b0b8de..1c30c8abd8629212c39d52f018f6ba2e3109f8da 100644
--- a/common.cfg
+++ b/common.cfg
@@ -7,6 +7,15 @@ eggs = beat.web
        beat.cmdline
        beat.core
        beat.backend.python
+versions = versions
+
+[versions]
+django = >=1.11,<2.0
+django-rest-swagger = >2.1
+django-guardian = >=1.3
+djangorestframework = >3.7
+django-activity-stream = >= 0.6.5
+django-jsonfield = >= 1.0.1
 
 [scripts]
 recipe = bob.buildout:scripts
diff --git a/dev.yml b/dev.yml
index 88eba66875c4a2728d171bf57a5dfdf22fb58d09..e5c291ccf87ebf298c7993f4fe5598ac542d0cef 100644
--- a/dev.yml
+++ b/dev.yml
@@ -10,11 +10,10 @@ dependencies:
   - beat-devel=2019.03.07
 
   # requirements.txt, they are indirectly pinned through the above
-  - beat.core
+  - beat.core=1.8
   - docopt
   - docutils
   - jinja2
-  - graphviz
   - matplotlib
   - psutil
   - psycopg2
diff --git a/develop.cfg b/develop.cfg
new file mode 100644
index 0000000000000000000000000000000000000000..d507f58cf8f9eda75b47ca056d782beb8dfab692
--- /dev/null
+++ b/develop.cfg
@@ -0,0 +1,21 @@
+[buildout]
+extends = common.cfg
+always-checkout = force
+versions = versions
+develop = src/beat.backend.python
+          src/beat.core
+          src/beat.cmdline
+          .
+
+[scripts]
+eggs = ${buildout:eggs}
+interpreter = python
+
+[sources]
+beat.core = git git@gitlab.idiap.ch:beat/beat.core egg=false
+beat.cmdline = git git@gitlab.idiap.ch:beat/beat.cmdline egg=false
+beat.backend.python = git git@gitlab.idiap.ch:beat/beat.backend.python egg=false
+beat.examples = git git@gitlab.idiap.ch:beat/beat.examples egg=false
+
+[bower]
+base-directory = ./beat/web/
diff --git a/doc/admin/installation.rst b/doc/admin/installation.rst
index f42f5234e7d4ccc1a6d00f33f3f7a72d499caa2b..69949a34d086bf7380ac71e46d106cdc395da6b8 100644
--- a/doc/admin/installation.rst
+++ b/doc/admin/installation.rst
@@ -440,13 +440,26 @@ Here is how to do it.
 
         $ ./bin/django runserver
 
+  2. Start the full scheduling setup::
+
+        $ ./bin/django full_scheduling
+
+This will start all elements of the scheduling/working process. Docker can
+be used for the worker node passing the ``--docker`` option.
+
+Each element composing the scheduling can also be started separately:
+
+  1. Start a the broker node::
+
+        $ ./bin/django broker -v 2
+
   2. Start a single scheduling node::
 
-        $ ./bin/scheduler -vv
+        $ ./bin/django scheduler -v 2
 
   3. Start a worker for your current node::
 
-        $ ./bin/worker -vv
+        $ ./bin/django worker -v 2
 
 By default, the applications are configured to figure out paths and
 configuration options by themselves. You can override some defaults via the
@@ -457,7 +470,7 @@ command line. Just check the output of each of those commands running the
 Mixing and matching
 ===================
 
-You can mix and match any of the above techniques to run a 3-node system
+You can mix and match any of the above techniques to run a 4-node system
 (all-in-one or discrete) to build a test system to suite to your needs. For
 example, it is possible to launch the scheduling activities using the web
 server and the page reload trick while launching the worker process separately
diff --git a/release.cfg b/release.cfg
index 734cbdc4f1de825d75bd64a82a154a44bfb022f9..104c23ab912ebed46a3f6ae25e201ae3908b4f06 100644
--- a/release.cfg
+++ b/release.cfg
@@ -6,9 +6,9 @@ eggs-directory = .buildout/eggs
 download-cache = .buildout/download-cache
 abi-tag-eggs = true
 versions = versions
-develop = beat.backend.python
-          beat.core
-          beat.cmdline
+develop = src/beat.backend.python
+          src/beat.core
+          src/beat.cmdline
           .
 
 [versions]
diff --git a/version.txt b/version.txt
index e18e854e4423c2671ad34d7bb98d06d51ab21952..2c2aebf42fe0c827c055bb505e98709e11c11cc5 100644
--- a/version.txt
+++ b/version.txt
@@ -1 +1 @@
-1.4.2b0
\ No newline at end of file
+1.5.0b0