Commit 055a46d7 authored by André Anjos's avatar André Anjos 💬

[docker] Introduced docker usage in async

parent 1319ebf9
Pipeline #2451 failed with stage
......@@ -32,7 +32,6 @@ import shutil
import logging
logger = logging.getLogger(__name__)
import psutil
import gevent
import zmq.green as zmq
......@@ -44,7 +43,7 @@ from . import baseformat
class Server(gevent.Greenlet):
'''A 0MQ server for our communication with the user process'''
def __init__(self, input_list, output_list):
def __init__(self, input_list, output_list, ip_address='127.0.0.1'):
super(Server, self).__init__()
......@@ -55,7 +54,7 @@ class Server(gevent.Greenlet):
# Starts our 0MQ server
self.context = zmq.Context()
self.socket = self.context.socket(zmq.PAIR)
self.address = 'tcp://127.0.0.1'
self.address = 'tcp://' + ip_address
port = self.socket.bind_to_random_port(self.address)
self.address += ':%d' % port
logger.debug("zmq server bound to `%s'", self.address)
......@@ -349,8 +348,7 @@ class Agent(object):
This number must be an integer number between 0 and
``100*number_of_cores`` in your system. For instance, if your system has
2 cores, this number can go between 0 and 200. If it is <= 0, then we
don't track CPU usage. Otherwise, we do, clipping your number at
``min(max_cpu_percent, 100*psutil.cpu_count())``.
don't track CPU usage.
cpulimit_path (str): If ``max_cpu_percent`` >0, then se use the program
indicated by this path to start a parallel cpulimit daemon that will
......@@ -489,5 +487,5 @@ class Agent(object):
def kill(self):
"""Stops the user process by force - to be called from signal handlers"""
if self.process is not None and psutil.pid_exists(self.process.pid):
if self.process is not None:
self.process.kill()
This diff is collapsed.
......@@ -50,121 +50,6 @@ from . import stats
from . import agent
def parse_stdout(data):
"""Parses the standard output to separate the statistics of a job"""
stdout = ''
statistics = None
if not isinstance(data, str): data = data.decode() #python3 compatibility
for line in data.split('\n'):
if line.startswith('STATISTICS '):
statistics = stats.Statistics(simplejson.loads(line[len('STATISTICS '):]))
assert statistics.valid, '\n * %s' % '\n * '.join(statistics.errors)
else: stdout += line + '\n'
stdout = stdout[:-1]
return stdout, statistics or stats.Statistics()
def parse_stderr(data):
"""Parses the standard error to separate error reports of a job"""
def __strip_atmost_one(s, c):
if len(s) > 1 and s[0] == s[-1] and s[0] == c: return s[1:-1]
return s
stderr = ''
user_error = ''
system_error = ''
if not isinstance(data, str): data = data.decode() #python3 compatibility
for line in data.split('\n'):
if line.startswith('USER_ERROR'):
if user_error: user_error += 20*'-' + '\\n'
user_error += line[len('USER_ERROR '):]
elif line.startswith('SYSTEM_ERROR '):
if system_error: system_error += 20*'-' + '\\n'
system_error += line[len('SYSTEM_ERROR '):]
else: stderr += line + '\n'
stderr = stderr[:-1]
user_error = user_error.replace('\\n', '\n')
user_error = user_error.replace('\\\'', "\'")
user_error = __strip_atmost_one(user_error, "'")
system_error = system_error.replace('\\n', '\n')
system_error = system_error.replace('\\\'', "\'")
system_error = __strip_atmost_one(system_error, "'")
return stderr, user_error, system_error
def discover_environments(envpath, raise_on_errors=True):
"""Returns a dictionary of environments by scanning a list of directories"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
environments = {}
for path in envpath:
for d in os.listdir(path):
envdir = os.path.realpath(os.path.join(path, d))
if not os.path.isdir(envdir): continue
logger.debug("Analysing potential environment directory `%s'...", envdir)
execute = os.path.join(envdir, 'bin', 'execute')
describe = os.path.join(envdir, 'bin', 'describe')
if not (is_exe(execute) and is_exe(describe)):
logger.debug("Ignoring directory `%s' either `bin/execute' or " \
"`bin/describe' not found or not executable" % envdir)
continue
try:
description = simplejson.loads(subprocess.check_output(describe))
except Exception as e:
logger.warn("Ignoring potential environment at `%s' since " \
"`describe' returned an error: %s", envdir, str(e))
continue
name = description['name'] + ' (' + description['version'] + ')'
if name in environments:
# if my own `execute' is already registered, give preference to it
mine = os.path.dirname(os.path.dirname(sys.argv[0]))
override = environments[name]['directory']
if os.path.samefile(mine, override):
logger.info("Skipping override of **existing** environment `%s' " \
"with base directory at `%s' (it is our own) by `%s'", name,
override, envdir)
continue
# this check avoids we do a new environment and, by mistake, override
# it with a previous version or the contrary.
if raise_on_errors:
raise RuntimeError("Environments at `%s' and `%s' have the " \
"same name (`%s'). Distinct environments must be uniquely " \
"named. Fix this and re-start." % \
(envdir, environments[name]['directory'], name))
else:
logger.warn("Overriding **existing** environment `%s' base " \
"directory with `%s' (it was `%s). To avoid this warning " \
"make sure the root environment path `%s' does not " \
"contain environments with the same names", name, envdir,
environments[name]['directory'], os.pathsep.join(envpath))
environments[name] = description
environments[name]['execute'] = execute
environments[name]['directory'] = envdir
logger.info("Registered `%s' -> `%s'", name, execute)
return environments
class Executor(object):
"""Executors runs the code given an execution block information, externally
......@@ -577,8 +462,7 @@ class Executor(object):
in a system. This number must be an integer number between 0 and
``100*number_of_cores`` in your system. For instance, if your system
has 2 cores, this number can go between 0 and 200. If it is <= 0, then
we don't track CPU usage. Otherwise, we do, clipping your number at
``min(max_cpu_percent, 100*psutil.cpu_count())``.
we don't track CPU usage.
cpulimit_path (str): The path to the ``cpulimit`` executable to use for
controlling the number of cores the user process can use.
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""A rather insecure sandboxing environment based on pure Python.
The resulting child process that is created using this sandboxer will have a
limited amount of memory to perform its task. It won't be able to spawn any
external processes. The total execution time is controlled by the parent
process.
DISCLAIMER: This environment works well under Linux, where child process
resources can be nicely controlled using the 'resource' module. If used in
environments such as OSX, check for the proper way to implement child process
restrictions.
"""
def sandbox():
import logging
logger = logging.getLogger('sandboxer')
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
formatter = logging.Formatter('[%(levelname)s] %(name)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
from optparse import OptionParser
parser = OptionParser(usage=__doc__)
parser.add_option("-m", "--virtual-memory-in-megabytes", default=0,
dest="vmem", metavar='MEMORY', type=float,
help="Maximum virtual memory (zero disables it) [default: 0]")
parser.add_option("-v", action="count", default=0,
dest="verbosity", help="increase the script verbosity")
options, args = parser.parse_args()
# Before we start, adjusts verbosity level
if options.verbosity <= 0:
ch.setLevel(logging.WARNING)
elif options.verbosity == 1:
ch.setLevel(logging.INFO)
else:
ch.setLevel(logging.DEBUG)
# does the real work
import os
import sys
import resource
import platform
logger.info("Sandboxing `%s'...", ' '.join(args))
vmem = int(1024*1024*float(options.vmem)) #in bytes
if vmem:
if platform.system() == 'Darwin':
logger.info("RLIMIT_AS does not seem to work on MacOSX")
resource.setrlimit(resource.RLIMIT_AS, (vmem, vmem))
logger.info("Set maximum memory to %d bytes (%g megabytes)", vmem, options.vmem)
resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
logger.info("Setting core dump limit to 0")
logger.info("Running process `%s'...", ' '.join(args))
os.execv(args[0], args)
if __name__ == '__main__':
sandbox()
......@@ -31,7 +31,6 @@
import os
import time
import copy
import psutil
import simplejson
......@@ -267,24 +266,7 @@ def io_statistics(data_sources, input_list, data_sinks, output_list, data, analy
)
def sum_tuples(t):
"""Helper for psutil"""
retval = list(t[0])
for k in range(len(retval)): retval[k] += sum([z[k] for z in t[1:]])
return t[0].__class__(*retval)
def merge_statistics(processes, entries):
"""Helper for psutil"""
retval = {}
for e in entries:
retval[e] = sum_tuples([getattr(p, e)() for p in processes])
return retval
def cpu_statistics(process_id=None):
def cpu_statistics(start, end):
"""Summarizes current CPU usage
This method should be used when the currently set algorithm is the only one
......@@ -297,28 +279,30 @@ def cpu_statistics(process_id=None):
"""
if process_id is None or psutil.pid_exists(process_id):
proc = psutil.Process(process_id)
total_time = time.time() - proc.create_time()
family = [proc] + proc.children(recursive=True)
allstats = merge_statistics(family, ['cpu_times', 'num_ctx_switches'])
cpu_times = allstats['cpu_times']
ctx_switches = allstats['num_ctx_switches']
return {
'user': cpu_times.user,
'system': cpu_times.system,
'total': total_time,
'context_switches': {
'voluntary': ctx_switches.voluntary,
'involuntary': ctx_switches.involuntary,
}
}
if start is not None:
user_cpu = end['cpu_usage']['total_usage'] - \
start['cpu_usage']['total_usage']
total_cpu = end['system_cpu_usage'] - start['system_cpu_usage']
else:
return {}
user_cpu = end['cpu_usage']['total_usage']
total_cpu = end['system_cpu_usage']
user_cpu /= 1000000000. #in seconds
total_cpu /= 1000000000. #in seconds
processors = len(end['cpu_usage']['percpu_usage']) if \
end['cpu_usage']['percpu_usage'] is not None else 1
return {
'user': user_cpu,
'system': 0.,
'total': total_cpu,
'percent': 100.*processors*user_cpu/total_cpu if total_cpu else 0.,
'processors': processors,
}
def memory_statistics(process_id=None):
def memory_statistics(data):
"""Summarizes current memory usage
This method should be used when the currently set algorithm is the only one
......@@ -331,15 +315,11 @@ def memory_statistics(process_id=None):
"""
if process_id is None or psutil.pid_exists(process_id):
proc = psutil.Process(process_id)
family = [proc] + proc.children(recursive=True)
allstats = merge_statistics(family, ['memory_info'])
memory = allstats['memory_info']
limit = float(data['limit'])
memory = float(data['max_usage'])
return {
'rss': float(memory.rss),
}
else:
return {}
return {
'rss': memory,
'limit': limit,
'percent': 100.*memory/limit if limit else 0.,
}
......@@ -35,10 +35,6 @@ Runs 2x CPU stress function
import sys
import multiprocessing
import os
import signal
import time
import psutil
def test(x):
try:
......@@ -47,17 +43,10 @@ def test(x):
return x * x
def kill_child_processes(signum, frame):
proc = psutil.Process()
for child in psutil.Process(): child.kill()
sys.exit()
def main():
nworkers = int(sys.argv[1])
pool = multiprocessing.Pool(processes=nworkers)
result = pool.map_async(test, range(nworkers))
signal.signal(signal.SIGTERM, kill_child_processes)
pool.close()
pool.join()
......
This diff is collapsed.
......@@ -35,7 +35,6 @@ requires = [
"jsonschema",
"numpy",
"pip",
"psutil",
"setuptools",
"simplejson",
"six",
......@@ -45,6 +44,8 @@ requires = [
"matplotlib>=1.4",
"gevent",
"pyzmq",
"docker-py",
"python-docker-machine",
]
# The only thing we do in this file is to call the setup() function with all
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment