Commit adc087a9 authored by Philip ABBET's avatar Philip ABBET
Browse files

Check that an execution environment is capable of "proxy_mode=False" before setup

parent 0f897238
......@@ -62,6 +62,10 @@ class Executor(object):
Parameters:
host (:py:class:Host): A configured docker host that will execute the
user process. If the host does not have access to the required
environment, an exception will be raised.
prefix (str): Establishes the prefix of your installation.
data (dict, str): The piece of data representing the block to be executed.
......@@ -138,10 +142,11 @@ class Executor(object):
"""
def __init__(self, prefix, data, cache=None, dataformat_cache=None,
def __init__(self, host, prefix, data, cache=None, dataformat_cache=None,
database_cache=None, algorithm_cache=None, library_cache=None,
proxy_mode=True):
self.host = host
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, 'cache')
self.proxy_mode = proxy_mode
......@@ -234,6 +239,16 @@ class Executor(object):
if not db.valid:
self.errors += db.errors
# Check if the execution environment supports proxy_mode=False (if necessary)
if not self.proxy_mode:
envkey = '%(name)s (%(version)s)' % self.data['environment']
if envkey not in self.host:
raise RuntimeError("Environment `%s' is not available on docker " \
"host `%s' - available environments are %s" % (envkey, self.host,
", ".join(self.host.environments.keys())))
self.proxy_mode = 'direct_access' not in self.host.environments[envkey].get('capabilities', [])
def __enter__(self):
"""Prepares inputs and outputs for the processing task
......@@ -245,11 +260,10 @@ class Executor(object):
"""
if len(self.databases) > 0:
host = dock.Host()
self.context = zmq.Context()
self.db_socket = self.context.socket(zmq.PAIR)
self.db_address = 'tcp://' + host.ip
port = self.db_socket.bind_to_random_port(self.db_address)
self.db_address = 'tcp://' + self.host.ip
port = self.db_socket.bind_to_random_port(self.db_address, min_port=50000)
self.db_address += ':%d' % port
self._prepare_inputs()
......@@ -310,8 +324,8 @@ class Executor(object):
)
def process(self, host, virtual_memory_in_megabytes=0,
max_cpu_percent=0, timeout_in_minutes=0, daemon=0):
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0, daemon=0):
"""Executes the user algorithm code using an external program.
If ``executable`` is set, then execute the process using an external
......@@ -336,10 +350,6 @@ class Executor(object):
Parameters:
host (:py:class:Host): A configured docker host that will execute the
user process. If the host does not have access to the required
environment, an exception will be raised.
virtual_memory_in_megabytes (int, Optional): The amount of virtual memory
(in Megabytes) available for the job. If set to zero, no limit will be
applied.
......@@ -377,7 +387,7 @@ class Executor(object):
self.agent = runner
#synchronous call - always returns after a certain timeout
retval = runner.run(self, host, timeout_in_minutes=timeout_in_minutes,
retval = runner.run(self, self.host, timeout_in_minutes=timeout_in_minutes,
daemon=daemon, db_address=self.db_address)
#adds I/O statistics from the current executor, if its complete already
......
......@@ -130,13 +130,13 @@ class TestExecution(unittest.TestCase):
# can we execute it?
results = []
for key, value in scheduled.items():
executor = Executor(prefix, value['configuration'], tmp_prefix,
executor = Executor(self.host, prefix, value['configuration'], tmp_prefix,
dataformat_cache, database_cache, algorithm_cache,
proxy_mode=self.proxy_mode)
assert executor.valid, '\n * %s' % '\n * '.join(executor.errors)
with executor:
result = executor.process(self.host, timeout_in_minutes=3)
result = executor.process(timeout_in_minutes=3)
assert result
assert 'status' in result
assert 'stdout' in result
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment