Commit 5cc7c066 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

Merge branch 'docker_improvements' into 'master'

Docker improvements

See merge request !35
parents d6084917 23a16eed
Pipeline #24099 passed with stages
in 24 minutes and 28 seconds
......@@ -55,7 +55,7 @@ class Host(object):
images_cache = {}
def __init__(self, images_cache=None, raise_on_errors=True):
def __init__(self, images_cache=None, raise_on_errors=True, discover=True):
# Initialisations
self.raise_on_errors = raise_on_errors
......@@ -71,7 +71,8 @@ class Host(object):
Host.images_cache = simplejson.load(f)
# Discover the environments
(self.processing_environments, self.db_environments) = self._discover_environments()
if discover:
(self.processing_environments, self.db_environments) = self._discover_environments()
# (If necessary) Save the known infos about the images
if self.images_cache_filename is not None:
......@@ -334,6 +335,18 @@ class Host(object):
'-tid',
]
network = container.network
if network:
cmd.append(network)
user = container.user
if user:
cmd.append(user)
name = container.name
if name:
cmd.append(name)
if container.image in Host.images_cache:
image_infos = Host.images_cache[container.image]
if ('capabilities' in image_infos) and ('gpu' in image_infos['capabilities']):
......@@ -378,17 +391,11 @@ class Host(object):
cmd.append('--cpu-quota=%d' % int(quota * period))
# Mount the volumes
for k, v in container.volumes.items():
cmd.append('--volume=%s:%s:%s' % (k, v['bind'], v['mode']))
cmd.extend(container.volumes)
# Expose the ports
for k, v in container.ports.items():
cmd.append('-p')
if isinstance(v, tuple):
cmd.append('%s:%d:%d' % (v[0], v[1], k))
else:
cmd.append('%d:%d' % (v[0], k))
cmd.extend(container.ports)
cmd.append(container.image)
cmd.extend(container.command)
......@@ -539,6 +546,25 @@ class Host(object):
return status, output
def get_ipaddress(self, container):
""" Returns the ip address of the given container"""
cmd = [
'docker',
'inspect',
'--format',
'{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}',
container.id
]
(status, stdout, stderr) = self._exec(cmd)
if status != 0:
logger.error("Failed to retrieve the ip address of the container, reason:\n\n%s", stderr)
return None
return stdout.replace('\n', '')
def _exec(self, command, timeout=None):
process_stdout = tempfile.NamedTemporaryFile()
process_stderr = tempfile.NamedTemporaryFile()
......@@ -589,13 +615,24 @@ class Container:
:param str command: Command to execute in the container.
"""
def __init__(self, image, command):
self.image = image
self.command = command
self.volumes = {}
self.ports = {}
self.network_name = None
self.uid = None
self.id = None
self._volumes = {}
self._ports = {}
self._stats = None
self._name = None
def set_name(self, name):
""" Set the name to be used by the container in place of the docker
auto generated one.
"""
self._name = name
def add_volume(self, path, mount_path, read_only=True):
......@@ -609,7 +646,7 @@ class Container:
:param boolean read_only: Whether the volume will be read only
"""
self.volumes[path] = {
self._volumes[path] = {
'bind': mount_path,
'mode': 'ro' if read_only else 'rw',
}
......@@ -631,13 +668,75 @@ class Container:
else:
value = [host_port]
self.ports[container_port] = value
self._ports[container_port] = value
def reset_ports(self):
"""Empty the port bindings"""
self.ports = {}
self._ports = {}
@property
def name(self):
name = ''
if self._name:
name = '--name=%s' % self._name
return name
@property
def volumes(self):
"""Returns the volumes of this container in a suitable form to build
a command to start the container.
"""
volumes = []
for k, v in self._volumes.items():
if k.startswith('nfs://'):
addr, src = k[6:].split(':')
volumes.append('--mount=type=volume,'
'dst={dst},'
'volume-driver=local,'
'volume-opt=type=nfs,'
'volume-opt=device=:{src},'
'volume-opt=o=addr={addr}'.format(dst=v['bind'], src=src, addr=addr))
else:
if k.startswith('file://'):
k = k[6:]
volumes.append('--volume=%s:%s:%s' % (k, v['bind'], v['mode']))
return volumes
@property
def ports(self):
"""Returns the ports of this container in a suitable form to build
a command to start the container.
"""
ports = []
for k, v in self._ports.items():
ports.append('-p')
if isinstance(v, tuple):
ports.append('%s:%d:%d' % (v[0], v[1], k))
else:
ports.append('%d:%d' % (v[0], k))
return ports
@property
def network(self):
network = ''
if self.network_name:
network = '--network=' + self.network_name
return network
@property
def user(self):
user = ''
if self.uid:
user = '--user={0}:{0}'.format(self.uid)
return user
@property
......@@ -650,15 +749,11 @@ class Container:
"""
cmd = "docker run -ti --rm=true "
for k, v in self.volumes.items():
cmd += "--volume %s:%s:%s " % (k, v['bind'], v['mode'])
for k, v in self.ports.items():
if isinstance(v, tuple):
cmd += "-p %s:%d:%d " % (v[0], v[1], k)
else:
cmd += "-p %d:%d " % (v[0], k)
cmd += "%s " % self.network
cmd += "%s " % self.user
cmd += ' '.join(self.volumes)
cmd += ' '.join(self.ports)
cmd += "%s " % self.name
cmd += "%s " % self.image
......
......@@ -37,6 +37,7 @@ import os
import requests
import simplejson
import zmq
import docker
import logging
logger = logging.getLogger(__name__)
......@@ -215,13 +216,22 @@ class DockerExecutor(RemoteExecutor):
def _kill():
self.host.kill(algorithm_container)
self.message_handler = message_handler.MessageHandler(self.host.ip,
address = self.host.ip
port_range = self.data.pop('port_range', None)
if port_range:
min_port, max_port = port_range.split(':')
port = utils.find_free_port_in_range(int(min_port), int(max_port))
address += ':{}'.format(port)
self.message_handler = message_handler.MessageHandler(address,
kill_callback=_kill)
#----- (If necessary) Instantiate the docker container that provide the databases
databases_container = None
datasets_uid = self.data.pop('datasets_uid', None)
network_name = self.data.pop('network_name', 'bridge')
if len(self.databases) > 0:
......@@ -234,18 +244,17 @@ class DockerExecutor(RemoteExecutor):
database_paths = {}
if 'datasets_root_path' not in self.data:
for db_name in self.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
for db_name in self.databases.keys():
json_path = os.path.join(root_folder, db_name + '.json')
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
with open(json_path, 'r') as f:
db_data = simplejson.load(f)
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
database_paths[db_name] = db_data['root_folder']
db_data['root_folder'] = os.path.join('/databases', db_name)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
with open(json_path, 'w') as f:
simplejson.dump(db_data, f, indent=4)
# Determine the docker image to use for the databases
try:
......@@ -270,19 +279,18 @@ class DockerExecutor(RemoteExecutor):
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_container_name = "beat_db_%s" % utils.id_generator()
databases_container = self.host.create_container(databases_environment, cmd)
databases_container.uid = datasets_uid
databases_container.network_name = network_name
databases_container.set_name(databases_container_name)
# Specify the volumes to mount inside the container
databases_container.add_volume(databases_configuration_path, '/beat/prefix')
databases_container.add_volume(self.cache, '/beat/cache')
if 'datasets_root_path' not in self.data:
for db_name, db_path in database_paths.items():
databases_container.add_volume(db_path, os.path.join('/databases', db_name))
else:
databases_container.add_volume(self.data['datasets_root_path'],
self.data['datasets_root_path'])
for db_name, db_path in database_paths.items():
databases_container.add_volume(db_path, os.path.join('/databases', db_name))
# Start the container
while True:
......@@ -302,6 +310,8 @@ class DockerExecutor(RemoteExecutor):
cmd = [x if not x.startswith('0.0.0.0:') else '0.0.0.0:%d' % database_port for x in cmd]
databases_container.command = cmd
database_ip = self.host.get_ipaddress(databases_container)
#----- Instantiate the algorithm container
......@@ -318,13 +328,15 @@ class DockerExecutor(RemoteExecutor):
]
if len(self.databases) > 0:
cmd.append('tcp://' + self.host.ip + ':%d' % database_port)
cmd.append('tcp://' + database_ip + ':%d' % database_port)
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
# Creation of the container
algorithm_container = self.host.create_container(processing_environment, cmd)
algorithm_container.uid = datasets_uid
algorithm_container.network_name = network_name
# Volumes
algorithm_container.add_volume(configuration_path, '/beat/prefix')
......
......@@ -284,7 +284,12 @@ class LocalExecutor(BaseExecutor):
self.runner = self.algorithm.runner()
retval = self.runner.setup(self.data['parameters'])
retval = self.runner.prepare(self.data_loaders)
if not retval:
raise RuntimeError("Algorithm setup failed")
prepared = self.runner.prepare(self.data_loaders)
if not prepared:
raise RuntimeError("Algorithm prepare failed")
if not self.input_list or not self.output_list:
raise RuntimeError("I/O for execution block has not yet been set up")
......
......@@ -9,7 +9,7 @@
"root_folder": {
"type": "string",
"pattern": "^(/[^/]+)+$"
"pattern": "^((file://)?(/[^/]+)+|nfs://[a-z0-9._-]+:(/[^/]+)+)$"
},
"protocols": {
......
......@@ -30,20 +30,23 @@
Usage:
%(prog)s [-v ... | --verbose ...] [ --name=<name>] [--prefix=<path>]
[--cache=<path>] [--docker] <address>
[--cache=<path>] [--docker] [--docker-network=<name>]
[--port-range=<range>] <scheduler_address>
%(prog)s (--help | -h)
%(prog)s (--version | -V)
Options:
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
-h, --help Show this screen
-V, --version Show version
-v, --verbose Increases the output verbosity level
-n <name>, --name=<name> The unique name of this worker on the database.
This is typically the assigned hostname of the node,
but not necessarily [default: %(hostname)s]
-p, --prefix=<path> Comma-separated list of the prefix(es) of your local data [default: .]
-c, --cache=<path> Cache prefix, otherwise defaults to '<prefix>/cache'
--docker-network=<name> Name of the docker network to use
--port-range=<range> Range of port usable for communication with containers
"""
......@@ -191,7 +194,6 @@ def connect_to_scheduler(address, name):
def main(user_input=None):
# Parse the command-line arguments
if user_input is not None:
arguments = user_input
......@@ -273,14 +275,26 @@ def main(user_input=None):
# (If necessary) Docker-related initialisations
docker_images_cache = None
docker_network_name = None
docker_port_range = None
if args['--docker']:
docker_images_cache = os.path.join(tempfile.gettempdir(), 'beat-docker-images.json')
logger.info("Using docker images cache: '%s'", docker_images_cache)
host = Host(images_cache=docker_images_cache, raise_on_errors=False)
docker_network_name = args.get('--docker-network', None)
if docker_network_name:
logger.info("Using docker network: '%s'", docker_network_name)
docker_port_range = args.get('--port-range', None)
if docker_port_range:
if len(docker_port_range.split(':')) != 2:
logger.error("Invalid port range %s" % docker_port_range)
return 1
# Establish a connection with the scheduler
(context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name'])
(context, socket, poller) = connect_to_scheduler(args['<scheduler_address>'], args['--name'])
if context is None:
return 1
......@@ -293,7 +307,7 @@ def main(user_input=None):
while not stop:
# If necessary, wait for the comeback of the scheduler
if not scheduler_available:
(context, socket, poller) = connect_to_scheduler(args['<address>'], args['--name'])
(context, socket, poller) = connect_to_scheduler(args['<scheduler_address>'], args['--name'])
if context is None:
break
scheduler_available = True
......@@ -369,6 +383,11 @@ def main(user_input=None):
if command == WorkerController.EXECUTE:
job_id = parts[1]
data = simplejson.loads(parts[2])
if args['--docker']:
if docker_network_name:
data['network_name'] = docker_network_name
if docker_port_range:
data['port_range'] = docker_port_range
# Start the execution
logger.info("Running '%s' with job id #%s", data['algorithm'], job_id)
......
......@@ -52,6 +52,9 @@ else:
prefix = os.path.join(prefix_folder, 'prefix')
DOCKER_NETWORK_TEST_ENABLED = os.environ.get('DOCKER_NETWORK_TEST_ENABLED', False) == 'True'
network_name = os.environ.get('DOCKER_TEST_NETWORK', 'beat_core_test_network')
network = None
# Setup the logging system
if False:
......@@ -80,9 +83,29 @@ def setup_package():
for path in prefixes:
subprocess.check_call(['rsync', '-arz', path, prefix_folder])
if DOCKER_NETWORK_TEST_ENABLED:
import docker
client = docker.from_env()
try:
network = client.networks.get(network_name)
except docker.errors.NotFound:
subnet = os.environ.get('DOCKER_TEST_SUBNET', '193.169.0.0/24')
gateway = os.environ.get('DOCKER_TEST_GATEWAY', '193.169.0.254')
ipam_pool = docker.types.IPAMPool(subnet=subnet,
gateway=gateway)
ipam_config = docker.types.IPAMConfig(pool_configs=[ipam_pool])
network = client.networks.create(network_name,
driver="bridge",
ipam=ipam_config)
def teardown_package():
if os.path.exists(tmp_prefix):
shutil.rmtree(tmp_prefix)
shutil.rmtree(prefix_folder)
if DOCKER_NETWORK_TEST_ENABLED:
if network:
network.remove()
{
"language": "python",
"splittable": true,
"api_version": 2,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
class Algorithm:
def prepare(self, data_loaders):
return False
def process(self, inputs, outputs):
outputs['out_data'].write(inputs['in_data'].data)
return True
{
"language": "python",
"splittable": true,
"api_version": 2,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {
"out_data": {
"type": "user/single_integer/1"
}
}
}
]
}
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
class Algorithm:
def prepare(self, data_loaders):
return True
def process(self, inputs, data_loaders, outputs):
outputs['out_data'].write(inputs['in_data'].data)
return True
{
"language": "python",
"splittable": true,
"api_version": 2,
"groups": [
{
"name": "main",
"inputs": {
"in_data": {
"type": "user/single_integer/1"
}
},
"outputs": {