Commit ae451cdd authored by André Anjos's avatar André Anjos 💬

[async] Better logging and transparent BEAT environment tag translation

parent 055a46d7
......@@ -72,6 +72,7 @@ class Host(object):
kwargs['base_url'] = "http://%s:%s" % (host, port)
self.kwargs = kwargs
self.beat_environments = {}
def setup(self):
......@@ -89,6 +90,8 @@ class Host(object):
else:
self.client = docker.Client(**self.kwargs)
self.beat_environments = self._discover_environments(raise_on_errors=True)
def teardown(self):
......@@ -102,6 +105,74 @@ class Host(object):
self.client = None
def _discover_environments(self, raise_on_errors=True):
'''Returns a dictionary containing information about docker environments
Parameters:
raise_on_errors (bool, Optional): If we should raise an exception
(``RuntimeError``) in case installed environments override each other
and we can't know which to use.
Raises:
RuntimeError: if you set ``raise_on_errors`` and I found environments
that override each other for their description keys (``<name>
(<version>)``), which should be unique.
'''
def _describe(image):
'''Tries to run the "describe" app on the image, collect results'''
status, output = self.get_statusoutput(image, 'describe')
if status == 0:
try:
return simplejson.loads(output)
except Exception as e:
logger.warn("Ignoring potential environment at `%s' since " \
"`describe' output cannot be parsed: %s", image, str(e))
return {}
retval = {}
for image in self.client.images():
# call the "describe" application on each existing image
tag = image['RepoTags'][0] if image['RepoTags'] else None
id = image['Id'].split(':')[1][:12]
logger.debug("Checking image `%s' (%s)...", tag, id)
description = _describe(image['Id'])
if not description: continue
key = description['name'] + ' (' + description['version'] + ')'
if key in retval:
# this check avoids we do a new environment and, by mistake, override
# it with a previous version or the contrary.
if raise_on_errors:
raise RuntimeError("Environments at `%s' and `%s' have the " \
"same name (`%s'). Distinct environments must be " \
"uniquely named. Fix this and re-start." % \
(envdir, retval[key]['image'], key))
else:
logger.warn("Overriding **existing** environment `%s' image " \
"with `%s' (it was `%s). To avoid this warning make " \
"sure your docker images do not contain environments " \
"with the same names", key, retval[key]['image'],
image['Id'])
retval[key] = description
retval[key]['image'] = image['Id']
retval[key]['tag'] = tag
retval[key]['short_id'] = id
logger.info("Registered `%s' -> `%s (%s)'", key, tag, id)
logger.debug("Found %d environments", len(retval))
return retval
def create_container(self, image, command, tmp_archive=None, **args):
"""Prepares the docker container for running the user code
......@@ -128,12 +199,19 @@ class Host(object):
"""
if image in self.beat_environments: #replace by a real image name
attrs = self.beat_environments[image]
if attrs['tag'] is not None: image = attrs['tag']
else: image = attrs['short_id']
logger.info("[docker] create_container %s %s", image, ' '.join(command))
container = self.client.create_container(image=image, command=command,
**args)
self.containers.append(container)
if tmp_archive is not None:
# Place the tarball into the container
logger.info("[docker] archive -> %s@/tmp", container['Id'][:12])
self.client.put_archive(container, '/tmp', tmp_archive)
return container
......@@ -142,6 +220,7 @@ class Host(object):
def start(self, container):
'''Starts a given container'''
logger.info("[docker] start %s", container['Id'][:12])
self.client.start(container)
......@@ -151,6 +230,7 @@ class Host(object):
'''
try:
logger.info("[docker] inspect %s", container['Id'][:12])
z = self.client.inspect_container(container)
return z['State']['Status']
except Exception as e:
......@@ -167,6 +247,7 @@ class Host(object):
container['Id'], status)
self.client.kill(container)
else:
logger.info("[docker] rm %s", container['Id'][:12])
self.client.remove_container(container)
self.containers.remove(container)
......@@ -177,91 +258,17 @@ class Host(object):
container = self.create_container(image=image, command=command, **kwargs)
try:
self.client.start(container)
self.start(container)
status = self.client.wait(container)
output = self.client.logs(container)
except Exception:
return 1, None
finally:
self.client.remove_container(container)
self.rm(container)
return status, output
def discover_environments(host, raise_on_errors=True):
'''Returns a dictionary containing information about docker environments
Parameters:
host (:py:class:`Host`): The docker host where to start the container.
The host must be properly initialized, including starting the
appropriate docker-machine, if that is the case. This normally implies
including calls to this object inside a protected ``with host``
section.
raise_on_errors (bool, Optional): If we should raise an exception
(``RuntimeError``) in case installed environments override each other and
we can't know which to use.
Raises:
RuntimeError: if you set ``raise_on_errors`` and I found environments that
override each other for their description keys (``<name> (<version>)``),
which should be unique.
'''
def _describe(image):
'''Tries to run the "describe" app on the image, collect results'''
status, output = host.get_statusoutput(image, 'describe')
if status == 0:
try:
return simplejson.loads(output)
except Exception as e:
logger.warn("Ignoring potential environment at `%s' since " \
"`describe' output cannot be parsed: %s", image, str(e))
return {}
retval = {}
for image in host.client.images():
# call the "describe" application on each existing image
tag = image['RepoTags'][0] if image['RepoTags'] else 'untagged'
id = image['Id'].split(':')[1][:12]
logger.debug("Checking image `%s' (%s)...", tag, id)
description = _describe(image['Id'])
if not description: continue
key = description['name'] + ' (' + description['version'] + ')'
if key in retval:
# this check avoids we do a new environment and, by mistake, override
# it with a previous version or the contrary.
if raise_on_errors:
raise RuntimeError("Environments at `%s' and `%s' have the " \
"same name (`%s'). Distinct environments must be " \
"uniquely named. Fix this and re-start." % \
(envdir, retval[key]['image'], key))
else:
logger.warn("Overriding **existing** environment `%s' image " \
"with `%s' (it was `%s). To avoid this warning make " \
"sure your docker images do not contain environments " \
"with the same names", key, retval[key]['image'],
image['Id'])
retval[key] = description
retval[key]['image'] = image['Id']
retval[key]['tag'] = tag
retval[key]['short_id'] = id
logger.info("Registered `%s' -> `%s (%s)'", key, tag, id)
logger.debug("Found %d environments", len(retval))
return retval
def make_inmemory_tarball(path):
'''Creates an in-memory tarball of the contents of path
......@@ -363,8 +370,6 @@ class Popen:
self.host.client.update_container(self.container, **update_args)
# Starts the container
logger.info("Starting equivalent of `docker run -ti %s \"%s\"'",
image, ' '.join(command))
self.host.start(self.container)
# Gets start point statistics
......
......@@ -121,7 +121,7 @@ class AsyncTest(unittest.TestCase):
def test_memory_limit(self):
with Popen(self.host, 'beats/py27:system', ['python', '-c', '; '.join([
with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([
"print('Before')",
"import sys; sys.stdout.flush()",
"d = '0' * (10 * 1024 * 1024)",
......@@ -143,7 +143,7 @@ class AsyncTest(unittest.TestCase):
def test_memory_limit2(self):
with Popen(self.host, 'beats/py27:system', ['python', '-c', '; '.join([
with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([
"print('Before')",
"import sys; sys.stdout.flush()",
"d = '0' * (10 * 1024 * 1024)",
......@@ -168,7 +168,7 @@ class AsyncTest(unittest.TestCase):
# disabled: we limit the maximum output to 1M internally
size = 2**16 #bytes
with Popen(self.host, 'beats/py27:system', ['python', '-c', '; '.join([
with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([
"import sys",
"for i in range(%d): sys.stdout.write('%%d\n' %% i)" % size,
"sys.stdout.flush()",
......@@ -191,7 +191,7 @@ class AsyncTest(unittest.TestCase):
# disabled: we limit the maximum output to 1M internally
size = 2**16 #bytes
with Popen(self.host, 'beats/py27:system', ['python', '-c', '; '.join([
with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([
"import sys",
"for i in range(%d): sys.stderr.write('%%d\n' %% i)" % size,
"sys.stderr.flush()",
......@@ -214,7 +214,7 @@ class AsyncTest(unittest.TestCase):
# disabled: we limit the maximum output to 1M internally
size = 2**16 #bytes
with Popen(self.host, 'beats/py27:system', ['python', '-c', '; '.join([
with Popen(self.host, 'environment (1)', ['python', '-c', '; '.join([
"import sys",
"for i in range(%d): sys.stdout.write('%%d\n' %% i)" % size,
"sys.stdout.flush()",
......@@ -244,7 +244,7 @@ class AsyncTest(unittest.TestCase):
program = pkg_resources.resource_filename(__name__, 'cpu_stress.py')
tmp_name = os.path.join('/tmp', os.path.basename(program))
with Popen(self.host, 'beats/py27:system', ['python', tmp_name,
with Popen(self.host, 'environment (1)', ['python', tmp_name,
str(processes)], max_cpu_percent=max_cpu_percent,
tmp_archive=program) as p:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment