...
 
Commits (179)
[flake8]
max-line-length = 80
max-line-length = 88
select = B,C,E,F,W,T4,B9,B950
ignore = E501, W503
ignore = E501, W503, E203
[settings]
multi_line_output=3
include_trailing_comma=true
force_grid_wrap=0
use_parentheses=true
ensure_newline_before_comments=true
line_length=88
force_single_line=true
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/ambv/black
rev: stable
- repo: https://github.com/timothycrosley/isort
rev: 5.0.4
hooks:
- id: black
language_version: python3.6
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: isort
files: .*.py
- repo: https://github.com/psf/black
rev: 19.10b0
hooks:
- id: black
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.0.0
rev: v3.1.0
hooks:
- id: check-ast
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: check-case-conflict
- id: trailing-whitespace
- id: end-of-file-fixer
- id: debug-statements
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: check-added-large-files
- id: check-docstring-first
- id: flake8
- id: check-yaml
exclude: conda/meta.yaml
- repo: https://github.com/PyCQA/flake8
rev: 3.8.3
hooks:
- id: flake8
- repo: https://github.com/PyCQA/bandit
rev: 'master' # Update me!
rev: 1.6.2
hooks:
- id: bandit
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
......
......@@ -31,9 +31,9 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
===========================================================
Authors of the Biometrics Evaluation and Testing Platform
===========================================================
==============================
Authors of the BEAT Platform
==============================
Andre Anjos <andre.anjos@idiap.ch>
Flavio Tarsetti <flavio.tarsetti@idiap.ch>
......
......@@ -31,18 +31,14 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
.. image:: https://img.shields.io/badge/docs-stable-yellow.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/stable/index.html
.. image:: https://img.shields.io/badge/docs-latest-orange.svg
.. image:: https://img.shields.io/badge/docs-available-orange.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/master/index.html
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/build.svg
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/pipeline.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/coverage.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://img.shields.io/badge/gitlab-project-0000c0.svg
:target: https://gitlab.idiap.ch/beat/beat.core
.. image:: https://img.shields.io/pypi/v/beat.core.svg
:target: https://pypi.python.org/pypi/beat.core
==========================
......
......@@ -33,8 +33,8 @@
# #
###################################################################################
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
......@@ -45,22 +45,21 @@ Forward importing from :py:mod:`beat.backend.python.algorithm`
:py:class:`beat.backend.python.algorithm.Storage`
:py:class:`beat.backend.python.algorithm.Runner`
"""
import os
import six
import numpy
import pkg_resources
import simplejson as json
import six
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Storage
from . import dataformat
from . import library
from . import schema
from . import prototypes
from beat.backend.python.algorithm import Storage
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
from . import schema
def load_algorithm_prototype(prefix):
......@@ -182,6 +181,8 @@ class Algorithm(BackendAlgorithm):
"""
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
......@@ -303,89 +304,52 @@ class Algorithm(BackendAlgorithm):
"declaration: %s" % (self.name, ", ".join(all_output_names))
)
def _validate_format(self, type_name, group_name, entry_name, dataformat):
if dataformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for %s `%s' on algorithm `%s': %s"
% (
type_name,
group_name,
entry_name,
self.name,
"\n".join(dataformat.errors),
)
)
def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats
"""
for group in self.groups:
for name, input in group["inputs"].items():
if input["type"] in self.dataformats:
continue
if dataformat_cache and input["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[input["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, input["type"])
if dataformat_cache is not None: # update it
dataformat_cache[input["type"]] = thisformat
self.dataformats[input["type"]] = thisformat
for name, input_ in group["inputs"].items():
self._validate_dataformats(group, "inputs", dataformat_cache)
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for input `%s' on algorithm `%s': %s"
% (input["type"], name, self.name, "\n".join(thisformat.errors))
)
if "outputs" not in group:
continue
for name, output in group["outputs"].items():
if output["type"] in self.dataformats:
continue
if dataformat_cache and output["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[output["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, output["type"])
if dataformat_cache is not None: # update it
dataformat_cache[output["type"]] = thisformat
self.dataformats[output["type"]] = thisformat
if "outputs" in group:
self._validate_dataformats(group, "outputs", dataformat_cache)
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for output `%s' on algorithm `%s': %s"
% (
output["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
if "loop" in group:
self._validate_dataformats(group, "loop", dataformat_cache)
if self.results:
for name, result in self.results.items():
if result["type"].find("/") != -1:
if result["type"] in self.dataformats:
continue
if dataformat_cache and result["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[result["type"]]
else:
thisformat = dataformat.DataFormat(self.prefix, result["type"])
if dataformat_cache is not None: # update it
dataformat_cache[result["type"]] = thisformat
self.dataformats[result["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for result `%s' on algorithm `%s': %s"
% (
result["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
result_type = result["type"]
# results can only contain base types and plots therefore, only
# process plots
if result_type.find("/") != -1:
thisformat = self._update_dataformat_cache(
result_type, dataformat_cache
)
self._validate_format(result_type, "result", name, thisformat)
def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and
......
......@@ -49,15 +49,14 @@ Options:
-V, --version Show version
-v, --verbose Increases the output verbosity level
"""
import os
import sys
from docopt import docopt
from ..bcpapi.broker import BeatComputationBroker
from ..version import __version__
from ..utils import setup_logging
from ..version import __version__
def run(port=5555, verbose=1, callbacks=None):
......
......@@ -41,46 +41,45 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide.
Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] [--docker-network=<name>]
[--port-range=<range>] <broker_address>
[--port-range=<range>] [--cache-mount-point=<cache_mount_point>]
<broker_address>
%(prog)s (--help | -h)
%(prog)s (--version | -V)
Options:
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers
--cache-mount-point=<cache_mount_point> NFS mount point to use for cache setup
"""
import os
import sys
import signal
import sys
import tempfile
import simplejson as json
from socket import gethostname
import simplejson as json
import zmq
from socket import gethostname
from docopt import docopt
from ..bcpapi import BCP
from ..bcpapi.worker import BeatComputationWorker
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.execution import ExecutionProcess
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.worker import BeatComputationWorker
from ..dock import Host
from ..utils import find_free_port
from ..utils import setup_logging
from ..version import __version__
logger = None
......@@ -107,6 +106,7 @@ def run(
docker_network_name=None,
docker_port_range=None,
docker_images_cache=None,
docker_cache_mount_point=None,
):
"""Start the worker
......@@ -202,6 +202,8 @@ def run(
data["network_name"] = docker_network_name
if docker_port_range:
data["port_range"] = docker_port_range
if docker_cache_mount_point:
data["cache_mount_point"] = docker_cache_mount_point
# Start the execution
logger.info("Running '%s' with job id #%s", data["algorithm"], job_id)
......@@ -248,6 +250,7 @@ def run(
logger.info("The scheduler shut down, we will wait for it")
worker.destroy()
worker.send_to_broker(BCP.BCPW_DISCONNECT)
worker.destroy()
# Cleanup
for execution_process in execution_processes:
......@@ -301,6 +304,8 @@ def main(argv=None):
docker_images_cache = None
docker_network_name = None
docker_port_range = None
docker_cache_mount_point = None
if args["--docker"]:
docker_images_cache = os.path.join(
tempfile.gettempdir(), "beat-docker-images.json"
......@@ -319,6 +324,14 @@ def main(argv=None):
return 1
logger.info("Using port range %s", docker_port_range)
docker_cache_mount_point = args.get("--cache-mount-point", None)
if docker_cache_mount_point:
if not docker_cache_mount_point.startswith("nfs://"):
raise RuntimeError(
"Invalid nfs mount point: {}".format(docker_cache_mount_point)
)
logger.info("Using volume cache mount point %s", docker_cache_mount_point)
return run(
broker_address,
service_name=args.get("--name"),
......@@ -329,6 +342,7 @@ def main(argv=None):
docker_network_name=docker_network_name,
docker_port_range=docker_port_range,
docker_images_cache=docker_images_cache,
docker_cache_mount_point=docker_cache_mount_point,
)
......
......@@ -37,20 +37,16 @@
Inspired by the Majordomo Protocol Broker
"""
import logging
import time
import signal
import time
from binascii import hexlify
import zmq
# local
from . import BCP
from .zhelpers import dump
logger = logging.getLogger(__name__)
......@@ -95,6 +91,7 @@ class BeatComputationBroker(object):
def __init__(self, verbose=False):
"""Initialize broker state."""
self.verbose = verbose
self.continue_ = True
self.services = {}
......@@ -163,14 +160,17 @@ class BeatComputationBroker(object):
def process_client(self, sender, msg):
"""Process a request coming from a client."""
# Service name + body
assert len(msg) >= 2 # nosec
service = msg.pop(0)
# Set reply return address to client sender
msg = [sender, b""] + msg
self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg):
"""Process message sent to us by a worker."""
# At least, command
assert len(msg) >= 1 # nosec
......@@ -222,12 +222,14 @@ class BeatComputationBroker(object):
def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker."""
assert worker is not None # nosec
if disconnect:
self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None)
if worker.service is not None:
worker.service.waiting.remove(worker)
if worker in worker.service.waiting:
worker.service.waiting.remove(worker)
on_disconnection = self.callbacks.get("on_disconnection", None)
if on_disconnection:
......@@ -236,8 +238,12 @@ class BeatComputationBroker(object):
if worker.identity in self.workers:
self.workers.pop(worker.identity)
if worker in self.waiting:
self.waiting.pop(self.waiting.index(worker))
def require_worker(self, address):
"""Finds the worker (creates if necessary)."""
assert address is not None # nosec
identity = hexlify(address)
worker = self.workers.get(identity)
......@@ -251,6 +257,7 @@ class BeatComputationBroker(object):
def require_service(self, name):
"""Locates the service (creates if necessary)."""
assert name is not None # nosec
service = self.services.get(name)
if service is None:
......@@ -264,11 +271,13 @@ class BeatComputationBroker(object):
We use a single socket for both clients and workers.
"""
self.socket.bind(endpoint)
logger.info("I: BCP broker/0.0.1 is active at %s", endpoint)
def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time"""
if time.time() > self.heartbeat_at:
for worker in self.waiting:
self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None)
......@@ -276,16 +285,16 @@ class BeatComputationBroker(object):
self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL
def purge_workers(self):
"""Look for & kill expired workers.
"""
"""Look for & kill expired workers."""
for item in self.waiting:
if item.expiry < time.time():
logger.info("I: deleting expired worker: %s", item.identity)
self.delete_worker(item, False)
self.waiting.pop(self.waiting.index(item))
def worker_waiting(self, worker):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
if worker not in self.waiting:
......@@ -298,10 +307,14 @@ class BeatComputationBroker(object):
def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible"""
assert service is not None # nosec
if msg is not None: # Queue message if any
service.requests.append(msg)
self.purge_workers()
while service.waiting and service.requests:
msg = service.requests.pop(0)
worker = service.waiting.pop(0)
......
......@@ -85,7 +85,7 @@ class BeatComputationClient(object):
request = [b"", BCP.BCPC_CLIENT, service] + request
if self.verbose:
logger.warn("I: send request to '%s' service: ", service)
logger.warning("I: send request to '%s' service: ", service)
dump(request)
self.client.send_multipart(request)
......
......@@ -40,17 +40,16 @@ management
Execution utilities
"""
import logging
import multiprocessing
import signal
import zmq
import simplejson as json
import zmq
from ..dock import Host
from ..execution.local import LocalExecutor
from ..execution.docker import DockerExecutor
from ..execution.local import LocalExecutor
from . import BCP
......
......@@ -34,8 +34,8 @@
"""BEAT Computation worker"""
import logging
import zmq
from .zhelpers import dump
......
......@@ -34,14 +34,12 @@
"""BEAT Computation worker"""
import logging
import time
import zmq
# BEAT Computation protocol constants:
from . import BCP
from .zhelpers import dump
logger = logging.getLogger(__name__)
......@@ -58,8 +56,8 @@ class BeatComputationWorker(object):
service = service.encode("utf-8")
self.heartbeat_at = (
0
) # When to send HEARTBEAT (relative to time.time(), so in seconds)
0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
)
self.liveness = 0 # How many attempts left
self.heartbeat = 2500 # Heartbeat delay, msecs
self.reconnect = 2500 # Reconnect delay, msecs
......@@ -210,7 +208,7 @@ class BeatComputationWorker(object):
self.liveness -= 1
if self.liveness == 0:
if self.verbose:
logger.warn("W: disconnected from broker - retrying…")
logger.warning("W: disconnected from broker - retrying…")
try:
time.sleep(1e-3 * self.reconnect)
except KeyboardInterrupt:
......@@ -222,7 +220,7 @@ class BeatComputationWorker(object):
self.send_to_broker(BCP.BCPW_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
logger.warn("W: interrupt received, killing worker…")
logger.warning("W: interrupt received, killing worker…")
return None
def destroy(self):
......
......@@ -38,9 +38,9 @@ Helper module for common zmq task
Based on Majordomo protocol zhelpers.
"""
import binascii
import logging
import zmq
logger = logging.getLogger(__name__)
......
......@@ -53,16 +53,15 @@ Forward importing from :py:mod:`beat.backend.python.data`:
:py:func:`beat.backend.python.data.load_data_index_db`
:py:func:`beat.backend.python.data.foundSplitRanges`
"""
from beat.backend.python.data import mixDataIndices # noqa
from beat.backend.python.data import getAllFilenames # noqa
from beat.backend.python.data import DataSource # noqa
from beat.backend.python.data import CachedDataSink # noqa
from beat.backend.python.data import CachedDataSource # noqa
from beat.backend.python.data import DatabaseOutputDataSource # noqa
from beat.backend.python.data import RemoteDataSource # noqa
from beat.backend.python.data import DataSink # noqa
from beat.backend.python.data import CachedDataSink # noqa
from beat.backend.python.data import DataSource # noqa
from beat.backend.python.data import RemoteDataSource # noqa
from beat.backend.python.data import StdoutDataSink # noqa
from beat.backend.python.data import foundSplitRanges # noqa
from beat.backend.python.data import getAllFilenames # noqa
from beat.backend.python.data import load_data_index # noqa
from beat.backend.python.data import load_data_index_db # noqa
from beat.backend.python.data import foundSplitRanges # noqa
from beat.backend.python.data import mixDataIndices # noqa
......@@ -44,8 +44,6 @@ Forward importing from :py:mod:`beat.backend.python.data_loaders`
:py:class:`beat.backend.python.data_loaders.DataLoader`
:py:class:`beat.backend.python.data_loaders.DataView`
"""
from beat.backend.python.data_loaders import DataLoaderList # noqa
from beat.backend.python.data_loaders import DataLoader # noqa
from beat.backend.python.data_loaders import DataLoaderList # noqa
from beat.backend.python.data_loaders import DataView # noqa
......@@ -44,34 +44,43 @@ Validation of databases
Forward importing from :py:mod:`beat.backend.python.database`:
:py:class:`beat.backend.python.database.Storage`
"""
import os
import six
from . import schema
from .dataformat import DataFormat
from . import prototypes
from beat.backend.python.database import Storage
from beat.backend.python.database import Database as BackendDatabase
from beat.backend.python.database import Storage
from beat.backend.python.protocoltemplate import Storage as PTStorage
from . import prototypes
from . import schema
from .dataformat import DataFormat
from .protocoltemplate import ProtocolTemplate
def get_first_procotol_template(prefix):
pt_root_folder = os.path.join(prefix, PTStorage.asset_folder)
pts_available = os.listdir(pt_root_folder)
pts_available = sorted(os.listdir(pt_root_folder))
if not pts_available:
raise RuntimeError("Invalid prefix content, no protocol template available")
procotol_template_folder = pts_available[0]
protocol_template_versions = sorted(
os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
)
version = protocol_template_versions[-1].split(".")[0]
return "{}/{}".format(procotol_template_folder, version)
selected_protocol_template = None
for procotol_template_folder in pts_available:
protocol_template_versions = sorted(
os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
)
version = protocol_template_versions[-1].split(".")[0]
protocol_template_name = "{}/{}".format(procotol_template_folder, version)
protocol_template = ProtocolTemplate(prefix, protocol_template_name)
if protocol_template.valid:
selected_protocol_template = protocol_template_name
break
if selected_protocol_template is None:
raise RuntimeError("No valid protocol template found")
return selected_protocol_template
class Database(BackendDatabase):
......
......@@ -44,17 +44,17 @@ Validation and parsing for dataformats
Forward importing from :py:mod:`beat.backend.python.dataformat`:
:py:class:`beat.backend.python.dataformat.Storage`
"""
import copy
import six
from . import schema
from beat.backend.python.dataformat import DataFormat as BackendDataFormat
from beat.backend.python.dataformat import Storage # noqa
from . import prototypes
from . import schema
from . import utils
from beat.backend.python.dataformat import Storage # noqa
from beat.backend.python.dataformat import DataFormat as BackendDataFormat
class DataFormat(BackendDataFormat):
"""Data formats define the chunks of data that circulate between blocks.
......
......@@ -41,19 +41,22 @@ dock
Docker helper classes
"""
import ast
import logging
import os
import simplejson as json
import socket
import subprocess as sp # nosec
import tempfile
import time
import docker
import subprocess as sp # nosec
import logging
import simplejson as json
from packaging import version
from beat.core import stats
from .utils import build_env_name
logger = logging.getLogger(__name__)
......@@ -85,7 +88,13 @@ class Host(object):
(
self.processing_environments,
self.db_environments,
) = self._discover_environments()
) = self._discover_environments_using_labels()
if not self.db_environments and not self.processing_environments:
(
self.processing_environments,
self.db_environments,
) = self._discover_environments_using_describe()
# (If necessary) Save the known infos about the images
if self.images_cache_filename is not None:
......@@ -123,6 +132,12 @@ class Host(object):
return attrs["image"]
def dbenv2docker(self, key):
"""Returns a nice docker image name given a BEAT database environment key"""
attrs = self.db_environments[key]
return attrs["image"]
def teardown(self):
for container in self.containers:
self.rm(container)
......@@ -157,7 +172,7 @@ class Host(object):
s.connect(("8.8.8.8", 1)) # connecting to a UDP address doesn't send packets
return s.getsockname()[0]
def _discover_environments(self):
def _discover_environments_using_describe(self):
"""Returns a dictionary containing information about docker environments
Raises:
......@@ -184,14 +199,14 @@ class Host(object):
Host.images_cache[image] = infos
return infos
except Exception as e:
logger.warn(
logger.warning(
"Ignoring potential environment at `%s' since "
"`describe' output cannot be parsed: %s",
image,
str(e),
)
else:
logger.warn(
logger.warning(
"Execution failed with status {}: \n"
"stdout: '{}'\n"
"stderr: '{}'".format(status, stdout, stderr)
......@@ -270,7 +285,7 @@ class Host(object):
)
return False
else:
logger.warn(
logger.warning(
"Overriding **existing** environment '%s' image "
"with '%s'. To avoid this warning make "
"sure your docker images do not contain environments "
......@@ -299,10 +314,12 @@ class Host(object):
for image in images:
# Call the "describe" application on each existing image
description = _describe(image)
if not description:
logger.debug("Description not found for", image)
continue
key = description["name"] + " (" + description["version"] + ")"
key = build_env_name(description)
if "databases" in description:
if (key in db_environments) and not _must_replace(
......@@ -331,6 +348,115 @@ class Host(object):
return (environments, db_environments)
def _discover_environments_using_labels(self):
"""Search BEAT runtime environments using docker labels"""
def _must_replace(key, image, environments):
environment = environments[key]
if environment["image"] not in image.tags:
logger.warning(
"Different images providing the same environment: {} VS {}".format(
environment["image"], image.tags
)
)
if self.raise_on_errors:
raise RuntimeError(
"Environments at '%s' and '%s' have the "
"same name ('%s'). Distinct environments must be "
"uniquely named. Fix this and re-start."
% (image.tags[0], environments[key]["image"], key)
)
else:
logger.debug("Keeping more recent")
current_version = "{}{}".format(
environment["version"], environment["revision"]
)
new_version = "{}{}".format(
image.labels["beat.env.version"], image.labels["beat.env.revision"]
)
current_version = version.parse(current_version)
new_version = version.parse(new_version)
return new_version > current_version
def _parse_image_info(image):
labels = image.labels
data = {
"image": image.tags[0],
"name": labels["beat.env.name"],
"version": labels["beat.env.version"],
"revision": labels["beat.env.revision"],
}
database_list = labels.get("beat.env.databases")
if database_list:
data["databases"] = ast.literal_eval(database_list)
capabilities = labels.get("beat.env.capabilities")
if capabilities:
data["capabilities"] = ast.literal_eval(capabilities)
return data
def _process_image_list(image_list):
environments = {}
for image in image_list:
if not len(image.tags):
logger.warning("Untagged image, skipping")
continue
image_info = _parse_image_info(image)
key = build_env_name(image_info)
image_name = image_info["image"]
if key in environments:
if _must_replace(key, image, environments):
environments[key] = image_info
logger.info("Updated '%s' -> '%s'", key, image_name)
else:
environments[key] = image_info
Host.images_cache[image_name] = environments[key]
logger.info("Registered '%s' -> '%s'", key, image_name)
return environments
client = docker.from_env()
try:
databases = client.images.list(
filters={"label": ["beat.env.type=database"]}
)
except Exception as e:
if self.raise_on_errors:
raise
else:
logger.error("Docker error: {}".format(e))
return {}, {}
else:
db_environments = _process_image_list(databases)
try:
executors = client.images.list(
filters={"label": ["beat.env.type=execution"]}
)
except Exception as e:
if self.raise_on_errors:
raise
else:
logger.error("Docker error: {}".format(e))
return {}, {}
else:
environments = _process_image_list(executors)
logger.debug(
"Found %d environments and %d database environments",
len(environments),
len(db_environments),
)
return environments, db_environments
def create_container(self, image, command):
if image in self: # Replace by a real image name
......@@ -360,7 +486,7 @@ class Host(object):
limitation is not put in place.
"""
cmd = ["docker", "run", "-tid"]
cmd = ["docker", "run", "--tty", "--interactive", "--detach", "--read-only"]
network = container.network
if network:
......@@ -387,7 +513,8 @@ class Host(object):
if ("capabilities" in image_infos) and (
"gpu" in image_infos["capabilities"]
):
cmd.append("--runtime=nvidia")
if os.path.exists("/proc/driver/nvidia"):
cmd.append("--gpus=all")
if virtual_memory_in_megabytes:
# For this to work properly, memory swap limitation has to be
......@@ -429,6 +556,9 @@ class Host(object):
# Mount the volumes
cmd.extend(container.volumes)
# Add tmpfs entries
cmd.extend(container.temporary_filesystems)
# Expose the ports
cmd.extend(container.ports)
......@@ -541,7 +671,7 @@ class Host(object):
status = self.status(container)
if status not in ("created", "exited"):
logger.warn(
logger.warning(
"Killing container '%s' which is on state '%s'", container.id, status
)
self._exec(["docker", "container", "stop", container.id])
......@@ -663,6 +793,7 @@ class Container:
self._name = None
self._workdir = None
self._entrypoint = None
self._temporary_filesystems = {"/tmp": "500k", "/run": "500k"} # nosec
def set_name(self, name):
""" Set the name to be used by the container in place of the docker
......@@ -693,6 +824,16 @@ class Container:
self._volumes[path] = {"bind": mount_path, "mode": "ro" if read_only else "rw"}
def add_tmpfs(self, path, size):
"""Add a tmpfs to be mounted on the container
Parameters:
:param str path: Target path for the tmpfs
:param str size: Size of the tmps. Unlimited if empty
"""
self._temporary_filesystems[path] = size
def add_port(self, container_port, host_port, host_address=None):
"""Add a port binding
......@@ -772,6 +913,17 @@ class Container:
volumes.append("--volume=%s:%s:%s" % (k, v["bind"], v["mode"]))
return volumes
@property
def temporary_filesystems(self):
tempfs_list = []
for path, size in self._temporary_filesystems.items():
tmpfs_string = "--tmpfs={}:rw,noexec,nosuid".format(path)
if size:
tmpfs_string += ",size={}".format(size)
tempfs_list.append(tmpfs_string)
return tempfs_list
@property
def ports(self):
"""Returns the ports of this container in a suitable form to build
......
......@@ -41,10 +41,8 @@ environment
Helper functions related to environment management
"""
import re
import logging
import re
logger = logging.getLogger(__name__)
......
......@@ -41,21 +41,19 @@ base
Execution utilities
"""
import os
import glob
import collections
import glob
import logging
import simplejson as json
import os
from .. import schema
from .. import database
from .. import algorithm
from .. import stats
import simplejson as json
from beat.backend.python.helpers import convert_experiment_configuration_to_container
from .. import algorithm
from .. import database
from .. import schema
from .. import stats
logger = logging.getLogger(__name__)
......@@ -230,6 +228,17 @@ class BaseExecutor(object):
"The input '%s' doesn't exist in the loop algorithm" % name
)
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append(
......
......@@ -41,21 +41,21 @@ docker
Execution utilities
"""
import logging
import os
import shutil
import logging
from collections import namedtuple
import requests
import simplejson as json
from beat.backend.python.data import getAllFilenames
from beat.backend.python.execution import MessageHandler
from .. import stats
from .. import utils
from .remote import RemoteExecutor
logger = logging.getLogger(__name__)
......@@ -146,6 +146,9 @@ class DockerExecutor(RemoteExecutor):
"""
CONTAINER_PREFIX_PATH = "/beat/prefix"
CONTAINER_CACHE_PATH = "/beat/cache"
def __init__(
self,
host,
......@@ -184,33 +187,79 @@ class DockerExecutor(RemoteExecutor):
# Modify the paths to the databases in the dumped configuration files
root_folder = os.path.join(databases_configuration_path, "prefix", "databases")
database_paths = {}
DatabaseInfo = namedtuple("DatabaseInfo", ["path", "environment"])
databases_infos = {}
for db_name in self.databases.keys():
for db_name, db_object, in self.databases.items():
json_path = os.path.join(root_folder, db_name + ".json")
with open(json_path, "r") as f:
db_data = json.load(f)
database_paths[db_name] = db_data["root_folder"]
db_data["root_folder"] = os.path.join("/databases", db_name)
system_path = db_data["root_folder"]
container_path = os.path.join("/databases", db_name)
db_data["root_folder"] = container_path
with open(json_path, "w") as f:
json.dump(db_data, f, indent=4)
# Determine the docker image to use for the databases
try:
databases_environment = self.host.db2docker(database_paths.keys())
except Exception:
raise RuntimeError(
"No environment found for the databases `%s' "
"- available environments are %s"
% (
", ".join(database_paths.keys()),
", ".join(self.host.db_environments.keys()),
)
databases_infos[db_name] = DatabaseInfo(
system_path, utils.build_env_name(db_object.environment)
)
databases_environment = None
requesting_environments = {
name: info
for name, info in databases_infos.items()
if info.environment is not None
}
if requesting_environments:
if len(requesting_environments) != len(self.databases):
raise RuntimeError(
"Selected databases ({}) are not all providing"
" an environment.".format(list(self.databases.keys()))
)
requested_environments = {
info.environment
for info in requesting_environments.values()
if info.environment is not None
}
if len(requested_environments) > 1:
raise RuntimeError(
"Selected databases ({}) are requesting different environments,"
"only one is supported".format(list(requesting_environments.keys()))
)
# All databases are requesting the same environment
db_environment = next(iter(requested_environments))
try:
databases_environment = self.host.dbenv2docker(db_environment)
except Exception:
raise RuntimeError(
"Environment {} not found - available environments are {}".format(
db_environment, list(self.host.db_environments.keys())
)
)
if not databases_environment:
# Determine the docker image to use for the databases
database_list = databases_infos.keys()
try:
databases_environment = self.host.db2docker(database_list)
except Exception: