Commit df6542a3 authored by Philip ABBET's avatar Philip ABBET
Browse files

Merge branch 'datasets' into 'master'

Add support to serve databases from a docker container

See merge request !14
parents 1d47bffb 2612a203
Pipeline #8146 passed with stage
in 4 minutes and 34 seconds
......@@ -2,7 +2,7 @@ stages:
- build
variables:
PREFIX: /opt/beat.env.web-${CI_BUILD_REF_NAME}/usr
PREFIX: /opt/beat.env.web/usr
build:
stage: build
......
......@@ -63,6 +63,7 @@ In particular, this package controls memory and CPU utilisation of the
containers it launches. You must make sure to enable those functionalities on
your installation.
Docker Setup
============
......@@ -75,14 +76,16 @@ execute algorithms or experiments.
We use specific docker images to run user algorithms. Download the following
base images before you try to run tests or experiments on your computer::
$ docker pull beats/py27:system
$ docker pull debian:8.4
$ docker pull docker.idiap.ch/beat/beat.env.system.python:system
$ docker pull docker.idiap.ch/beat/beat.env.db.examples:1.0.0
Optionally, also download the following images to be able to re-run experiments
downloaded from the BEAT platform (not required for unit testing)::
$ docker pull beats/py27:0.0.4
$ docker pull beats/py27:0.1.0
$ docker pull docker.idiap.ch/beat/beat.env.python:0.0.4
$ docker pull docker.idiap.ch/beat/beat.env.python:0.1.0
$ docker pull docker.idiap.ch/beat/beat.env.cxx:1.0.1
$ docker pull docker.idiap.ch/beat/beat.env.db:1.0.0
Documentation
......@@ -139,7 +142,6 @@ sphinx::
Development
-----------
Indentation
===========
......@@ -148,8 +150,8 @@ example, to enforce compliance on a single file and edit it in place, do::
$ ./bin/autopep8 --indent-size=2 --in-place beat/core/utils.py
We normally use 2-space identattion. If ever, you can easily change the
identation to 4 spaces like this::
We normally use 2-space indentation. If ever, you can easily change the
indentation to 4 spaces like this::
$ ./bin/autopep8 --indent-size=4 --in-place beat/core/utils.py
......
......@@ -28,6 +28,7 @@
import os
import shutil
import simplejson
import logging
logger = logging.getLogger(__name__)
......@@ -44,21 +45,14 @@ from . import utils
from . import dock
from . import baseformat
from beat.backend.python.message_handler import MessageHandler
class Server(gevent.Greenlet):
class Server(MessageHandler):
'''A 0MQ server for our communication with the user process'''
def __init__(self, input_list, output_list, host_address):
super(Server, self).__init__()
# An event unblocking a graceful stop
self.stop = gevent.event.Event()
self.stop.clear()
self.must_kill = gevent.event.Event()
self.must_kill.clear()
# Starts our 0MQ server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
......@@ -67,104 +61,23 @@ class Server(gevent.Greenlet):
port = self.socket.bind_to_random_port(self.address)
self.address += ':%d' % port
logger.debug("zmq server bound to `%s'", self.address)
self.poller = zmq.Poller()
self.poller.register(self.socket, zmq.POLLIN)
self.input_list = input_list
self.output_list = output_list
super(Server, self).__init__(input_list, self.context, self.socket)
self.system_error = ''
self.user_error = ''
self.last_statistics = {}
self.output_list = output_list
# implementations
self.callbacks = dict(
nxt = self.next,
hmd = self.has_more_data,
idd = self.is_dataunit_done,
wrt = self.write,
idm = self.is_data_missing,
oic = self.output_is_connected,
don = self.done,
err = self.error,
)
def set_process(self, process):
self.process = process
self.process.statistics() # initialize internal statistics
self.callbacks.update(dict(
wrt = self.write,
idm = self.is_data_missing,
oic = self.output_is_connected,
))
def __str__(self):
return 'Server(%s)' % self.address
def _run(self):
logger.debug("0MQ server thread started")
while not self.stop.is_set(): #keep on
if self.must_kill.is_set():
self.process.kill()
self.must_kill.clear()
timeout = 1000 #ms
socks = dict(self.poller.poll(timeout)) #yields to the next greenlet
if self.socket in socks and socks[self.socket] == zmq.POLLIN:
# incomming
more = True
parts = []
while more:
parts.append(self.socket.recv())
more = self.socket.getsockopt(zmq.RCVMORE)
command = parts[0]
logger.debug("recv: %s", command)
if command in self.callbacks:
try: #to handle command
self.callbacks[command](*parts[1:])
except:
import traceback
parser = lambda s: s if len(s)<20 else s[:20] + '...'
parsed_parts = ' '.join([parser(k) for k in parts])
message = "A problem occurred while performing command `%s' " \
"killing user process. Exception:\n %s" % \
(parsed_parts, traceback.format_exc())
logger.error(message, exc_info=True)
self.system_error = message
self.process.kill()
self.stop.set()
break
else:
message = "Command `%s' is not implemented - stopping user process" \
% command
logger.error(message)
self.system_error = message
self.process.kill()
self.stop.set()
break
self.socket.setsockopt(zmq.LINGER, 0)
self.socket.close()
self.context.term()
logger.debug("0MQ server thread stopped")
def _get_input_candidate(self, channel, name):
channel_group = self.input_list.group(channel)
retval = channel_group[name]
if retval is None:
raise RuntimeError("Could not find input `%s' at channel `%s'" % \
(name, channel))
return retval
def _get_output_candidate(self, name):
retval = self.output_list[name]
......@@ -172,90 +85,6 @@ class Server(gevent.Greenlet):
return retval
def next(self, channel, name=None):
"""Syntax: nxt channel [name] ..."""
if name is not None: #single input
logger.debug('recv: nxt %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
input_candidate.next()
if input_candidate.data is None: #error
message = "User algorithm asked for more data for channel " \
"`%s' on input `%s', but it is over (no more data). This " \
"normally indicates a programming error on the user " \
"side." % (channel, name)
self.user_error += message + '\n'
raise RuntimeError(message)
if isinstance(input_candidate.data, baseformat.baseformat):
packed = input_candidate.data.pack()
else:
packed = input_candidate.data
logger.debug('send: <bin> (size=%d)', len(packed))
self.socket.send(packed)
else: #whole group data
logger.debug('recv: nxt %s', channel)
channel_group = self.input_list.group(channel)
# Call next() on the group
channel_group.restricted_access = False
channel_group.next()
channel_group.restricted_access = True
# Loop over the inputs
inputs_to_go = len(channel_group)
self.socket.send(str(inputs_to_go), zmq.SNDMORE)
for inp in channel_group:
logger.debug('send: %s', inp.name)
self.socket.send(str(inp.name), zmq.SNDMORE)
if inp.data is None:
message = "User algorithm process asked for more data on channel " \
"`%s' (all inputs), but input `%s' has nothing. This " \
"normally indicates a programming error on the user " \
"side." % (channel, inp.name)
self.user_error += message + '\n'
raise RuntimeError(message)
elif isinstance(inp.data, baseformat.baseformat):
packed = inp.data.pack()
else:
packed = inp.data
logger.debug('send: <bin> (size=%d)', len(packed))
inputs_to_go -= 1
if inputs_to_go > 0:
self.socket.send(packed, zmq.SNDMORE)
else:
self.socket.send(packed)
def has_more_data(self, channel, name=None):
"""Syntax: hmd channel [name]"""
if name: #single input
logger.debug('recv: hmd %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.hasMoreData() else 'fal'
else: #for all channel names
logger.debug('recv: hmd %s', channel)
channel_group = self.input_list.group(channel)
what = 'tru' if channel_group.hasMoreData() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def is_dataunit_done(self, channel, name):
"""Syntax: idd channel name"""
logger.debug('recv: idd %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.isDataUnitDone() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def write(self, name, packed):
"""Syntax: wrt output data"""
......@@ -296,50 +125,6 @@ class Server(gevent.Greenlet):
self.socket.send(what)
def _collect_statistics(self):
logger.debug('collecting user process statistics...')
self.last_statistics = self.process.statistics()
def _acknowledge(self):
logger.debug('send: ack')
self.socket.send('ack')
logger.debug('setting stop condition for 0MQ server thread')
self.stop.set()
def done(self, wait_time):
"""Syntax: don"""
logger.debug('recv: don %s', wait_time)
self._collect_statistics()
# collect I/O stats from client
wait_time = float(wait_time)
self.last_statistics['data'] = dict(network=dict(wait_time=wait_time))
self._acknowledge()
def error(self, t, msg):
"""Syntax: err type message"""
logger.debug('recv: err %s <msg> (size=%d)', t, len(msg))
if t == 'usr': self.user_error = msg
else: self.system_error = msg
self._collect_statistics()
self.last_statistics['data'] = dict(network=dict(wait_time=0.))
self._acknowledge()
def kill(self):
self.must_kill.set()
class Agent(object):
'''Handles synchronous commands.
......@@ -370,7 +155,9 @@ class Agent(object):
self.virtual_memory_in_megabytes = virtual_memory_in_megabytes
self.max_cpu_percent = max_cpu_percent
self.tempdir = None
self.db_tempdir = None
self.process = None
self.db_process = None
self.server = None
......@@ -382,7 +169,10 @@ class Agent(object):
# Creates a temporary directory for the user process
self.tempdir = utils.temporary_directory()
logger.debug("Created temporary directory `%s'", self.tempdir)
self.db_tempdir = utils.temporary_directory()
logger.debug("Created temporary directory `%s'", self.db_tempdir)
self.process = None
self.db_process = None
return self
......@@ -392,11 +182,16 @@ class Agent(object):
shutil.rmtree(self.tempdir)
self.tempdir = None
if self.db_tempdir is not None and os.path.exists(self.db_tempdir):
shutil.rmtree(self.db_tempdir)
self.db_tempdir = None
self.process = None
self.db_process = None
logger.debug("Exiting processing context...")
def run(self, configuration, host, timeout_in_minutes=0, daemon=0):
def run(self, configuration, host, timeout_in_minutes=0, daemon=0, db_address=None):
"""Runs the algorithm code
......@@ -424,17 +219,46 @@ class Agent(object):
# Recursively copies configuration data to <tempdir>/prefix
configuration.dump_runner_configuration(self.tempdir)
if db_address is not None:
configuration.dump_databases_provider_configuration(self.db_tempdir)
# Modify the paths to the databases in the dumped configuration files
root_folder = os.path.join(self.db_tempdir, 'prefix', 'databases')
database_paths = {}
for db_name in configuration.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
# Server for our single client
self.server = Server(configuration.input_list, configuration.output_list,
host.ip)
# Figures out the image to use
# Figures out the images to use
envkey = '%(name)s (%(version)s)' % configuration.data['environment']
if envkey not in host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (envkey, host,
", ".join(host.environments.keys())))
if db_address is not None:
try:
db_envkey = host.db2docker(database_paths.keys())
except:
raise RuntimeError("No environment found for the databases `%s' " \
"- available environments are %s" % (
", ".join(database_paths.keys()),
", ".join(host.db_environments.keys())))
# Launches the process (0MQ client)
tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir))
cmd = ['execute', self.server.address, tmp_dir]
......@@ -447,14 +271,35 @@ class Agent(object):
cmd = ['sleep', str(daemon)]
logger.debug("Daemon mode: sleeping for %d seconds", daemon)
else:
if db_address is not None:
tmp_dir = os.path.join('/tmp', os.path.basename(self.db_tempdir))
db_cmd = ['databases_provider', db_address, tmp_dir]
volumes = {}
for db_name, db_path in database_paths.items():
volumes[db_path] = {
'bind': os.path.join('/databases', db_name),
'mode': 'ro',
}
# Note: we only support one databases image loaded at the same time
self.db_process = dock.Popen(
host,
db_envkey,
command=db_cmd,
tmp_archive=self.db_tempdir,
volumes=volumes
)
self.process = dock.Popen(
host,
envkey,
command=cmd,
tmp_archive=self.tempdir,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
)
host,
envkey,
command=cmd,
tmp_archive=self.tempdir,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
)
# provide a tip on how to stop the test
if daemon > 0:
......@@ -476,6 +321,11 @@ class Agent(object):
timeout_in_minutes)
self.process.kill()
status = self.process.wait()
if self.db_process is not None:
self.db_process.kill()
self.db_process.wait()
timed_out = True
except KeyboardInterrupt: #developer pushed CTRL-C
......@@ -483,6 +333,10 @@ class Agent(object):
self.process.kill()
status = self.process.wait()
if self.db_process is not None:
self.db_process.kill()
self.db_process.wait()
finally:
self.server.stop.set()
......@@ -499,6 +353,13 @@ class Agent(object):
user_error = self.server.user_error,
)
process.rm()
if self.db_process is not None:
retval['stdout'] += '\n' + self.db_process.stdout
retval['stderr'] += '\n' + self.db_process.stderr
self.db_process.rm()
self.db_process = None
self.server = None
return retval
......
......@@ -39,155 +39,15 @@ from . import dataformat
from . import library
from . import schema
from . import prototypes
from . import loader
from . import utils
class Storage(utils.CodeStorage):
"""Resolves paths for algorithms
from beat.backend.python.algorithm import Storage
from beat.backend.python.algorithm import Runner
from beat.backend.python.algorithm import Algorithm as BackendAlgorithm
Parameters:
prefix (str): Establishes the prefix of your installation.
name (str): The name of the algorithm object in the format
``<user>/<name>/<version>``.
"""
def __init__(self, prefix, name, language=None):
if name.count('/') != 2:
raise RuntimeError("invalid algorithm name: `%s'" % name)
self.username, self.name, self.version = name.split('/')
self.prefix = prefix
self.fullname = name
path = utils.hashed_or_simple(self.prefix, 'algorithms', name)
super(Storage, self).__init__(path, language)
class Runner(object):
'''A special loader class for algorithms, with specialized methods
Parameters:
module (module): The preloaded module containing the algorithm as
returned by :py:func:`beat.core.loader.load_module`.
obj_name (str): The name of the object within the module you're interested
on
exc (class): The class to use as base exception when translating the
exception from the user code. Read the documention of :py:func:`run`
for more details.
algorithm (object): The algorithm instance that is used for parameter
checking.
*args: Constructor parameters for the algorithm (normally none)
**kwargs: Constructor parameters for the algorithm (normally none)
'''
def __init__(self, module, obj_name, algorithm, exc=None, *args,
**kwargs):
try:
class_ = getattr(module, obj_name)
except Exception as e:
if exc is not None:
type, value, traceback = sys.exc_info()
six.reraise(exc, exc(value), traceback)
else:
raise #just re-raise the user exception
self.obj = loader.run(class_, '__new__', exc, *args, **kwargs)
self.name = module.__name__
self.algorithm = algorithm
self.exc = exc
# if the algorithm does not have a 'setup' method, it is ready by default
self.ready = not hasattr(self.obj, 'setup')
def _check_parameters(self, parameters):
"""Checks input parameters from the user and fill defaults"""
user_keys = set(parameters.keys())
algo_parameters = self.algorithm.parameters or {}
valid_keys = set(algo_parameters.keys())
# checks the user is not trying to set an undeclared parameter
if not user_keys.issubset(valid_keys):
err_keys = user_keys - valid_keys
message = "parameters `%s' are not declared for algorithm `%s' - " \
"valid parameters are `%s'" % (
','.join(err_keys),
self.name,
','.join(valid_keys),
)
exc = self.exc or KeyError
raise exc(message)
# checks all values set by the user are in range (if a range is set)
retval = dict() #dictionary with checked user parameters and defaults
for key, definition in algo_parameters.items():
if key in parameters:
try:
value = self.algorithm.clean_parameter(key, parameters[key])
except Exception as e:
message = "parameter `%s' cannot be safely cast to the declared " \
"type on algorithm `%s': %s" % (key, self.name, e)