diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index eef80b2317d913b94f4781d8ee398c86b4b9d91a..b34ee73ba0890b372e5ddc8b94ff59298396299c 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,29 +1,21 @@ -py27-linux: - script: - - git clean -ffdx - - export TMPDIR=/var/tmp - - /idiap/project/beat/environments/staging/usr/bin/python bootstrap-buildout.py --setuptools-version=`/idiap/project/beat/environments/staging/usr/bin/python -c 'import setuptools; print(setuptools.__version__)'` - - ./bin/buildout - - ./bin/python --version - - unset TMPDIR - - export NOSE_WITH_COVERAGE=1 - - export NOSE_COVER_PACKAGE=beat.core - - ./bin/nosetests -sv - - ./bin/sphinx-apidoc --separate -d 2 --output=doc/api beat - - ./bin/sphinx-build doc html - tags: - - lidiap2015 +stages: + - build + +variables: + PREFIX: /opt/beat.env.web-${CI_BUILD_REF_NAME}/usr -py27-macosx: +build: + stage: build + before_script: + - ${PREFIX}/bin/python --version + - docker info script: - git clean -ffdx - - /Users/buildbot/work/environments/beat/py27/bin/python bootstrap-buildout.py --setuptools-version=`/Users/buildbot/work/environments/beat/py27/bin/python -c 'import setuptools; print(setuptools.__version__)'` + - ${PREFIX}/bin/python bootstrap-buildout.py - ./bin/buildout - - ./bin/python --version - - export NOSE_WITH_COVERAGE=1 - - export NOSE_COVER_PACKAGE=beat.core - - ./bin/nosetests -sv - - ./bin/sphinx-apidoc --separate -d 2 --output=doc/api beat - - ./bin/sphinx-build doc html + - ./bin/python ${PREFIX}/bin/coverage run --source=${CI_PROJECT_NAME} ${PREFIX}/bin/nosetests -sv ${CI_PROJECT_NAME} + - ./bin/python ${PREFIX}/bin/coverage report + - ./bin/python ${PREFIX}/bin/sphinx-apidoc --separate -d 2 --output=doc/api ${CI_PROJECT_NAMESPACE} + - ./bin/python ${PREFIX}/bin/sphinx-build doc sphinx tags: - - beat-macosx + - docker-build diff --git a/README.rst b/README.rst index 3a947bcb0f84003b015072542153a0c3b049a39c..03db5f212547a434fef3f9a5b543859698726533 100644 --- a/README.rst +++ b/README.rst @@ -28,6 +28,7 @@ This package contains the source code for the core components of the BEAT platform. + Installation ------------ @@ -50,26 +51,38 @@ get you a fully operational test and development environment. package instead. It contains the same setup deployed at the final BEAT machinery. -Cpulimit -======== -Make sure the program ``cpulimit`` is available on your system or by the side -of the python interpreter you bootstrapped as per instructions above. The BEAT -platform uses this program to control slot usage on the scheduling/worker -level:: +Docker +====== + +This package depends on Docker_ and uses it to run user algorithms in a +container with the required software stack. You must install the Docker_ engine +and make sure the user running tests has access to it. + +In particular, this package controls memory and CPU utilisation of the +containers it launches. You must make sure to enable those functionalities on +your installation. + +Docker Setup +============ - $ cpulimit -h +Make sure you have the ``docker`` command available on your system. For certain +operating systems, it is necessary to install ``docker`` via an external +virtual machine (a.k.a. the *docker machine*). Follow the instructions at `the +docker website <https://docs.docker.com/engine/installation/>` before trying to +execute algorithms or experiments. -If that is not the case, then you need to install it. Either install a package -that is native to your system (e.g. on Debian or Ubuntu platforms) or compile -the checked-out version available at ``src/cpulimit``:: +We use specific docker images to run user algorithms. Download the following +base images before you try to run tests or experiments on your computer:: - $ cd src/cpulimit; - $ make - $ ./src/cpulimit -h #to test it - $ cd ../../bin #go back to the root of beat.web and the into the `bin' dir - $ ln -s ../src/cpulimit/src/cpulimit - $ cd .. #go back to the root of beat.web + $ docker pull beats/py27:system + $ docker pull debian:8.4 + +Optionally, also download the following images to be able to re-run experiments +downloaded from the BEAT platform (not required for unit testing):: + + $ docker pull beats/py27:0.0.4 + $ docker pull beats/py27:0.1.0 Documentation @@ -130,9 +143,8 @@ Development Indentation =========== -You can enforce `PEP8 <https://www.python.org/dev/peps/pep-0008/>` compliance -using the application ``autopep8``. For example, to enforce compliance on a -single file and edit it in place, do:: +You can enforce PEP8_ compliance using the application ``autopep8``. For +example, to enforce compliance on a single file and edit it in place, do:: $ ./bin/autopep8 --indent-size=2 --in-place beat/core/utils.py @@ -156,3 +168,8 @@ in different ways using another command:: This will allow you to dump and print the profiling statistics as you may find fit. + + +.. References go here +.. _pep8: https://www.python.org/dev/peps/pep-0008/ +.. _docker: https://www.docker.com/ diff --git a/beat/core/agent.py b/beat/core/agent.py old mode 100644 new mode 100755 index c227823ae85072913fc9857d9cd108518b2ade57..8b61307b46d61fd8f5449843e450bd930c4ab04d --- a/beat/core/agent.py +++ b/beat/core/agent.py @@ -32,19 +32,23 @@ import shutil import logging logger = logging.getLogger(__name__) -import psutil import gevent import zmq.green as zmq +import requests +from gevent import monkey +monkey.patch_socket(dns=False) +monkey.patch_ssl() + from . import utils -from . import async +from . import dock from . import baseformat class Server(gevent.Greenlet): '''A 0MQ server for our communication with the user process''' - def __init__(self, input_list, output_list): + def __init__(self, input_list, output_list, host_address): super(Server, self).__init__() @@ -55,7 +59,8 @@ class Server(gevent.Greenlet): # Starts our 0MQ server self.context = zmq.Context() self.socket = self.context.socket(zmq.PAIR) - self.address = 'tcp://127.0.0.1' + + self.address = 'tcp://' + host_address port = self.socket.bind_to_random_port(self.address) self.address += ':%d' % port logger.debug("zmq server bound to `%s'", self.address) @@ -84,6 +89,7 @@ class Server(gevent.Greenlet): def set_process(self, process): self.process = process + self.process.statistics() # initialize internal statistics def __str__(self): @@ -325,22 +331,17 @@ class Server(gevent.Greenlet): class Agent(object): - '''Handles asynchronous stdout/stderr readout and synchronous commands. + '''Handles synchronous commands. We use the greenlets for this implementation. Objects of this class are in charge of three separate tasks: - 1. Handling the execution of the user process (as a separate process) - 2. Making sure the user process does not consume more resources than it is - supposed to (uses the external application ``cpulimit``) + 1. Handling the execution of the user process (in a docker container) 3. Implementing a pipe-based API for I/O that the user process can query Parameters: - execute_path (str): The path to the ``execute`` script of the chosen - environment. This **must** be a valid path. - virtual_memory_in_megabytes (int, optional): The amount of virtual memory (in Megabytes) available for the job. If set to zero, no limit will be applied. @@ -349,26 +350,14 @@ class Agent(object): This number must be an integer number between 0 and ``100*number_of_cores`` in your system. For instance, if your system has 2 cores, this number can go between 0 and 200. If it is <= 0, then we - don't track CPU usage. Otherwise, we do, clipping your number at - ``min(max_cpu_percent, 100*psutil.cpu_count())``. - - cpulimit_path (str): If ``max_cpu_percent`` >0, then se use the program - indicated by this path to start a parallel cpulimit daemon that will - control the CPU utilisation. If this value is not set, we search your - current execution path and then the system for a ``cpulimit`` executable. - The first one found will be used. It is an error to specify - ``max_cpu_percent > 0`` and not have a valid ``cpulimit`` executable - available on your system. + don't track CPU usage. ''' - def __init__(self, execute_path, virtual_memory_in_megabytes, - max_cpu_percent, cpulimit_path): + def __init__(self, virtual_memory_in_megabytes, max_cpu_percent): - self.execute_path = execute_path self.virtual_memory_in_megabytes = virtual_memory_in_megabytes self.max_cpu_percent = max_cpu_percent - self.cpulimit_path = cpulimit_path self.tempdir = None self.process = None @@ -395,7 +384,7 @@ class Agent(object): logger.debug("Exiting processing context...") - def run(self, configuration, timeout_in_minutes=0, daemon=0): + def run(self, configuration, host, timeout_in_minutes=0, daemon=0): """Runs the algorithm code @@ -404,6 +393,10 @@ class Agent(object): configuration (object): A *valid*, preloaded :py:class:`beat.core.execution.Executor` object. + host (:py:class:Host): A configured docker host that will execute the + user process. If the host does not have access to the required + environment, an exception will be raised. + timeout_in_minutes (int): The number of minutes to wait for the user process to execute. After this amount of time, the user process is killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will @@ -420,29 +413,41 @@ class Agent(object): configuration.dump_runner_configuration(self.tempdir) # Server for our single client - server = Server(configuration.input_list, configuration.output_list) + server = Server(configuration.input_list, configuration.output_list, + host.ip) + + # Figures out the image to use + envkey = '%(name)s (%(version)s)' % configuration.data['environment'] + if envkey not in host: + raise RuntimeError("Environment `%s' is not available on docker " \ + "host `%s' - available environments are %s" % (envkey, host, + ", ".join(host.environments.keys()))) # Launches the process (0MQ client) - cmd = [self.execute_path, '%s' % server.address, self.tempdir] + tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir)) + cmd = ['execute', server.address, tmp_dir] if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug') if daemon > 0: + image = host.env2docker(envkey) logger.debug("Daemon mode: start the user process with the following " \ - "command: `%s'", ' '.join(cmd)) + "command: `docker run -ti %s %s'", image, ' '.join(cmd)) cmd = ['sleep', str(daemon)] logger.debug("Daemon mode: sleeping for %d seconds", daemon) - - self.process = async.Popen( - cmd=cmd, - virtual_memory_in_megabytes=self.virtual_memory_in_megabytes, - max_cpu_percent=self.max_cpu_percent, - cpulimit_path=self.cpulimit_path - ) + else: + self.process = dock.Popen( + host, + envkey, + command=cmd, + tmp_archive=self.tempdir, + virtual_memory_in_megabytes=self.virtual_memory_in_megabytes, + max_cpu_percent=self.max_cpu_percent, + ) # provide a tip on how to stop the test if daemon > 0: - logger.debug("To stop the daemon, press CTRL-c or kill the sleep " \ - "process with `kill -9 %d`", self.process.pid) + logger.debug("To stop the daemon, press CTRL-c or kill the user " \ + "process with `docker kill %s`", self.process.pid) # Serve asynchronously server.set_process(self.process) @@ -454,7 +459,7 @@ class Agent(object): timeout = (60*timeout_in_minutes) if timeout_in_minutes else None status = self.process.wait(timeout) - except async.TimeoutExpired: + except requests.exceptions.ReadTimeout: logger.warn("user process has timed out after %d minutes", timeout_in_minutes) self.process.kill() @@ -469,25 +474,24 @@ class Agent(object): finally: server.stop.set() - # If status is negative, convert it to a positive value (group signal) - if status < 0: status *= -1 - # Collects final information and returns to caller process = self.process self.process = None - return dict( - stdout = process.peek_stdout(), - stderr = process.peek_stderr(), - status = status, - timed_out = timed_out, - statistics = server.last_statistics, - system_error = server.system_error, - user_error = server.user_error, - ) + retval = dict( + stdout = process.stdout, + stderr = process.stderr, + status = status, + timed_out = timed_out, + statistics = server.last_statistics, + system_error = server.system_error, + user_error = server.user_error, + ) + process.rm() + return retval def kill(self): """Stops the user process by force - to be called from signal handlers""" - if self.process is not None and psutil.pid_exists(self.process.pid): + if self.process is not None: self.process.kill() diff --git a/beat/core/algorithm.py b/beat/core/algorithm.py old mode 100644 new mode 100755 index 91306f0993c1e6a5b9985aea29860071c1ff5402..38025ce3328ceb31d8603ecee221c2dc7573546f --- a/beat/core/algorithm.py +++ b/beat/core/algorithm.py @@ -337,9 +337,10 @@ class Algorithm(object): if self.storage is not None: #loading from the disk, check code if not self.storage.code.exists(): - self.errors.append('Algorithm code not found: %s' % \ - self.storage.code.path) - return + if self.data['language'] != 'cxx': + self.errors.append('Algorithm code not found: %s' % \ + self.storage.code.path) + return else: code = self.storage.code.load() diff --git a/beat/core/async.py b/beat/core/async.py deleted file mode 100644 index ba5b689eb57e73d24a00d530809e3072e55d5963..0000000000000000000000000000000000000000 --- a/beat/core/async.py +++ /dev/null @@ -1,373 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : - -############################################################################### -# # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # -# Contact: beat.support@idiap.ch # -# # -# This file is part of the beat.core module of the BEAT platform. # -# # -# Commercial License Usage # -# Licensees holding valid commercial BEAT licenses may use this file in # -# accordance with the terms contained in a written agreement between you # -# and Idiap. For further information contact tto@idiap.ch # -# # -# Alternatively, this file may be used under the terms of the GNU Affero # -# Public License version 3 as published by the Free Software and appearing # -# in the file LICENSE.AGPL included in the packaging of this file. # -# The BEAT platform is distributed in the hope that it will be useful, but # -# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # -# or FITNESS FOR A PARTICULAR PURPOSE. # -# # -# You should have received a copy of the GNU Affero Public License along # -# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # -# # -############################################################################### - - -'''Implementation of subprocess-based asynchronous running with greenlets -''' - -import os -import sys -import time -import collections -import distutils.spawn - -import logging -logger = logging.getLogger(__name__) - -import psutil - -import gevent -import gevent.timeout -import gevent.subprocess -import pkg_resources - -from . import stats - - -# Figures out the expected TimeoutExpired exception -import six -if six.PY2: - from gevent.timeout import Timeout as TimeoutExpired -else: - from subprocess import TimeoutExpired as TimeoutExpired - -class _circbuffer(object): - '''A configurable circular buffer used for outputting stdout/sterr - - You may used it like this:: - - b = circbuffer(1000) #1000 characters long - b.extend(str) # extend with string - b.data() # get current data contents - - Parameters: - - maxlen (int): The maximum size the buffer can reach, after which, it is - going to start to implement a FIFO strategy eliminating the oldest - content first. - - name (str): A name to attach to my debug messages - - ''' - - def __init__(self, maxlen, name): - self.buf = collections.deque(maxlen=maxlen) - self.name = name - - def write(self, s): - '''Appends to the end of the buffer''' - logger.debug("[%s] write: `%s'", self.name, s.rstrip()) - self.buf.extend(s) - - def read(self, size=None): - '''Returns the current data stored on the buffer''' - if size: return ''.join(self.buf[:size]) - return ''.join(self.buf) - - def clear(self): - '''Destroyes all buffer contents''' - logger.debug('[%s] cleared', self.name) - return self.buf.clear() - - def close(self): - '''Pretends closing''' - logger.debug('[%s] closed', self.name) - return self.buf.clear() - - def __str__(self): - return '[%s] %d bytes' % (self.name, len(self.buf)) - - -def _read_stream(stream, buf): - '''Reads stream, write on buffer, yields if blocked''' - - try: - while not stream.closed: - l = stream.readline() - if not l: break - buf.write(l) - except RuntimeError: - # process was terminated abruptly - pass - - -def _sandbox_memory(cmd, virtual_memory_in_megabytes): - '''Returns the command-line for a memory-sandbox executable''' - - if virtual_memory_in_megabytes > 0: - logger.info("Setting maximum virtual memory usage to %d megabyte(s)", - virtual_memory_in_megabytes) - sandboxer = pkg_resources.resource_filename(__name__, 'sandboxer.py') - prefix_command = [ - sys.executable, - sandboxer, - '--virtual-memory-in-megabytes=%d' % \ - virtual_memory_in_megabytes, - '--', - ] - cmd = prefix_command + cmd - logger.debug("Command-line is now set to `%s'", " ".join(cmd)) - - return cmd - - -def resolve_cpulimit_path(exe): - '''Returns the path to cpulimit''' - - FIXED_LOCATIONS = [ - '/usr/local/bin/cpulimit', - '/opt/local/bin/cpulimit', - '/usr/bin/cpulimit', - ] - - default = os.path.join( - os.path.dirname(os.path.realpath(sys.argv[0])), - 'cpulimit', - ) - retval = exe or default - - # See if we find it in parallel, installed with our interpreter - if not os.path.exists(retval): - cand = os.path.join(os.path.dirname(sys.executable), 'cpulimit') - if os.path.exists(cand): retval = cand - - # Try to see if the PATH variable is set - if not os.path.exists(retval): - try: - retval = distutils.spawn.find_executable('cpulimit') - except KeyError: #missing PATH variable - retval = None - - # Try fixed locations - if not retval: - for k in FIXED_LOCATIONS: - if os.path.exists(k): - retval = k - - if not retval: - raise IOError("I cannot the find a `cpulimit' binary on your system or " \ - "the value you provided is not valid (%s) or the symbolic link " \ - "(%s) is not properly set" % (exe, default)) - - return retval - - -class Popen(gevent.subprocess.Popen): - '''Manager for an asynchronous process. - - The process will be run in the background, and its standard output and - standard error will be read asynchronously, into a limited size circular - buffer. This implementation, despite using Greenlets, will be able to execute - the readout in parallel, since the stream ``read()`` operation yields the - next greenlet - - Parameters: - - cmd (list): A set of strings representing the command to run. - - buflen (int, Optional): If set, determines the maximum buffer size for - stdout and stderr streams. If not set, defaults to 65500 (nearly 64kb). - - virtual_memory_in_megabytes (int, Optional): The maximum amount of virtual - memory consumed by the process in megabytes. - - max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed in - a system. This number must be an integer number between 0 and - ``100*number_of_cores`` in your system. For instance, if your system has - 2 cores, this number can go between 0 and 200. If it is <= 0, then we - don't track CPU usage. Otherwise, we do, clipping your number at - ``min(max_cpu_percent, 100*psutil.cpu_count())``. - - cpulimit_path (str, Optional): If ``max_cpu_percent`` >0, then we use the - program indicated by this path to start a parallel cpulimit daemon that - will control the CPU utilisation. If this value is not set, we search - your current execution path and then the system for a ``cpulimit`` - executable. The first one found will be used. It is an error to specify - ``max_cpu_percent > 0`` and not have a valid ``cpulimit`` executable - available on your system. - - - Raises: - - IOError: If ``max_cpu_percent > 0`` and we could not find a valid - ``cpulimit`` executable on your system. - - OSError: If ``cmd`` points to something that cannot be executed. - - ''' - - def __init__(self, cmd, buflen=65500, virtual_memory_in_megabytes=0, - max_cpu_percent=0, cpulimit_path=None): - - debug = logger.getEffectiveLevel() <= logging.DEBUG - - name = os.path.basename(cmd[0]) - self.__stdout = _circbuffer(buflen, name='%s:stdout' % name) - self.__stderr = _circbuffer(buflen, name='%s:stderr' % name) - - # hooks-in memory usage containment - virtual_memory_in_megabytes = max(virtual_memory_in_megabytes, 0) - cmd = _sandbox_memory(cmd, virtual_memory_in_megabytes) - - logger.debug("Running command `%s'" % ' '.join(cmd)) - - super(Popen, self).__init__( - cmd, - stdin=None, - stdout=gevent.subprocess.PIPE, - stderr=gevent.subprocess.PIPE, - bufsize=1 if debug else -1, - ) - - # if we need to use a cpu limitation - max_cpu_percent = max(0, max_cpu_percent) - max_cpu_percent = min(max_cpu_percent, 100*psutil.cpu_count()) - if max_cpu_percent: - cpulimit_path = resolve_cpulimit_path(cpulimit_path) - logger.info("Setting maximum cpu-usage to %d%%", max_cpu_percent) - cpulimit_cmd = [ - cpulimit_path, - '--limit=%d' % max_cpu_percent, #percentage of cpu allowed - '--include-children', #limit also the children processes - '--lazy', #exit if there is no target process, or if it dies - '--pid=%d' % self.pid, - ] - logger.debug("Cpulimit command line set to `%s'", ' '.join(cpulimit_cmd)) - # this is self managed (thanks to the --lazy flag), you don't need to - # worry about terminating this process by hand. - self.cpulimit_process = Popen(cmd=cpulimit_cmd) - else: - self.cpulimit_process = None - - - def wait(self, timeout=None): - '''Reads stdout and stderr until the underlying processes finishes - - Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which - we read the stdout and stderr data into a circular buffer, keep only the - last N bytes of each stream. - - This method will call :py:meth:`file.readline` on both stdout and stderr - streams attached to the process. These methods are "green". They will yield - once they are blocked. - - - Returns: - - int: Returns the status code of the process - - - Raises: - - gevent.timeout.Timeout: under Python 2, if the process times out - - subprocess.TimeoutExpired: under Python 3, if the process times out - - ''' - - gevent.spawn(_read_stream, self.stdout, self.__stdout) - gevent.spawn(_read_stream, self.stderr, self.__stderr) - - # in some test cases, timeout may actually expire before the announced - # number of seconds. To avoid errors, we check if we're really supposed to - # expire, otherwise we retry. - if timeout is not None: - timeout = float(timeout) - while timeout > 0.0: - start = time.time() - retval = super(Popen, self).wait(timeout) - if retval is not None: break - timeout -= (time.time() - start) - else: - retval = super(Popen, self).wait(timeout) - - if retval is None and timeout is not None: - raise TimeoutExpired(timeout) - - return retval - - - def peek_stdout(self): - '''Returns the last N bytes of stdout''' - - return self.__stdout.read() - - - def peek_stderr(self): - '''Returns the last N bytes of stderr''' - - return self.__stderr.read() - - - def kill(self): - '''Before killing myself, make sure to kill all children.''' - - p = psutil.Process(self.pid) - for child in p.children(): child.kill() - super(Popen, self).kill() - self.wait() #avoids zombies - if self.cpulimit_process: - # the cpulimit process is --lazy, so it should die automatically - # after the attached process is killed - self.cpulimit_process.wait() #avoids zombie process - - - def statistics(self): - '''If the process is still active, returns usage statistics by ``pusutil`` - - - Returns: - - stats (dict): A dictionary object with all collected statistics - - - Raises: - - RuntimeError: In case the process is not active anymore. - - ''' - - def sum_tuples(t): - retval = list(t[0]) - for k in range(len(retval)): retval[k] += sum([z[k] for z in t[1:]]) - return t[0].__class__(*retval) - - def merge_statistics(processes, entries): - retval = {} - for e in entries: - retval[e] = sum_tuples([getattr(p, e)() for p in processes]) - return retval - - if not self.pid: - raise RuntimeError("Process is dead") - - # merge statistics - retval = dict( - cpu=stats.cpu_statistics(self.pid), - memory=stats.memory_statistics(self.pid), - ) - return retval diff --git a/beat/core/dock.py b/beat/core/dock.py new file mode 100755 index 0000000000000000000000000000000000000000..37ecfd76ece247352e3c4440d1d0693303664be1 --- /dev/null +++ b/beat/core/dock.py @@ -0,0 +1,606 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +'''Implementation of subprocess-based asynchronous running with greenlets +''' + +import os +import tarfile + +import logging +logger = logging.getLogger(__name__) + +import six +import docker +import requests +import simplejson +import socket + +from . import stats + + +class Host(object): + '''An object of this class can connect to the docker host and resolve stuff''' + + + def __init__(self, **kwargs): + + self.client = None + self.containers = [] + + if 'host' in kwargs and 'port' in kwargs: + host = kwargs.get('host') + del kwargs['host'] + port = kwargs.get('port') + del kwargs['port'] + kwargs['base_url'] = "http://%s:%s" % (host, port) + + self.kwargs = kwargs + self.environments = {} + + + def setup(self, raise_on_errors=True): + + self.client = docker.Client(**self.kwargs) + + self.environments = self._discover_environments(raise_on_errors) + + + def __contains__(self, key): + return key in self.environments + + + def __str__(self): + return self.kwargs['base_url'] + + + def env2docker(self, key): + '''Returns a nice docker image name given a BEAT environment key''' + + attrs = self.environments[key] + + if attrs['tag'] is not None: return attrs['tag'] + return attrs['short_id'] + + + def teardown(self): + + for container in self.containers: self.rm(container) + + + def __enter__(self): + self.setup() + return self + + + def __exit__(self, *exc): + self.teardown() + + + @property + def ip(self): + '''The IP address of the docker host''' + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.connect(('8.8.8.8', 1)) # connecting to a UDP address doesn't send packets + return s.getsockname()[0] + + + def _discover_environments(self, raise_on_errors=True): + '''Returns a dictionary containing information about docker environments + + Parameters: + + raise_on_errors (bool, Optional): If we should raise an exception + (``RuntimeError``) in case installed environments override each other + and we can't know which to use. + + + Raises: + + RuntimeError: if you set ``raise_on_errors`` and I found environments + that override each other for their description keys (``<name> + (<version>)``), which should be unique. + + ''' + + def _describe(image): + '''Tries to run the "describe" app on the image, collect results''' + + status, output = self.get_statusoutput(image, ['describe']) + if status == 0: + try: + return simplejson.loads(output) + except Exception as e: + logger.warn("Ignoring potential environment at `%s' since " \ + "`describe' output cannot be parsed: %s", image, str(e)) + return {} + + retval = {} + + for image in self.client.images(): + # call the "describe" application on each existing image with "beat" in + # its name + tag = image['RepoTags'][0] if image['RepoTags'] else None + if (tag is None) or (tag.find('beat') == -1): + continue + + id = image['Id'].split(':')[1][:12] + logger.debug("Checking image `%s' (%s)...", tag, id) + description = _describe(tag or id) + if not description: continue + + key = description['name'] + ' (' + description['version'] + ')' + + if key in retval: + + # this check avoids we do a new environment and, by mistake, override + # it with a previous version or the contrary. + if raise_on_errors: + raise RuntimeError("Environments at `%s' and `%s' have the " \ + "same name (`%s'). Distinct environments must be " \ + "uniquely named. Fix this and re-start." % \ + (tag or id, retval[key]['image'], key)) + else: + new_version = None + previous_version = None + + parts = tag.split('/') + if len(parts) > 1: + parts = parts[-1].split(':') + if len(parts) > 1: + new_version = parts[-1] + + parts = retval[key]['tag'].split('/') + if len(parts) > 1: + parts = parts[-1].split(':') + if len(parts) > 1: + previous_version = parts[-1] + + replacement = False + keep = False + + if (new_version is not None) and (previous_version is not None): + if new_version == 'latest': + replacement = True + elif previous_version == 'latest': + keep = True + else: + try: + new_version = tuple([ int(x) for x in new_version.split('.') ]) + + try: + previous_version = tuple([ int(x) for x in previous_version.split('.') ]) + + if new_version > previous_version: + replacement = True + else: + keep = True + except: + replacement = True + + except: + keep = True + + elif new_version is not None: + replacement = True + elif previous_version is not None: + keep = True + + if replacement: + logger.debug("Overriding **existing** environment `%s' in image `%s'", key, retval[key]['tag']) + elif keep: + logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, retval[key]['tag']) + continue + else: + logger.warn("Overriding **existing** environment `%s' image " \ + "with `%s' (it was `%s). To avoid this warning make " \ + "sure your docker images do not contain environments " \ + "with the same names", key, retval[key]['image'], + image['Id']) + + retval[key] = description + retval[key]['image'] = image['Id'] + retval[key]['tag'] = tag + retval[key]['short_id'] = id + retval[key]['nickname'] = tag or id + logger.info("Registered `%s' -> `%s (%s)'", key, tag, id) + + logger.debug("Found %d environments", len(retval)) + return retval + + + def put_path(self, container, src, dest='/tmp', chmod=None): + """Puts a given src path into a destination folder + + This method will copy a given ``src`` path into a ``dest`` folder. It + optionally changes the owner of the resulting path on the image so it is + accessible by other users than root. + + + Parameters: + + container (str, dict): The container where to copy the path. + + src (str): The path, on the host, of the archive to be copied over the + container. The path may point to a file or directory on the host. Data + will be copied recursively. + + dest (str, Optional): A path, on the container, where to copy the input + data. By default, we put data into the temporary directory of the + container. + + chmod (str, Optional): An optional string to set the mode of the + resulting path (files and directory). A common reason for this sort of + thing is to set directories to 755 but files to 644 with the following + value ``u+rwX,go+rX,go-w``. This will be applied recursively to the + resulting path on the container. If you set this value to ``None``, + then the chmod command will not be run. + + """ + + # The docker API only accepts in-memory tar archives + c = six.moves.cStringIO() + with tarfile.open(mode='w', fileobj=c) as tar: + tar.add(src, arcname=os.path.basename(src)) + archive = c.getvalue() + + # Place the tarball into the container + path = os.path.join(dest, os.path.basename(src)) + logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest) + self.client.put_archive(container, dest, archive) + + if chmod is not None: + # Change permissions to access the path + ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest]) + output = self.client.exec_start(ex) #waits until it is executed + + + def create_container(self, image, command, tmp_path=None, host_args=None, + **args): + """Prepares the docker container for running the user code + + + Parameters: + + image (str): A unique identifier for the image to create + + command (list): A list of strings with the command to run inside the + container. + + tmp_path (str): A path with a file name or directory that will be + copied into the container (inside ``/tmp``), supposedly with + information that is used by the command. + + host_args (dict): A dictionary that will be transformed into a + ``HostConfig`` object that will be used for the creation of the + container. It is the place too add/drop capabilities for the container + or to set maximum resource usage (CPU, memory, etc). Keys accepted are + not arbitrary. Check docker_py's manual for details. + + args (dict): A list of extra arguments to pass to the underlying + ``create_container()`` call from the docker Python API. + + + Returns: + + docker.Container: The container, already preloaded and ready to run. The + container is not started by this method. + + """ + + if image in self: #replace by a real image name + image = self.env2docker(image) + + config_args = dict( + log_config = docker.utils.LogConfig(type='', + config={'max-size': '1M', 'max-file': '1'}), + ) + + if host_args is not None: + config_args.update(host_args) + + # see bug: https://github.com/docker/docker-py/issues/1195 + # fix on docker-engine 1.13.0 and superior + # see changelog: https://github.com/docker/docker/blob/master/CHANGELOG.md + args['network_disabled'] = False + + # for HostConfig (keys are not arbitrary) - see: + # http://docker-py.readthedocs.io/en/latest/hostconfig/ + args['host_config'] = self.client.create_host_config(**config_args) + + logger.debug("[docker] create_container %s %s", image, ' '.join(command)) + container = self.client.create_container(image=image, command=command, + **args) + self.containers.append(container) + + if tmp_path is not None: + self.put_path(container, tmp_path) + + return container + + + def start(self, container): + '''Starts a given container''' + + logger.debug("[docker] start %s", container['Id'][:12]) + self.client.start(container) + + + def status(self, container): + '''Checks the status of a given container + + ''' + + try: + logger.debug("[docker] inspect %s", container['Id'][:12]) + z = self.client.inspect_container(container) + return z['State']['Status'] + except Exception as e: + return None + + + def rm(self, container): + '''Removes a given container. If it is not done, kill it first''' + + status = self.status(container) + + if status not in ('created', 'exited'): + logger.warn("Killing container `%s' which is on state `%s'", + container['Id'], status) + self.client.kill(container) + + logger.debug("[docker] rm %s", container['Id'][:12]) + self.client.remove_container(container) + self.containers.remove(container) + + + def get_statusoutput(self, image, command, **kwargs): + '''Runs a command and retrieves status and output''' + + container = None + try: + container = self.create_container(image=image, command=command, **kwargs) + self.start(container) + status = self.client.wait(container) + output = self.client.logs(container) + except Exception: + return 1, None + finally: + if container is not None: self.rm(container) + + return status, output + + +class Popen: + '''Process-like manager for asynchronously executing user code in a container + + The process will be run in the background, and its standard output and + standard error will be read after it finishes, into a limited size circular + buffer. + + + Parameters: + + host (:py:class:`Host`): The docker host where to start the container. + The host must be properly initialized, including starting the + appropriate docker-machine, if that is the case (environment setup). + + image (str): A string uniquely identifying the image from which to start + the container + + command (list): A list of strings with the command to run inside the + container. + + path (str, Optional): An archive to copy into the temporary + directory of the container, supposedly with information that is used by + the command. If a string is given, than it is considered as a path from + which the archive is going to be created. + + virtual_memory_in_megabytes (int, Optional): The maximum amount of memory + the user process can consume on the host. If not specified, a memory + limit is not set. + + max_cpu_percent (float, Optional): The maximum amount of CPU the user + process may consume on the host. The value ``100`` equals to using 100% + of a single core. If not specified, then a CPU limitation is not put in + place. + + ''' + + def __init__(self, host, image, command, tmp_archive=None, + virtual_memory_in_megabytes=0, max_cpu_percent=0, **args): + + self.host = host + + args.update({ + 'user': 'root', #user `nobody' cannot access the tmp archive... + 'tty': False, + 'detach': True, + 'stdin_open': False, + }) + + host_args = dict() + + if virtual_memory_in_megabytes: + # For this to work properly, memory swap limitation has to be enabled on + # the kernel. This typically goes by setting "cgroup_enable=memory" as a + # boot parameter to kernels which are compiled with this support. + # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting + + host_args['mem_limit'] = str(virtual_memory_in_megabytes) + 'm' + host_args['memswap_limit'] = host_args['mem_limit'] + logger.debug('Setting maximum memory to %dMB' % \ + (virtual_memory_in_megabytes,)) + + if max_cpu_percent: + # The period corresponds to the scheduling interval for the CFS in Linux + # The quota corresponds to a fraction or a multiple of the period, the + # container will get. A quota that is 2x the period gets the container up + # to 200% cpu time (2 cores). If the quota is 0.5x the period, the + # container gets up to 50% the cpu time. Each core represents 100%. A + # system with 2 cores has 200% computing power. + # + # More info: + # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt + # + # For this to work properly, CPU bandwidth provisioning for the Linux CFS + # must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/ + # + # If your system is running on a virtual machine, having more cores + # available to docker engine normally translates to more precise + # scheduling. + period = 100000 #microseconds + + quota = max_cpu_percent/100. + host_args['cpu_period'] = period + host_args['cpu_quota'] = int(period * quota) + logger.debug('Setting CPU quota to %d%%' % max_cpu_percent) + + # creates the container + self.container = self.host.create_container(image=image, + command=command, tmp_path=tmp_archive, + host_args=host_args, **args) + + # Starts the container + self.host.start(self.container) + + # Gets start point statistics + self.previous_stats = None + + + def __enter__(self): + return self + + + def __exit__(self, *exc): + self.rm() + + + @property + def pid(self): + return self.container['Id'][:12] + + + def wait(self, timeout=None): + '''Reads stdout and stderr until the underlying processes finishes + + Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which + we read the stdout and stderr data into a circular buffer, keep only the + last N bytes of each stream. + + This method will call :py:meth:`file.readline` on both stdout and stderr + streams attached to the process. These methods are "green". They will yield + once they are blocked. + + + Parameters: + + timeout (float, Optional): A timeout in seconds to wait for the user + process to finish. If a timeout value is not given, waits forever. + + + Returns: + + int: Returns the status code of the process + + + Raises: + + requests.exceptions.ReadTimeout: if the process times out + + ''' + + return self.host.client.wait(self.container, timeout) + + @property + def stdout(self): + '''Returns the stdout''' + + return self.host.client.logs(self.container, stdout=True, stderr=False) + + + @property + def stderr(self): + '''Returns the stderr''' + + return self.host.client.logs(self.container, stdout=False, stderr=True) + + + def rm(self): + '''Remove the container. If it is not stopped yet, kill it first''' + + self.host.rm(self.container) + + + def status(self): + '''Returns my own "docker" status''' + + return self.host.status(self.container) + + + def kill(self): + '''Stop container''' + + if self.status() == 'running': + self.host.client.kill(self.container) + + + def _stats(self): + + return self.host.client.stats(self.container, decode=True, stream=False) + + + def statistics(self): + '''If the process is still active, returns usage statistics by ``pusutil`` + + + Returns: + + stats (dict): A dictionary object with all collected statistics + + + Raises: + + RuntimeError: In case the process is not active anymore. + + ''' + + data = self._stats() + + previous_cpu = self.previous_stats['cpu_stats'] \ + if self.previous_stats else None + + # merge statistics + retval = dict( + cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']), + memory=stats.memory_statistics(data['memory_stats']), + ) + + self.previous_stats = data + + return retval diff --git a/beat/core/execution.py b/beat/core/execution.py index d18ebc02363c63cb2a6e420ae14298519ffcc589..9636f2b1ac578d7a01ae31140f5d3d59a63fd780 100644 --- a/beat/core/execution.py +++ b/beat/core/execution.py @@ -50,121 +50,6 @@ from . import stats from . import agent -def parse_stdout(data): - """Parses the standard output to separate the statistics of a job""" - - stdout = '' - statistics = None - - if not isinstance(data, str): data = data.decode() #python3 compatibility - - for line in data.split('\n'): - if line.startswith('STATISTICS '): - statistics = stats.Statistics(simplejson.loads(line[len('STATISTICS '):])) - assert statistics.valid, '\n * %s' % '\n * '.join(statistics.errors) - else: stdout += line + '\n' - stdout = stdout[:-1] - - return stdout, statistics or stats.Statistics() - -def parse_stderr(data): - """Parses the standard error to separate error reports of a job""" - - def __strip_atmost_one(s, c): - if len(s) > 1 and s[0] == s[-1] and s[0] == c: return s[1:-1] - return s - - stderr = '' - user_error = '' - system_error = '' - - if not isinstance(data, str): data = data.decode() #python3 compatibility - - for line in data.split('\n'): - - if line.startswith('USER_ERROR'): - if user_error: user_error += 20*'-' + '\\n' - user_error += line[len('USER_ERROR '):] - elif line.startswith('SYSTEM_ERROR '): - if system_error: system_error += 20*'-' + '\\n' - system_error += line[len('SYSTEM_ERROR '):] - else: stderr += line + '\n' - stderr = stderr[:-1] - - user_error = user_error.replace('\\n', '\n') - user_error = user_error.replace('\\\'', "\'") - user_error = __strip_atmost_one(user_error, "'") - system_error = system_error.replace('\\n', '\n') - system_error = system_error.replace('\\\'', "\'") - system_error = __strip_atmost_one(system_error, "'") - - return stderr, user_error, system_error - -def discover_environments(envpath, raise_on_errors=True): - """Returns a dictionary of environments by scanning a list of directories""" - - def is_exe(fpath): - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) - - environments = {} - - for path in envpath: - - for d in os.listdir(path): - - envdir = os.path.realpath(os.path.join(path, d)) - if not os.path.isdir(envdir): continue - logger.debug("Analysing potential environment directory `%s'...", envdir) - execute = os.path.join(envdir, 'bin', 'execute') - describe = os.path.join(envdir, 'bin', 'describe') - - if not (is_exe(execute) and is_exe(describe)): - logger.debug("Ignoring directory `%s' either `bin/execute' or " \ - "`bin/describe' not found or not executable" % envdir) - continue - - try: - description = simplejson.loads(subprocess.check_output(describe)) - except Exception as e: - logger.warn("Ignoring potential environment at `%s' since " \ - "`describe' returned an error: %s", envdir, str(e)) - continue - - name = description['name'] + ' (' + description['version'] + ')' - - if name in environments: - - # if my own `execute' is already registered, give preference to it - mine = os.path.dirname(os.path.dirname(sys.argv[0])) - override = environments[name]['directory'] - if os.path.samefile(mine, override): - logger.info("Skipping override of **existing** environment `%s' " \ - "with base directory at `%s' (it is our own) by `%s'", name, - override, envdir) - continue - - # this check avoids we do a new environment and, by mistake, override - # it with a previous version or the contrary. - if raise_on_errors: - raise RuntimeError("Environments at `%s' and `%s' have the " \ - "same name (`%s'). Distinct environments must be uniquely " \ - "named. Fix this and re-start." % \ - (envdir, environments[name]['directory'], name)) - else: - logger.warn("Overriding **existing** environment `%s' base " \ - "directory with `%s' (it was `%s). To avoid this warning " \ - "make sure the root environment path `%s' does not " \ - "contain environments with the same names", name, envdir, - environments[name]['directory'], os.pathsep.join(envpath)) - - environments[name] = description - environments[name]['execute'] = execute - environments[name]['directory'] = envdir - logger.info("Registered `%s' -> `%s'", name, execute) - - return environments - - class Executor(object): """Executors runs the code given an execution block information, externally @@ -537,9 +422,8 @@ class Executor(object): force_start_index=start_index or 0)) - def process(self, execute_path=None, virtual_memory_in_megabytes=0, - max_cpu_percent=0, cpulimit_path=None, timeout_in_minutes=0, - daemon=0): + def process(self, host, virtual_memory_in_megabytes=0, + max_cpu_percent=0, timeout_in_minutes=0, daemon=0): """Executes the user algorithm code using an external program. If ``executable`` is set, then execute the process using an external @@ -564,27 +448,19 @@ class Executor(object): Parameters: - execute_path (str): The path to the ``execute`` script of the chosen - environment. If set to ``None``, we still may find an ``execute`` - program at the current script install location and use it. Otherwise, - an exception is raised if the parameter ``cores`` is also set. + host (:py:class:Host): A configured docker host that will execute the + user process. If the host does not have access to the required + environment, an exception will be raised. - virtual_memory_in_megabytes (int): The amount of virtual memory (in - Megabytes) available for the job. If set to zero, no limit will be + virtual_memory_in_megabytes (int, Optional): The amount of virtual memory + (in Megabytes) available for the job. If set to zero, no limit will be applied. - max_cpu_percent (int, optional): The maximum amount of CPU usage allowed + max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed in a system. This number must be an integer number between 0 and ``100*number_of_cores`` in your system. For instance, if your system has 2 cores, this number can go between 0 and 200. If it is <= 0, then - we don't track CPU usage. Otherwise, we do, clipping your number at - ``min(max_cpu_percent, 100*psutil.cpu_count())``. - - cpulimit_path (str): The path to the ``cpulimit`` executable to use for - controlling the number of cores the user process can use. - environment. If set to ``None``, we still may find a ``cpulimit`` - program at the current script install location and use it. Otherwise, - an exception is raised if the parameter ``cores`` is also set. + we don't track CPU usage. timeout_in_minutes (int): The number of minutes to wait for the user process to execute. After this amount of time, the user process is @@ -608,21 +484,12 @@ class Executor(object): raise RuntimeError("execution information is bogus:\n * %s" % \ '\n * '.join(self.errors)) - # figures out which execution script to use - execute_local = os.path.join(os.path.dirname(sys.argv[0]), 'execute') - execute = execute_path or execute_local - if not os.path.exists(execute): - raise IOError("external execution agent `%s' does not exist" % execute) - if not os.access(execute, os.X_OK): - raise OSError("file `%s' is not executable" % execute) - - with agent.Agent(execute, virtual_memory_in_megabytes, - max_cpu_percent, cpulimit_path) as runner: + with agent.Agent(virtual_memory_in_megabytes, max_cpu_percent) as runner: self.agent = runner #synchronous call - always returns after a certain timeout - retval = runner.run(self, timeout_in_minutes=timeout_in_minutes, + retval = runner.run(self, host, timeout_in_minutes=timeout_in_minutes, daemon=daemon) #adds I/O statistics from the current executor, if its complete already diff --git a/beat/core/hash.py b/beat/core/hash.py index 07a509a7791599e6c92b4ccdfc99e5da2a727ce8..40b074f302334280d13c83aeaede030d88db587e 100644 --- a/beat/core/hash.py +++ b/beat/core/hash.py @@ -41,8 +41,12 @@ import simplejson def _sha256(s): """A python2/3 replacement for :py:func:`haslib.sha256`""" - if isinstance(s, str): s = six.u(s) - return hashlib.sha256(s.encode('utf8')).hexdigest() + try: + if isinstance(s, str): s = six.u(s) + return hashlib.sha256(s.encode('utf8')).hexdigest() + except: + return hashlib.sha256(s).hexdigest() + def _compact(text): return text.replace(' ', '').replace('\n', '') diff --git a/beat/core/sandboxer.py b/beat/core/sandboxer.py deleted file mode 100755 index 84defe126ea576ce26592b79b0075dd287ef6a73..0000000000000000000000000000000000000000 --- a/beat/core/sandboxer.py +++ /dev/null @@ -1,92 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : - -############################################################################### -# # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # -# Contact: beat.support@idiap.ch # -# # -# This file is part of the beat.core module of the BEAT platform. # -# # -# Commercial License Usage # -# Licensees holding valid commercial BEAT licenses may use this file in # -# accordance with the terms contained in a written agreement between you # -# and Idiap. For further information contact tto@idiap.ch # -# # -# Alternatively, this file may be used under the terms of the GNU Affero # -# Public License version 3 as published by the Free Software and appearing # -# in the file LICENSE.AGPL included in the packaging of this file. # -# The BEAT platform is distributed in the hope that it will be useful, but # -# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # -# or FITNESS FOR A PARTICULAR PURPOSE. # -# # -# You should have received a copy of the GNU Affero Public License along # -# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # -# # -############################################################################### - - -"""A rather insecure sandboxing environment based on pure Python. - -The resulting child process that is created using this sandboxer will have a -limited amount of memory to perform its task. It won't be able to spawn any -external processes. The total execution time is controlled by the parent -process. - -DISCLAIMER: This environment works well under Linux, where child process -resources can be nicely controlled using the 'resource' module. If used in -environments such as OSX, check for the proper way to implement child process -restrictions. -""" - -def sandbox(): - - import logging - logger = logging.getLogger('sandboxer') - logger.setLevel(logging.DEBUG) - ch = logging.StreamHandler() - formatter = logging.Formatter('[%(levelname)s] %(name)s: %(message)s') - ch.setFormatter(formatter) - logger.addHandler(ch) - - from optparse import OptionParser - - parser = OptionParser(usage=__doc__) - parser.add_option("-m", "--virtual-memory-in-megabytes", default=0, - dest="vmem", metavar='MEMORY', type=float, - help="Maximum virtual memory (zero disables it) [default: 0]") - parser.add_option("-v", action="count", default=0, - dest="verbosity", help="increase the script verbosity") - options, args = parser.parse_args() - - # Before we start, adjusts verbosity level - if options.verbosity <= 0: - ch.setLevel(logging.WARNING) - elif options.verbosity == 1: - ch.setLevel(logging.INFO) - else: - ch.setLevel(logging.DEBUG) - - # does the real work - import os - import sys - import resource - import platform - - logger.info("Sandboxing `%s'...", ' '.join(args)) - - vmem = int(1024*1024*float(options.vmem)) #in bytes - if vmem: - if platform.system() == 'Darwin': - logger.info("RLIMIT_AS does not seem to work on MacOSX") - resource.setrlimit(resource.RLIMIT_AS, (vmem, vmem)) - logger.info("Set maximum memory to %d bytes (%g megabytes)", vmem, options.vmem) - - resource.setrlimit(resource.RLIMIT_CORE, (0, 0)) - logger.info("Setting core dump limit to 0") - - logger.info("Running process `%s'...", ' '.join(args)) - os.execv(args[0], args) - -if __name__ == '__main__': - sandbox() diff --git a/beat/core/schema/common/1.json b/beat/core/schema/common/1.json index a5377a9c9d171ffb672e36fb78fe6178fa9bf810..2accbc6bd5cdc60cab3677e8ce9a193a16ee24f7 100644 --- a/beat/core/schema/common/1.json +++ b/beat/core/schema/common/1.json @@ -49,7 +49,7 @@ "enum": [ "unknown", "python", - "binary", + "cxx", "matlab", "r" ] diff --git a/beat/core/stats.py b/beat/core/stats.py index 9d4ee671f393f45ceeae5d61a1f0a7e9ad53f034..1e056b2534bc71eb4e110ce57a83923e1c049001 100644 --- a/beat/core/stats.py +++ b/beat/core/stats.py @@ -31,7 +31,6 @@ import os import time import copy -import psutil import simplejson @@ -267,29 +266,13 @@ def io_statistics(data_sources, input_list, data_sinks, output_list, data, analy ) -def sum_tuples(t): - """Helper for psutil""" - - retval = list(t[0]) - for k in range(len(retval)): retval[k] += sum([z[k] for z in t[1:]]) - return t[0].__class__(*retval) - - -def merge_statistics(processes, entries): - """Helper for psutil""" - - retval = {} - for e in entries: - retval[e] = sum_tuples([getattr(p, e)() for p in processes]) - return retval - - -def cpu_statistics(process_id=None): +def cpu_statistics(start, end): """Summarizes current CPU usage This method should be used when the currently set algorithm is the only one executed through the whole process. It is done for collecting resource - statistics on separate processing environments. + statistics on separate processing environments. It follows the recipe in: + http://stackoverflow.com/questions/30271942/get-docker-container-cpu-usage-as-percentage Returns: @@ -297,28 +280,30 @@ def cpu_statistics(process_id=None): """ - if process_id is None or psutil.pid_exists(process_id): - proc = psutil.Process(process_id) - total_time = time.time() - proc.create_time() - family = [proc] + proc.children(recursive=True) - allstats = merge_statistics(family, ['cpu_times', 'num_ctx_switches']) - cpu_times = allstats['cpu_times'] - ctx_switches = allstats['num_ctx_switches'] - - return { - 'user': cpu_times.user, - 'system': cpu_times.system, - 'total': total_time, - 'context_switches': { - 'voluntary': ctx_switches.voluntary, - 'involuntary': ctx_switches.involuntary, - } - } + if start is not None: + user_cpu = end['cpu_usage']['total_usage'] - \ + start['cpu_usage']['total_usage'] + total_cpu = end['system_cpu_usage'] - start['system_cpu_usage'] + else: - return {} + user_cpu = end['cpu_usage']['total_usage'] + total_cpu = end['system_cpu_usage'] + + user_cpu /= 1000000000. #in seconds + total_cpu /= 1000000000. #in seconds + processors = len(end['cpu_usage']['percpu_usage']) if \ + end['cpu_usage']['percpu_usage'] is not None else 1 + return { + 'user': user_cpu, + 'system': 0., + 'total': total_cpu, + 'percent': 100.*processors*user_cpu/total_cpu if total_cpu else 0., + 'processors': processors, + } -def memory_statistics(process_id=None): + +def memory_statistics(data): """Summarizes current memory usage This method should be used when the currently set algorithm is the only one @@ -331,15 +316,11 @@ def memory_statistics(process_id=None): """ - if process_id is None or psutil.pid_exists(process_id): - proc = psutil.Process(process_id) - family = [proc] + proc.children(recursive=True) - allstats = merge_statistics(family, ['memory_info']) - memory = allstats['memory_info'] - - return { - 'rss': float(memory.rss), - } + limit = float(data['limit']) + memory = float(data['max_usage']) - else: - return {} + return { + 'rss': memory, + 'limit': limit, + 'percent': 100.*memory/limit if limit else 0., + } diff --git a/beat/core/test/cpu_stress.py b/beat/core/test/cpu_stress.py index 5def7f6d0ae3858d88937c0f37281e5355632c21..48883a2b600270d0b18d7d87851e00d8c98dc506 100644 --- a/beat/core/test/cpu_stress.py +++ b/beat/core/test/cpu_stress.py @@ -35,10 +35,6 @@ Runs 2x CPU stress function import sys import multiprocessing -import os -import signal -import time -import psutil def test(x): try: @@ -47,17 +43,10 @@ def test(x): return x * x -def kill_child_processes(signum, frame): - proc = psutil.Process() - for child in psutil.Process(): child.kill() - sys.exit() - - def main(): nworkers = int(sys.argv[1]) pool = multiprocessing.Pool(processes=nworkers) result = pool.map_async(test, range(nworkers)) - signal.signal(signal.SIGTERM, kill_child_processes) pool.close() pool.join() diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/1.json b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/1.json new file mode 100644 index 0000000000000000000000000000000000000000..3bc1de14a26f35b3f6a1513ed967479a37b1d8ad --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/1.json @@ -0,0 +1,19 @@ +{ + "language": "cxx", + "splittable": true, + "groups": [ + { + "name": "main", + "inputs": { + "in_data": { + "type": "user/single_integer/1" + } + }, + "outputs": { + "out_data": { + "type": "user/single_integer/1" + } + } + } + ] +} diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/CMakeLists.txt b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..b0bcdb1af5b64a4ea9cf63014baaa519c9c52e14 --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/CMakeLists.txt @@ -0,0 +1,56 @@ +cmake_minimum_required(VERSION 3.0) +project(BEAT_CORE_CXX_INTEGERS_ECHO) + + +set(BEAT_BACKEND_CXX_DIR "/usr/local/beat") + + +# CMake setup +include(CheckCXXCompilerFlag) +CHECK_CXX_COMPILER_FLAG("-std=c++11" COMPILER_SUPPORTS_CXX11) +CHECK_CXX_COMPILER_FLAG("-std=c++0x" COMPILER_SUPPORTS_CXX0X) +if (COMPILER_SUPPORTS_CXX11) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") +elseif (COMPILER_SUPPORTS_CXX0X) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x") +else() + message(STATUS "The compiler ${CMAKE_CXX_COMPILER} has no C++11 support. Please use a different C++ compiler.") +endif() + + +# Retrieve the dependencies +find_package(Boost REQUIRED) + + +# Setup the search paths +include_directories( + "${BEAT_BACKEND_CXX_DIR}/include" + "${Boost_INCLUDE_DIRS}" +) + +link_directories( + "${BEAT_BACKEND_CXX_DIR}/bin" +) + + +# List the source files +set(HEADERS "algorithm.h" + "beat_setup.h" + "user_single_integer_1.h" +) + +set(SRCS "algorithm.cpp" + "beat_setup.cpp" + "user_single_integer_1.cpp" +) + +# Create and link the library +add_library(cxx_integers_echo SHARED ${SRCS} ${HEADERS}) + +target_link_libraries(cxx_integers_echo beat_backend_cxx) +set_target_properties(cxx_integers_echo PROPERTIES + COMPILE_FLAGS "-fPIC" + OUTPUT_NAME "1" + PREFIX "" + LIBRARY_OUTPUT_DIRECTORY "${BEAT_CORE_CXX_INTEGERS_ECHO_SOURCE_DIR}" +) diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/algorithm.cpp b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/algorithm.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f6466e538a703b39c90c08c6377e382467b1ee6c --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/algorithm.cpp @@ -0,0 +1,43 @@ +// NOTE: This file implements the algorithm declared in the file +// 'user/cxx_integers_echo/1.json' + +#include "algorithm.h" +#include "user_single_integer_1.h" + +using namespace beat::backend::cxx; + + +Algorithm::Algorithm() +{ +} + +//--------------------------------------------------------- + +Algorithm::~Algorithm() +{ +} + +//--------------------------------------------------------- + +bool Algorithm::setup(const json& parameters) +{ + return true; +} + +//--------------------------------------------------------- + +bool Algorithm::process(const InputList& inputs, const OutputList& outputs) +{ + auto data = inputs["in_data"]->data<user::single_integer_1>(); + + outputs["out_data"]->write(data); + + return true; +} + +//--------------------------------------------------------- + +IAlgorithm* create_algorithm() +{ + return new Algorithm(); +} diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/algorithm.h b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/algorithm.h new file mode 100644 index 0000000000000000000000000000000000000000..9be32ffc58b01230e07c1cae2a68e84b55c1e197 --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/algorithm.h @@ -0,0 +1,27 @@ +// NOTE: This file implements the algorithm declared in the file +// 'user/cxx_integers_echo/1.json' + +#ifndef _BEAT_GENERATED_ALGORITHM_H_ +#define _BEAT_GENERATED_ALGORITHM_H_ + +#include <beat.backend.cxx/algorithm.h> + + +class Algorithm: public beat::backend::cxx::IAlgorithm +{ +public: + Algorithm(); + virtual ~Algorithm(); + + virtual bool setup(const json& parameters); + + virtual bool process(const beat::backend::cxx::InputList& inputs, + const beat::backend::cxx::OutputList& outputs); +}; + + +extern "C" { + beat::backend::cxx::IAlgorithm* create_algorithm(); +} + +#endif diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/beat_setup.cpp b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/beat_setup.cpp new file mode 100644 index 0000000000000000000000000000000000000000..87349d52d76113bbff38da01ccdcfe961c76c1f6 --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/beat_setup.cpp @@ -0,0 +1,16 @@ +// NOTE: This file was automatically generated from the algorithm declaration file +// 'user/cxx_integers_echo/1.json' + +#include <beat.backend.cxx/dataformatfactory.h> +#include "beat_setup.h" +#include "user_single_integer_1.h" + +using namespace beat::backend::cxx; + + +void setup_beat_cxx_module() +{ + DataFormatFactory* factory = DataFormatFactory::getInstance(); + + factory->registerDataFormat("user/single_integer/1", &user::single_integer_1::create); +} diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/beat_setup.h b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/beat_setup.h new file mode 100644 index 0000000000000000000000000000000000000000..28337bf58147429445043000b78703f8abf584f4 --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/beat_setup.h @@ -0,0 +1,11 @@ +// NOTE: This file was automatically generated from the algorithm declaration file +// 'user/cxx_integers_echo/1.json' + +#ifndef _BEAT_SETUP_H_ +#define _BEAT_SETUP_H_ + +extern "C" { + void setup_beat_cxx_module(); +} + +#endif diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/user_single_integer_1.cpp b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/user_single_integer_1.cpp new file mode 100644 index 0000000000000000000000000000000000000000..cea28aa6b11645566e1989fe4cae3bd94ae0b130 --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/user_single_integer_1.cpp @@ -0,0 +1,40 @@ +// NOTE: This file was automatically generated from the dataformat declaration file +// 'user/single_integer/1.json' + +#include "user_single_integer_1.h" +#include <beat.backend.cxx/dataformatfactory.h> + +using namespace beat::backend::cxx; + + +user::single_integer_1::single_integer_1() +{ +} + +//--------------------------------------------------------- + +size_t user::single_integer_1::size() const +{ + return sizeof(int32_t); +} + +//--------------------------------------------------------- + +void user::single_integer_1::pack(uint8_t** buffer) const +{ + beat::backend::cxx::pack(value, buffer); +} + +//--------------------------------------------------------- + +void user::single_integer_1::unpack(uint8_t** buffer) +{ + value = beat::backend::cxx::unpack_scalar<int32_t>(buffer); +} + +//--------------------------------------------------------- + +Data* user::single_integer_1::create() +{ + return new user::single_integer_1(); +} diff --git a/beat/core/test/prefix/algorithms/user/cxx_integers_echo/user_single_integer_1.h b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/user_single_integer_1.h new file mode 100644 index 0000000000000000000000000000000000000000..57c53b0173bc414dc163bc8183b84921fb6765cb --- /dev/null +++ b/beat/core/test/prefix/algorithms/user/cxx_integers_echo/user_single_integer_1.h @@ -0,0 +1,28 @@ +// NOTE: This file was automatically generated from the dataformat declaration file +// 'user/single_integer/1.json' + +#ifndef _BEAT_GENERATED_user_single_integer_1_H_ +#define _BEAT_GENERATED_user_single_integer_1_H_ + +#include <beat.backend.cxx/data.h> + +namespace user { + +class single_integer_1: public beat::backend::cxx::Data +{ +public: + single_integer_1(); + + virtual size_t size() const; + virtual void pack(uint8_t** buffer) const; + virtual void unpack(uint8_t** buffer); + + static Data* create(); + +public: + int32_t value; +}; + +} + +#endif diff --git a/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json b/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json new file mode 100644 index 0000000000000000000000000000000000000000..1f3750601973152ab3c47a5f3a68be67d83efad1 --- /dev/null +++ b/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json @@ -0,0 +1,52 @@ +{ + "analyzers": { + "analysis": { + "algorithm": "user/integers_echo_analyzer/1", + "inputs": { + "in_data": "in" + } + } + }, + "blocks": { + "echo1": { + "algorithm": "user/cxx_integers_echo/1", + "inputs": { + "in_data": "in" + }, + "outputs": { + "out_data": "out" + }, + "environment": { + "name": "cxx_environment", + "version": "1" + } + }, + "echo2": { + "algorithm": "user/cxx_integers_echo/1", + "inputs": { + "in_data": "in" + }, + "outputs": { + "out_data": "out" + }, + "environment": { + "name": "cxx_environment", + "version": "1" + } + } + }, + "datasets": { + "set": { + "database": "simple/1", + "protocol": "protocol", + "set": "set" + } + }, + "globals": { + "queue": "queue", + "environment": { + "name": "environment", + "version": "1" + } + } +} diff --git a/beat/core/test/test_async.py b/beat/core/test/test_async.py deleted file mode 100644 index db1b8a1913b920d6815cd0a41d1a5ee092a7e49e..0000000000000000000000000000000000000000 --- a/beat/core/test/test_async.py +++ /dev/null @@ -1,221 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : - -############################################################################### -# # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # -# Contact: beat.support@idiap.ch # -# # -# This file is part of the beat.core module of the BEAT platform. # -# # -# Commercial License Usage # -# Licensees holding valid commercial BEAT licenses may use this file in # -# accordance with the terms contained in a written agreement between you # -# and Idiap. For further information contact tto@idiap.ch # -# # -# Alternatively, this file may be used under the terms of the GNU Affero # -# Public License version 3 as published by the Free Software and appearing # -# in the file LICENSE.AGPL included in the packaging of this file. # -# The BEAT platform is distributed in the hope that it will be useful, but # -# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # -# or FITNESS FOR A PARTICULAR PURPOSE. # -# # -# You should have received a copy of the GNU Affero Public License along # -# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # -# # -############################################################################### - - -"""Asynchronous process I/O with the Subprocess module -""" - -import os -import sys -import time -import signal -import platform -import pkg_resources - -import nose -import nose.tools - -from ..async import Popen, resolve_cpulimit_path, TimeoutExpired -from .utils import slow - -# in case you want to see the printouts dynamically, set to ``True`` -if False: - import logging - logger = logging.getLogger() #root logger - logger.setLevel(logging.DEBUG) - ch = logging.StreamHandler() - ch.setLevel(logging.DEBUG) - ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) - logger.addHandler(ch) - - -@nose.tools.raises(OSError) -def test_non_existing(): - - p = Popen(["sdfsdfdsf329909092"]) - - -def test_echo_hello_world(): - - string = "hello, world" - p = Popen(["echo", string]) - status = p.wait() - nose.tools.eq_(status, 0) - time.sleep(0.5) - nose.tools.eq_(p.peek_stdout(), string + '\n') - nose.tools.eq_(p.peek_stderr(), '') - - -def test_timeout(): - - sleep_for = 100 # seconds - p = Popen(["sleep", str(sleep_for)]) - try: - retval = p.wait(timeout=0.5) - assert False, "timeout never occurred after %d seconds" % sleep_for - except TimeoutExpired as e: - p.kill() - status = p.wait() - nose.tools.eq_(status, -signal.SIGKILL) - nose.tools.eq_(p.peek_stdout(), '') - nose.tools.eq_(p.peek_stderr(), '') - - -def test_does_not_timeout(): - - sleep_for = 0.5 # seconds - p = Popen(["sleep", str(sleep_for)]) - status = p.wait(1) #should not timeout - nose.tools.eq_(status, 0) - nose.tools.eq_(p.peek_stdout(), '') - nose.tools.eq_(p.peek_stderr(), '') - - -def test_memory_limit(): - - if platform.system() == 'Darwin': - raise nose.SkipTest("Memory limit does not work on OSX") - - p = Popen([ - sys.executable, - "-c", - '; '.join([ - "d = (0,) * (10 * 1024 * 1024)", - "print('Allocated %d' % len(d))", - ]), - ], virtual_memory_in_megabytes=1) - status = p.wait() - nose.tools.eq_(status, -11) - nose.tools.eq_(p.peek_stdout(), '') - nose.tools.eq_(p.peek_stderr(), '') - - -def test_limit_stdout(): - - p = Popen([ - sys.executable, - "-c", - '; '.join([ - "import sys", - "sys.stdout.write(' '.join([str(k) for k in range(2**17)]))", - "sys.stdout.flush()", - ]), - ]) - status = p.wait() - nose.tools.eq_(status, 0) - data = p.peek_stdout() - nose.tools.eq_(len(data), 65500) - expected = str((2**17)-1) - assert data.endswith(expected) - nose.tools.eq_(p.peek_stderr(), '') - - -def test_limit_stderr(): - - p = Popen([ - sys.executable, - "-c", - '; '.join([ - "import sys", - "sys.stderr.write(' '.join([str(k) for k in range(2**17)]))", - "sys.stderr.flush()", - ]), - ]) - status = p.wait() - nose.tools.eq_(status, 0) - data = p.peek_stderr() - nose.tools.eq_(len(data), 65500) - expected = str((2**17)-1) - assert data.endswith(expected) - nose.tools.eq_(p.peek_stdout(), '') - - -def test_limit_both(): - - p = Popen([ - sys.executable, - '-c' , - '; '.join([ - "import sys", - "sys.stderr.write(' '.join([str(k) for k in range(2**17)]))", - "sys.stderr.flush()", - "sys.stdout.write(' '.join([str(k) for k in range(2**17)]))", - "sys.stdout.flush()", - ]), - ]) - status = p.wait() - nose.tools.eq_(status, 0) - - data = p.peek_stdout() - nose.tools.eq_(len(data), 65500) - expected = str((2**17)-1) - assert data.endswith(expected) - - data = p.peek_stderr() - nose.tools.eq_(len(data), 65500) - expected = str((2**17)-1) - assert data.endswith(expected) - - -def run_cpulimit(processes, max_cpu_percent, sleep_time): - - program = pkg_resources.resource_filename(__name__, 'cpu_stress.py') - p = Popen([ - sys.executable, - program, - str(processes), - ], max_cpu_percent=max_cpu_percent) - - try: - - time.sleep(sleep_time) - stats = p.statistics() - percent = (stats['cpu']['user'] + stats['cpu']['system']) / stats['cpu']['total'] - #print percent - assert percent < (1.2*max_cpu_percent), "%.2f%% much greater than the expected ceiling at %.2f%%!" % (percent, max_cpu_percent) - - finally: - - p.kill() - - # make sure nothing is there anymore - nose.tools.eq_(p.wait(), -signal.SIGKILL) - status = p.cpulimit_process.wait() - status_choices = (0, -13) #lazy mode, should just stop gracefully - assert status in status_choices, "status (%d) not in %s" % (status, status_choices) - - -@slow -def test_cpulimit_at_20percent(): - stats = run_cpulimit(1, 20, 3) - -@slow -def test_cpulimit_at_100percent(): - run_cpulimit(4, 100, 3) - -def test_cpulimit_search(): - assert os.path.exists(resolve_cpulimit_path(None)) diff --git a/beat/core/test/test_docker.py b/beat/core/test/test_docker.py new file mode 100755 index 0000000000000000000000000000000000000000..96237c12b753d95ca87e3841eb54434e779f6891 --- /dev/null +++ b/beat/core/test/test_docker.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +"""Asynchronous process I/O with the Subprocess module +""" + +import os +import sys +import time +import unittest +import pkg_resources + +import docker +import requests + +from ..dock import Popen, Host + +# in case you want to see the printouts dynamically, set to ``True`` +if False: + import logging + logger = logging.getLogger() #root logger + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) + logger.addHandler(ch) + + +class AsyncTest(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.host = Host() + cls.host.setup(raise_on_errors=False) + + + @classmethod + def tearDownClass(cls): + cls.host.teardown() + + + def test_echo(self): + + string = "hello, world" + with Popen(self.host, 'debian:8.4', ["echo", string]) as p: + status = p.wait() + self.assertEqual(status, 0) + self.assertEqual(p.stdout, string + '\n') + self.assertEqual(p.stderr, '') + + assert not self.host.containers #all containers are gone + + + def test_non_existing(self): + + self.assertRaises(docker.errors.NotFound, Popen, + self.host, 'debian:8.4', ["sdfsdfdsf329909092"]) + + self.host.teardown() + + assert not self.host.containers #all containers are gone + + + def test_timeout(self): + + sleep_for = 100 # seconds + with Popen(self.host, 'debian:8.4', ["sleep", str(sleep_for)]) as p: + + try: + retval = p.wait(timeout=0.5) + assert False, "timeout never occurred after %d seconds" % sleep_for + except requests.exceptions.ReadTimeout as e: + self.assertEqual(p.status(), 'running') + + p.kill() + status = p.wait() + self.assertEqual(p.status(), 'exited') + self.assertEqual(status, 137) + self.assertEqual(p.stdout, '') + self.assertEqual(p.stderr, '') + + assert not self.host.containers #all containers are gone + + + def test_does_not_timeout(self): + + sleep_for = 0.5 # seconds + with Popen(self.host, 'debian:8.4', ["sleep", str(sleep_for)]) as p: + status = p.wait(5) #should not timeout + self.assertEqual(p.status(), 'exited') + self.assertEqual(status, 0) + self.assertEqual(p.stdout, '') + self.assertEqual(p.stderr, '') + + assert not self.host.containers #all containers are gone + + + def test_memory_limit(self): + + with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([ + "print('Before')", + "import sys; sys.stdout.flush()", + "d = '0' * (10 * 1024 * 1024)", + "import time; time.sleep(5)", + "print('After')", + ]), + ], name='memory_limit', virtual_memory_in_megabytes=4) as p: + + time.sleep(2) + stats = p.statistics() + status = p.wait() + self.assertEqual(p.status(), 'exited') + self.assertEqual(status, 137) + self.assertEqual(p.stdout.strip(), 'Before') + self.assertEqual(p.stderr, '') + + assert not self.host.containers #all containers are gone + + + def test_memory_limit2(self): + + with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([ + "print('Before')", + "import sys; sys.stdout.flush()", + "d = '0' * (10 * 1024 * 1024)", + "import time; time.sleep(5)", + "print('After')", + ]), + ], name='memory_limit2', virtual_memory_in_megabytes=100) as p: + + time.sleep(2) + stats = p.statistics() + status = p.wait() + assert stats['memory']['percent'] > 10, 'Memory check failed, ' \ + '%d%% <= 10%%' % stats['memory']['percent'] + assert stats['memory']['percent'] < 15, 'Memory check failed, ' \ + '%d%% >= 15%%' % stats['memory']['percent'] + self.assertEqual(p.status(), 'exited') + self.assertEqual(status, 0) + self.assertEqual(p.stdout.strip(), 'Before\nAfter') + self.assertEqual(p.stderr, '') + + assert not self.host.containers #all containers are gone + + + def notest_limit_stdout(self): + + # disabled: we limit the maximum output to 1M internally + + size = 2**16 #bytes + with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([ + "import sys", + "for i in range(%d): sys.stdout.write('%%d\n' %% i)" % size, + "sys.stdout.flush()", + ]), + ], buflen=size) as p: + + status = p.wait() + self.assertEqual(status, 0) + data = p.stdout + self.assertEqual(len(data), size) + expected = str(size-1) + assert data.rstrip().endswith(expected) + self.assertEqual(p.stderr, '') + + assert not self.host.containers #all containers are gone + + + def notest_limit_stderr(self): + + # disabled: we limit the maximum output to 1M internally + + size = 2**16 #bytes + with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([ + "import sys", + "for i in range(%d): sys.stderr.write('%%d\n' %% i)" % size, + "sys.stderr.flush()", + ]), + ], buflen=size) as p: + + status = p.wait() + self.assertEqual(status, 0) + data = p.stderr + self.assertEqual(len(data), size) + expected = str(size-1) + assert data.rstrip().endswith(expected) + self.assertEqual(p.stdout, '') + + assert not self.host.containers #all containers are gone + + + def notest_limit_both(self): + + # disabled: we limit the maximum output to 1M internally + + size = 2**16 #bytes + with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([ + "import sys", + "for i in range(%d): sys.stdout.write('%%d\n' %% i)" % size, + "sys.stdout.flush()", + "for i in range(%d): sys.stderr.write('%%d\n' %% i)" % size, + "sys.stderr.flush()", + ]), + ], buflen=size) as p: + + status = p.wait() + self.assertEqual(status, 0) + + data = p.stdout + self.assertEqual(len(data), size) + expected = str(size-1) + assert data.rstrip().endswith(expected) + + data = p.stderr + self.assertEqual(len(data), size) + expected = str(size-1) + assert data.rstrip().endswith(expected) + + assert not self.host.containers #all containers are gone + + + def _run_cpulimit(self, processes, max_cpu_percent, sleep_time): + + program = pkg_resources.resource_filename(__name__, 'cpu_stress.py') + tmp_name = os.path.join('/tmp', os.path.basename(program)) + + with Popen(self.host, 'environment (1)', ['python', tmp_name, + str(processes)], max_cpu_percent=max_cpu_percent, + tmp_archive=program) as p: + + p.statistics() # start recording + time.sleep(sleep_time) + stats = p.statistics() + self.assertEqual(p.status(), 'running') + percent = stats['cpu']['percent'] + assert percent < (1.1*max_cpu_percent), "%.2f%% is more than 20%% off the expected ceiling at %d%%!" % (percent, max_cpu_percent) + + # make sure nothing is there anymore + p.kill() + self.assertEqual(p.wait(), 137) + + assert not self.host.containers #all containers are gone + + + def test_cpulimit_at_20percent(self): + # runs 1 process that should consume at most 20% of the host CPU + self._run_cpulimit(1, 20, 3) + + + def test_cpulimit_at_100percent(self): + # runs 4 processes that should consume 50% of the host CPU + self._run_cpulimit(4, 100, 3) diff --git a/beat/core/test/test_execution.py b/beat/core/test/test_execution.py index 0ee2e9561e4f1209dfb0d297c7372b9710592341..a660f99c0ee318bbf189dc1ceeabe4c97d42cd91 100644 --- a/beat/core/test/test_execution.py +++ b/beat/core/test/test_execution.py @@ -43,177 +43,173 @@ if False: logger.addHandler(ch) import numpy -import nose.tools +import unittest from ..experiment import Experiment from ..execution import Executor from ..hash import hashFileContents from ..data import CachedDataSource +from ..dock import Host from . import prefix, tmp_prefix -from .utils import slow, cleanup - - -def check_output(prefix, path): - '''Checks if a given output exists, together with its indexes and checksums - ''' - - finalpath = os.path.join(prefix, path) - datafiles = glob.glob(finalpath + '*.data') - datachksums = glob.glob(finalpath + '*.data.checksum') - indexfiles = glob.glob(finalpath + '*.index') - indexchksums = glob.glob(finalpath + '*.index.checksum') - assert datafiles - nose.tools.eq_(len(datafiles), len(indexfiles)) - for k in datafiles + indexfiles: - checksum_file = k + '.checksum' - assert checksum_file in datachksums + indexchksums - stored_checksum = None - with open(checksum_file, 'rt') as f: stored_checksum = f.read().strip() - current_checksum = hashFileContents(k) - nose.tools.eq_(current_checksum, stored_checksum) - - -def load_result(executor): - '''Loads the result of an experiment, in a single go''' - - f = CachedDataSource() - assert f.setup(os.path.join(executor.cache, - executor.data['result']['path'] + '.data'), executor.prefix) - data, start, end = f.next() - nose.tools.eq_(start, 0) - nose.tools.eq_(start, end) - f.close() - return data - - -def execute(label, expected_result): - """Executes the full experiment, block after block, returning results. If an - error occurs, returns information about the err'd block. Otherwise, returns - ``None``. - - This bit of code mimics the scheduler, but does everything on the local - machine. It borrows some code from the package ``beat.cmdline``. - """ - - dataformat_cache = {} - database_cache = {} - algorithm_cache = {} - - experiment = Experiment(prefix, label, - dataformat_cache, database_cache, algorithm_cache) - - assert experiment.valid, '\n * %s' % '\n * '.join(experiment.errors) - - scheduled = experiment.setup() - - # can we execute it? - results = [] - for key, value in scheduled.items(): - executor = Executor(prefix, value['configuration'], tmp_prefix, - dataformat_cache, database_cache, algorithm_cache) - assert executor.valid, '\n * %s' % '\n * '.join(executor.errors) - - with executor: - result = executor.process() - assert result - assert 'status' in result - assert 'stdout' in result - assert 'stderr' in result - assert 'statistics' in result - assert 'timed_out' in result - assert 'system_error' in result - assert 'user_error' in result - if result['status'] != 0: - logger.warn("(eventual) system errors: %s", result['system_error']) - logger.warn("(eventual) user errors: %s", result['user_error']) - logger.warn("stdout: %s", result['stdout']) - logger.warn("stderr: %s", result['stderr']) - return result - if result['system_error']: - logger.warn("system errors: %s", result['system_error']) - return result - assert result['status'] == 0 - assert isinstance(result['statistics'], dict) - assert result['statistics'] - - if executor.analysis: - check_output(tmp_prefix, executor.data['result']['path']) - results.append(load_result(executor)) - else: - for name, details in executor.data['outputs'].items(): - check_output(tmp_prefix, details['path']) - - # compares all results - assert results - for k, result in enumerate(results): - expected = result.__class__() - expected.from_dict(expected_result[k], casting='unsafe') #defaults=False - assert result.isclose(expected), "%r is not close enough to %r" % (result.as_dict(), expected.as_dict()) - - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_integers_addition_1(): - assert execute('user/user/integers_addition/1/integers_addition', - [{'sum': 12272, 'nb': 10}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_integers_addition_2(): - assert execute('user/user/integers_addition/2/integers_addition', - [{'sum': 18392, 'nb': 10}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_single_1_single(): - assert execute('user/user/single/1/single', [{'out_data': 42}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_single_1_add(): - assert execute('user/user/single/1/single_add', [{'out_data': 43}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_single_1_add2(): - assert execute('user/user/single/1/single_add2', [{'out_data': 44}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_single_1_error(): - result = execute('user/user/single/1/single_error', [None]) - assert result - nose.tools.eq_(result['status'], 1) - assert result['user_error'] - assert 'NameError' in result['user_error'] - nose.tools.eq_(result['system_error'], '') - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_single_1_large(): - assert execute('user/user/single/1/single_large', [{'out_data': 2.0}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_double_1(): - assert execute('user/user/double/1/double', [{'out_data': 42}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_triangle_1(): - assert execute('user/user/triangle/1/triangle', [{'out_data': 42}]) is None - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_too_many_nexts(): - result = execute('user/user/triangle/1/too_many_nexts', [None]) - assert result - nose.tools.eq_(result['status'], 9) - assert result['user_error'] - assert 'no more data' in result['user_error'] - -@slow -@nose.tools.with_setup(teardown=cleanup) -def test_double_triangle_1(): - assert execute('user/user/double_triangle/1/double_triangle', [{'out_data': 42}]) is None +from .utils import cleanup + + +class TestExecution(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.host = Host() + cls.host.setup(raise_on_errors=False) + + + @classmethod + def tearDownClass(cls): + cls.host.teardown() + cleanup() + + + def check_output(self, prefix, path): + '''Checks if a given output exists, together with its indexes and checksums + ''' + + finalpath = os.path.join(prefix, path) + datafiles = glob.glob(finalpath + '*.data') + datachksums = glob.glob(finalpath + '*.data.checksum') + indexfiles = glob.glob(finalpath + '*.index') + indexchksums = glob.glob(finalpath + '*.index.checksum') + assert datafiles + self.assertEqual(len(datafiles), len(indexfiles)) + for k in datafiles + indexfiles: + checksum_file = k + '.checksum' + assert checksum_file in datachksums + indexchksums + stored_checksum = None + with open(checksum_file, 'rt') as f: stored_checksum = f.read().strip() + current_checksum = hashFileContents(k) + self.assertEqual(current_checksum, stored_checksum) + + + def load_result(self, executor): + '''Loads the result of an experiment, in a single go''' + + f = CachedDataSource() + assert f.setup(os.path.join(executor.cache, + executor.data['result']['path'] + '.data'), executor.prefix) + data, start, end = f.next() + self.assertEqual(start, 0) + self.assertEqual(start, end) + f.close() + return data + + + def execute(self, label, expected_result): + """Executes the full experiment, block after block, returning results. If an + error occurs, returns information about the err'd block. Otherwise, returns + ``None``. + + This bit of code mimics the scheduler, but does everything on the local + machine. It borrows some code from the package ``beat.cmdline``. + """ + + dataformat_cache = {} + database_cache = {} + algorithm_cache = {} + + experiment = Experiment(prefix, label, + dataformat_cache, database_cache, algorithm_cache) + + assert experiment.valid, '\n * %s' % '\n * '.join(experiment.errors) + + scheduled = experiment.setup() + + # can we execute it? + results = [] + for key, value in scheduled.items(): + executor = Executor(prefix, value['configuration'], tmp_prefix, + dataformat_cache, database_cache, algorithm_cache) + assert executor.valid, '\n * %s' % '\n * '.join(executor.errors) + + with executor: + result = executor.process(self.host, timeout_in_minutes=1) + assert result + assert 'status' in result + assert 'stdout' in result + assert 'stderr' in result + assert 'statistics' in result + assert 'timed_out' in result + assert 'system_error' in result + assert 'user_error' in result + if result['status'] != 0: + logger.warn("(eventual) system errors: %s", result['system_error']) + logger.warn("(eventual) user errors: %s", result['user_error']) + logger.warn("stdout: %s", result['stdout']) + logger.warn("stderr: %s", result['stderr']) + return result + if result['system_error']: + logger.warn("system errors: %s", result['system_error']) + return result + assert result['status'] == 0 + assert isinstance(result['statistics'], dict) + assert result['statistics'] + + if executor.analysis: + self.check_output(tmp_prefix, executor.data['result']['path']) + results.append(self.load_result(executor)) + else: + for name, details in executor.data['outputs'].items(): + self.check_output(tmp_prefix, details['path']) + + # compares all results + assert results + for k, result in enumerate(results): + expected = result.__class__() + expected.from_dict(expected_result[k], casting='unsafe') #defaults=False + assert result.isclose(expected), "%r is not close enough to %r" % (result.as_dict(), expected.as_dict()) + + + def test_integers_addition_1(self): + assert self.execute('user/user/integers_addition/1/integers_addition', + [{'sum': 12272, 'nb': 10}]) is None + + def test_integers_addition_2(self): + assert self.execute('user/user/integers_addition/2/integers_addition', + [{'sum': 18392, 'nb': 10}]) is None + + def test_single_1_single(self): + assert self.execute('user/user/single/1/single', [{'out_data': 42}]) is None + + def test_single_1_add(self): + assert self.execute('user/user/single/1/single_add', [{'out_data': 43}]) is None + + def test_single_1_add2(self): + assert self.execute('user/user/single/1/single_add2', [{'out_data': 44}]) is None + + def test_single_1_error(self): + result = self.execute('user/user/single/1/single_error', [None]) + assert result + self.assertEqual(result['status'], 1) + assert result['user_error'] + assert 'NameError' in result['user_error'] + self.assertEqual(result['system_error'], '') + + def test_single_1_large(self): + assert self.execute('user/user/single/1/single_large', [{'out_data': 2.0}]) is None + + def test_double_1(self): + assert self.execute('user/user/double/1/double', [{'out_data': 42}]) is None + + def test_triangle_1(self): + assert self.execute('user/user/triangle/1/triangle', [{'out_data': 42}]) is None + + def test_too_many_nexts(self): + result = self.execute('user/user/triangle/1/too_many_nexts', [None]) + assert result + self.assertEqual(result['status'], 137) + assert result['user_error'] + assert 'no more data' in result['user_error'] + + def test_double_triangle_1(self): + assert self.execute('user/user/double_triangle/1/double_triangle', [{'out_data': 42}]) is None + + def test_cxx_double_1(self): + assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None diff --git a/beat/core/utils.py b/beat/core/utils.py index f80084198ea68706a41a4d059363d6a7cda38ed2..0811e739aceb009bdd83f08f34dff39e7b124c02 100644 --- a/beat/core/utils.py +++ b/beat/core/utils.py @@ -93,7 +93,7 @@ def extension_for_language(language): return dict( unknown = '', - binary = '.bin', + cxx = '.so', matlab = '.m', python = '.py', r = '.r', diff --git a/buildout.cfg b/buildout.cfg index 120926c43a12e0c0078befe5513e234bca4abc47..4b9f9a0d4de333df65f5b62e466540962b1947ca 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -1,5 +1,6 @@ [buildout] -parts = scripts cpulimit +index = https://pypi.org/simple +parts = scripts cxx_integers_echo_algorithm extensions = mr.developer auto-checkout = * develop = . @@ -7,19 +8,26 @@ newest = false eggs = beat.core beat.backend.python ipdb - coverage [sources] -beat.backend.python = git git@gitlab.idiap.ch:beat/beat.backend.python -cpulimit = git https://github.com/opsengine/cpulimit rev=v0.2 egg=false +beat.backend.python = git https://gitlab.idiap.ch/beat/beat.backend.python -[cpulimit] +[scripts] +recipe = bob.buildout:scripts + +[cxx_integers_echo_algorithm] recipe = collective.recipe.cmd -cmds = make -C src/cpulimit - cd bin && ln -sf ../src/cpulimit/src/cpulimit && cd .. -uninstall_cmds = rm -f bin/cpulimit +cmds = cd beat/core/test/prefix/algorithms/user/ + tar -cf cxx_integers_echo.tar cxx_integers_echo/ + docker run -dti --name build docker.idiap.ch/beat/beat.env.cxx_base:0.1.6 > /dev/null + docker cp cxx_integers_echo.tar build:/tmp/cxx_integers_echo.tar + docker exec build bash -c 'cd /tmp ; tar -xf /tmp/cxx_integers_echo.tar' + docker exec build bash -c 'cd /tmp/cxx_integers_echo ; mkdir build ; cd build ; cmake .. ; make' + docker cp build:/tmp/cxx_integers_echo/1.so cxx_integers_echo/. + docker stop build > /dev/null + docker rm build > /dev/null + rm cxx_integers_echo.tar + cd ../../../../../.. +uninstall_cmds = rm -f beat/core/test/prefix/algorithms/user/cxx_integers_echo/1.so on_install = true on_update = true - -[scripts] -recipe = bob.buildout:scripts diff --git a/setup.py b/setup.py index 1c87a1f77519e530c5701cdbd1096033deecfc65..010ee031ba71fb16500fecf6b50c78ff963c968a 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,6 @@ requires = [ "jsonschema", "numpy", "pip", - "psutil", "setuptools", "simplejson", "six", @@ -45,6 +44,7 @@ requires = [ "matplotlib>=1.4", "gevent", "pyzmq", + "docker-py", ] # The only thing we do in this file is to call the setup() function with all