#!/usr/bin/env python # vim: set fileencoding=utf-8 : ############################################################################### # # # Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.core module of the BEAT platform. # # # # Commercial License Usage # # Licensees holding valid commercial BEAT licenses may use this file in # # accordance with the terms contained in a written agreement between you # # and Idiap. For further information contact tto@idiap.ch # # # # Alternatively, this file may be used under the terms of the GNU Affero # # Public License version 3 as published by the Free Software and appearing # # in the file LICENSE.AGPL included in the packaging of this file. # # The BEAT platform is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # # or FITNESS FOR A PARTICULAR PURPOSE. # # # # You should have received a copy of the GNU Affero Public License along # # with the BEAT platform. If not, see http://www.gnu.org/licenses/. # # # ############################################################################### '''Implementation of subprocess-based asynchronous running with greenlets ''' import os import tarfile import logging logger = logging.getLogger(__name__) import six import docker import requests import simplejson from . import stats class Host(object): '''An object of this class can connect to the docker host and resolve stuff''' def __init__(self, use_machine=None, **kwargs): self.machine = None #normally, no need to use docker-machine self.use_machine = None self.started_machine = False self.client = None self.containers = [] if use_machine is not None: import machine self.machine = machine.Machine() self.use_machine = use_machine if not self.machine.exists(self.use_machine): raise RuntimeError("no docker machine named `%s' found " \ " - perhaps you forgot to create it?" % self.use_machine) elif 'host' in kwargs and 'port' in kwargs: host = kwargs.get('host') del kwargs['host'] port = kwargs.get('port') del kwargs['port'] kwargs['base_url'] = "http://%s:%s" % (host, port) self.kwargs = kwargs self.beat_environments = {} def setup(self): if self.use_machine is not None: # requires a bootstrapping machine to run if not self.machine.status(self.use_machine): logger.info("Starting docker machine `%s'...", self.use_machine) self.machine.start(self.use_machine) self.started_machine = True self.client = \ docker.Client(**self.machine.config(machine=self.use_machine)) else: self.client = docker.Client(**self.kwargs) self.beat_environments = self._discover_environments(raise_on_errors=True) def teardown(self): for container in self.containers: self.rm(container) if self.use_machine is not None and self.started_machine: logger.info("Stopping docker machine `%s'...", self.use_machine) self.machine.stop(self.use_machine) self.started_machine = False self.client = None def _discover_environments(self, raise_on_errors=True): '''Returns a dictionary containing information about docker environments Parameters: raise_on_errors (bool, Optional): If we should raise an exception (``RuntimeError``) in case installed environments override each other and we can't know which to use. Raises: RuntimeError: if you set ``raise_on_errors`` and I found environments that override each other for their description keys (`` ()``), which should be unique. ''' def _describe(image): '''Tries to run the "describe" app on the image, collect results''' status, output = self.get_statusoutput(image, 'describe') if status == 0: try: return simplejson.loads(output) except Exception as e: logger.warn("Ignoring potential environment at `%s' since " \ "`describe' output cannot be parsed: %s", image, str(e)) return {} retval = {} for image in self.client.images(): # call the "describe" application on each existing image tag = image['RepoTags'][0] if image['RepoTags'] else None id = image['Id'].split(':')[1][:12] logger.debug("Checking image `%s' (%s)...", tag, id) description = _describe(image['Id']) if not description: continue key = description['name'] + ' (' + description['version'] + ')' if key in retval: # this check avoids we do a new environment and, by mistake, override # it with a previous version or the contrary. if raise_on_errors: raise RuntimeError("Environments at `%s' and `%s' have the " \ "same name (`%s'). Distinct environments must be " \ "uniquely named. Fix this and re-start." % \ (envdir, retval[key]['image'], key)) else: logger.warn("Overriding **existing** environment `%s' image " \ "with `%s' (it was `%s). To avoid this warning make " \ "sure your docker images do not contain environments " \ "with the same names", key, retval[key]['image'], image['Id']) retval[key] = description retval[key]['image'] = image['Id'] retval[key]['tag'] = tag retval[key]['short_id'] = id logger.info("Registered `%s' -> `%s (%s)'", key, tag, id) logger.debug("Found %d environments", len(retval)) return retval def create_container(self, image, command, tmp_archive=None, **args): """Prepares the docker container for running the user code Parameters: image (str): A unique identifier for the image to create command (list): A list of strings with the command to run inside the container. tmp_archive (bytes): An archive to copy into the temporary directory of the container (``/tmp``), supposedly with information that is used by the command. args (dict): A list of extra arguments to pass to the underlying ``create_container()`` call from the docker Python API. Returns: docker.Container: The container, already preloaded and ready to run. The container is not started by this method. """ if image in self.beat_environments: #replace by a real image name attrs = self.beat_environments[image] if attrs['tag'] is not None: image = attrs['tag'] else: image = attrs['short_id'] logger.info("[docker] create_container %s %s", image, ' '.join(command)) container = self.client.create_container(image=image, command=command, **args) self.containers.append(container) if tmp_archive is not None: # Place the tarball into the container logger.info("[docker] archive -> %s@/tmp", container['Id'][:12]) self.client.put_archive(container, '/tmp', tmp_archive) return container def start(self, container): '''Starts a given container''' logger.info("[docker] start %s", container['Id'][:12]) self.client.start(container) def status(self, container): '''Checks the status of a given container ''' try: logger.info("[docker] inspect %s", container['Id'][:12]) z = self.client.inspect_container(container) return z['State']['Status'] except Exception as e: return None def rm(self, container): '''Removes a given container. If it is not done, kill it first''' status = self.status(container) if status not in ('created', 'exited'): logger.warn("Killing container `%s' which is on state `%s'", container['Id'], status) self.client.kill(container) else: logger.info("[docker] rm %s", container['Id'][:12]) self.client.remove_container(container) self.containers.remove(container) def get_statusoutput(self, image, command, **kwargs): '''Runs a command and retrieves status and output''' container = self.create_container(image=image, command=command, **kwargs) try: self.start(container) status = self.client.wait(container) output = self.client.logs(container) except Exception: return 1, None finally: self.rm(container) return status, output def make_inmemory_tarball(path): '''Creates an in-memory tarball of the contents of path Parameters: path (str): The path to a file or directory that will be tarred. Returns: bytes: A byte-string representing the packaged object(s). ''' # Prepare the tarball, remove the temporary directory c = six.moves.cStringIO() with tarfile.open(mode='w', fileobj=c) as tar: tar.add(path, arcname=os.path.basename(path)) return c.getvalue() class Popen: '''Process-like manager for asynchronously executing user code in a container The process will be run in the background, and its standard output and standard error will be read after it finishes, into a limited size circular buffer. Parameters: host (:py:class:`Host`): The docker host where to start the container. The host must be properly initialized, including starting the appropriate docker-machine, if that is the case. This normally implies including calls to this object inside a protected ``with host`` section. image (str): A string uniquely identifying the image from which to start the container command (list): A list of strings with the command to run inside the container. tmp_archive (str, bytes, Optional): An archive to copy into the temporary directory of the container, supposedly with information that is used by the command. If a string is given, than it is considered as a path from which the archive is going to be created. virtual_memory_in_megabytes (int, Optional): The maximum amount of memory the user process can consume on the host. If not specified, a memory limit is not set. max_cpu_percent (float, Optional): The maximum amount of CPU the user process may consume on the host. The value ``100`` equals to using 100% of a single core. If not specified, then a CPU limitation is not put in place. ''' def __init__(self, host, image, command, tmp_archive=None, virtual_memory_in_megabytes=0, max_cpu_percent=0, **args): self.host = host args.update({ 'user': 'nobody', 'ports': ['5555'], 'tty': False, 'detach': True, 'stdin_open': False, }) # creates the log configuration, limiting output size kept on the image host_config = self.host.client.create_host_config( log_config = docker.utils.LogConfig(type='', config={'max-size': '1M', 'max-file': '1'}), ) if isinstance(tmp_archive, six.string_types) and \ os.path.exists(tmp_archive): tmp_archive = make_inmemory_tarball(tmp_archive) # creates the container self.container = self.host.create_container(image=image, command=command, tmp_archive=tmp_archive, host_config=host_config, **args) update_args = {} if max_cpu_percent: update_args['cpu_period'] = 100000 update_args['cpu_quota'] = int(update_args['cpu_period'] * \ (max_cpu_percent/100.)) if virtual_memory_in_megabytes: update_args['mem_limit'] = str(virtual_memory_in_megabytes) + 'm' update_args['memswap_limit'] = update_args['mem_limit'] self.host.client.update_container(self.container, **update_args) # Starts the container self.host.start(self.container) # Gets start point statistics self.previous_stats = None def __enter__(self): return self def __exit__(self, *exc): self.rm() def wait(self, timeout=None): '''Reads stdout and stderr until the underlying processes finishes Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which we read the stdout and stderr data into a circular buffer, keep only the last N bytes of each stream. This method will call :py:meth:`file.readline` on both stdout and stderr streams attached to the process. These methods are "green". They will yield once they are blocked. Parameters: timeout (float, Optional): A timeout in seconds to wait for the user process to finish. If a timeout value is not given, waits forever. Returns: int: Returns the status code of the process Raises: requests.exceptions.ReadTimeout: if the process times out ''' return self.host.client.wait(self.container, timeout) @property def stdout(self): '''Returns the stdout''' return self.host.client.logs(self.container, stdout=True, stderr=False) @property def stderr(self): '''Returns the stderr''' return self.host.client.logs(self.container, stdout=False, stderr=True) def rm(self): '''Remove the container. If it is not stopped yet, kill it first''' self.host.rm(self.container) def status(self): '''Returns my own "docker" status''' return self.host.status(self.container) def kill(self): '''Stop container''' if self.status() == 'running': self.host.client.kill(self.container) def _stats(self): return self.host.client.stats(self.container, decode=True, stream=False) def statistics(self): '''If the process is still active, returns usage statistics by ``pusutil`` Returns: stats (dict): A dictionary object with all collected statistics Raises: RuntimeError: In case the process is not active anymore. ''' data = self._stats() previous_cpu = self.previous_stats['cpu_stats'] \ if self.previous_stats else None # merge statistics retval = dict( cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']), memory=stats.memory_statistics(data['memory_stats']), ) self.previous_stats = data return retval