...
 
Commits (175)
[flake8] [flake8]
max-line-length = 80 max-line-length = 88
select = B,C,E,F,W,T4,B9,B950 select = B,C,E,F,W,T4,B9,B950
ignore = E501, W503 ignore = E501, W503, E203
[settings]
multi_line_output=3
include_trailing_comma=true
force_grid_wrap=0
use_parentheses=true
ensure_newline_before_comments=true
line_length=88
force_single_line=true
# See https://pre-commit.com for more information # See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks # See https://pre-commit.com/hooks.html for more hooks
repos: repos:
- repo: https://github.com/ambv/black - repo: https://github.com/timothycrosley/isort
rev: stable rev: 5.0.4
hooks: hooks:
- id: black - id: isort
language_version: python3.6 files: .*.py
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py - repo: https://github.com/psf/black
rev: 19.10b0
hooks:
- id: black
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- repo: https://github.com/pre-commit/pre-commit-hooks - repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.0.0 rev: v3.1.0
hooks: hooks:
- id: check-ast
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: check-case-conflict
- id: trailing-whitespace - id: trailing-whitespace
- id: end-of-file-fixer - id: end-of-file-fixer
- id: debug-statements - id: debug-statements
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: check-added-large-files - id: check-added-large-files
- id: check-docstring-first - id: check-docstring-first
- id: flake8
- id: check-yaml - id: check-yaml
exclude: conda/meta.yaml exclude: conda/meta.yaml
- repo: https://github.com/PyCQA/flake8
rev: 3.8.3
hooks:
- id: flake8
- repo: https://github.com/PyCQA/bandit - repo: https://github.com/PyCQA/bandit
rev: 'master' # Update me! rev: 1.6.2
hooks: hooks:
- id: bandit - id: bandit
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
......
...@@ -31,9 +31,9 @@ ...@@ -31,9 +31,9 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. .. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
=========================================================== ==============================
Authors of the Biometrics Evaluation and Testing Platform Authors of the BEAT Platform
=========================================================== ==============================
Andre Anjos <andre.anjos@idiap.ch> Andre Anjos <andre.anjos@idiap.ch>
Flavio Tarsetti <flavio.tarsetti@idiap.ch> Flavio Tarsetti <flavio.tarsetti@idiap.ch>
......
...@@ -31,18 +31,14 @@ ...@@ -31,18 +31,14 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. .. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
.. image:: https://img.shields.io/badge/docs-stable-yellow.svg .. image:: https://img.shields.io/badge/docs-available-orange.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/stable/index.html
.. image:: https://img.shields.io/badge/docs-latest-orange.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/master/index.html :target: https://www.idiap.ch/software/beat/docs/beat/beat.core/master/index.html
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/build.svg .. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/pipeline.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master :target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/coverage.svg .. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/coverage.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master :target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://img.shields.io/badge/gitlab-project-0000c0.svg .. image:: https://img.shields.io/badge/gitlab-project-0000c0.svg
:target: https://gitlab.idiap.ch/beat/beat.core :target: https://gitlab.idiap.ch/beat/beat.core
.. image:: https://img.shields.io/pypi/v/beat.core.svg
:target: https://pypi.python.org/pypi/beat.core
========================== ==========================
......
...@@ -33,8 +33,8 @@ ...@@ -33,8 +33,8 @@
# # # #
################################################################################### ###################################################################################
# see https://docs.python.org/3/library/pkgutil.html # see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path from pkgutil import extend_path
__path__ = extend_path(__path__, __name__) __path__ = extend_path(__path__, __name__)
...@@ -45,22 +45,21 @@ Forward importing from :py:mod:`beat.backend.python.algorithm` ...@@ -45,22 +45,21 @@ Forward importing from :py:mod:`beat.backend.python.algorithm`
:py:class:`beat.backend.python.algorithm.Storage` :py:class:`beat.backend.python.algorithm.Storage`
:py:class:`beat.backend.python.algorithm.Runner` :py:class:`beat.backend.python.algorithm.Runner`
""" """
import os import os
import six
import numpy import numpy
import pkg_resources import pkg_resources
import simplejson as json import simplejson as json
import six
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Storage
from . import dataformat from . import dataformat
from . import library from . import library
from . import schema
from . import prototypes from . import prototypes
from . import schema
from beat.backend.python.algorithm import Storage
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
def load_algorithm_prototype(prefix): def load_algorithm_prototype(prefix):
...@@ -182,6 +181,8 @@ class Algorithm(BackendAlgorithm): ...@@ -182,6 +181,8 @@ class Algorithm(BackendAlgorithm):
""" """
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None): def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache) super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
...@@ -303,89 +304,52 @@ class Algorithm(BackendAlgorithm): ...@@ -303,89 +304,52 @@ class Algorithm(BackendAlgorithm):
"declaration: %s" % (self.name, ", ".join(all_output_names)) "declaration: %s" % (self.name, ", ".join(all_output_names))
) )
def _validate_format(self, type_name, group_name, entry_name, dataformat):
if dataformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for %s `%s' on algorithm `%s': %s"
% (
type_name,
group_name,
entry_name,
self.name,
"\n".join(dataformat.errors),
)
)
def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
def _validate_required_dataformats(self, dataformat_cache): def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats """Makes sure we can load all requested formats
""" """
for group in self.groups: for group in self.groups:
for name, input in group["inputs"].items(): for name, input_ in group["inputs"].items():
if input["type"] in self.dataformats: self._validate_dataformats(group, "inputs", dataformat_cache)
continue
if dataformat_cache and input["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[input["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, input["type"])
if dataformat_cache is not None: # update it
dataformat_cache[input["type"]] = thisformat
self.dataformats[input["type"]] = thisformat
if thisformat.errors: if "outputs" in group:
self.errors.append( self._validate_dataformats(group, "outputs", dataformat_cache)
"found error validating data format `%s' "
"for input `%s' on algorithm `%s': %s"
% (input["type"], name, self.name, "\n".join(thisformat.errors))
)
if "outputs" not in group:
continue
for name, output in group["outputs"].items():
if output["type"] in self.dataformats:
continue
if dataformat_cache and output["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[output["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, output["type"])
if dataformat_cache is not None: # update it
dataformat_cache[output["type"]] = thisformat
self.dataformats[output["type"]] = thisformat
if thisformat.errors: if "loop" in group:
self.errors.append( self._validate_dataformats(group, "loop", dataformat_cache)
"found error validating data format `%s' "
"for output `%s' on algorithm `%s': %s"
% (
output["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
if self.results: if self.results:
for name, result in self.results.items(): for name, result in self.results.items():
result_type = result["type"]
if result["type"].find("/") != -1: # results can only contain base types and plots therefore, only
# process plots
if result["type"] in self.dataformats: if result_type.find("/") != -1:
continue thisformat = self._update_dataformat_cache(
result_type, dataformat_cache
if dataformat_cache and result["type"] in dataformat_cache: # reuse )
thisformat = dataformat_cache[result["type"]] self._validate_format(result_type, "result", name, thisformat)
else:
thisformat = dataformat.DataFormat(self.prefix, result["type"])
if dataformat_cache is not None: # update it
dataformat_cache[result["type"]] = thisformat
self.dataformats[result["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for result `%s' on algorithm `%s': %s"
% (
result["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
def _convert_parameter_types(self): def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and """Converts types to numpy equivalents, checks defaults, ranges and
......
...@@ -49,15 +49,14 @@ Options: ...@@ -49,15 +49,14 @@ Options:
-V, --version Show version -V, --version Show version
-v, --verbose Increases the output verbosity level -v, --verbose Increases the output verbosity level
""" """
import os import os
import sys import sys
from docopt import docopt from docopt import docopt
from ..bcpapi.broker import BeatComputationBroker from ..bcpapi.broker import BeatComputationBroker
from ..version import __version__
from ..utils import setup_logging from ..utils import setup_logging
from ..version import __version__
def run(port=5555, verbose=1, callbacks=None): def run(port=5555, verbose=1, callbacks=None):
......
...@@ -41,46 +41,45 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide. ...@@ -41,46 +41,45 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide.
Usage: Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>] %(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] [--docker-network=<name>] [--cache=<path>] [--docker] [--docker-network=<name>]
[--port-range=<range>] <broker_address> [--port-range=<range>] [--cache-mount-point=<cache_mount_point>]
<broker_address>
%(prog)s (--help | -h) %(prog)s (--help | -h)
%(prog)s (--version | -V) %(prog)s (--version | -V)
Options: Options:
-h, --help Show this screen -h, --help Show this screen
-V, --version Show version -V, --version Show version
-v, --verbose Increases the output verbosity level -v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database. -n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node, This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s] but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .] -p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache' -c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use --docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers --port-range=<range> Range of port usable for communication with containers
--cache-mount-point=<cache_mount_point> NFS mount point to use for cache setup
""" """
import os import os
import sys
import signal import signal
import sys
import tempfile import tempfile
import simplejson as json from socket import gethostname
import simplejson as json
import zmq import zmq
from socket import gethostname
from docopt import docopt from docopt import docopt
from ..bcpapi import BCP from ..bcpapi import BCP
from ..bcpapi.worker import BeatComputationWorker
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.execution import ExecutionProcess from ..bcpapi.execution import ExecutionProcess
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.worker import BeatComputationWorker
from ..dock import Host from ..dock import Host
from ..utils import find_free_port from ..utils import find_free_port
from ..utils import setup_logging from ..utils import setup_logging
from ..version import __version__ from ..version import __version__
logger = None logger = None
...@@ -107,6 +106,7 @@ def run( ...@@ -107,6 +106,7 @@ def run(
docker_network_name=None, docker_network_name=None,
docker_port_range=None, docker_port_range=None,
docker_images_cache=None, docker_images_cache=None,
docker_cache_mount_point=None,
): ):
"""Start the worker """Start the worker
...@@ -202,6 +202,8 @@ def run( ...@@ -202,6 +202,8 @@ def run(
data["network_name"] = docker_network_name data["network_name"] = docker_network_name
if docker_port_range: if docker_port_range:
data["port_range"] = docker_port_range data["port_range"] = docker_port_range
if docker_cache_mount_point:
data["cache_mount_point"] = docker_cache_mount_point
# Start the execution # Start the execution
logger.info("Running '%s' with job id #%s", data["algorithm"], job_id) logger.info("Running '%s' with job id #%s", data["algorithm"], job_id)
...@@ -248,6 +250,7 @@ def run( ...@@ -248,6 +250,7 @@ def run(
logger.info("The scheduler shut down, we will wait for it") logger.info("The scheduler shut down, we will wait for it")
worker.destroy() worker.destroy()
worker.send_to_broker(BCP.BCPW_DISCONNECT)
worker.destroy() worker.destroy()
# Cleanup # Cleanup
for execution_process in execution_processes: for execution_process in execution_processes:
...@@ -301,6 +304,8 @@ def main(argv=None): ...@@ -301,6 +304,8 @@ def main(argv=None):
docker_images_cache = None docker_images_cache = None
docker_network_name = None docker_network_name = None
docker_port_range = None docker_port_range = None
docker_cache_mount_point = None
if args["--docker"]: if args["--docker"]:
docker_images_cache = os.path.join( docker_images_cache = os.path.join(
tempfile.gettempdir(), "beat-docker-images.json" tempfile.gettempdir(), "beat-docker-images.json"
...@@ -319,6 +324,14 @@ def main(argv=None): ...@@ -319,6 +324,14 @@ def main(argv=None):
return 1 return 1
logger.info("Using port range %s", docker_port_range) logger.info("Using port range %s", docker_port_range)
docker_cache_mount_point = args.get("--cache-mount-point", None)
if docker_cache_mount_point:
if not docker_cache_mount_point.startswith("nfs://"):
raise RuntimeError(
"Invalid nfs mount point: {}".format(docker_cache_mount_point)
)
logger.info("Using volume cache mount point %s", docker_cache_mount_point)
return run( return run(
broker_address, broker_address,
service_name=args.get("--name"), service_name=args.get("--name"),
...@@ -329,6 +342,7 @@ def main(argv=None): ...@@ -329,6 +342,7 @@ def main(argv=None):
docker_network_name=docker_network_name, docker_network_name=docker_network_name,
docker_port_range=docker_port_range, docker_port_range=docker_port_range,
docker_images_cache=docker_images_cache, docker_images_cache=docker_images_cache,
docker_cache_mount_point=docker_cache_mount_point,
) )
......
...@@ -37,20 +37,16 @@ ...@@ -37,20 +37,16 @@
Inspired by the Majordomo Protocol Broker Inspired by the Majordomo Protocol Broker
""" """
import logging import logging
import time
import signal import signal
import time
from binascii import hexlify from binascii import hexlify
import zmq import zmq
# local
from . import BCP from . import BCP
from .zhelpers import dump from .zhelpers import dump
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -95,6 +91,7 @@ class BeatComputationBroker(object): ...@@ -95,6 +91,7 @@ class BeatComputationBroker(object):
def __init__(self, verbose=False): def __init__(self, verbose=False):
"""Initialize broker state.""" """Initialize broker state."""
self.verbose = verbose self.verbose = verbose
self.continue_ = True self.continue_ = True
self.services = {} self.services = {}
...@@ -163,14 +160,17 @@ class BeatComputationBroker(object): ...@@ -163,14 +160,17 @@ class BeatComputationBroker(object):
def process_client(self, sender, msg): def process_client(self, sender, msg):
"""Process a request coming from a client.""" """Process a request coming from a client."""
# Service name + body # Service name + body
assert len(msg) >= 2 # nosec assert len(msg) >= 2 # nosec
service = msg.pop(0) service = msg.pop(0)
# Set reply return address to client sender # Set reply return address to client sender
msg = [sender, b""] + msg msg = [sender, b""] + msg
self.dispatch(self.require_service(service), msg) self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg): def process_worker(self, sender, msg):
"""Process message sent to us by a worker.""" """Process message sent to us by a worker."""
# At least, command # At least, command
assert len(msg) >= 1 # nosec assert len(msg) >= 1 # nosec
...@@ -222,12 +222,14 @@ class BeatComputationBroker(object): ...@@ -222,12 +222,14 @@ class BeatComputationBroker(object):
def delete_worker(self, worker, disconnect): def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker.""" """Deletes worker from all data structures, and deletes worker."""
assert worker is not None # nosec assert worker is not None # nosec
if disconnect: if disconnect:
self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None) self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None)
if worker.service is not None: if worker.service is not None:
worker.service.waiting.remove(worker) if worker in worker.service.waiting:
worker.service.waiting.remove(worker)
on_disconnection = self.callbacks.get("on_disconnection", None) on_disconnection = self.callbacks.get("on_disconnection", None)
if on_disconnection: if on_disconnection:
...@@ -236,8 +238,12 @@ class BeatComputationBroker(object): ...@@ -236,8 +238,12 @@ class BeatComputationBroker(object):
if worker.identity in self.workers: if worker.identity in self.workers:
self.workers.pop(worker.identity) self.workers.pop(worker.identity)
if worker in self.waiting:
self.waiting.pop(self.waiting.index(worker))
def require_worker(self, address): def require_worker(self, address):
"""Finds the worker (creates if necessary).""" """Finds the worker (creates if necessary)."""
assert address is not None # nosec assert address is not None # nosec
identity = hexlify(address) identity = hexlify(address)
worker = self.workers.get(identity) worker = self.workers.get(identity)
...@@ -251,6 +257,7 @@ class BeatComputationBroker(object): ...@@ -251,6 +257,7 @@ class BeatComputationBroker(object):
def require_service(self, name): def require_service(self, name):
"""Locates the service (creates if necessary).""" """Locates the service (creates if necessary)."""
assert name is not None # nosec assert name is not None # nosec
service = self.services.get(name) service = self.services.get(name)
if service is None: if service is None:
...@@ -264,11 +271,13 @@ class BeatComputationBroker(object): ...@@ -264,11 +271,13 @@ class BeatComputationBroker(object):
We use a single socket for both clients and workers. We use a single socket for both clients and workers.
""" """
self.socket.bind(endpoint) self.socket.bind(endpoint)
logger.info("I: BCP broker/0.0.1 is active at %s", endpoint) logger.info("I: BCP broker/0.0.1 is active at %s", endpoint)
def send_heartbeats(self): def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time""" """Send heartbeats to idle workers if it's time"""
if time.time() > self.heartbeat_at: if time.time() > self.heartbeat_at:
for worker in self.waiting: for worker in self.waiting:
self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None) self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None)
...@@ -276,16 +285,16 @@ class BeatComputationBroker(object): ...@@ -276,16 +285,16 @@ class BeatComputationBroker(object):
self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL
def purge_workers(self): def purge_workers(self):
"""Look for & kill expired workers. """Look for & kill expired workers."""
"""
for item in self.waiting: for item in self.waiting:
if item.expiry < time.time(): if item.expiry < time.time():
logger.info("I: deleting expired worker: %s", item.identity) logger.info("I: deleting expired worker: %s", item.identity)
self.delete_worker(item, False) self.delete_worker(item, False)
self.waiting.pop(self.waiting.index(item))
def worker_waiting(self, worker): def worker_waiting(self, worker):
"""This worker is now waiting for work.""" """This worker is now waiting for work."""
# Queue to broker and service waiting lists # Queue to broker and service waiting lists
if worker not in self.waiting: if worker not in self.waiting:
...@@ -298,10 +307,14 @@ class BeatComputationBroker(object): ...@@ -298,10 +307,14 @@ class BeatComputationBroker(object):
def dispatch(self, service, msg): def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible""" """Dispatch requests to waiting workers as possible"""
assert service is not None # nosec assert service is not None # nosec
if msg is not None: # Queue message if any if msg is not None: # Queue message if any
service.requests.append(msg) service.requests.append(msg)
self.purge_workers() self.purge_workers()
while service.waiting and service.requests: while service.waiting and service.requests:
msg = service.requests.pop(0) msg = service.requests.pop(0)
worker = service.waiting.pop(0) worker = service.waiting.pop(0)
......
...@@ -85,7 +85,7 @@ class BeatComputationClient(object): ...@@ -85,7 +85,7 @@ class BeatComputationClient(object):
request = [b"", BCP.BCPC_CLIENT, service] + request request = [b"", BCP.BCPC_CLIENT, service] + request
if self.verbose: if self.verbose:
logger.warn("I: send request to '%s' service: ", service) logger.warning("I: send request to '%s' service: ", service)
dump(request) dump(request)
self.client.send_multipart(request) self.client.send_multipart(request)
......
...@@ -40,17 +40,16 @@ management ...@@ -40,17 +40,16 @@ management
Execution utilities Execution utilities
""" """
import logging import logging
import multiprocessing import multiprocessing
import signal import signal
import zmq
import simplejson as json import simplejson as json
import zmq
from ..dock import Host from ..dock import Host
from ..execution.local import LocalExecutor
from ..execution.docker import DockerExecutor from ..execution.docker import DockerExecutor
from ..execution.local import LocalExecutor
from . import BCP from . import BCP
......
...@@ -34,8 +34,8 @@ ...@@ -34,8 +34,8 @@
"""BEAT Computation worker""" """BEAT Computation worker"""
import logging import logging
import zmq import zmq
from .zhelpers import dump from .zhelpers import dump
......
...@@ -34,14 +34,12 @@ ...@@ -34,14 +34,12 @@
"""BEAT Computation worker""" """BEAT Computation worker"""
import logging import logging
import time import time
import zmq import zmq
# BEAT Computation protocol constants:
from . import BCP from . import BCP
from .zhelpers import dump from .zhelpers import dump
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -58,8 +56,8 @@ class BeatComputationWorker(object): ...@@ -58,8 +56,8 @@ class BeatComputationWorker(object):
service = service.encode("utf-8") service = service.encode("utf-8")
self.heartbeat_at = ( self.heartbeat_at = (
0 0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
) # When to send HEARTBEAT (relative to time.time(), so in seconds) )
self.liveness = 0 # How many attempts left self.liveness = 0 # How many attempts left
self.heartbeat = 2500 # Heartbeat delay, msecs self.heartbeat = 2500 # Heartbeat delay, msecs
self.reconnect = 2500 # Reconnect delay, msecs self.reconnect = 2500 # Reconnect delay, msecs
...@@ -210,7 +208,7 @@ class BeatComputationWorker(object): ...@@ -210,7 +208,7 @@ class BeatComputationWorker(object):
self.liveness -= 1 self.liveness -= 1
if self.liveness == 0: if self.liveness == 0:
if self.verbose: if self.verbose:
logger.warn("W: disconnected from broker - retrying…") logger.warning("W: disconnected from broker - retrying…")
try: try:
time.sleep(1e-3 * self.reconnect) time.sleep(1e-3 * self.reconnect)
except KeyboardInterrupt: except KeyboardInterrupt:
...@@ -222,7 +220,7 @@ class BeatComputationWorker(object): ...@@ -222,7 +220,7 @@ class BeatComputationWorker(object):
self.send_to_broker(BCP.BCPW_HEARTBEAT) self.send_to_broker(BCP.BCPW_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
logger.warn("W: interrupt received, killing worker…") logger.warning("W: interrupt received, killing worker…")
return None return None
def destroy(self): def destroy(self):
......
...@@ -38,9 +38,9 @@ Helper module for common zmq task ...@@ -38,9 +38,9 @@ Helper module for common zmq task
Based on Majordomo protocol zhelpers. Based on Majordomo protocol zhelpers.
""" """
import binascii import binascii
import logging import logging
import zmq import zmq
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
......
...@@ -53,16 +53,15 @@ Forward importing from :py:mod:`beat.backend.python.data`: ...@@ -53,16 +53,15 @@ Forward importing from :py:mod:`beat.backend.python.data`:
:py:func:`beat.backend.python.data.load_data_index_db` :py:func:`beat.backend.python.data.load_data_index_db`
:py:func:`beat.backend.python.data.foundSplitRanges` :py:func:`beat.backend.python.data.foundSplitRanges`
""" """
from beat.backend.python.data import CachedDataSink # noqa
from beat.backend.python.data import mixDataIndices # noqa
from beat.backend.python.data import getAllFilenames # noqa
from beat.backend.python.data import DataSource # noqa
from beat.backend.python.data import CachedDataSource # noqa from beat.backend.python.data import CachedDataSource # noqa
from beat.backend.python.data import DatabaseOutputDataSource # noqa from beat.backend.python.data import DatabaseOutputDataSource # noqa
from beat.backend.python.data import RemoteDataSource # noqa
from beat.backend.python.data import DataSink # noqa from beat.backend.python.data import DataSink # noqa
from beat.backend.python.data import CachedDataSink # noqa from beat.backend.python.data import DataSource # noqa
from beat.backend.python.data import RemoteDataSource # noqa
from beat.backend.python.data import StdoutDataSink # noqa from beat.backend.python.data import StdoutDataSink # noqa
from beat.backend.python.data import foundSplitRanges # noqa
from beat.backend.python.data import getAllFilenames # noqa
from beat.backend.python.data import load_data_index # noqa from beat.backend.python.data import load_data_index # noqa
from beat.backend.python.data import load_data_index_db # noqa from beat.backend.python.data import load_data_index_db # noqa
from beat.backend.python.data import foundSplitRanges # noqa from beat.backend.python.data import mixDataIndices # noqa
...@@ -44,8 +44,6 @@ Forward importing from :py:mod:`beat.backend.python.data_loaders` ...@@ -44,8 +44,6 @@ Forward importing from :py:mod:`beat.backend.python.data_loaders`
:py:class:`beat.backend.python.data_loaders.DataLoader` :py:class:`beat.backend.python.data_loaders.DataLoader`
:py:class:`beat.backend.python.data_loaders.DataView` :py:class:`beat.backend.python.data_loaders.DataView`
""" """
from beat.backend.python.data_loaders import DataLoaderList # noqa
from beat.backend.python.data_loaders import DataLoader # noqa from beat.backend.python.data_loaders import DataLoader # noqa
from beat.backend.python.data_loaders import DataLoaderList # noqa
from beat.backend.python.data_loaders import DataView # noqa from beat.backend.python.data_loaders import DataView # noqa
...@@ -44,34 +44,43 @@ Validation of databases ...@@ -44,34 +44,43 @@ Validation of databases
Forward importing from :py:mod:`beat.backend.python.database`: Forward importing from :py:mod:`beat.backend.python.database`:
:py:class:`beat.backend.python.database.Storage` :py:class:`beat.backend.python.database.Storage`
""" """
import os import os
import six import six
from . import schema
from .dataformat import DataFormat
from . import prototypes
from beat.backend.python.database import Storage
from beat.backend.python.database import Database as BackendDatabase from beat.backend.python.database import Database as BackendDatabase
from beat.backend.python.database import Storage
from beat.backend.python.protocoltemplate import Storage as PTStorage from beat.backend.python.protocoltemplate import Storage as PTStorage
from . import prototypes
from . import schema
from .dataformat import DataFormat
from .protocoltemplate import ProtocolTemplate
def get_first_procotol_template(prefix): def get_first_procotol_template(prefix):
pt_root_folder = os.path.join(prefix, PTStorage.asset_folder) pt_root_folder = os.path.join(prefix, PTStorage.asset_folder)
pts_available = os.listdir(pt_root_folder) pts_available = sorted(os.listdir(pt_root_folder))
if not pts_available: if not pts_available:
raise RuntimeError("Invalid prefix content, no protocol template available") raise RuntimeError("Invalid prefix content, no protocol template available")
procotol_template_folder = pts_available[0] selected_protocol_template = None
protocol_template_versions = sorted( for procotol_template_folder in pts_available:
os.listdir(os.path.join(pt_root_folder, procotol_template_folder)) protocol_template_versions = sorted(
) os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
version = protocol_template_versions[-1].split(".")[0] )
return "{}/{}".format(procotol_template_folder, version) version = protocol_template_versions[-1].split(".")[0]
protocol_template_name = "{}/{}".format(procotol_template_folder, version)
protocol_template = ProtocolTemplate(prefix, protocol_template_name)
if protocol_template.valid:
selected_protocol_template = protocol_template_name
break
if selected_protocol_template is None:
raise RuntimeError("No valid protocol template found")
return selected_protocol_template
class Database(BackendDatabase): class Database(BackendDatabase):
......
...@@ -44,17 +44,17 @@ Validation and parsing for dataformats ...@@ -44,17 +44,17 @@ Validation and parsing for dataformats
Forward importing from :py:mod:`beat.backend.python.dataformat`: Forward importing from :py:mod:`beat.backend.python.dataformat`:
:py:class:`beat.backend.python.dataformat.Storage` :py:class:`beat.backend.python.dataformat.Storage`
""" """
import copy import copy
import six import six
from . import schema from beat.backend.python.dataformat import DataFormat as BackendDataFormat
from beat.backend.python.dataformat import Storage # noqa
from . import prototypes from . import prototypes
from . import schema
from . import utils from . import utils
from beat.backend.python.dataformat import Storage # noqa
from beat.backend.python.dataformat import DataFormat as BackendDataFormat
class DataFormat(BackendDataFormat): class DataFormat(BackendDataFormat):
"""Data formats define the chunks of data that circulate between blocks. """Data formats define the chunks of data that circulate between blocks.
......
This diff is collapsed.
...@@ -41,10 +41,8 @@ environment ...@@ -41,10 +41,8 @@ environment
Helper functions related to environment management Helper functions related to environment management
""" """
import re
import logging import logging
import re
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
......
...@@ -41,21 +41,19 @@ base ...@@ -41,21 +41,19 @@ base
Execution utilities Execution utilities
""" """
import os
import glob
import collections import collections
import glob
import logging import logging
import simplejson as json import os
from .. import schema import simplejson as json
from .. import database
from .. import algorithm
from .. import stats
from beat.backend.python.helpers import convert_experiment_configuration_to_container from beat.backend.python.helpers import convert_experiment_configuration_to_container
from .. import algorithm
from .. import database
from .. import schema
from .. import stats
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -230,6 +228,17 @@ class BaseExecutor(object): ...@@ -230,6 +228,17 @@ class BaseExecutor(object):
"The input '%s' doesn't exist in the loop algorithm" % name "The input '%s' doesn't exist in the loop algorithm" % name
) )
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent # Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map): if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append( self.errors.append(
......
This diff is collapsed.
...@@ -41,20 +41,19 @@ local ...@@ -41,20 +41,19 @@ local
Execution utilities Execution utilities
""" """
import logging
import os import os
import shutil
import sys import sys
import tempfile import tempfile
import shutil
import zmq
import time import time
import logging
import zmq
from beat.backend.python.execution import AlgorithmExecutor from beat.backend.python.execution import AlgorithmExecutor
from beat.backend.python.execution import MessageHandler
from beat.backend.python.execution import LoopExecutor from beat.backend.python.execution import LoopExecutor
from beat.backend.python.execution import LoopMessageHandler from beat.backend.python.execution import LoopMessageHandler
from beat.backend.python.execution import MessageHandler
from .base import BaseExecutor from .base import BaseExecutor
...@@ -188,14 +187,24 @@ class LocalExecutor(BaseExecutor): ...@@ -188,14 +187,24 @@ class LocalExecutor(BaseExecutor):
self.zmq_context = None self.zmq_context = None
def __cleanup(self): def __cleanup(self, early=False):
if self.loop_executor: if self.loop_executor:
if early:
self.loop_socket.send_string("don")
self.loop_socket.recv() # ack
self.loop_executor.wait() self.loop_executor.wait()
self.loop_executor.close()
for handler in [self.message_handler, self.loop_message_handler]: for handler in [self.message_handler, self.loop_message_handler]:
if handler: if handler:
handler.kill() handler.kill()
handler.join() try:
handler.join()
except RuntimeError:
# The handler was not started
pass
handler.destroy() handler.destroy()
for socket in [self.executor_socket, self.loop_socket]: for socket in [self.executor_socket, self.loop_socket]:
...@@ -310,15 +319,35 @@ class LocalExecutor(BaseExecutor): ...@@ -310,15 +319,35 @@ class LocalExecutor(BaseExecutor):
cache_root=self.cache, cache_root=self.cache,
) )
retval = self.loop_executor.setup() try:
retval = self.loop_executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
retval = False
else:
message = None
if not retval: if not retval:
self.__cleanup() self.__cleanup()
raise RuntimeError("Loop algorithm setup failed") error = "Loop algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.loop_executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = False
else:
message = None
prepared = self.loop_executor.prepare()
if not prepared: if not prepared:
self.__cleanup() self.__cleanup()
raise RuntimeError("Loop algorithm prepare failed") error = "Loop algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
self.loop_executor.process() self.loop_executor.process()
...@@ -330,28 +359,50 @@ class LocalExecutor(BaseExecutor): ...@@ -330,28 +359,50 @@ class LocalExecutor(BaseExecutor):
loop_socket=self.loop_socket, loop_socket=self.loop_socket,
) )
retval = self.executor.setup() try:
if not retval: status = self.executor.setup()
self.__cleanup() except Exception as e:
raise RuntimeError("Algorithm setup failed") message = _process_exception(e, self.prefix, "algorithms")
status = 0
else:
message = None
if not status:
self.__cleanup(early=True)
error = "Algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = 0
else:
message = None
prepared = self.executor.prepare()
if not prepared: if not prepared:
self.__cleanup() self.__cleanup(early=True)
raise RuntimeError("Algorithm prepare failed") error = "Algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
_start = time.time() _start = time.time()
try: try:
processed = self.executor.process() processed = self.executor.process()
except Exception as e: except Exception as e:
message = _process_exception(e, self.prefix, "databases") message = _process_exception(e, self.prefix, "algorithms")
self.__cleanup() self.__cleanup()
return _create_result(1, message) return _create_result(1, message)
if not processed: if not processed:
self.__cleanup() self.__cleanup()
raise RuntimeError("Algorithm process failed") raise RuntimeError(
"Algorithm {} process failed".format(self.algorithm.name)
)
proc_time = time.time() - _start proc_time = time.time() - _start
......
...@@ -41,17 +41,10 @@ remote ...@@ -41,17 +41,10 @@ remote
Execution utilities Execution utilities
""" """
import logging
from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration from beat.backend.python.helpers import create_outputs_from_configuration
from .base import BaseExecutor
logger = logging.getLogger(__name__)
class RemoteExecutor(BaseExecutor): class RemoteExecutor(BaseExecutor):
......
...@@ -40,22 +40,19 @@ subprocess ...@@ -40,22 +40,19 @@ subprocess
Execution utilities Execution utilities
""" """
import logging
import os import os
import shutil import shutil
import logging
import sys
import subprocess as sp # nosec import subprocess as sp # nosec
import sys
import tempfile import tempfile
from beat.backend.python.execution import MessageHandler from beat.backend.python.execution import MessageHandler
from .. import stats from .. import stats
from .. import utils from .. import utils
from .remote import RemoteExecutor from .remote import RemoteExecutor
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -123,6 +120,12 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -123,6 +120,12 @@ class SubprocessExecutor(RemoteExecutor):
guarantee that the cache is refreshed as appropriate in case the guarantee that the cache is refreshed as appropriate in case the
underlying libraries change. underlying libraries change.
custom_root_folders (dict): A dictionary mapping databases name and
their location on disk
ip_address (str): IP address of the machine to connect to for the database
execution and message handlers.
python_path (str): Path to the python executable of the environment to use
for experiment execution.
Attributes: Attributes:
...@@ -172,8 +175,8 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -172,8 +175,8 @@ class SubprocessExecutor(RemoteExecutor):
library_cache=None, library_cache=None,
custom_root_folders=None, custom_root_folders=None,
ip_address="127.0.0.1", ip_address="127.0.0.1",
python_path=None,
): ):
super(SubprocessExecutor, self).__init__( super(SubprocessExecutor, self).__init__(
prefix, prefix,
data, data,
...@@ -186,14 +189,30 @@ class SubprocessExecutor(RemoteExecutor): ...@@ -186,14 +189,30 @@ class SubprocessExecutor(RemoteExecutor):
custom_root_folders=custom_root_folders,