#!/usr/bin/env python # vim: set fileencoding=utf-8 : ############################################################################### # # # Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # # Contact: beat.support@idiap.ch # # # # This file is part of the beat.core module of the BEAT platform. # # # # Commercial License Usage # # Licensees holding valid commercial BEAT licenses may use this file in # # accordance with the terms contained in a written agreement between you # # and Idiap. For further information contact tto@idiap.ch # # # # Alternatively, this file may be used under the terms of the GNU Affero # # Public License version 3 as published by the Free Software and appearing # # in the file LICENSE.AGPL included in the packaging of this file. # # The BEAT platform is distributed in the hope that it will be useful, but # # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # # or FITNESS FOR A PARTICULAR PURPOSE. # # # # You should have received a copy of the GNU Affero Public License along # # with the BEAT platform. If not, see http://www.gnu.org/licenses/. # # # ############################################################################### '''Implementation of subprocess-based asynchronous running with greenlets ''' import os import tarfile import logging logger = logging.getLogger(__name__) import six import docker import requests import simplejson import socket from . import stats class Host(object): '''An object of this class can connect to the docker host and resolve stuff''' images_cache = {} def __init__(self, **kwargs): self.client = None self.containers = [] if 'host' in kwargs and 'port' in kwargs: host = kwargs.get('host') del kwargs['host'] port = kwargs.get('port') del kwargs['port'] kwargs['base_url'] = "http://%s:%s" % (host, port) self.images_cache_filename = None if 'images_cache' in kwargs: self.images_cache_filename = kwargs.get('images_cache') del kwargs['images_cache'] self.kwargs = kwargs self.environments = {} self.db_environments = {} def setup(self, raise_on_errors=True): self.kwargs.update(**docker.utils.kwargs_from_env(assert_hostname=False)) self.client = docker.Client(**self.kwargs) if (self.images_cache_filename is not None) and os.path.exists(self.images_cache_filename): with open(self.images_cache_filename, 'r') as f: Host.images_cache = simplejson.load(f) (self.environments, self.db_environments) = self._discover_environments(raise_on_errors) if self.images_cache_filename is not None: with open(self.images_cache_filename, 'w') as f: simplejson.dump(Host.images_cache, f, indent=4) def __contains__(self, key): return (key in self.environments) or (key in self.db_environments) def __str__(self): return self.kwargs.get('base_url', 'Host') def env2docker(self, key): '''Returns a nice docker image name given a BEAT environment key''' attrs = self.environments[key] if attrs['tag'] is not None: return attrs['tag'] return attrs['short_id'] def db2docker(self, db_names): '''Returns a nice docker image name given a database name''' def _all_in(db_names, databases): return len([ x for x in db_names if x in databases ]) == len(db_names) attrs = [ x for x in self.db_environments.values() if _all_in(db_names, x['databases']) ][0] if attrs['tag'] is not None: return attrs['tag'] return attrs['short_id'] def teardown(self): for container in self.containers: self.rm(container) def __enter__(self): self.setup() return self def __exit__(self, *exc): self.teardown() @property def ip(self): '''The IP address of the docker host''' s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 1)) # connecting to a UDP address doesn't send packets return s.getsockname()[0] def _discover_environments(self, raise_on_errors=True): '''Returns a dictionary containing information about docker environments Parameters: raise_on_errors (bool, Optional): If we should raise an exception (``RuntimeError``) in case installed environments override each other and we can't know which to use. Raises: RuntimeError: if you set ``raise_on_errors`` and I found environments that override each other for their description keys (`` ()``), which should be unique. ''' def _describe(image): '''Tries to run the "describe" app on the image, collect results''' if Host.images_cache.has_key(image): return Host.images_cache[image] status, output = self.get_statusoutput(image, ['describe']) if status == 0: try: infos = simplejson.loads(output) Host.images_cache[image] = infos return infos except Exception as e: logger.warn("Ignoring potential environment at `%s' since " \ "`describe' output cannot be parsed: %s", image, str(e)) return {} def _must_replace(image_tag, environments, key): # this check avoids we do a new environment and, by mistake, override # it with a previous version or the contrary. if raise_on_errors: raise RuntimeError("Environments at `%s' and `%s' have the " \ "same name (`%s'). Distinct environments must be " \ "uniquely named. Fix this and re-start." % \ (image_tag, environments[key]['image'], key)) new_version = None previous_version = None parts = image_tag.split('/') if len(parts) > 1: parts = parts[-1].split(':') if len(parts) > 1: new_version = parts[-1] parts = environments[key]['tag'].split('/') if len(parts) > 1: parts = parts[-1].split(':') if len(parts) > 1: previous_version = parts[-1] replacement = False keep = False if (new_version is not None) and (previous_version is not None): if new_version == 'latest': replacement = True elif previous_version == 'latest': keep = True else: try: new_version = tuple([ int(x) for x in new_version.split('.') ]) try: previous_version = tuple([ int(x) for x in previous_version.split('.') ]) if new_version > previous_version: replacement = True else: keep = True except: replacement = True except: keep = True elif new_version is not None: replacement = True elif previous_version is not None: keep = True if replacement: logger.debug("Overriding **existing** environment `%s' in image `%s'", key, environments[key]['tag']) elif keep: logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, environments[key]['tag']) return False else: logger.warn("Overriding **existing** environment `%s' image " \ "with `%s'. To avoid this warning make " \ "sure your docker images do not contain environments " \ "with the same names", key, environments[key]['image']) return True environments = {} db_environments = {} for image in self.client.images(): # call the "describe" application on each existing image with "beat" in # its name tag = image['RepoTags'][0] if image['RepoTags'] else None if (tag is None) or (tag.find('beat') == -1): continue id = image['Id'].split(':')[1][:12] logger.debug("Checking image `%s' (%s)...", tag, id) description = _describe(tag or id) if not description: continue key = description['name'] + ' (' + description['version'] + ')' if description.has_key('databases'): if (key in db_environments) and not _must_replace(tag, db_environments, key): continue db_environments[key] = description db_environments[key]['image'] = image['Id'] db_environments[key]['tag'] = tag db_environments[key]['short_id'] = id db_environments[key]['nickname'] = tag or id else: if (key in environments) and not _must_replace(tag, environments, key): continue environments[key] = description environments[key]['image'] = image['Id'] environments[key]['tag'] = tag environments[key]['short_id'] = id environments[key]['nickname'] = tag or id logger.info("Registered `%s' -> `%s (%s)'", key, tag, id) logger.debug("Found %d environments and %d database environments", len(environments), len(db_environments)) return (environments, db_environments) def put_path(self, container, src, dest='/tmp', chmod=None): """Puts a given src path into a destination folder This method will copy a given ``src`` path into a ``dest`` folder. It optionally changes the owner of the resulting path on the image so it is accessible by other users than root. Parameters: container (str, dict): The container where to copy the path. src (str): The path, on the host, of the archive to be copied over the container. The path may point to a file or directory on the host. Data will be copied recursively. dest (str, Optional): A path, on the container, where to copy the input data. By default, we put data into the temporary directory of the container. chmod (str, Optional): An optional string to set the mode of the resulting path (files and directory). A common reason for this sort of thing is to set directories to 755 but files to 644 with the following value ``u+rwX,go+rX,go-w``. This will be applied recursively to the resulting path on the container. If you set this value to ``None``, then the chmod command will not be run. """ # The docker API only accepts in-memory tar archives c = six.moves.cStringIO() with tarfile.open(mode='w', fileobj=c) as tar: tar.add(src, arcname=os.path.basename(src)) archive = c.getvalue() # Place the tarball into the container path = os.path.join(dest, os.path.basename(src)) logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest) self.client.put_archive(container, dest, archive) if chmod is not None: # Change permissions to access the path ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest]) output = self.client.exec_start(ex) #waits until it is executed def create_container(self, image, command, tmp_path=None, host_args=None, **args): """Prepares the docker container for running the user code Parameters: image (str): A unique identifier for the image to create command (list): A list of strings with the command to run inside the container. tmp_path (str): A path with a file name or directory that will be copied into the container (inside ``/tmp``), supposedly with information that is used by the command. host_args (dict): A dictionary that will be transformed into a ``HostConfig`` object that will be used for the creation of the container. It is the place too add/drop capabilities for the container or to set maximum resource usage (CPU, memory, etc). Keys accepted are not arbitrary. Check docker_py's manual for details. args (dict): A list of extra arguments to pass to the underlying ``create_container()`` call from the docker Python API. Returns: docker.Container: The container, already preloaded and ready to run. The container is not started by this method. """ if image in self: #replace by a real image name image = self.env2docker(image) config_args = dict( log_config = docker.utils.LogConfig(type='', config={'max-size': '1M', 'max-file': '1'}), ) if host_args is not None: config_args.update(host_args) if args.has_key('volumes'): volumes = [ v['bind'] for k, v in args['volumes'].items() ] config_args['binds'] = args['volumes'] args['volumes'] = volumes # see bug: https://github.com/docker/docker-py/issues/1195 # fix on docker-engine 1.13.0 and superior # see changelog: https://github.com/docker/docker/blob/master/CHANGELOG.md args['network_disabled'] = False # for HostConfig (keys are not arbitrary) - see: # http://docker-py.readthedocs.io/en/latest/hostconfig/ args['host_config'] = self.client.create_host_config(**config_args) logger.debug("[docker] create_container %s %s", image, ' '.join(command)) container = self.client.create_container(image=image, command=command, **args) self.containers.append(container) if tmp_path is not None: self.put_path(container, tmp_path) return container def start(self, container): '''Starts a given container''' logger.debug("[docker] start %s", container['Id'][:12]) self.client.start(container) def status(self, container): '''Checks the status of a given container ''' try: logger.debug("[docker] inspect %s", container['Id'][:12]) z = self.client.inspect_container(container) return z['State']['Status'] except Exception as e: return None def rm(self, container): '''Removes a given container. If it is not done, kill it first''' status = self.status(container) if status not in ('created', 'exited'): logger.warn("Killing container `%s' which is on state `%s'", container['Id'], status) self.client.kill(container) logger.debug("[docker] rm %s", container['Id'][:12]) self.client.remove_container(container) self.containers.remove(container) def get_statusoutput(self, image, command, **kwargs): '''Runs a command and retrieves status and output''' container = None try: container = self.create_container(image=image, command=command, **kwargs) self.start(container) status = self.client.wait(container) output = self.client.logs(container) except Exception: return 1, None finally: if container is not None: self.rm(container) return status, output class Popen: '''Process-like manager for asynchronously executing user code in a container The process will be run in the background, and its standard output and standard error will be read after it finishes, into a limited size circular buffer. Parameters: host (:py:class:`Host`): The docker host where to start the container. The host must be properly initialized, including starting the appropriate docker-machine, if that is the case (environment setup). image (str): A string uniquely identifying the image from which to start the container command (list): A list of strings with the command to run inside the container. path (str, Optional): An archive to copy into the temporary directory of the container, supposedly with information that is used by the command. If a string is given, than it is considered as a path from which the archive is going to be created. virtual_memory_in_megabytes (int, Optional): The maximum amount of memory the user process can consume on the host. If not specified, a memory limit is not set. max_cpu_percent (float, Optional): The maximum amount of CPU the user process may consume on the host. The value ``100`` equals to using 100% of a single core. If not specified, then a CPU limitation is not put in place. ''' def __init__(self, host, image, command, tmp_archive=None, virtual_memory_in_megabytes=0, max_cpu_percent=0, **args): self.host = host args.update({ 'user': 'root', #user `nobody' cannot access the tmp archive... 'tty': False, 'detach': True, 'stdin_open': False, }) host_args = dict() if virtual_memory_in_megabytes: # For this to work properly, memory swap limitation has to be enabled on # the kernel. This typically goes by setting "cgroup_enable=memory" as a # boot parameter to kernels which are compiled with this support. # More info: https://docs.docker.com/engine/installation/linux/ubuntulinux/#/enable-memory-and-swap-accounting host_args['mem_limit'] = str(virtual_memory_in_megabytes) + 'm' host_args['memswap_limit'] = host_args['mem_limit'] logger.debug('Setting maximum memory to %dMB' % \ (virtual_memory_in_megabytes,)) if max_cpu_percent: # The period corresponds to the scheduling interval for the CFS in Linux # The quota corresponds to a fraction or a multiple of the period, the # container will get. A quota that is 2x the period gets the container up # to 200% cpu time (2 cores). If the quota is 0.5x the period, the # container gets up to 50% the cpu time. Each core represents 100%. A # system with 2 cores has 200% computing power. # # More info: # https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt # # For this to work properly, CPU bandwidth provisioning for the Linux CFS # must be enabled on the kernel. More info on how to do it: http://www.blaess.fr/christophe/2012/01/07/linux-3-2-cfs-cpu-bandwidth-english-version/ # # If your system is running on a virtual machine, having more cores # available to docker engine normally translates to more precise # scheduling. period = 100000 #microseconds quota = max_cpu_percent/100. host_args['cpu_period'] = period host_args['cpu_quota'] = int(period * quota) logger.debug('Setting CPU quota to %d%%' % max_cpu_percent) # creates the container self.container = self.host.create_container(image=image, command=command, tmp_path=tmp_archive, host_args=host_args, **args) # Starts the container self.host.start(self.container) # Gets start point statistics self.previous_stats = None def __enter__(self): return self def __exit__(self, *exc): self.rm() @property def pid(self): return self.container['Id'][:12] def wait(self, timeout=None): '''Reads stdout and stderr until the underlying processes finishes Implements a modified version of :py:meth:`subprocess.Popen.wait`, in which we read the stdout and stderr data into a circular buffer, keep only the last N bytes of each stream. This method will call :py:meth:`file.readline` on both stdout and stderr streams attached to the process. These methods are "green". They will yield once they are blocked. Parameters: timeout (float, Optional): A timeout in seconds to wait for the user process to finish. If a timeout value is not given, waits forever. Returns: int: Returns the status code of the process Raises: requests.exceptions.ReadTimeout: if the process times out ''' return self.host.client.wait(self.container, timeout) @property def stdout(self): '''Returns the stdout''' return self.host.client.logs(self.container, stdout=True, stderr=False) @property def stderr(self): '''Returns the stderr''' return self.host.client.logs(self.container, stdout=False, stderr=True) def rm(self): '''Remove the container. If it is not stopped yet, kill it first''' self.host.rm(self.container) def status(self): '''Returns my own "docker" status''' return self.host.status(self.container) def kill(self): '''Stop container''' if self.status() == 'running': self.host.client.kill(self.container) def _stats(self): return self.host.client.stats(self.container, decode=True, stream=False) def statistics(self): '''If the process is still active, returns usage statistics by ``pusutil`` Returns: stats (dict): A dictionary object with all collected statistics Raises: RuntimeError: In case the process is not active anymore. ''' data = self._stats() # If CPU statistics can't be retrieved if not data['cpu_stats'].has_key('system_cpu_usage'): data['cpu_stats'] = dict(data['precpu_stats']) # If memory statistics can't be retrieved if len(data['memory_stats']) == 0: data['memory_stats'] = dict( limit=0, max_usage=0 ) previous_cpu = self.previous_stats['cpu_stats'] \ if self.previous_stats else None # merge statistics retval = dict( cpu=stats.cpu_statistics(previous_cpu, data['cpu_stats']), memory=stats.memory_statistics(data['memory_stats']), ) self.previous_stats = data return retval