Commit 6c8888a6 authored by Philip ABBET's avatar Philip ABBET
Browse files

Merge branch 'docker' into 'master'

Merge docker-related changes

See merge request !13
parents 7b6d921f dbc24234
Pipeline #6722 passed with stage
in 4 minutes and 3 seconds
py27-linux:
script:
- git clean -ffdx
- export TMPDIR=/var/tmp
- /idiap/project/beat/environments/staging/usr/bin/python bootstrap-buildout.py --setuptools-version=`/idiap/project/beat/environments/staging/usr/bin/python -c 'import setuptools; print(setuptools.__version__)'`
- ./bin/buildout
- ./bin/python --version
- unset TMPDIR
- export NOSE_WITH_COVERAGE=1
- export NOSE_COVER_PACKAGE=beat.core
- ./bin/nosetests -sv
- ./bin/sphinx-apidoc --separate -d 2 --output=doc/api beat
- ./bin/sphinx-build doc html
tags:
- lidiap2015
stages:
- build
variables:
PREFIX: /opt/beat.env.web-${CI_BUILD_REF_NAME}/usr
py27-macosx:
build:
stage: build
before_script:
- ${PREFIX}/bin/python --version
- docker info
script:
- git clean -ffdx
- /Users/buildbot/work/environments/beat/py27/bin/python bootstrap-buildout.py --setuptools-version=`/Users/buildbot/work/environments/beat/py27/bin/python -c 'import setuptools; print(setuptools.__version__)'`
- ${PREFIX}/bin/python bootstrap-buildout.py
- ./bin/buildout
- ./bin/python --version
- export NOSE_WITH_COVERAGE=1
- export NOSE_COVER_PACKAGE=beat.core
- ./bin/nosetests -sv
- ./bin/sphinx-apidoc --separate -d 2 --output=doc/api beat
- ./bin/sphinx-build doc html
- ./bin/python ${PREFIX}/bin/coverage run --source=${CI_PROJECT_NAME} ${PREFIX}/bin/nosetests -sv ${CI_PROJECT_NAME}
- ./bin/python ${PREFIX}/bin/coverage report
- ./bin/python ${PREFIX}/bin/sphinx-apidoc --separate -d 2 --output=doc/api ${CI_PROJECT_NAMESPACE}
- ./bin/python ${PREFIX}/bin/sphinx-build doc sphinx
tags:
- beat-macosx
- docker-build
......@@ -28,6 +28,7 @@
This package contains the source code for the core components of the BEAT
platform.
Installation
------------
......@@ -50,26 +51,38 @@ get you a fully operational test and development environment.
package instead. It contains the same setup deployed at the final BEAT
machinery.
Cpulimit
========
Make sure the program ``cpulimit`` is available on your system or by the side
of the python interpreter you bootstrapped as per instructions above. The BEAT
platform uses this program to control slot usage on the scheduling/worker
level::
Docker
======
This package depends on Docker_ and uses it to run user algorithms in a
container with the required software stack. You must install the Docker_ engine
and make sure the user running tests has access to it.
In particular, this package controls memory and CPU utilisation of the
containers it launches. You must make sure to enable those functionalities on
your installation.
Docker Setup
============
$ cpulimit -h
Make sure you have the ``docker`` command available on your system. For certain
operating systems, it is necessary to install ``docker`` via an external
virtual machine (a.k.a. the *docker machine*). Follow the instructions at `the
docker website <https://docs.docker.com/engine/installation/>` before trying to
execute algorithms or experiments.
If that is not the case, then you need to install it. Either install a package
that is native to your system (e.g. on Debian or Ubuntu platforms) or compile
the checked-out version available at ``src/cpulimit``::
We use specific docker images to run user algorithms. Download the following
base images before you try to run tests or experiments on your computer::
$ cd src/cpulimit;
$ make
$ ./src/cpulimit -h #to test it
$ cd ../../bin #go back to the root of beat.web and the into the `bin' dir
$ ln -s ../src/cpulimit/src/cpulimit
$ cd .. #go back to the root of beat.web
$ docker pull beats/py27:system
$ docker pull debian:8.4
Optionally, also download the following images to be able to re-run experiments
downloaded from the BEAT platform (not required for unit testing)::
$ docker pull beats/py27:0.0.4
$ docker pull beats/py27:0.1.0
Documentation
......@@ -130,9 +143,8 @@ Development
Indentation
===========
You can enforce `PEP8 <https://www.python.org/dev/peps/pep-0008/>` compliance
using the application ``autopep8``. For example, to enforce compliance on a
single file and edit it in place, do::
You can enforce PEP8_ compliance using the application ``autopep8``. For
example, to enforce compliance on a single file and edit it in place, do::
$ ./bin/autopep8 --indent-size=2 --in-place beat/core/utils.py
......@@ -156,3 +168,8 @@ in different ways using another command::
This will allow you to dump and print the profiling statistics as you may find
fit.
.. References go here
.. _pep8: https://www.python.org/dev/peps/pep-0008/
.. _docker: https://www.docker.com/
......@@ -32,19 +32,23 @@ import shutil
import logging
logger = logging.getLogger(__name__)
import psutil
import gevent
import zmq.green as zmq
import requests
from gevent import monkey
monkey.patch_socket(dns=False)
monkey.patch_ssl()
from . import utils
from . import async
from . import dock
from . import baseformat
class Server(gevent.Greenlet):
'''A 0MQ server for our communication with the user process'''
def __init__(self, input_list, output_list):
def __init__(self, input_list, output_list, host_address):
super(Server, self).__init__()
......@@ -55,7 +59,8 @@ class Server(gevent.Greenlet):
# Starts our 0MQ server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.address = 'tcp://127.0.0.1'
self.address = 'tcp://' + host_address
port = self.socket.bind_to_random_port(self.address)
self.address += ':%d' % port
logger.debug("zmq server bound to `%s'", self.address)
......@@ -84,6 +89,7 @@ class Server(gevent.Greenlet):
def set_process(self, process):
self.process = process
self.process.statistics() # initialize internal statistics
def __str__(self):
......@@ -325,22 +331,17 @@ class Server(gevent.Greenlet):
class Agent(object):
'''Handles asynchronous stdout/stderr readout and synchronous commands.
'''Handles synchronous commands.
We use the greenlets for this implementation. Objects of this class are in
charge of three separate tasks:
1. Handling the execution of the user process (as a separate process)
2. Making sure the user process does not consume more resources than it is
supposed to (uses the external application ``cpulimit``)
1. Handling the execution of the user process (in a docker container)
3. Implementing a pipe-based API for I/O that the user process can query
Parameters:
execute_path (str): The path to the ``execute`` script of the chosen
environment. This **must** be a valid path.
virtual_memory_in_megabytes (int, optional): The amount of virtual memory
(in Megabytes) available for the job. If set to zero, no limit will be
applied.
......@@ -349,26 +350,14 @@ class Agent(object):
This number must be an integer number between 0 and
``100*number_of_cores`` in your system. For instance, if your system has
2 cores, this number can go between 0 and 200. If it is <= 0, then we
don't track CPU usage. Otherwise, we do, clipping your number at
``min(max_cpu_percent, 100*psutil.cpu_count())``.
cpulimit_path (str): If ``max_cpu_percent`` >0, then se use the program
indicated by this path to start a parallel cpulimit daemon that will
control the CPU utilisation. If this value is not set, we search your
current execution path and then the system for a ``cpulimit`` executable.
The first one found will be used. It is an error to specify
``max_cpu_percent > 0`` and not have a valid ``cpulimit`` executable
available on your system.
don't track CPU usage.
'''
def __init__(self, execute_path, virtual_memory_in_megabytes,
max_cpu_percent, cpulimit_path):
def __init__(self, virtual_memory_in_megabytes, max_cpu_percent):
self.execute_path = execute_path
self.virtual_memory_in_megabytes = virtual_memory_in_megabytes
self.max_cpu_percent = max_cpu_percent
self.cpulimit_path = cpulimit_path
self.tempdir = None
self.process = None
......@@ -395,7 +384,7 @@ class Agent(object):
logger.debug("Exiting processing context...")
def run(self, configuration, timeout_in_minutes=0, daemon=0):
def run(self, configuration, host, timeout_in_minutes=0, daemon=0):
"""Runs the algorithm code
......@@ -404,6 +393,10 @@ class Agent(object):
configuration (object): A *valid*, preloaded
:py:class:`beat.core.execution.Executor` object.
host (:py:class:Host): A configured docker host that will execute the
user process. If the host does not have access to the required
environment, an exception will be raised.
timeout_in_minutes (int): The number of minutes to wait for the user
process to execute. After this amount of time, the user process is
killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will
......@@ -420,29 +413,41 @@ class Agent(object):
configuration.dump_runner_configuration(self.tempdir)
# Server for our single client
server = Server(configuration.input_list, configuration.output_list)
server = Server(configuration.input_list, configuration.output_list,
host.ip)
# Figures out the image to use
envkey = '%(name)s (%(version)s)' % configuration.data['environment']
if envkey not in host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (envkey, host,
", ".join(host.environments.keys())))
# Launches the process (0MQ client)
cmd = [self.execute_path, '%s' % server.address, self.tempdir]
tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir))
cmd = ['execute', server.address, tmp_dir]
if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug')
if daemon > 0:
image = host.env2docker(envkey)
logger.debug("Daemon mode: start the user process with the following " \
"command: `%s'", ' '.join(cmd))
"command: `docker run -ti %s %s'", image, ' '.join(cmd))
cmd = ['sleep', str(daemon)]
logger.debug("Daemon mode: sleeping for %d seconds", daemon)
self.process = async.Popen(
cmd=cmd,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
cpulimit_path=self.cpulimit_path
)
else:
self.process = dock.Popen(
host,
envkey,
command=cmd,
tmp_archive=self.tempdir,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
)
# provide a tip on how to stop the test
if daemon > 0:
logger.debug("To stop the daemon, press CTRL-c or kill the sleep " \
"process with `kill -9 %d`", self.process.pid)
logger.debug("To stop the daemon, press CTRL-c or kill the user " \
"process with `docker kill %s`", self.process.pid)
# Serve asynchronously
server.set_process(self.process)
......@@ -454,7 +459,7 @@ class Agent(object):
timeout = (60*timeout_in_minutes) if timeout_in_minutes else None
status = self.process.wait(timeout)
except async.TimeoutExpired:
except requests.exceptions.ReadTimeout:
logger.warn("user process has timed out after %d minutes",
timeout_in_minutes)
self.process.kill()
......@@ -469,25 +474,24 @@ class Agent(object):
finally:
server.stop.set()
# If status is negative, convert it to a positive value (group signal)
if status < 0: status *= -1
# Collects final information and returns to caller
process = self.process
self.process = None
return dict(
stdout = process.peek_stdout(),
stderr = process.peek_stderr(),
status = status,
timed_out = timed_out,
statistics = server.last_statistics,
system_error = server.system_error,
user_error = server.user_error,
)
retval = dict(
stdout = process.stdout,
stderr = process.stderr,
status = status,
timed_out = timed_out,
statistics = server.last_statistics,
system_error = server.system_error,
user_error = server.user_error,
)
process.rm()
return retval
def kill(self):
"""Stops the user process by force - to be called from signal handlers"""
if self.process is not None and psutil.pid_exists(self.process.pid):
if self.process is not None:
self.process.kill()
......@@ -337,9 +337,10 @@ class Algorithm(object):
if self.storage is not None: #loading from the disk, check code
if not self.storage.code.exists():
self.errors.append('Algorithm code not found: %s' % \
self.storage.code.path)
return
if self.data['language'] != 'cxx':
self.errors.append('Algorithm code not found: %s' % \
self.storage.code.path)
return
else:
code = self.storage.code.load()
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
'''Implementation of subprocess-based asynchronous running with greenlets
'''
import os
import sys
import time
import collections
import distutils.spawn
import logging
logger = logging.getLogger(__name__)
import psutil
import gevent
import gevent.timeout
import gevent.subprocess
import pkg_resources
from . import stats
# Figures out the expected TimeoutExpired exception
import six
if six.PY2:
from gevent.timeout import Timeout as TimeoutExpired
else:
from subprocess import TimeoutExpired as TimeoutExpired
class _circbuffer(object):
'''A configurable circular buffer used for outputting stdout/sterr
You may used it like this::
b = circbuffer(1000) #1000 characters long
b.extend(str) # extend with string
b.data() # get current data contents
Parameters:
maxlen (int): The maximum size the buffer can reach, after which, it is
going to start to implement a FIFO strategy eliminating the oldest
content first.
name (str): A name to attach to my debug messages
'''
def __init__(self, maxlen, name):
self.buf = collections.deque(maxlen=maxlen)
self.name = name
def write(self, s):
'''Appends to the end of the buffer'''
logger.debug("[%s] write: `%s'", self.name, s.rstrip())
self.buf.extend(s)
def read(self, size=None):
'''Returns the current data stored on the buffer'''
if size: return ''.join(self.buf[:size])
return ''.join(self.buf)
def clear(self):
'''Destroyes all buffer contents'''
logger.debug('[%s] cleared', self.name)
return self.buf.clear()
def close(self):
'''Pretends closing'''
logger.debug('[%s] closed', self.name)
return self.buf.clear()
def __str__(self):
return '[%s] %d bytes' % (self.name, len(self.buf))
def _read_stream(stream, buf):
'''Reads stream, write on buffer, yields if blocked'''
try:
while not stream.closed:
l = stream.readline()
if not l: break
buf.write(l)
except RuntimeError:
# process was terminated abruptly
pass
def _sandbox_memory(cmd, virtual_memory_in_megabytes):
'''Returns the command-line for a memory-sandbox executable'''
if virtual_memory_in_megabytes > 0:
logger.info("Setting maximum virtual memory usage to %d megabyte(s)",
virtual_memory_in_megabytes)
sandboxer = pkg_resources.resource_filename(__name__, 'sandboxer.py')
prefix_command = [
sys.executable,
sandboxer,
'--virtual-memory-in-megabytes=%d' % \
virtual_memory_in_megabytes,
'--',
]
cmd = prefix_command + cmd
logger.debug("Command-line is now set to `%s'", " ".join(cmd))
return cmd
def resolve_cpulimit_path(exe):
'''Returns the path to cpulimit'''
FIXED_LOCATIONS = [
'/usr/local/bin/cpulimit',
'/opt/local/bin/cpulimit',
'/usr/bin/cpulimit',
]
default = os.path.join(
os.path.dirname(os.path.realpath(sys.argv[0])),
'cpulimit',
)
retval = exe or default
# See if we find it in parallel, installed with our interpreter
if not os.path.exists(retval):
cand = os.path.join(os.path.dirname(sys.executable), 'cpulimit')
if os.path.exists(cand): retval = cand
# Try to see if the PATH variable is set
if not os.path.exists(retval):
try:
retval = distutils.spawn.find_executable('cpulimit')
except KeyError: #missing PATH variable
retval = None
# Try fixed locations
if not retval:
for k in FIXED_LOCATIONS:
if os.path.exists(k):
retval = k
if not retval:
raise IOError("I cannot the find a `cpulimit' binary on your system or " \
"the value you provided is not valid (%s) or the symbolic link " \
"(%s) is not properly set" % (exe, default))
return retval
class Popen(gevent.subprocess.Popen):
'''Manager for an asynchronous process.
The process will be run in the background, and its standard output and
standard error will be read asynchronously, into a limited size circular
buffer. This implementation, despite using Greenlets, will be able to execute
the readout in parallel, since the stream ``read()`` operation yields the
next greenlet
Parameters:
cmd (list): A set of strings representing the command to run.
buflen (int, Optional): If set, determines the maximum buffer size for
stdout and stderr streams. If not set, defaults to 65500 (nearly 64kb).
virtual_memory_in_megabytes (int, Optional): The maximum amount of virtual
memory consumed by the process in megabytes.
max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed in
a system. This number must be an integer number between 0 and
``100*number_of_cores`` in your system. For instance, if your system has
2 cores, this number can go between 0 and 200. If it is <= 0, then we
don't track CPU usage. Otherwise, we do, clipping your number at
``min(max_cpu_percent, 100*psutil.cpu_count())``.
cpulimit_path (str, Optional): If ``max_cpu_percent`` >0, then we use the
program indicated by this path to start a parallel cpulimit daemon that
will control the CPU utilisation. If this value is not set, we search
your current execution path and then the system for a ``cpulimit``
executable. The first one found will be used. It is an error to specify
``max_cpu_percent > 0`` and not have a valid ``cpulimit`` executable
available on your system.
Raises:
IOError: If ``max_cpu_percent > 0`` and we could not find a valid
``cpulimit`` executable on your system.
OSError: If ``cmd`` points to something that cannot be executed.
'''
def __init__(self, cmd, buflen=65500, virtual_memory_in_megabytes=0,
max_cpu_percent=0, cpulimit_path=None):
debug = logger.getEffectiveLevel() <= logging.DEBUG
name = os.path.basename(cmd[0])
self.__stdout = _circbuffer(buflen, name='%s:stdout' % name)
self.__stderr = _circbuffer(buflen, name='%s:stderr' % name)
# hooks-in memory usage containment
virtual_memory_in_megabytes = max(virtual_memory_in_megabytes, 0)
cmd = _sandbox_memory(cmd, virtual_memory_in_megabytes)
logger.debug("Running command `%s'" % ' '.join(cmd))
super(Popen, self).__init__(
cmd,
stdin=None,
stdout=gevent.subprocess.PIPE,
stderr=gevent.subprocess.PIPE,
bufsize=1 if debug else -1,
)
# if we need to use a cpu limitation
max_cpu_percent = max(0, max_cpu_percent)
max_cpu_percent = min(max_cpu_percent, 100*psutil.cpu_count())
if max_cpu_percent:
cpulimit_path = resolve_cpulimit_path(cpulimit_path)
logger.info("Setting maximum cpu-usage to %d%%", max_cpu_percent)
cpulimit_cmd = [
cpulimit_path,
'--limit=%d' % max_cpu_percent, #percentage of cpu allowed
'--include-children', #limit also the children processes
'--lazy', #exit if there is no target process, or if it dies
'--pid=%d' % self.pid,
]
logger.debug("Cpulimit command line set to `%s'", ' '.join(cpulimit_cmd))
# this is self managed (thanks to the --lazy flag), you don't need to
# worry about terminating this process by hand.
self.cpulimit_process = Popen(cmd=cpulimit_cmd)
else: