diff --git a/beat/core/execution.py b/beat/core/execution.py index b154a8f10ddcd683339405f385bcac35c4ea077a..0c0b9d453dcd20057aafd9e3a4d7985befb139d4 100755 --- a/beat/core/execution.py +++ b/beat/core/execution.py @@ -62,6 +62,10 @@ class Executor(object): Parameters: + host (:py:class:Host): A configured docker host that will execute the + user process. If the host does not have access to the required + environment, an exception will be raised. + prefix (str): Establishes the prefix of your installation. data (dict, str): The piece of data representing the block to be executed. @@ -138,10 +142,11 @@ class Executor(object): """ - def __init__(self, prefix, data, cache=None, dataformat_cache=None, + def __init__(self, host, prefix, data, cache=None, dataformat_cache=None, database_cache=None, algorithm_cache=None, library_cache=None, proxy_mode=True): + self.host = host self.prefix = prefix self.cache = cache or os.path.join(self.prefix, 'cache') self.proxy_mode = proxy_mode @@ -234,6 +239,16 @@ class Executor(object): if not db.valid: self.errors += db.errors + # Check if the execution environment supports proxy_mode=False (if necessary) + if not self.proxy_mode: + envkey = '%(name)s (%(version)s)' % self.data['environment'] + if envkey not in self.host: + raise RuntimeError("Environment `%s' is not available on docker " \ + "host `%s' - available environments are %s" % (envkey, self.host, + ", ".join(self.host.environments.keys()))) + + self.proxy_mode = 'direct_access' not in self.host.environments[envkey].get('capabilities', []) + def __enter__(self): """Prepares inputs and outputs for the processing task @@ -245,11 +260,10 @@ class Executor(object): """ if len(self.databases) > 0: - host = dock.Host() self.context = zmq.Context() self.db_socket = self.context.socket(zmq.PAIR) - self.db_address = 'tcp://' + host.ip - port = self.db_socket.bind_to_random_port(self.db_address) + self.db_address = 'tcp://' + self.host.ip + port = self.db_socket.bind_to_random_port(self.db_address, min_port=50000) self.db_address += ':%d' % port self._prepare_inputs() @@ -310,8 +324,8 @@ class Executor(object): ) - def process(self, host, virtual_memory_in_megabytes=0, - max_cpu_percent=0, timeout_in_minutes=0, daemon=0): + def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0, + timeout_in_minutes=0, daemon=0): """Executes the user algorithm code using an external program. If ``executable`` is set, then execute the process using an external @@ -336,10 +350,6 @@ class Executor(object): Parameters: - host (:py:class:Host): A configured docker host that will execute the - user process. If the host does not have access to the required - environment, an exception will be raised. - virtual_memory_in_megabytes (int, Optional): The amount of virtual memory (in Megabytes) available for the job. If set to zero, no limit will be applied. @@ -377,7 +387,7 @@ class Executor(object): self.agent = runner #synchronous call - always returns after a certain timeout - retval = runner.run(self, host, timeout_in_minutes=timeout_in_minutes, + retval = runner.run(self, self.host, timeout_in_minutes=timeout_in_minutes, daemon=daemon, db_address=self.db_address) #adds I/O statistics from the current executor, if its complete already diff --git a/beat/core/test/test_execution.py b/beat/core/test/test_execution.py old mode 100755 new mode 100644 index af2890a13c4e712db8ae67da45bb7b5179f25073..85ce14de837ff0e67f4b29495055c9d2b6f9099a --- a/beat/core/test/test_execution.py +++ b/beat/core/test/test_execution.py @@ -130,13 +130,13 @@ class TestExecution(unittest.TestCase): # can we execute it? results = [] for key, value in scheduled.items(): - executor = Executor(prefix, value['configuration'], tmp_prefix, + executor = Executor(self.host, prefix, value['configuration'], tmp_prefix, dataformat_cache, database_cache, algorithm_cache, proxy_mode=self.proxy_mode) assert executor.valid, '\n * %s' % '\n * '.join(executor.errors) with executor: - result = executor.process(self.host, timeout_in_minutes=3) + result = executor.process(timeout_in_minutes=3) assert result assert 'status' in result assert 'stdout' in result