Commit abee7883 authored by André Anjos's avatar André Anjos 💬

[agent] Adapt execution model to docker async

parent 0519387e
......@@ -83,6 +83,7 @@ class Server(gevent.Greenlet):
def set_process(self, process):
self.process = process
self.process.statistics() # initialize internal statistics
def __str__(self):
......@@ -324,22 +325,17 @@ class Server(gevent.Greenlet):
class Agent(object):
'''Handles asynchronous stdout/stderr readout and synchronous commands.
'''Handles synchronous commands.
We use the greenlets for this implementation. Objects of this class are in
charge of three separate tasks:
1. Handling the execution of the user process (as a separate process)
2. Making sure the user process does not consume more resources than it is
supposed to (uses the external application ``cpulimit``)
1. Handling the execution of the user process (in a docker container)
3. Implementing a pipe-based API for I/O that the user process can query
Parameters:
execute_path (str): The path to the ``execute`` script of the chosen
environment. This **must** be a valid path.
virtual_memory_in_megabytes (int, optional): The amount of virtual memory
(in Megabytes) available for the job. If set to zero, no limit will be
applied.
......@@ -350,23 +346,12 @@ class Agent(object):
2 cores, this number can go between 0 and 200. If it is <= 0, then we
don't track CPU usage.
cpulimit_path (str): If ``max_cpu_percent`` >0, then se use the program
indicated by this path to start a parallel cpulimit daemon that will
control the CPU utilisation. If this value is not set, we search your
current execution path and then the system for a ``cpulimit`` executable.
The first one found will be used. It is an error to specify
``max_cpu_percent > 0`` and not have a valid ``cpulimit`` executable
available on your system.
'''
def __init__(self, execute_path, virtual_memory_in_megabytes,
max_cpu_percent, cpulimit_path):
def __init__(self, virtual_memory_in_megabytes, max_cpu_percent):
self.execute_path = execute_path
self.virtual_memory_in_megabytes = virtual_memory_in_megabytes
self.max_cpu_percent = max_cpu_percent
self.cpulimit_path = cpulimit_path
self.tempdir = None
self.process = None
......@@ -393,7 +378,7 @@ class Agent(object):
logger.debug("Exiting processing context...")
def run(self, configuration, timeout_in_minutes=0, daemon=0):
def run(self, configuration, host, timeout_in_minutes=0, daemon=0):
"""Runs the algorithm code
......@@ -402,6 +387,10 @@ class Agent(object):
configuration (object): A *valid*, preloaded
:py:class:`beat.core.execution.Executor` object.
host (:py:class:Host): A configured docker host that will execute the
user process. If the host does not have access to the required
environment, an exception will be raised.
timeout_in_minutes (int): The number of minutes to wait for the user
process to execute. After this amount of time, the user process is
killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will
......@@ -420,27 +409,38 @@ class Agent(object):
# Server for our single client
server = Server(configuration.input_list, configuration.output_list)
# Figures out the image to use
envkey = '%(name)s (%(version)s)' % configuration.data['environment']
if envkey not in host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (envkey, host,
", ".join(host.environments.keys())))
# Launches the process (0MQ client)
cmd = [self.execute_path, '%s' % server.address, self.tempdir]
tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir))
cmd = ['execute', server.address, tmp_dir]
if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug')
if daemon > 0:
image = host.env2docker(envkey)
logger.debug("Daemon mode: start the user process with the following " \
"command: `%s'", ' '.join(cmd))
"command: `docker run -ti %s %s'", image, ' '.join(cmd))
cmd = ['sleep', str(daemon)]
logger.debug("Daemon mode: sleeping for %d seconds", daemon)
self.process = async.Popen(
cmd=cmd,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
cpulimit_path=self.cpulimit_path
)
else:
self.process = async.Popen(
host,
envkey,
command=cmd,
tmp_archive=self.tempdir,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
)
# provide a tip on how to stop the test
if daemon > 0:
logger.debug("To stop the daemon, press CTRL-c or kill the sleep " \
"process with `kill -9 %d`", self.process.pid)
logger.debug("To stop the daemon, press CTRL-c or kill the user " \
"process with `docker kill %s`", self.process.pid)
# Serve asynchronously
server.set_process(self.process)
......@@ -452,7 +452,7 @@ class Agent(object):
timeout = (60*timeout_in_minutes) if timeout_in_minutes else None
status = self.process.wait(timeout)
except async.TimeoutExpired:
except requests.exceptions.ReadTimeout:
logger.warn("user process has timed out after %d minutes",
timeout_in_minutes)
self.process.kill()
......@@ -467,21 +467,19 @@ class Agent(object):
finally:
server.stop.set()
# If status is negative, convert it to a positive value (group signal)
if status < 0: status *= -1
# Collects final information and returns to caller
process = self.process
self.process = None
return dict(
stdout = process.peek_stdout(),
stderr = process.peek_stderr(),
status = status,
timed_out = timed_out,
statistics = server.last_statistics,
system_error = server.system_error,
user_error = server.user_error,
)
stdout = process.stdout,
stderr = process.stderr,
status = status,
timed_out = timed_out,
statistics = server.last_statistics,
system_error = server.system_error,
user_error = server.user_error,
)
process.rm()
def kill(self):
......
......@@ -422,9 +422,8 @@ class Executor(object):
force_start_index=start_index or 0))
def process(self, execute_path=None, virtual_memory_in_megabytes=0,
max_cpu_percent=0, cpulimit_path=None, timeout_in_minutes=0,
daemon=0):
def process(self, host, virtual_memory_in_megabytes=0,
max_cpu_percent=0, timeout_in_minutes=0, daemon=0):
"""Executes the user algorithm code using an external program.
If ``executable`` is set, then execute the process using an external
......@@ -449,27 +448,20 @@ class Executor(object):
Parameters:
execute_path (str): The path to the ``execute`` script of the chosen
environment. If set to ``None``, we still may find an ``execute``
program at the current script install location and use it. Otherwise,
an exception is raised if the parameter ``cores`` is also set.
host (:py:class:Host): A configured docker host that will execute the
user process. If the host does not have access to the required
environment, an exception will be raised.
virtual_memory_in_megabytes (int): The amount of virtual memory (in
Megabytes) available for the job. If set to zero, no limit will be
virtual_memory_in_megabytes (int, Optional): The amount of virtual memory
(in Megabytes) available for the job. If set to zero, no limit will be
applied.
max_cpu_percent (int, optional): The maximum amount of CPU usage allowed
max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed
in a system. This number must be an integer number between 0 and
``100*number_of_cores`` in your system. For instance, if your system
has 2 cores, this number can go between 0 and 200. If it is <= 0, then
we don't track CPU usage.
cpulimit_path (str): The path to the ``cpulimit`` executable to use for
controlling the number of cores the user process can use.
environment. If set to ``None``, we still may find a ``cpulimit``
program at the current script install location and use it. Otherwise,
an exception is raised if the parameter ``cores`` is also set.
timeout_in_minutes (int): The number of minutes to wait for the user
process to execute. After this amount of time, the user process is
killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will
......@@ -492,21 +484,12 @@ class Executor(object):
raise RuntimeError("execution information is bogus:\n * %s" % \
'\n * '.join(self.errors))
# figures out which execution script to use
execute_local = os.path.join(os.path.dirname(sys.argv[0]), 'execute')
execute = execute_path or execute_local
if not os.path.exists(execute):
raise IOError("external execution agent `%s' does not exist" % execute)
if not os.access(execute, os.X_OK):
raise OSError("file `%s' is not executable" % execute)
with agent.Agent(execute, virtual_memory_in_megabytes,
max_cpu_percent, cpulimit_path) as runner:
with agent.Agent(virtual_memory_in_megabytes, max_cpu_percent) as runner:
self.agent = runner
#synchronous call - always returns after a certain timeout
retval = runner.run(self, timeout_in_minutes=timeout_in_minutes,
retval = runner.run(self, host, timeout_in_minutes=timeout_in_minutes,
daemon=daemon)
#adds I/O statistics from the current executor, if its complete already
......
......@@ -49,6 +49,7 @@ from ..experiment import Experiment
from ..execution import Executor
from ..hash import hashFileContents
from ..data import CachedDataSource
from .. import async
from . import prefix, tmp_prefix
from .utils import slow, cleanup
......@@ -114,8 +115,8 @@ def execute(label, expected_result):
dataformat_cache, database_cache, algorithm_cache)
assert executor.valid, '\n * %s' % '\n * '.join(executor.errors)
with executor:
result = executor.process()
with executor, async.Host(use_machine='default') as host:
result = executor.process(host)
assert result
assert 'status' in result
assert 'stdout' in result
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment