...
 
Commits (160)
[flake8] [flake8]
max-line-length = 80 max-line-length = 80
select = B,C,E,F,W,T4,B9,B950 select = B,C,E,F,W,T4,B9,B950
ignore = E501, W503 ignore = E501, W503, E203
...@@ -5,7 +5,6 @@ repos: ...@@ -5,7 +5,6 @@ repos:
rev: stable rev: stable
hooks: hooks:
- id: black - id: black
language_version: python3.6
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- repo: https://github.com/pre-commit/pre-commit-hooks - repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.0.0 rev: v2.0.0
......
...@@ -31,9 +31,9 @@ ...@@ -31,9 +31,9 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. .. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
=========================================================== ==============================
Authors of the Biometrics Evaluation and Testing Platform Authors of the BEAT Platform
=========================================================== ==============================
Andre Anjos <andre.anjos@idiap.ch> Andre Anjos <andre.anjos@idiap.ch>
Flavio Tarsetti <flavio.tarsetti@idiap.ch> Flavio Tarsetti <flavio.tarsetti@idiap.ch>
......
...@@ -31,18 +31,14 @@ ...@@ -31,18 +31,14 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. .. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
.. image:: https://img.shields.io/badge/docs-stable-yellow.svg .. image:: https://img.shields.io/badge/docs-available-orange.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/stable/index.html
.. image:: https://img.shields.io/badge/docs-latest-orange.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/master/index.html :target: https://www.idiap.ch/software/beat/docs/beat/beat.core/master/index.html
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/build.svg .. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/pipeline.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master :target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/coverage.svg .. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/coverage.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master :target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://img.shields.io/badge/gitlab-project-0000c0.svg .. image:: https://img.shields.io/badge/gitlab-project-0000c0.svg
:target: https://gitlab.idiap.ch/beat/beat.core :target: https://gitlab.idiap.ch/beat/beat.core
.. image:: https://img.shields.io/pypi/v/beat.core.svg
:target: https://pypi.python.org/pypi/beat.core
========================== ==========================
......
...@@ -182,6 +182,8 @@ class Algorithm(BackendAlgorithm): ...@@ -182,6 +182,8 @@ class Algorithm(BackendAlgorithm):
""" """
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None): def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache) super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
...@@ -303,89 +305,52 @@ class Algorithm(BackendAlgorithm): ...@@ -303,89 +305,52 @@ class Algorithm(BackendAlgorithm):
"declaration: %s" % (self.name, ", ".join(all_output_names)) "declaration: %s" % (self.name, ", ".join(all_output_names))
) )
def _validate_required_dataformats(self, dataformat_cache): def _validate_format(self, type_name, group_name, entry_name, dataformat):
"""Makes sure we can load all requested formats if dataformat.errors:
"""
for group in self.groups:
for name, input in group["inputs"].items():
if input["type"] in self.dataformats:
continue
if dataformat_cache and input["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[input["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, input["type"])
if dataformat_cache is not None: # update it
dataformat_cache[input["type"]] = thisformat
self.dataformats[input["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for input `%s' on algorithm `%s': %s"
% (input["type"], name, self.name, "\n".join(thisformat.errors))
)
if "outputs" not in group:
continue
for name, output in group["outputs"].items():
if output["type"] in self.dataformats:
continue
if dataformat_cache and output["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[output["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, output["type"])
if dataformat_cache is not None: # update it
dataformat_cache[output["type"]] = thisformat
self.dataformats[output["type"]] = thisformat
if thisformat.errors:
self.errors.append( self.errors.append(
"found error validating data format `%s' " "found error validating data format `%s' "
"for output `%s' on algorithm `%s': %s" "for %s `%s' on algorithm `%s': %s"
% ( % (
output["type"], type_name,
name, group_name,
entry_name,
self.name, self.name,
"\n".join(thisformat.errors), "\n".join(dataformat.errors),
) )
) )
if self.results: def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
for name, result in self.results.items(): def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats
"""
if result["type"].find("/") != -1: for group in self.groups:
if result["type"] in self.dataformats: for name, input_ in group["inputs"].items():
continue self._validate_dataformats(group, "inputs", dataformat_cache)
if dataformat_cache and result["type"] in dataformat_cache: # reuse if "outputs" in group:
thisformat = dataformat_cache[result["type"]] self._validate_dataformats(group, "outputs", dataformat_cache)
else:
thisformat = dataformat.DataFormat(self.prefix, result["type"])
if dataformat_cache is not None: # update it
dataformat_cache[result["type"]] = thisformat
self.dataformats[result["type"]] = thisformat if "loop" in group:
self._validate_dataformats(group, "loop", dataformat_cache)
if thisformat.errors: if self.results:
self.errors.append(
"found error validating data format `%s' " for name, result in self.results.items():
"for result `%s' on algorithm `%s': %s" result_type = result["type"]
% ( # results can only contain base types and plots therefore, only
result["type"], # process plots
name, if result_type.find("/") != -1:
self.name, thisformat = self._update_dataformat_cache(
"\n".join(thisformat.errors), result_type, dataformat_cache
)
) )
self._validate_format(result_type, "result", name, thisformat)
def _convert_parameter_types(self): def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and """Converts types to numpy equivalents, checks defaults, ranges and
......
...@@ -41,7 +41,8 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide. ...@@ -41,7 +41,8 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide.
Usage: Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>] %(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] [--docker-network=<name>] [--cache=<path>] [--docker] [--docker-network=<name>]
[--port-range=<range>] <broker_address> [--port-range=<range>] [--cache-mount-point=<cache_mount_point>]
<broker_address>
%(prog)s (--help | -h) %(prog)s (--help | -h)
%(prog)s (--version | -V) %(prog)s (--version | -V)
...@@ -57,6 +58,7 @@ Options: ...@@ -57,6 +58,7 @@ Options:
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache' -c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use --docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers --port-range=<range> Range of port usable for communication with containers
--cache-mount-point=<cache_mount_point> NFS mount point to use for cache setup
""" """
...@@ -107,6 +109,7 @@ def run( ...@@ -107,6 +109,7 @@ def run(
docker_network_name=None, docker_network_name=None,
docker_port_range=None, docker_port_range=None,
docker_images_cache=None, docker_images_cache=None,
docker_cache_mount_point=None,
): ):
"""Start the worker """Start the worker
...@@ -202,6 +205,8 @@ def run( ...@@ -202,6 +205,8 @@ def run(
data["network_name"] = docker_network_name data["network_name"] = docker_network_name
if docker_port_range: if docker_port_range:
data["port_range"] = docker_port_range data["port_range"] = docker_port_range
if docker_cache_mount_point:
data["cache_mount_point"] = docker_cache_mount_point
# Start the execution # Start the execution
logger.info("Running '%s' with job id #%s", data["algorithm"], job_id) logger.info("Running '%s' with job id #%s", data["algorithm"], job_id)
...@@ -248,6 +253,7 @@ def run( ...@@ -248,6 +253,7 @@ def run(
logger.info("The scheduler shut down, we will wait for it") logger.info("The scheduler shut down, we will wait for it")
worker.destroy() worker.destroy()
worker.send_to_broker(BCP.BCPW_DISCONNECT)
worker.destroy() worker.destroy()
# Cleanup # Cleanup
for execution_process in execution_processes: for execution_process in execution_processes:
...@@ -301,6 +307,8 @@ def main(argv=None): ...@@ -301,6 +307,8 @@ def main(argv=None):
docker_images_cache = None docker_images_cache = None
docker_network_name = None docker_network_name = None
docker_port_range = None docker_port_range = None
docker_cache_mount_point = None
if args["--docker"]: if args["--docker"]:
docker_images_cache = os.path.join( docker_images_cache = os.path.join(
tempfile.gettempdir(), "beat-docker-images.json" tempfile.gettempdir(), "beat-docker-images.json"
...@@ -319,6 +327,14 @@ def main(argv=None): ...@@ -319,6 +327,14 @@ def main(argv=None):
return 1 return 1
logger.info("Using port range %s", docker_port_range) logger.info("Using port range %s", docker_port_range)
docker_cache_mount_point = args.get("--cache-mount-point", None)
if docker_cache_mount_point:
if not docker_cache_mount_point.startswith("nfs://"):
raise RuntimeError(
"Invalid nfs mount point: {}".format(docker_cache_mount_point)
)
logger.info("Using volume cache mount point %s", docker_cache_mount_point)
return run( return run(
broker_address, broker_address,
service_name=args.get("--name"), service_name=args.get("--name"),
...@@ -329,6 +345,7 @@ def main(argv=None): ...@@ -329,6 +345,7 @@ def main(argv=None):
docker_network_name=docker_network_name, docker_network_name=docker_network_name,
docker_port_range=docker_port_range, docker_port_range=docker_port_range,
docker_images_cache=docker_images_cache, docker_images_cache=docker_images_cache,
docker_cache_mount_point=docker_cache_mount_point,
) )
......
...@@ -95,6 +95,7 @@ class BeatComputationBroker(object): ...@@ -95,6 +95,7 @@ class BeatComputationBroker(object):
def __init__(self, verbose=False): def __init__(self, verbose=False):
"""Initialize broker state.""" """Initialize broker state."""
self.verbose = verbose self.verbose = verbose
self.continue_ = True self.continue_ = True
self.services = {} self.services = {}
...@@ -163,14 +164,17 @@ class BeatComputationBroker(object): ...@@ -163,14 +164,17 @@ class BeatComputationBroker(object):
def process_client(self, sender, msg): def process_client(self, sender, msg):
"""Process a request coming from a client.""" """Process a request coming from a client."""
# Service name + body # Service name + body
assert len(msg) >= 2 # nosec assert len(msg) >= 2 # nosec
service = msg.pop(0) service = msg.pop(0)
# Set reply return address to client sender # Set reply return address to client sender
msg = [sender, b""] + msg msg = [sender, b""] + msg
self.dispatch(self.require_service(service), msg) self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg): def process_worker(self, sender, msg):
"""Process message sent to us by a worker.""" """Process message sent to us by a worker."""
# At least, command # At least, command
assert len(msg) >= 1 # nosec assert len(msg) >= 1 # nosec
...@@ -222,11 +226,13 @@ class BeatComputationBroker(object): ...@@ -222,11 +226,13 @@ class BeatComputationBroker(object):
def delete_worker(self, worker, disconnect): def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker.""" """Deletes worker from all data structures, and deletes worker."""
assert worker is not None # nosec assert worker is not None # nosec
if disconnect: if disconnect:
self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None) self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None)
if worker.service is not None: if worker.service is not None:
if worker in worker.service.waiting:
worker.service.waiting.remove(worker) worker.service.waiting.remove(worker)
on_disconnection = self.callbacks.get("on_disconnection", None) on_disconnection = self.callbacks.get("on_disconnection", None)
...@@ -236,8 +242,12 @@ class BeatComputationBroker(object): ...@@ -236,8 +242,12 @@ class BeatComputationBroker(object):
if worker.identity in self.workers: if worker.identity in self.workers:
self.workers.pop(worker.identity) self.workers.pop(worker.identity)
if worker in self.waiting:
self.waiting.pop(self.waiting.index(worker))
def require_worker(self, address): def require_worker(self, address):
"""Finds the worker (creates if necessary).""" """Finds the worker (creates if necessary)."""
assert address is not None # nosec assert address is not None # nosec
identity = hexlify(address) identity = hexlify(address)
worker = self.workers.get(identity) worker = self.workers.get(identity)
...@@ -251,6 +261,7 @@ class BeatComputationBroker(object): ...@@ -251,6 +261,7 @@ class BeatComputationBroker(object):
def require_service(self, name): def require_service(self, name):
"""Locates the service (creates if necessary).""" """Locates the service (creates if necessary)."""
assert name is not None # nosec assert name is not None # nosec
service = self.services.get(name) service = self.services.get(name)
if service is None: if service is None:
...@@ -264,11 +275,13 @@ class BeatComputationBroker(object): ...@@ -264,11 +275,13 @@ class BeatComputationBroker(object):
We use a single socket for both clients and workers. We use a single socket for both clients and workers.
""" """
self.socket.bind(endpoint) self.socket.bind(endpoint)
logger.info("I: BCP broker/0.0.1 is active at %s", endpoint) logger.info("I: BCP broker/0.0.1 is active at %s", endpoint)
def send_heartbeats(self): def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time""" """Send heartbeats to idle workers if it's time"""
if time.time() > self.heartbeat_at: if time.time() > self.heartbeat_at:
for worker in self.waiting: for worker in self.waiting:
self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None) self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None)
...@@ -276,16 +289,16 @@ class BeatComputationBroker(object): ...@@ -276,16 +289,16 @@ class BeatComputationBroker(object):
self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL
def purge_workers(self): def purge_workers(self):
"""Look for & kill expired workers. """Look for & kill expired workers."""
"""
for item in self.waiting: for item in self.waiting:
if item.expiry < time.time(): if item.expiry < time.time():
logger.info("I: deleting expired worker: %s", item.identity) logger.info("I: deleting expired worker: %s", item.identity)
self.delete_worker(item, False) self.delete_worker(item, False)
self.waiting.pop(self.waiting.index(item))
def worker_waiting(self, worker): def worker_waiting(self, worker):
"""This worker is now waiting for work.""" """This worker is now waiting for work."""
# Queue to broker and service waiting lists # Queue to broker and service waiting lists
if worker not in self.waiting: if worker not in self.waiting:
...@@ -298,10 +311,14 @@ class BeatComputationBroker(object): ...@@ -298,10 +311,14 @@ class BeatComputationBroker(object):
def dispatch(self, service, msg): def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible""" """Dispatch requests to waiting workers as possible"""
assert service is not None # nosec assert service is not None # nosec
if msg is not None: # Queue message if any if msg is not None: # Queue message if any
service.requests.append(msg) service.requests.append(msg)
self.purge_workers() self.purge_workers()
while service.waiting and service.requests: while service.waiting and service.requests:
msg = service.requests.pop(0) msg = service.requests.pop(0)
worker = service.waiting.pop(0) worker = service.waiting.pop(0)
......
...@@ -85,7 +85,7 @@ class BeatComputationClient(object): ...@@ -85,7 +85,7 @@ class BeatComputationClient(object):
request = [b"", BCP.BCPC_CLIENT, service] + request request = [b"", BCP.BCPC_CLIENT, service] + request
if self.verbose: if self.verbose:
logger.warn("I: send request to '%s' service: ", service) logger.warning("I: send request to '%s' service: ", service)
dump(request) dump(request)
self.client.send_multipart(request) self.client.send_multipart(request)
......
...@@ -210,7 +210,7 @@ class BeatComputationWorker(object): ...@@ -210,7 +210,7 @@ class BeatComputationWorker(object):
self.liveness -= 1 self.liveness -= 1
if self.liveness == 0: if self.liveness == 0:
if self.verbose: if self.verbose:
logger.warn("W: disconnected from broker - retrying…") logger.warning("W: disconnected from broker - retrying…")
try: try:
time.sleep(1e-3 * self.reconnect) time.sleep(1e-3 * self.reconnect)
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -222,7 +222,7 @@ class BeatComputationWorker(object): ...@@ -222,7 +222,7 @@ class BeatComputationWorker(object):
self.send_to_broker(BCP.BCPW_HEARTBEAT) self.send_to_broker(BCP.BCPW_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
logger.warn("W: interrupt received, killing worker…") logger.warning("W: interrupt received, killing worker…")
return None return None
def destroy(self): def destroy(self):
......
...@@ -52,6 +52,8 @@ import six ...@@ -52,6 +52,8 @@ import six
from . import schema from . import schema
from .dataformat import DataFormat from .dataformat import DataFormat
from .protocoltemplate import ProtocolTemplate
from . import prototypes from . import prototypes
from beat.backend.python.database import Storage from beat.backend.python.database import Storage
...@@ -61,17 +63,27 @@ from beat.backend.python.protocoltemplate import Storage as PTStorage ...@@ -61,17 +63,27 @@ from beat.backend.python.protocoltemplate import Storage as PTStorage
def get_first_procotol_template(prefix): def get_first_procotol_template(prefix):
pt_root_folder = os.path.join(prefix, PTStorage.asset_folder) pt_root_folder = os.path.join(prefix, PTStorage.asset_folder)
pts_available = os.listdir(pt_root_folder) pts_available = sorted(os.listdir(pt_root_folder))
if not pts_available: if not pts_available:
raise RuntimeError("Invalid prefix content, no protocol template available") raise RuntimeError("Invalid prefix content, no protocol template available")
procotol_template_folder = pts_available[0] selected_protocol_template = None
for procotol_template_folder in pts_available:
protocol_template_versions = sorted( protocol_template_versions = sorted(
os.listdir(os.path.join(pt_root_folder, procotol_template_folder)) os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
) )
version = protocol_template_versions[-1].split(".")[0] version = protocol_template_versions[-1].split(".")[0]
return "{}/{}".format(procotol_template_folder, version) protocol_template_name = "{}/{}".format(procotol_template_folder, version)
protocol_template = ProtocolTemplate(prefix, protocol_template_name)
if protocol_template.valid:
selected_protocol_template = protocol_template_name
break
if selected_protocol_template is None:
raise RuntimeError("No valid protocol template found")
return selected_protocol_template
class Database(BackendDatabase): class Database(BackendDatabase):
......
...@@ -43,6 +43,7 @@ Docker helper classes ...@@ -43,6 +43,7 @@ Docker helper classes
""" """
import ast
import os import os
import simplejson as json import simplejson as json
import socket import socket
...@@ -52,8 +53,12 @@ import docker ...@@ -52,8 +53,12 @@ import docker
import subprocess as sp # nosec import subprocess as sp # nosec
import logging import logging
from packaging import version
from beat.core import stats from beat.core import stats
from .utils import build_env_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -85,7 +90,13 @@ class Host(object): ...@@ -85,7 +90,13 @@ class Host(object):
( (
self.processing_environments, self.processing_environments,
self.db_environments, self.db_environments,
) = self._discover_environments() ) = self._discover_environments_using_labels()
if not self.db_environments and not self.processing_environments:
(
self.processing_environments,
self.db_environments,
) = self._discover_environments_using_describe()
# (If necessary) Save the known infos about the images # (If necessary) Save the known infos about the images
if self.images_cache_filename is not None: if self.images_cache_filename is not None:
...@@ -123,6 +134,12 @@ class Host(object): ...@@ -123,6 +134,12 @@ class Host(object):
return attrs["image"] return attrs["image"]
def dbenv2docker(self, key):
"""Returns a nice docker image name given a BEAT database environment key"""
attrs = self.db_environments[key]
return attrs["image"]
def teardown(self): def teardown(self):
for container in self.containers: for container in self.containers:
self.rm(container) self.rm(container)
...@@ -157,7 +174,7 @@ class Host(object): ...@@ -157,7 +174,7 @@ class Host(object):
s.connect(("8.8.8.8", 1)) # connecting to a UDP address doesn't send packets s.connect(("8.8.8.8", 1)) # connecting to a UDP address doesn't send packets
return s.getsockname()[0] return s.getsockname()[0]
def _discover_environments(self): def _discover_environments_using_describe(self):
"""Returns a dictionary containing information about docker environments """Returns a dictionary containing information about docker environments
Raises: Raises:
...@@ -184,14 +201,14 @@ class Host(object): ...@@ -184,14 +201,14 @@ class Host(object):
Host.images_cache[image] = infos Host.images_cache[image] = infos
return infos return infos
except Exception as e: except Exception as e:
logger.warn( logger.warning(
"Ignoring potential environment at `%s' since " "Ignoring potential environment at `%s' since "
"`describe' output cannot be parsed: %s", "`describe' output cannot be parsed: %s",
image, image,
str(e), str(e),
) )
else: else:
logger.warn( logger.warning(
"Execution failed with status {}: \n" "Execution failed with status {}: \n"
"stdout: '{}'\n" "stdout: '{}'\n"
"stderr: '{}'".format(status, stdout, stderr) "stderr: '{}'".format(status, stdout, stderr)
...@@ -270,7 +287,7 @@ class Host(object): ...@@ -270,7 +287,7 @@ class Host(object):
) )
return False return False
else: else:
logger.warn( logger.warning(
"Overriding **existing** environment '%s' image " "Overriding **existing** environment '%s' image "
"with '%s'. To avoid this warning make " "with '%s'. To avoid this warning make "
"sure your docker images do not contain environments " "sure your docker images do not contain environments "
...@@ -299,10 +316,12 @@ class Host(object): ...@@ -299,10 +316,12 @@ class Host(object):
for image in images: for image in images:
# Call the "describe" application on each existing image # Call the "describe" application on each existing image
description = _describe(image) description = _describe(image)
if not description: if not description:
logger.debug("Description not found for", image)
continue continue
key = description["name"] + " (" + description["version"] + ")" key = build_env_name(description)
if "databases" in description: if "databases" in description:
if (key in db_environments) and not _must_replace( if (key in db_environments) and not _must_replace(
...@@ -331,6 +350,115 @@ class Host(object): ...@@ -331,6 +350,115 @@ class Host(object):
return (environments, db_environments) return (environments, db_environments)
def _discover_environments_using_labels(self):
"""Search BEAT runtime environments using docker labels"""
def _must_replace(key, image, environments):
environment = environments[key]
if environment["image"] not in image.tags:
logger.warning(
"Different images providing the same environment: {} VS {}".format(
environment["image"], image.tags
)
)
if self.raise_on_errors:
raise RuntimeError(
"Environments at '%s' and '%s' have the "
"same name ('%s'). Distinct environments must be "
"uniquely named. Fix this and re-start."
% (image.tags[0], environments[key]["image"], key)
)
else:
logger.debug("Keeping more recent")
current_version = "{}{}".format(
environment["version"], environment["revision"]
)
new_version = "{}{}".format(
image.labels["beat.env.version"], image.labels["beat.env.revision"]
)
current_version = version.parse(current_version)
new_version = version.parse(new_version)
return new_version > current_version
def _parse_image_info(image):
labels = image.labels
data = {
"image": image.tags[0],
"name": labels["beat.env.name"],
"version": labels["beat.env.version"],
"revision": labels["beat.env.revision"],
}
database_list = labels.get("beat.env.databases")
if database_list:
data["databases"] = ast.literal_eval(database_list)
capabilities = labels.get("beat.env.capabilities")
if capabilities:
data["capabilities"] = ast.literal_eval(capabilities)
return data
def _process_image_list(image_list):
environments = {}
for image in image_list:
if not len(image.tags):
logger.warning("Untagged image, skipping")
continue
image_info = _parse_image_info(image)
key = build_env_name(image_info)
image_name = image_info["image"]
if key in environments:
if _must_replace(key, image, environments):
environments[key] = image_info
logger.info("Updated '%s' -> '%s'", key, image_name)
else:
environments[key] = image_info
Host.images_cache[image_name] = environments[key]
logger.info("Registered '%s' -> '%s'", key, image_name)
return environments
client = docker.from_env()
try:
databases = client.images.list(
filters={"label": ["beat.env.type=database"]}
)
except Exception as e:
if self.raise_on_errors:
raise
else:
logger.error("Docker error: {}".format(e))
return {}, {}
else:
db_environments = _process_image_list(databases)
try:
executors = client.images.list(
filters={"label": ["beat.env.type=execution"]}
)
except Exception as e:
if self.raise_on_errors:
raise
else:
logger.error("Docker error: {}".format(e))
return {}, {}
else:
environments = _process_image_list(executors)
logger.debug(
"Found %d environments and %d database environments",
len(environments),
len(db_environments),
)
return environments, db_environments
def create_container(self, image, command): def create_container(self, image, command):
if image in self: # Replace by a real image name if image in self: # Replace by a real image name
...@@ -360,7 +488,7 @@ class Host(object): ...@@ -360,7 +488,7 @@ class Host(object):
limitation is not put in place. limitation is not put in place.
""" """
cmd = ["docker", "run", "-tid"] cmd = ["docker", "run", "--tty", "--interactive", "--detach", "--read-only"]
network = container.network network = container.network
if network: if network:
...@@ -387,7 +515,8 @@ class Host(object): ...@@ -387,7 +515,8 @@ class Host(object):
if ("capabilities" in image_infos) and ( if ("capabilities" in image_infos) and (
"gpu" in image_infos["capabilities"] "gpu" in image_infos["capabilities"]
): ):
cmd.append("--runtime=nvidia") if os.path.exists("/proc/driver/nvidia"):
cmd.append("--gpus=all")
if virtual_memory_in_megabytes: if virtual_memory_in_megabytes:
# For this to work properly, memory swap limitation has to be # For this to work properly, memory swap limitation has to be
...@@ -541,7 +670,7 @@ class Host(object): ...@@ -541,7 +670,7 @@ class Host(object):
status = self.status(container) status = self.status(container)
if status not in ("created", "exited"): if status not in ("created", "exited"):
logger.warn( logger.warning(
"Killing container '%s' which is on state '%s'", container.id, status "Killing container '%s' which is on state '%s'", container.id, status
) )
self._exec(["docker", "container", "stop", container.id]) self._exec(["docker", "container", "stop", container.id])
......
...@@ -230,6 +230,17 @@ class BaseExecutor(object): ...@@ -230,6 +230,17 @@ class BaseExecutor(object):
"The input '%s' doesn't exist in the loop algorithm" % name "The input '%s' doesn't exist in the loop algorithm" % name
) )
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent # Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map): if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append( self.errors.append(
......
This diff is collapsed.
...@@ -188,14 +188,24 @@ class LocalExecutor(BaseExecutor): ...@@ -188,14 +188,24 @@ class LocalExecutor(BaseExecutor):
self.zmq_context = None self.zmq_context = None
def __cleanup(self): def __cleanup(self, early=False):
if self.loop_executor: if self.loop_executor:
if early:
self.loop_socket.send_string("don")
self.loop_socket.recv() # ack
self.loop_executor.wait() self.loop_executor.wait()
self.loop_executor.close()
for handler in [self.message_handler, self.loop_message_handler]: for handler in [self.message_handler, self.loop_message_handler]:
if handler: if handler:
handler.kill() handler.kill()
try:
handler.join() handler.join()
except RuntimeError:
# The handler was not started
pass
handler.destroy() handler.destroy()
for socket in [self.executor_socket, self.loop_socket]: for socket in [self.executor_socket, self.loop_socket]:
...@@ -310,15 +320,35 @@ class LocalExecutor(BaseExecutor): ...@@ -310,15 +320,35 @@ class LocalExecutor(BaseExecutor):
cache_root=self.cache, cache_root=self.cache,
) )
try:
retval = self.loop_executor.setup() retval = self.loop_executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
retval = False
else:
message = None
if not retval: if not retval:
self.__cleanup() self.__cleanup()
raise RuntimeError("Loop algorithm setup failed") error = "Loop algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.loop_executor.prepare() prepared = self.loop_executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = False
else:
message = None
if not prepared: if not prepared:
self.__cleanup() self.__cleanup()
raise RuntimeError("Loop algorithm prepare failed") error = "Loop algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
self.loop_executor.process() self.loop_executor.process()
...@@ -330,28 +360,50 @@ class LocalExecutor(BaseExecutor): ...@@ -330,28 +360,50 @@ class LocalExecutor(BaseExecutor):
loop_socket=self.loop_socket, loop_socket=self.loop_socket,
) )
retval = self.executor.setup() try:
if not retval: status = self.executor.setup()
self.__cleanup() except Exception as e:
raise RuntimeError("Algorithm setup failed") message = _process_exception(e, self.prefix, "algorithms")
status = 0
else:
message = None
if not status:
self.__cleanup(early=True)
error = "Algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.executor.prepare() prepared = self.executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = 0
else:
message = None
if not prepared: if not prepared:
self.__cleanup() self.__cleanup(early=True)
raise RuntimeError("Algorithm prepare failed") error = "Algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
_start = time.time() _start = time.time()
try: try:
processed = self.executor.process() processed = self.executor.process()
except Exception as e: except Exception as e:
message = _process_exception(e, self.prefix, "databases") message = _process_exception(e, self.prefix, "algorithms")
self.__cleanup() self.__cleanup()
return _create_result(1, message) return _create_result(1, message)
if not processed: if not processed:
self.__cleanup() self.__cleanup()
raise RuntimeError("Algorithm process failed") raise RuntimeError(
"Algorithm {} process failed".format(self.algorithm.name)
)
proc_time = time.time() - _start proc_time = time.time() - _start
......
...@@ -42,18 +42,12 @@ remote ...@@ -42,18 +42,12 @@ remote
Execution utilities Execution utilities
""" """
import logging
from .base import BaseExecutor from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration from beat.backend.python.helpers import create_outputs_from_configuration
logger = logging.getLogger(__name__)
class RemoteExecutor(BaseExecutor): class RemoteExecutor(BaseExecutor):
"""Base class for Executors that communicate with a message handler """Base class for Executors that communicate with a message handler
......
...@@ -123,6 +123,12 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -123,6 +123,12 @@ class SubprocessExecutor(RemoteExecutor):
guarantee that the cache is refreshed as appropriate in case the guarantee that the cache is refreshed as appropriate in case the
underlying libraries change. underlying libraries change.
custom_root_folders (dict): A dictionary mapping databases name and
their location on disk
ip_address (str): IP address of the machine to connect to for the database
execution and message handlers.
python_path (str): Path to the python executable of the environment to use
for experiment execution.
Attributes: Attributes:
...@@ -172,8 +178,8 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -172,8 +178,8 @@ class SubprocessExecutor(RemoteExecutor):
library_cache=None, library_cache=None,
custom_root_folders=None, custom_root_folders=None,
ip_address="127.0.0.1", ip_address="127.0.0.1",
python_path=None,
): ):
super(SubprocessExecutor, self).__init__( super(SubprocessExecutor, self).__init__(
prefix, prefix,
data, data,
...@@ -186,14 +192,30 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -186,14 +192,30 @@ class SubprocessExecutor(RemoteExecutor):
custom_root_folders=custom_root_folders, custom_root_folders=custom_root_folders,
) )
if python_path is None:
base_path = os.path.dirname(sys.argv[0])
# We need three apps to run this function: databases_provider and execute # We need three apps to run this function: databases_provider and execute
self.EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]), "execute")) self.EXECUTE_BIN = _which(os.path.join(base_path, "execute"))
self.LOOP_EXECUTE_BIN = _which( self.LOOP_EXECUTE_BIN = _which(os.path.join(base_path, "loop_execute"))
os.path.join(os.path.dirname(sys.argv[0]), "loop_execute") self.DBPROVIDER_BIN = _which(os.path.join(base_path, "databases_provider"))
) else:
self.DBPROVIDER_BIN = _which( base_path = os.path.dirname(python_path)
os.path.join(os.path.dirname(sys.argv[0]), "databases_provider") self.EXECUTE_BIN = os.path.join(base_path, "execute")
) self.LOOP_EXECUTE_BIN = os.path.join(base_path, "loop_execute")
self.DBPROVIDER_BIN = os.path.join(base_path, "databases_provider")
if any(
[
not os.path.exists(executable)
for executable in [
self.EXECUTE_BIN,
self.LOOP_EXECUTE_BIN,
self.DBPROVIDER_BIN,
]
]
):
raise RuntimeError("Invalid environment")
def __create_db_process(self, configuration_name=None): def __create_db_process(self, configuration_name=None):
databases_process = None databases_process = None
...@@ -384,7 +406,9 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -384,7 +406,9 @@ class SubprocessExecutor(RemoteExecutor):
) )
if self.loop_algorithm is not None: if self.loop_algorithm is not None:
cmd.append("tcp://" + self.ip_address + (":%d" % loop_algorithm_port)) cmd.append(
"--loop=tcp://" + self.ip_address + (":%d" % loop_algorithm_port)
)
if logger.getEffectiveLevel() <= logging.DEBUG: if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, "--debug") cmd.insert(1, "--debug")
......
This diff is collapsed.
...@@ -67,7 +67,9 @@ class Storage(utils.Storage): ...@@ -67,7 +67,9 @@ class Storage(utils.Storage):
def __init__(self, prefix, name): def __init__(self, prefix, name):
if name.count("/") != 2: if name.count("/") != 2:
raise RuntimeError(f"invalid plotterparameter name: {name}") raise RuntimeError(
"invalid plotterparameter name: {name}".format(name=name)
)
self.username, self.name, self.version = name.split("/") self.username, self.name, self.version = name.split("/")
self.fullname = name self.fullname = name
...@@ -170,7 +172,9 @@ class Plotterparameter(object): ...@@ -170,7 +172,9 @@ class Plotterparameter(object):
self.storage = Storage(self.prefix, self._name) self.storage = Storage(self.prefix, self._name)
if not self.storage.json.exists(): if not self.storage.json.exists():
self.errors.append( self.errors.append(
f"Plotterparameter declaration file not found: {data}" "Plotterparameter declaration file not found: {data}".format(
data=data
)
) )
return return
data = self.storage.json.path # loads data from JSON declaration data = self.storage.json.path # loads data from JSON declaration
...@@ -219,12 +223,16 @@ class Plotterparameter(object): ...@@ -219,12 +223,16 @@ class Plotterparameter(object):
self.plotter.clean_parameter(key, val) self.plotter.clean_parameter(key, val)
except KeyError: except KeyError:
self.errors.append( self.errors.append(
f"'{key}' isn't a parameter for plotter {self.plotter.name}" "'{key}' isn't a parameter for plotter {name}".format(
key=key, name=self.plotter.name
)
) )
return return
except ValueError: except ValueError:
self.errors.append( self.errors.append(
f"'{val}' is invalid for parameter {key} of plotter {self.plotter.name}" "'{val}' is invalid for parameter {key} of plotter {name}".format(
val=val, key=key, name=self.plotter.name
)
) )
return return
......
...@@ -7,15 +7,15 @@ from beat.backend.python.database import View ...@@ -7,15 +7,15 @@ from beat.backend.python.database import View
class FooView(View): class FooView(View):
def setup( def setup(
self, self, root_folder, outputs, parameters, start_index=None, end_index=None,
root_folder,
outputs,
parameters,
force_start_index=None,
force_end_index=None,
): ):
"""Initializes the database""" """Initializes the database"""
# DO NOT REMOVE: this is required to setup the view internal state
super().setup(root_folder, outputs, parameters, start_index, end_index)
# Add custom setup code here
return True return True
def index(self, root_folder, parameters): def index(self, root_folder, parameters):
......
...@@ -55,7 +55,6 @@ ...@@ -55,7 +55,6 @@
"language": { "$ref": "../common/1.json#/definitions/language" }, "language": { "$ref": "../common/1.json#/definitions/language" },
"description": { "$ref": "../common/1.json#/definitions/description" }, "description": { "$ref": "../common/1.json#/definitions/description" },
"groups": { "$ref": "common.json#/definitions/analyzer_groups" }, "groups": { "$ref": "common.json#/definitions/analyzer_groups" },
"parameters": { "$ref": "common.json#/definitions/parameters" },
"results": { "$ref": "common.json#/definitions/results" }, "results": { "$ref": "common.json#/definitions/results" },
"uses": { "$ref": "../common/1.json#/definitions/uses" }, "uses": { "$ref": "../common/1.json#/definitions/uses" },
"schema_version": { "$ref": "common.json#/definitions/schema_version" }, "schema_version": { "$ref": "common.json#/definitions/schema_version" },
......
...@@ -6,8 +6,8 @@ ...@@ -6,8 +6,8 @@
"oneOf": [ "oneOf": [
{ "$ref": "2.json#/definitions/block" }, { "$ref": "2.json#/definitions/block" },
{ "$ref": "2.json#/definitions/analyzer" }, { "$ref": "2.json#/definitions/analyzer" },
{ "$ref": "#/definitions/loop_user" }, { "$ref": "#/definitions/loop_evaluator" },
{ "$ref": "#/definitions/loop" } { "$ref": "#/definitions/loop_processor" }
], ],
"definitions": { "definitions": {
...@@ -71,11 +71,13 @@ ...@@ -71,11 +71,13 @@
"properties": { "properties": {
"name": { "type": "string" }, "name": { "type": "string" },
"inputs": { "$ref": "common.json#/definitions/endpoints" }, "inputs": { "$ref": "common.json#/definitions/endpoints" },
"outputs": { "$ref": "common.json#/definitions/endpoints" },
"loop": { "$ref": "#/definitions/loop_io_group" } "loop": { "$ref": "#/definitions/loop_io_group" }
}, },
"required": [ "required": [
"inputs", "inputs",
"outputs",
"loop" "loop"
], ],
...@@ -99,7 +101,7 @@ ...@@ -99,7 +101,7 @@
}, },
"loop_user": { "loop_processor": {
"type": "object", "type": "object",
...@@ -113,9 +115,8 @@ ...@@ -113,9 +115,8 @@
"schema_version": { "$ref": "common.json#/definitions/schema_version" }, "schema_version": { "$ref": "common.json#/definitions/schema_version" },
"api_version": { "$ref": "common.json#/definitions/api_version" }, "api_version": { "$ref": "common.json#/definitions/api_version" },
"type": { "type": {
"$comment": "Change enum to const when tools allow v6 json schema",
"type": "string", "type": "string",
"enum": ["loop_user"] "enum": ["sequential_loop_processor", "autonomous_loop_processor"]
} }
}, },
...@@ -129,7 +130,7 @@ ...@@ -129,7 +130,7 @@
}, },
"loop": { "loop_evaluator": {
"type": "object", "type": "object",
...@@ -142,9 +143,8 @@ ...@@ -142,9 +143,8 @@
"schema_version": { "$ref": "common.json#/definitions/schema_version" }, "schema_version": { "$ref": "common.json#/definitions/schema_version" },
"api_version": { "$ref": "common.json#/definitions/api_version" }, "api_version": { "$ref": "common.json#/definitions/api_version" },
"type": { "type": {
"$comment": "Change enum to const when tools allow v6 json schema",
"type": "string", "type": "string",
"enum": ["loop"] "enum": ["sequential_loop_evaluator", "autonomous_loop_evaluator"]
} }
}, },
......
...@@ -39,7 +39,8 @@ ...@@ -39,7 +39,8 @@
}, },