Commit 7e987c53 authored by André Anjos's avatar André Anjos 💬
Browse files

Merge branch '1.7.x' into 'master'

1.7.x

See merge request !46
parents 937d5d21 df93f8cd
Pipeline #25399 passed with stages
in 43 minutes and 11 seconds
......@@ -4,12 +4,12 @@
# Definition of global variables (all stages)
variables:
CONDA_ROOT: "${CI_PROJECT_DIR}/miniconda"
DOCKER_REGISTRY: docker.idiap.ch
# Definition of our build pipeline order
stages:
- build
- docker
- deploy
- pypi
......@@ -23,6 +23,8 @@ stages:
- chmod 755 _ci/install.sh
- ./_ci/install.sh _ci master #installs ci support scripts
- ./_ci/before_build.sh
- docker info
- ./scripts/docker_pull.sh #pulls required test images
script:
- ./_ci/build.sh
after_script:
......@@ -37,8 +39,7 @@ stages:
.build_linux_template: &linux_build_job
<<: *build_job
tags:
- docker
image: continuumio/conda-concourse-ci
- docker-build
artifacts:
expire_in: 1 week
paths:
......@@ -63,23 +64,6 @@ stages:
key: "macosx-cache"
# Docker host based testing (must be run inside dind or docker-enabled host)
.docker_test_linux_template: &linux_docker_job
stage: docker
before_script:
# safe keep artifacts as before_build.sh will erase those...
- mv ${CONDA_ROOT}/conda-bld .
- ./_ci/install.sh _ci master #updates ci support scripts
- ./_ci/before_build.sh
- mv conda-bld ${CONDA_ROOT}
- ./scripts/before_test.sh
script:
- export BEAT_DOCKER_TESTS=true
- BOB_TEST_ONLY=true ./_ci/build.sh
after_script:
- ./_ci/after_build.sh
build_linux_36:
<<: *linux_build_job
variables:
......@@ -100,17 +84,6 @@ build_macosx_36:
PYTHON_VERSION: "3.6"
# Docker host based testing
docker_linux_36:
<<: *linux_docker_job
variables:
PYTHON_VERSION: "3.6"
dependencies:
- build_linux_36
tags:
- docker-build
# Deploy targets
.deploy_template: &deploy_job
stage: deploy
......
......@@ -223,6 +223,8 @@ class Algorithm(BackendAlgorithm):
for k,v in g['inputs'].items()])
self.output_map = dict([(k,v['type']) for g in self.groups \
for k,v in g.get('outputs', {}).items()])
self.loop_map = dict([(k,v['type']) for g in self.groups \
for k,v in g.get('loop', {}).items()])
self._validate_required_dataformats(dataformat_cache)
self._convert_parameter_types()
......
......@@ -182,7 +182,7 @@ class Database(BackendDatabase):
self.errors.append("found error validating data format `%s' " \
"for output `%s' on set `%s' of protocol `%s': %s" % \
(value, key, _set['name'], protocol['name'],
str(dataformat.errors))
"\n".join(dataformat.errors))
)
# all view names must be relative to the database root path
......
......@@ -482,7 +482,7 @@ class Host(object):
client = docker.APIClient()
data = client.stats(container.id, decode=True, stream=False)
data = client.stats(container.id, stream=False)
# If CPU statistics can't be retrieved
if 'system_cpu_usage' not in data['cpu_stats']:
......
......@@ -145,6 +145,7 @@ class BaseExecutor(object):
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, 'cache')
self.algorithm = None
self.loop_algorithm = None
self.databases = {}
self.input_list = None
self.data_loaders = None
......@@ -185,14 +186,34 @@ class BaseExecutor(object):
if self.data['algorithm'] in algorithm_cache:
self.algorithm = algorithm_cache[self.data['algorithm']]
else:
self.algorithm = algorithm.Algorithm(self.prefix, self.data['algorithm'],
dataformat_cache, library_cache)
self.algorithm = algorithm.Algorithm(self.prefix,
self.data['algorithm'],
dataformat_cache,
library_cache)
algorithm_cache[self.algorithm.name] = self.algorithm
if not self.algorithm.valid:
self.errors += self.algorithm.errors
return
if 'loop' in self.data:
loop = self.data['loop']
if loop['algorithm'] in algorithm_cache:
self.loop_algorithm = algorithm_cache[loop['algorithm']]
else:
self.loop_algorithm = algorithm.Algorithm(self.prefix,
loop['algorithm'],
dataformat_cache,
library_cache)
algorithm_cache[self.loop_algorithm.name] = self.loop_algorithm
if len(loop['inputs']) != len(self.loop_algorithm.input_map):
self.errors.append("The number of inputs of the loop algorithm doesn't correspond")
for name in self.data['inputs'].keys():
if name not in self.algorithm.input_map.keys():
self.errors.append("The input '%s' doesn't exist in the loop algorithm" % name)
# Check that the mapping in coherent
if len(self.data['inputs']) != len(self.algorithm.input_map):
self.errors.append("The number of inputs of the algorithm doesn't correspond")
......@@ -209,31 +230,19 @@ class BaseExecutor(object):
if name not in self.algorithm.output_map.keys():
self.errors.append("The output '%s' doesn't exist in the algorithm" % name)
if 'loop' in self.data:
for name in ['request', 'answer']:
if name not in self.algorithm.loop_map.keys():
self.errors.append("The loop '%s' doesn't exist in the algorithm" % name)
if self.errors:
return
# Load the databases (if any is required)
for name, details in self.data['inputs'].items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache:
db = database_cache[details['database']]
else:
db = database.Database(self.prefix, details['database'],
dataformat_cache)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data['root_folder'] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
self._update_db_cache(self.data['inputs'], custom_root_folders, database_cache, dataformat_cache)
if not db.valid:
self.errors += db.errors
if 'loop' in self.data:
self._update_db_cache(self.data['loop']['inputs'], custom_root_folders, database_cache, dataformat_cache)
def __enter__(self):
......@@ -247,9 +256,6 @@ class BaseExecutor(object):
logger.info("Start the execution of '%s'", self.algorithm.name)
# self._prepare_inputs()
# self._prepare_outputs()
return self
......@@ -269,6 +275,32 @@ class BaseExecutor(object):
self.data_sinks = []
def _update_db_cache(self, inputs, custom_root_folders, database_cache, dataformat_cache):
""" Update the database cache based on the input list given"""
for name, details in inputs.items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache:
db = database_cache[details['database']]
else:
db = database.Database(self.prefix, details['database'],
dataformat_cache)
name = "database/%s" % db.name
if name in custom_root_folders:
db.data['root_folder'] = custom_root_folders[name]
database_cache[db.name] = db
self.databases[db.name] = db
if not db.valid:
self.errors += db.errors
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
......@@ -380,6 +412,9 @@ class BaseExecutor(object):
self.algorithm.export(tmp_prefix)
if self.loop_algorithm:
self.loop_algorithm.export(tmp_prefix)
def dump_databases_provider_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
......
......@@ -35,21 +35,22 @@ Execution utilities
import os
import shutil
import logging
import requests
import simplejson
import zmq
import docker
import logging
logger = logging.getLogger(__name__)
from beat.backend.python.execution import MessageHandler
from .. import stats
from .. import message_handler
from .. import utils
from .remote import RemoteExecutor
logger = logging.getLogger(__name__)
class DockerExecutor(RemoteExecutor):
"""DockerExecutor runs the code given an execution block information, externally
......@@ -152,6 +153,91 @@ class DockerExecutor(RemoteExecutor):
self.host = host
def __create_db_container(self, datasets_uid, network_name):
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
# Modify the paths to the databases in the dumped configuration files
root_folder = os.path.join(databases_configuration_path, 'prefix', 'databases')
database_paths = {}
for db_name in self.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
# Determine the docker image to use for the databases
try:
databases_environment = self.host.db2docker(database_paths.keys())
except:
raise RuntimeError("No environment found for the databases `%s' " \
"- available environments are %s" % (
", ".join(database_paths.keys()),
", ".join(self.host.db_environments.keys())))
# Creation of the container
# Note: we only support one databases image loaded at the same time
database_port = utils.find_free_port()
cmd = [
'databases_provider',
'0.0.0.0:%d' % database_port,
'/beat/prefix',
'/beat/cache'
]
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_info_name = "beat_db_%s" % utils.id_generator()
databases_info = self.host.create_container(databases_environment, cmd)
databases_info.uid = datasets_uid
databases_info.network_name = network_name
databases_info.set_name(databases_info_name)
# Specify the volumes to mount inside the container
databases_info.add_volume(databases_configuration_path, '/beat/prefix')
databases_info.add_volume(self.cache, '/beat/cache')
for db_name, db_path in database_paths.items():
databases_info.add_volume(db_path, os.path.join('/databases', db_name))
# Start the container
while True:
try:
databases_info.add_port(database_port, database_port, host_address=self.host.ip)
self.host.start(databases_info)
break
except Exception as e:
if str(e).find('port is already allocated') < 0:
break
databases_info.reset_ports()
database_port = utils.find_free_port()
cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
databases_info.command = cmd
database_ip = self.host.get_ipaddress(databases_info)
retval = dict(
configuration_path=databases_configuration_path,
container=databases_info,
address=database_ip,
port=database_port
)
return retval
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
"""Executes the user algorithm code using an external program.
......@@ -224,102 +310,63 @@ class DockerExecutor(RemoteExecutor):
port = utils.find_free_port_in_range(int(min_port), int(max_port))
address += ':{}'.format(port)
self.message_handler = message_handler.MessageHandler(address,
kill_callback=_kill)
self.message_handler = MessageHandler(address, kill_callback=_kill)
#----- (If necessary) Instantiate the docker container that provide the databases
databases_container = None
databases_configuration_path = None
datasets_uid = self.data.pop('datasets_uid', None)
network_name = self.data.pop('network_name', 'bridge')
databases_infos = {}
if len(self.databases) > 0:
databases_infos['db'] = self.__create_db_container(datasets_uid, network_name)
# Configuration and needed files
databases_configuration_path = utils.temporary_directory()
self.dump_databases_provider_configuration(databases_configuration_path)
# Modify the paths to the databases in the dumped configuration files
root_folder = os.path.join(databases_configuration_path, 'prefix', 'databases')
database_paths = {}
for db_name in self.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
#----- Instantiate the algorithm container
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
# Determine the docker image to use for the databases
try:
databases_environment = self.host.db2docker(database_paths.keys())
except:
raise RuntimeError("No environment found for the databases `%s' " \
"- available environments are %s" % (
", ".join(database_paths.keys()),
", ".join(self.host.db_environments.keys())))
loop_algorithm_container = None
loop_algorithm_container_ip = None
loop_algorithm_container_port = None
# Creation of the container
# Note: we only support one databases image loaded at the same time
database_port = utils.find_free_port()
if self.loop_algorithm is not None:
if len(self.databases) > 0:
databases_infos['loop_db'] = self.__create_db_container(datasets_uid, network_name)
loop_algorithm_container_port = utils.find_free_port()
cmd = [
'databases_provider',
'0.0.0.0:%d' % database_port,
'loop_execute',
'0.0.0.0:{}'.format(loop_algorithm_container_port),
'/beat/prefix',
'/beat/cache'
]
if len(self.databases) > 0:
cmd.append('tcp://{}:{}'.format(databases_infos['loop_db']['address'],
databases_infos['loop_db']['port']))
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_container_name = "beat_db_%s" % utils.id_generator()
databases_container = self.host.create_container(databases_environment, cmd)
databases_container.uid = datasets_uid
databases_container.network_name = network_name
databases_container.set_name(databases_container_name)
# Specify the volumes to mount inside the container
databases_container.add_volume(databases_configuration_path, '/beat/prefix')
databases_container.add_volume(self.cache, '/beat/cache')
loop_algorithm_container = self.host.create_container(processing_environment, cmd)
loop_algorithm_container.uid = datasets_uid
loop_algorithm_container.network_name = network_name
for db_name, db_path in database_paths.items():
databases_container.add_volume(db_path, os.path.join('/databases', db_name))
# Volumes
loop_algorithm_container.add_volume(configuration_path, '/beat/prefix')
loop_algorithm_container.add_volume(self.cache, '/beat/cache', read_only=False)
# Start the container
while True:
try:
databases_container.add_port(database_port, database_port, host_address=self.host.ip)
self.host.start(databases_container)
break
except Exception as e:
if str(e).find('port is already allocated') < 0:
break
databases_container.reset_ports()
database_port = utils.find_free_port()
cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
databases_container.command = cmd
database_ip = self.host.get_ipaddress(databases_container)
#----- Instantiate the algorithm container
# Configuration and needed files
configuration_path = utils.temporary_directory()
self.dump_runner_configuration(configuration_path)
self.host.start(loop_algorithm_container,
virtual_memory_in_megabytes=virtual_memory_in_megabytes,
max_cpu_percent=max_cpu_percent
)
loop_algorithm_container_ip = self.host.get_ipaddress(loop_algorithm_container)
# Command to execute
cmd = [
......@@ -330,7 +377,12 @@ class DockerExecutor(RemoteExecutor):
]
if len(self.databases) > 0:
cmd.append('tcp://' + database_ip + ':%d' % database_port)
cmd.append('tcp://%s:%d' % (databases_infos['db']['address'],
databases_infos['db']['port']))
if self.loop_algorithm is not None:
cmd.append('tcp://%s:%d' % (loop_algorithm_container_ip,
loop_algorithm_container_port))
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
......@@ -373,9 +425,11 @@ class DockerExecutor(RemoteExecutor):
status = self.host.wait(algorithm_container)
finally:
if databases_container is not None:
self.host.kill(databases_container)
self.host.wait(databases_container)
for name, databases_info in databases_infos.items():
logger.debug("Stopping database container " + name)
container = databases_info['container']
self.host.kill(container)
self.host.wait(container)
self.message_handler.stop.set()
self.message_handler.join()
......@@ -409,25 +463,29 @@ class DockerExecutor(RemoteExecutor):
self.host.rm(algorithm_container)
if databases_container is not None:
db_container_log = self.host.logs(databases_container)
for name, databases_info in databases_infos.items():
container = databases_info['container']
db_container_log = self.host.logs(container)
if logger.getEffectiveLevel() <= logging.DEBUG:
logger.debug("Log of the database container: " + db_container_log)
logger.debug("Log of the" + name + "database container: " + db_container_log)
if status != 0:
retval['stderr'] += '\n' + db_container_log
else:
retval['stdout'] += '\n' + db_container_log
self.host.rm(databases_container)
self.host.rm(container)
if loop_algorithm_container:
self.host.rm(loop_algorithm_container)
self.message_handler.destroy()
self.message_handler = None
if not self.debug:
if databases_configuration_path:
shutil.rmtree(databases_configuration_path)
for _, databases_info in databases_infos.items():
shutil.rmtree(databases_info['configuration_path'])
shutil.rmtree(configuration_path)
return retval
......@@ -36,10 +36,8 @@ Execution utilities
import os
import sys
import glob
import errno
import tempfile
import subprocess
import shutil
import zmq
import time
......@@ -48,20 +46,17 @@ logger = logging.getLogger(__name__)
import simplejson
from .. import schema
from .. import database
from .. import algorithm
from .. import inputs
from .. import outputs
from .. import data
from .. import stats
from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
from beat.backend.python.helpers import AccessMode
from beat.backend.python.execution import AlgorithmExecutor
from beat.backend.python.execution import MessageHandler
from beat.backend.python.execution import LoopExecutor
from beat.backend.python.execution import LoopMessageHandler
class LocalExecutor(BaseExecutor):
"""LocalExecutor runs the code given an execution block information
......@@ -167,41 +162,38 @@ class LocalExecutor(BaseExecutor):
library_cache=library_cache,
custom_root_folders=custom_root_folders)
self.working_dir = None
self.executor = None
self.message_handler = None
self.executor_socket = None
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""