...
 
Commits (175)
[flake8]
max-line-length = 80
max-line-length = 88
select = B,C,E,F,W,T4,B9,B950
ignore = E501, W503
ignore = E501, W503, E203
[settings]
multi_line_output=3
include_trailing_comma=true
force_grid_wrap=0
use_parentheses=true
ensure_newline_before_comments=true
line_length=88
force_single_line=true
# See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/ambv/black
rev: stable
- repo: https://github.com/timothycrosley/isort
rev: 5.0.4
hooks:
- id: black
language_version: python3.6
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: isort
files: .*.py
- repo: https://github.com/psf/black
rev: 19.10b0
hooks:
- id: black
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.0.0
rev: v3.1.0
hooks:
- id: check-ast
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: check-case-conflict
- id: trailing-whitespace
- id: end-of-file-fixer
- id: debug-statements
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
- id: check-added-large-files
- id: check-docstring-first
- id: flake8
- id: check-yaml
exclude: conda/meta.yaml
- repo: https://github.com/PyCQA/flake8
rev: 3.8.3
hooks:
- id: flake8
- repo: https://github.com/PyCQA/bandit
rev: 'master' # Update me!
rev: 1.6.2
hooks:
- id: bandit
exclude: beat/core/test/prefix/algorithms/errors/syntax_error/1.py|beat/core/test/prefix/databases/invalid/1.py
......
......@@ -31,9 +31,9 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
===========================================================
Authors of the Biometrics Evaluation and Testing Platform
===========================================================
==============================
Authors of the BEAT Platform
==============================
Andre Anjos <andre.anjos@idiap.ch>
Flavio Tarsetti <flavio.tarsetti@idiap.ch>
......
......@@ -31,18 +31,14 @@
.. OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
.. image:: https://img.shields.io/badge/docs-stable-yellow.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/stable/index.html
.. image:: https://img.shields.io/badge/docs-latest-orange.svg
.. image:: https://img.shields.io/badge/docs-available-orange.svg
:target: https://www.idiap.ch/software/beat/docs/beat/beat.core/master/index.html
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/build.svg
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/pipeline.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://gitlab.idiap.ch/beat/beat.core/badges/master/coverage.svg
:target: https://gitlab.idiap.ch/beat/beat.core/commits/master
.. image:: https://img.shields.io/badge/gitlab-project-0000c0.svg
:target: https://gitlab.idiap.ch/beat/beat.core
.. image:: https://img.shields.io/pypi/v/beat.core.svg
:target: https://pypi.python.org/pypi/beat.core
==========================
......
......@@ -33,8 +33,8 @@
# #
###################################################################################
# see https://docs.python.org/3/library/pkgutil.html
from pkgutil import extend_path
__path__ = extend_path(__path__, __name__)
......@@ -45,22 +45,21 @@ Forward importing from :py:mod:`beat.backend.python.algorithm`
:py:class:`beat.backend.python.algorithm.Storage`
:py:class:`beat.backend.python.algorithm.Runner`
"""
import os
import six
import numpy
import pkg_resources
import simplejson as json
import six
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Storage
from . import dataformat
from . import library
from . import schema
from . import prototypes
from beat.backend.python.algorithm import Storage
from beat.backend.python.algorithm import Runner # noqa
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
from . import schema
def load_algorithm_prototype(prefix):
......@@ -182,6 +181,8 @@ class Algorithm(BackendAlgorithm):
"""
dataformat_klass = dataformat.DataFormat
def __init__(self, prefix, data, dataformat_cache=None, library_cache=None):
super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache)
......@@ -303,89 +304,52 @@ class Algorithm(BackendAlgorithm):
"declaration: %s" % (self.name, ", ".join(all_output_names))
)
def _validate_format(self, type_name, group_name, entry_name, dataformat):
if dataformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for %s `%s' on algorithm `%s': %s"
% (
type_name,
group_name,
entry_name,
self.name,
"\n".join(dataformat.errors),
)
)
def _validate_dataformats(self, group, group_name, dataformat_cache):
for name, entry in group[group_name].items():
type_name = entry["type"]
thisformat = self._update_dataformat_cache(type_name, dataformat_cache)
self._validate_format(type_name, group_name, name, thisformat)
def _validate_required_dataformats(self, dataformat_cache):
"""Makes sure we can load all requested formats
"""
for group in self.groups:
for name, input in group["inputs"].items():
if input["type"] in self.dataformats:
continue
if dataformat_cache and input["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[input["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, input["type"])
if dataformat_cache is not None: # update it
dataformat_cache[input["type"]] = thisformat
self.dataformats[input["type"]] = thisformat
for name, input_ in group["inputs"].items():
self._validate_dataformats(group, "inputs", dataformat_cache)
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for input `%s' on algorithm `%s': %s"
% (input["type"], name, self.name, "\n".join(thisformat.errors))
)
if "outputs" not in group:
continue
for name, output in group["outputs"].items():
if output["type"] in self.dataformats:
continue
if dataformat_cache and output["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[output["type"]]
else: # load it
thisformat = dataformat.DataFormat(self.prefix, output["type"])
if dataformat_cache is not None: # update it
dataformat_cache[output["type"]] = thisformat
self.dataformats[output["type"]] = thisformat
if "outputs" in group:
self._validate_dataformats(group, "outputs", dataformat_cache)
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for output `%s' on algorithm `%s': %s"
% (
output["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
if "loop" in group:
self._validate_dataformats(group, "loop", dataformat_cache)
if self.results:
for name, result in self.results.items():
if result["type"].find("/") != -1:
if result["type"] in self.dataformats:
continue
if dataformat_cache and result["type"] in dataformat_cache: # reuse
thisformat = dataformat_cache[result["type"]]
else:
thisformat = dataformat.DataFormat(self.prefix, result["type"])
if dataformat_cache is not None: # update it
dataformat_cache[result["type"]] = thisformat
self.dataformats[result["type"]] = thisformat
if thisformat.errors:
self.errors.append(
"found error validating data format `%s' "
"for result `%s' on algorithm `%s': %s"
% (
result["type"],
name,
self.name,
"\n".join(thisformat.errors),
)
)
result_type = result["type"]
# results can only contain base types and plots therefore, only
# process plots
if result_type.find("/") != -1:
thisformat = self._update_dataformat_cache(
result_type, dataformat_cache
)
self._validate_format(result_type, "result", name, thisformat)
def _convert_parameter_types(self):
"""Converts types to numpy equivalents, checks defaults, ranges and
......
......@@ -49,15 +49,14 @@ Options:
-V, --version Show version
-v, --verbose Increases the output verbosity level
"""
import os
import sys
from docopt import docopt
from ..bcpapi.broker import BeatComputationBroker
from ..version import __version__
from ..utils import setup_logging
from ..version import __version__
def run(port=5555, verbose=1, callbacks=None):
......
......@@ -41,46 +41,45 @@ Based on the Majordomo Protocol worker example of the ZMQ Guide.
Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] [--docker-network=<name>]
[--port-range=<range>] <broker_address>
[--port-range=<range>] [--cache-mount-point=<cache_mount_point>]
<broker_address>
%(prog)s (--help | -h)
%(prog)s (--version | -V)
Options:
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers
--cache-mount-point=<cache_mount_point> NFS mount point to use for cache setup
"""
import os
import sys
import signal
import sys
import tempfile
import simplejson as json
from socket import gethostname
import simplejson as json
import zmq
from socket import gethostname
from docopt import docopt
from ..bcpapi import BCP
from ..bcpapi.worker import BeatComputationWorker
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.execution import ExecutionProcess
from ..bcpapi.processor import BeatComputationProcessor
from ..bcpapi.worker import BeatComputationWorker
from ..dock import Host
from ..utils import find_free_port
from ..utils import setup_logging
from ..version import __version__
logger = None
......@@ -107,6 +106,7 @@ def run(
docker_network_name=None,
docker_port_range=None,
docker_images_cache=None,
docker_cache_mount_point=None,
):
"""Start the worker
......@@ -202,6 +202,8 @@ def run(
data["network_name"] = docker_network_name
if docker_port_range:
data["port_range"] = docker_port_range
if docker_cache_mount_point:
data["cache_mount_point"] = docker_cache_mount_point
# Start the execution
logger.info("Running '%s' with job id #%s", data["algorithm"], job_id)
......@@ -248,6 +250,7 @@ def run(
logger.info("The scheduler shut down, we will wait for it")
worker.destroy()
worker.send_to_broker(BCP.BCPW_DISCONNECT)
worker.destroy()
# Cleanup
for execution_process in execution_processes:
......@@ -301,6 +304,8 @@ def main(argv=None):
docker_images_cache = None
docker_network_name = None
docker_port_range = None
docker_cache_mount_point = None
if args["--docker"]:
docker_images_cache = os.path.join(
tempfile.gettempdir(), "beat-docker-images.json"
......@@ -319,6 +324,14 @@ def main(argv=None):
return 1
logger.info("Using port range %s", docker_port_range)
docker_cache_mount_point = args.get("--cache-mount-point", None)
if docker_cache_mount_point:
if not docker_cache_mount_point.startswith("nfs://"):
raise RuntimeError(
"Invalid nfs mount point: {}".format(docker_cache_mount_point)
)
logger.info("Using volume cache mount point %s", docker_cache_mount_point)
return run(
broker_address,
service_name=args.get("--name"),
......@@ -329,6 +342,7 @@ def main(argv=None):
docker_network_name=docker_network_name,
docker_port_range=docker_port_range,
docker_images_cache=docker_images_cache,
docker_cache_mount_point=docker_cache_mount_point,
)
......
......@@ -37,20 +37,16 @@
Inspired by the Majordomo Protocol Broker
"""
import logging
import time
import signal
import time
from binascii import hexlify
import zmq
# local
from . import BCP
from .zhelpers import dump
logger = logging.getLogger(__name__)
......@@ -95,6 +91,7 @@ class BeatComputationBroker(object):
def __init__(self, verbose=False):
"""Initialize broker state."""
self.verbose = verbose
self.continue_ = True
self.services = {}
......@@ -163,14 +160,17 @@ class BeatComputationBroker(object):
def process_client(self, sender, msg):
"""Process a request coming from a client."""
# Service name + body
assert len(msg) >= 2 # nosec
service = msg.pop(0)
# Set reply return address to client sender
msg = [sender, b""] + msg
self.dispatch(self.require_service(service), msg)
def process_worker(self, sender, msg):
"""Process message sent to us by a worker."""
# At least, command
assert len(msg) >= 1 # nosec
......@@ -222,12 +222,14 @@ class BeatComputationBroker(object):
def delete_worker(self, worker, disconnect):
"""Deletes worker from all data structures, and deletes worker."""
assert worker is not None # nosec
if disconnect:
self.send_to_worker(worker, BCP.BCPW_DISCONNECT, None, None)
if worker.service is not None:
worker.service.waiting.remove(worker)
if worker in worker.service.waiting:
worker.service.waiting.remove(worker)
on_disconnection = self.callbacks.get("on_disconnection", None)
if on_disconnection:
......@@ -236,8 +238,12 @@ class BeatComputationBroker(object):
if worker.identity in self.workers:
self.workers.pop(worker.identity)
if worker in self.waiting:
self.waiting.pop(self.waiting.index(worker))
def require_worker(self, address):
"""Finds the worker (creates if necessary)."""
assert address is not None # nosec
identity = hexlify(address)
worker = self.workers.get(identity)
......@@ -251,6 +257,7 @@ class BeatComputationBroker(object):
def require_service(self, name):
"""Locates the service (creates if necessary)."""
assert name is not None # nosec
service = self.services.get(name)
if service is None:
......@@ -264,11 +271,13 @@ class BeatComputationBroker(object):
We use a single socket for both clients and workers.
"""
self.socket.bind(endpoint)
logger.info("I: BCP broker/0.0.1 is active at %s", endpoint)
def send_heartbeats(self):
"""Send heartbeats to idle workers if it's time"""
if time.time() > self.heartbeat_at:
for worker in self.waiting:
self.send_to_worker(worker, BCP.BCPW_HEARTBEAT, None, None)
......@@ -276,16 +285,16 @@ class BeatComputationBroker(object):
self.heartbeat_at = time.time() + 1e-3 * self.HEARTBEAT_INTERVAL
def purge_workers(self):
"""Look for & kill expired workers.
"""
"""Look for & kill expired workers."""
for item in self.waiting:
if item.expiry < time.time():
logger.info("I: deleting expired worker: %s", item.identity)
self.delete_worker(item, False)
self.waiting.pop(self.waiting.index(item))
def worker_waiting(self, worker):
"""This worker is now waiting for work."""
# Queue to broker and service waiting lists
if worker not in self.waiting:
......@@ -298,10 +307,14 @@ class BeatComputationBroker(object):
def dispatch(self, service, msg):
"""Dispatch requests to waiting workers as possible"""
assert service is not None # nosec
if msg is not None: # Queue message if any
service.requests.append(msg)
self.purge_workers()
while service.waiting and service.requests:
msg = service.requests.pop(0)
worker = service.waiting.pop(0)
......
......@@ -85,7 +85,7 @@ class BeatComputationClient(object):
request = [b"", BCP.BCPC_CLIENT, service] + request
if self.verbose:
logger.warn("I: send request to '%s' service: ", service)
logger.warning("I: send request to '%s' service: ", service)
dump(request)
self.client.send_multipart(request)
......
......@@ -40,17 +40,16 @@ management
Execution utilities
"""
import logging
import multiprocessing
import signal
import zmq
import simplejson as json
import zmq
from ..dock import Host
from ..execution.local import LocalExecutor
from ..execution.docker import DockerExecutor
from ..execution.local import LocalExecutor
from . import BCP
......
......@@ -34,8 +34,8 @@
"""BEAT Computation worker"""
import logging
import zmq
from .zhelpers import dump
......
......@@ -34,14 +34,12 @@
"""BEAT Computation worker"""
import logging
import time
import zmq
# BEAT Computation protocol constants:
from . import BCP
from .zhelpers import dump
logger = logging.getLogger(__name__)
......@@ -58,8 +56,8 @@ class BeatComputationWorker(object):
service = service.encode("utf-8")
self.heartbeat_at = (
0
) # When to send HEARTBEAT (relative to time.time(), so in seconds)
0 # When to send HEARTBEAT (relative to time.time(), so in seconds)
)
self.liveness = 0 # How many attempts left
self.heartbeat = 2500 # Heartbeat delay, msecs
self.reconnect = 2500 # Reconnect delay, msecs
......@@ -210,7 +208,7 @@ class BeatComputationWorker(object):
self.liveness -= 1
if self.liveness == 0:
if self.verbose:
logger.warn("W: disconnected from broker - retrying…")
logger.warning("W: disconnected from broker - retrying…")
try:
time.sleep(1e-3 * self.reconnect)
except KeyboardInterrupt:
......@@ -222,7 +220,7 @@ class BeatComputationWorker(object):
self.send_to_broker(BCP.BCPW_HEARTBEAT)
self.heartbeat_at = time.time() + 1e-3 * self.heartbeat
logger.warn("W: interrupt received, killing worker…")
logger.warning("W: interrupt received, killing worker…")
return None
def destroy(self):
......
......@@ -38,9 +38,9 @@ Helper module for common zmq task
Based on Majordomo protocol zhelpers.
"""
import binascii
import logging
import zmq
logger = logging.getLogger(__name__)
......
......@@ -53,16 +53,15 @@ Forward importing from :py:mod:`beat.backend.python.data`:
:py:func:`beat.backend.python.data.load_data_index_db`
:py:func:`beat.backend.python.data.foundSplitRanges`
"""
from beat.backend.python.data import mixDataIndices # noqa
from beat.backend.python.data import getAllFilenames # noqa
from beat.backend.python.data import DataSource # noqa
from beat.backend.python.data import CachedDataSink # noqa
from beat.backend.python.data import CachedDataSource # noqa
from beat.backend.python.data import DatabaseOutputDataSource # noqa
from beat.backend.python.data import RemoteDataSource # noqa
from beat.backend.python.data import DataSink # noqa
from beat.backend.python.data import CachedDataSink # noqa
from beat.backend.python.data import DataSource # noqa
from beat.backend.python.data import RemoteDataSource # noqa
from beat.backend.python.data import StdoutDataSink # noqa
from beat.backend.python.data import foundSplitRanges # noqa
from beat.backend.python.data import getAllFilenames # noqa
from beat.backend.python.data import load_data_index # noqa
from beat.backend.python.data import load_data_index_db # noqa
from beat.backend.python.data import foundSplitRanges # noqa
from beat.backend.python.data import mixDataIndices # noqa
......@@ -44,8 +44,6 @@ Forward importing from :py:mod:`beat.backend.python.data_loaders`
:py:class:`beat.backend.python.data_loaders.DataLoader`
:py:class:`beat.backend.python.data_loaders.DataView`
"""
from beat.backend.python.data_loaders import DataLoaderList # noqa
from beat.backend.python.data_loaders import DataLoader # noqa
from beat.backend.python.data_loaders import DataLoaderList # noqa
from beat.backend.python.data_loaders import DataView # noqa
......@@ -44,34 +44,43 @@ Validation of databases
Forward importing from :py:mod:`beat.backend.python.database`:
:py:class:`beat.backend.python.database.Storage`
"""
import os
import six
from . import schema
from .dataformat import DataFormat
from . import prototypes
from beat.backend.python.database import Storage
from beat.backend.python.database import Database as BackendDatabase
from beat.backend.python.database import Storage
from beat.backend.python.protocoltemplate import Storage as PTStorage
from . import prototypes
from . import schema
from .dataformat import DataFormat
from .protocoltemplate import ProtocolTemplate
def get_first_procotol_template(prefix):
pt_root_folder = os.path.join(prefix, PTStorage.asset_folder)
pts_available = os.listdir(pt_root_folder)
pts_available = sorted(os.listdir(pt_root_folder))
if not pts_available:
raise RuntimeError("Invalid prefix content, no protocol template available")
procotol_template_folder = pts_available[0]
protocol_template_versions = sorted(
os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
)
version = protocol_template_versions[-1].split(".")[0]
return "{}/{}".format(procotol_template_folder, version)
selected_protocol_template = None
for procotol_template_folder in pts_available:
protocol_template_versions = sorted(
os.listdir(os.path.join(pt_root_folder, procotol_template_folder))
)
version = protocol_template_versions[-1].split(".")[0]
protocol_template_name = "{}/{}".format(procotol_template_folder, version)
protocol_template = ProtocolTemplate(prefix, protocol_template_name)
if protocol_template.valid:
selected_protocol_template = protocol_template_name
break
if selected_protocol_template is None:
raise RuntimeError("No valid protocol template found")
return selected_protocol_template
class Database(BackendDatabase):
......
......@@ -44,17 +44,17 @@ Validation and parsing for dataformats
Forward importing from :py:mod:`beat.backend.python.dataformat`:
:py:class:`beat.backend.python.dataformat.Storage`
"""
import copy
import six
from . import schema
from beat.backend.python.dataformat import DataFormat as BackendDataFormat
from beat.backend.python.dataformat import Storage # noqa
from . import prototypes
from . import schema
from . import utils
from beat.backend.python.dataformat import Storage # noqa
from beat.backend.python.dataformat import DataFormat as BackendDataFormat
class DataFormat(BackendDataFormat):
"""Data formats define the chunks of data that circulate between blocks.
......
This diff is collapsed.
......@@ -41,10 +41,8 @@ environment
Helper functions related to environment management
"""
import re
import logging
import re
logger = logging.getLogger(__name__)
......
......@@ -41,21 +41,19 @@ base
Execution utilities
"""
import os
import glob
import collections
import glob
import logging
import simplejson as json
import os
from .. import schema
from .. import database
from .. import algorithm
from .. import stats
import simplejson as json
from beat.backend.python.helpers import convert_experiment_configuration_to_container
from .. import algorithm
from .. import database
from .. import schema
from .. import stats
logger = logging.getLogger(__name__)
......@@ -230,6 +228,17 @@ class BaseExecutor(object):
"The input '%s' doesn't exist in the loop algorithm" % name
)
if len(loop["outputs"]) != len(self.loop_algorithm.output_map):
self.errors.append(
"The number of outputs of the loop algorithm doesn't correspond"
)
for name in self.data["outputs"].keys():
if name not in self.algorithm.output_map.keys():
self.errors.append(
"The output '%s' doesn't exist in the loop algorithm" % name
)
# Check that the mapping in coherent
if len(self.data["inputs"]) != len(self.algorithm.input_map):
self.errors.append(
......
This diff is collapsed.
......@@ -41,20 +41,19 @@ local
Execution utilities
"""
import logging
import os
import shutil
import sys
import tempfile
import shutil
import zmq
import time
import logging
import zmq
from beat.backend.python.execution import AlgorithmExecutor
from beat.backend.python.execution import MessageHandler
from beat.backend.python.execution import LoopExecutor
from beat.backend.python.execution import LoopMessageHandler
from beat.backend.python.execution import MessageHandler
from .base import BaseExecutor
......@@ -188,14 +187,24 @@ class LocalExecutor(BaseExecutor):
self.zmq_context = None
def __cleanup(self):
def __cleanup(self, early=False):
if self.loop_executor:
if early:
self.loop_socket.send_string("don")
self.loop_socket.recv() # ack
self.loop_executor.wait()
self.loop_executor.close()
for handler in [self.message_handler, self.loop_message_handler]:
if handler:
handler.kill()
handler.join()
try:
handler.join()
except RuntimeError:
# The handler was not started
pass
handler.destroy()
for socket in [self.executor_socket, self.loop_socket]:
......@@ -310,15 +319,35 @@ class LocalExecutor(BaseExecutor):
cache_root=self.cache,
)
retval = self.loop_executor.setup()
try:
retval = self.loop_executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
retval = False
else:
message = None
if not retval:
self.__cleanup()
raise RuntimeError("Loop algorithm setup failed")
error = "Loop algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.loop_executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = False
else:
message = None
prepared = self.loop_executor.prepare()
if not prepared:
self.__cleanup()
raise RuntimeError("Loop algorithm prepare failed")
error = "Loop algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
self.loop_executor.process()
......@@ -330,28 +359,50 @@ class LocalExecutor(BaseExecutor):
loop_socket=self.loop_socket,
)
retval = self.executor.setup()
if not retval:
self.__cleanup()
raise RuntimeError("Algorithm setup failed")
try:
status = self.executor.setup()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
status = 0
else:
message = None
if not status:
self.__cleanup(early=True)
error = "Algorithm {} setup failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
try:
prepared = self.executor.prepare()
except Exception as e:
message = _process_exception(e, self.prefix, "algorithms")
prepared = 0
else:
message = None
prepared = self.executor.prepare()
if not prepared:
self.__cleanup()
raise RuntimeError("Algorithm prepare failed")
self.__cleanup(early=True)
error = "Algorithm {} prepare failed".format(self.algorithm.name)
if message:
error += ": {}".format(message)
raise RuntimeError(error)
_start = time.time()
try:
processed = self.executor.process()
except Exception as e:
message = _process_exception(e, self.prefix, "databases")
message = _process_exception(e, self.prefix, "algorithms")
self.__cleanup()
return _create_result(1, message)
if not processed:
self.__cleanup()
raise RuntimeError("Algorithm process failed")
raise RuntimeError(
"Algorithm {} process failed".format(self.algorithm.name)
)
proc_time = time.time() - _start
......
......@@ -41,17 +41,10 @@ remote
Execution utilities
"""
import logging
from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
logger = logging.getLogger(__name__)
from .base import BaseExecutor
class RemoteExecutor(BaseExecutor):
......
......@@ -40,22 +40,19 @@ subprocess
Execution utilities
"""
import logging
import os
import shutil
import logging
import sys
import subprocess as sp # nosec
import sys
import tempfile
from beat.backend.python.execution import MessageHandler
from .. import stats
from .. import utils
from .remote import RemoteExecutor
logger = logging.getLogger(__name__)
......@@ -123,6 +120,12 @@ class SubprocessExecutor(RemoteExecutor):
guarantee that the cache is refreshed as appropriate in case the
underlying libraries change.
custom_root_folders (dict): A dictionary mapping databases name and
their location on disk
ip_address (str): IP address of the machine to connect to for the database
execution and message handlers.
python_path (str): Path to the python executable of the environment to use
for experiment execution.
Attributes:
......@@ -172,8 +175,8 @@ class SubprocessExecutor(RemoteExecutor):
library_cache=None,
custom_root_folders=None,
ip_address="127.0.0.1",
python_path=None,
):
super(SubprocessExecutor, self).__init__(
prefix,
data,
......@@ -186,14 +189,30 @@ class SubprocessExecutor(RemoteExecutor):
custom_root_folders=custom_root_folders,
)
# We need three apps to run this function: databases_provider and execute
self.EXECUTE_BIN = _which(os.path.join(os.path.dirname(sys.argv[0]), "execute"))
self.LOOP_EXECUTE_BIN = _which(
os.path.join(os.path.dirname(sys.argv[0]), "loop_execute")
)
self.DBPROVIDER_BIN = _which(
os.path.join(os.path.dirname(sys.argv[0]), "databases_provider")
)
if python_path is None:
base_path = os.path.dirname(sys.argv[0])
# We need three apps to run this function: databases_provider and execute
self.EXECUTE_BIN = _which(os.path.join(base_path, "execute"))
self.LOOP_EXECUTE_BIN = _which(os.path.join(base_path, "loop_execute"))
self.DBPROVIDER_BIN = _which(os.path.join(base_path, "databases_provider"))
else:
base_path = os.path.dirname(python_path)
self.EXECUTE_BIN = os.path.join(base_path, "execute")
self.LOOP_EXECUTE_BIN = os.path.join(base_path, "loop_execute")
self.DBPROVIDER_BIN = os.path.join(base_path, "databases_provider")
if any(
[
not os.path.exists(executable)
for executable in [
self.EXECUTE_BIN,
self.LOOP_EXECUTE_BIN,
self.DBPROVIDER_BIN,
]
]
):
raise RuntimeError("Invalid environment")
def __create_db_process(self, configuration_name=None):
databases_process = None
......@@ -384,7 +403,9 @@ class SubprocessExecutor(RemoteExecutor):
)
if self.loop_algorithm is not None:
cmd.append("tcp://" + self.ip_address + (":%d" % loop_algorithm_port))
cmd.append(
"--loop=tcp://" + self.ip_address + (":%d" % loop_algorithm_port)
)
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, "--debug")
......
This diff is collapsed.
......@@ -43,16 +43,14 @@ Various functions for hashing platform contributions and others
Also forward importing from :py:mod:`beat.backend.python.hash`
"""
import collections
import simplejson as json
from beat.backend.python.hash import * # noqa
from beat.backend.python.hash import _compact
from beat.backend.python.hash import _sha256 # noqa
from beat.backend.python.hash import _stringify
from beat.backend.python.hash import _compact
# ----------------------------------------------------------
......
......@@ -45,8 +45,6 @@ Forward imported from :py:mod:`beat.backend.python.inputs`:
:py:class:`beat.backend.python.inputs.Input`
:py:class:`beat.backend.python.inputs.InputGroup`
"""
from beat.backend.python.inputs import InputList # noqa
from beat.backend.python.inputs import Input # noqa
from beat.backend.python.inputs import InputGroup # noqa
from beat.backend.python.inputs import InputList # noqa
......@@ -44,14 +44,13 @@ Validation for libraries
Forward imported from :py:mod:`beat.backend.python.library`:
:py:class:`beat.backend.python.library.Storage`
"""
import six
from . import schema
from . import prototypes
from beat.backend.python.library import Storage
from beat.backend.python.library import Library as BackendLibrary
from beat.backend.python.library import Storage
from . import prototypes
from . import schema
class Library(BackendLibrary):
......
......@@ -44,8 +44,6 @@ Forward imported from :py:mod:`beat.backend.python.outputs`:
:py:class:`beat.backend.python.outputs.Output`
:py:class:`beat.backend.python.outputs.OutputList`
"""
from beat.backend.python.outputs import SynchronizationListener # noqa
from beat.backend.python.outputs import Output # noqa
from beat.backend.python.outputs import OutputList # noqa
from beat.backend.python.outputs import SynchronizationListener # noqa
......@@ -41,18 +41,17 @@ plotter
Validation for plotters
"""
import os
import six
import sys
from . import dataformat
import six
from . import algorithm
from . import schema
from . import dataformat
from . import loader
from . import prototypes
from . import schema
from . import utils
from . import loader
# ----------------------------------------------------------
......
......@@ -41,13 +41,12 @@ plotterparameter
Validation for plotterparameters
"""
import simplejson as json
from . import schema
from . import plotter
from . import prototypes
from . import schema
from . import utils
from . import plotter
class Storage(utils.Storage):
......@@ -67,7 +66,9 @@ class Storage(utils.Storage):
def __init__(self, prefix, name):
if name.count("/") != 2:
raise RuntimeError(f"invalid plotterparameter name: {name}")
raise RuntimeError(
"invalid plotterparameter name: {name}".format(name=name)
)
self.username, self.name, self.version = name.split("/")
self.fullname = name
......@@ -170,7 +171,9 @@ class Plotterparameter(object):
self.storage = Storage(self.prefix, self._name)
if not self.storage.json.exists():
self.errors.append(
f"Plotterparameter declaration file not found: {data}"
"Plotterparameter declaration file not found: {data}".format(
data=data
)
)
return
data = self.storage.json.path # loads data from JSON declaration
......@@ -219,12 +222,16 @@ class Plotterparameter(object):
self.plotter.clean_parameter(key, val)
except KeyError:
self.errors.append(
f"'{key}' isn't a parameter for plotter {self.plotter.name}"
"'{key}' isn't a parameter for plotter {name}".format(
key=key, name=self.plotter.name
)
)
return