diff --git a/beat/core/agent.py b/beat/core/agent.py deleted file mode 100755 index 30bba1be928eb405ac05a6e827343f0ae2ef6921..0000000000000000000000000000000000000000 --- a/beat/core/agent.py +++ /dev/null @@ -1,424 +0,0 @@ -#!/usr/bin/env python -# vim: set fileencoding=utf-8 : - -############################################################################### -# # -# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # -# Contact: beat.support@idiap.ch # -# # -# This file is part of the beat.core module of the BEAT platform. # -# # -# Commercial License Usage # -# Licensees holding valid commercial BEAT licenses may use this file in # -# accordance with the terms contained in a written agreement between you # -# and Idiap. For further information contact tto@idiap.ch # -# # -# Alternatively, this file may be used under the terms of the GNU Affero # -# Public License version 3 as published by the Free Software and appearing # -# in the file LICENSE.AGPL included in the packaging of this file. # -# The BEAT platform is distributed in the hope that it will be useful, but # -# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # -# or FITNESS FOR A PARTICULAR PURPOSE. # -# # -# You should have received a copy of the GNU Affero Public License along # -# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # -# # -############################################################################### - - -import os -import shutil -import simplejson -import glob - -import logging -logger = logging.getLogger(__name__) - -import gevent -import zmq.green as zmq - -import requests -from gevent import monkey -monkey.patch_socket(dns=False) -monkey.patch_ssl() - -from . import utils -from . import dock -from . import baseformat - -from beat.backend.python.message_handler import MessageHandler - - -class Server(MessageHandler): - '''A 0MQ server for our communication with the user process''' - - def __init__(self, input_list, output_list, host_address): - - # Starts our 0MQ server - self.context = zmq.Context() - self.socket = self.context.socket(zmq.PAIR) - - self.address = 'tcp://' + host_address - port = self.socket.bind_to_random_port(self.address, min_port=50000) - self.address += ':%d' % port - logger.debug("zmq server bound to `%s'", self.address) - - super(Server, self).__init__(input_list, self.context, self.socket) - - self.output_list = output_list - - # implementations - self.callbacks.update(dict( - wrt = self.write, - idm = self.is_data_missing, - oic = self.output_is_connected, - )) - - - def destroy(self): - self.context.destroy() - - - def __str__(self): - return 'Server(%s)' % self.address - - - def _get_output_candidate(self, name): - - retval = self.output_list[name] - if retval is None: raise RuntimeError("Could not find output `%s'" % name) - return retval - - - def write(self, name, packed): - """Syntax: wrt output data""" - - logger.debug('recv: wrt %s <bin> (size=%d)', name, len(packed)) - - # Get output object - output_candidate = self._get_output_candidate(name) - if output_candidate is None: - raise RuntimeError("Could not find output `%s' to write to" % name) - - data = output_candidate.data_sink.dataformat.type() - data.unpack(packed) - output_candidate.write(data) - - logger.debug('send: ack') - self.socket.send('ack') - - - def is_data_missing(self, name): - """Syntax: idm output""" - - logger.debug('recv: idm %s', name) - - output_candidate = self._get_output_candidate(name) - what = 'tru' if output_candidate.isDataMissing() else 'fal' - logger.debug('send: %s', what) - self.socket.send(what) - - - def output_is_connected(self, name): - """Syntax: oic output""" - - logger.debug('recv: oic %s', name) - - output_candidate = self._get_output_candidate(name) - what = 'tru' if output_candidate.isConnected() else 'fal' - logger.debug('send: %s', what) - self.socket.send(what) - - - -class Agent(object): - '''Handles synchronous commands. - - We use the greenlets for this implementation. Objects of this class are in - charge of three separate tasks: - - 1. Handling the execution of the user process (in a docker container) - 3. Implementing a pipe-based API for I/O that the user process can query - - - Parameters: - - virtual_memory_in_megabytes (int, optional): The amount of virtual memory - (in Megabytes) available for the job. If set to zero, no limit will be - applied. - - max_cpu_percent (int): The maximum amount of CPU usage allowed in a system. - This number must be an integer number between 0 and - ``100*number_of_cores`` in your system. For instance, if your system has - 2 cores, this number can go between 0 and 200. If it is <= 0, then we - don't track CPU usage. - - ''' - - def __init__(self, virtual_memory_in_megabytes, max_cpu_percent): - - self.virtual_memory_in_megabytes = virtual_memory_in_megabytes - self.max_cpu_percent = max_cpu_percent - self.tempdir = None - self.db_tempdir = None - self.process = None - self.db_process = None - self.server = None - - - def __enter__(self): - '''Start of context manager''' - - logger.debug("Entering processing context...") - - # Creates a temporary directory for the user process - self.tempdir = utils.temporary_directory() - logger.debug("Created temporary directory `%s'", self.tempdir) - self.db_tempdir = utils.temporary_directory() - logger.debug("Created temporary directory `%s'", self.db_tempdir) - self.process = None - self.db_process = None - - return self - - def __exit__(self, exc_type, exc_value, traceback): - - if self.tempdir is not None and os.path.exists(self.tempdir): - shutil.rmtree(self.tempdir) - self.tempdir = None - - if self.db_tempdir is not None and os.path.exists(self.db_tempdir): - shutil.rmtree(self.db_tempdir) - self.db_tempdir = None - - self.process = None - self.db_process = None - logger.debug("Exiting processing context...") - - - def run(self, configuration, host, timeout_in_minutes=0, daemon=0, db_address=None): - """Runs the algorithm code - - - Parameters: - - configuration (object): A *valid*, preloaded - :py:class:`beat.core.execution.DockerExecutor` object. - - host (:py:class:Host): A configured docker host that will execute the - user process. If the host does not have access to the required - environment, an exception will be raised. - - timeout_in_minutes (int): The number of minutes to wait for the user - process to execute. After this amount of time, the user process is - killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will - be applied. - - daemon (int): If this variable is set, then we don't really start the - user process, but just kick out 0MQ server, print the command-line and - sleep for that many seconds. You're supposed to start the client by - hand then and debug it. - - """ - - # Recursively copies configuration data to <tempdir>/prefix - configuration.dump_runner_configuration(self.tempdir) - - if db_address is not None: - configuration.dump_databases_provider_configuration(self.db_tempdir) - - # Modify the paths to the databases in the dumped configuration files - root_folder = os.path.join(self.db_tempdir, 'prefix', 'databases') - - database_paths = {} - - if not configuration.data.has_key('datasets_root_path'): - for db_name in configuration.databases.keys(): - json_path = os.path.join(root_folder, db_name + '.json') - - with open(json_path, 'r') as f: - db_data = simplejson.load(f) - - database_paths[db_name] = db_data['root_folder'] - db_data['root_folder'] = os.path.join('/databases', db_name) - - with open(json_path, 'w') as f: - simplejson.dump(db_data, f, indent=4) - - # Server for our single client - self.server = Server(configuration.input_list, configuration.output_list, - host.ip) - - # Figures out the images to use - envkey = '%(name)s (%(version)s)' % configuration.data['environment'] - if envkey not in host: - raise RuntimeError("Environment `%s' is not available on docker " \ - "host `%s' - available environments are %s" % (envkey, host, - ", ".join(host.environments.keys()))) - - if db_address is not None: - try: - db_envkey = host.db2docker(database_paths.keys()) - except: - raise RuntimeError("No environment found for the databases `%s' " \ - "- available environments are %s" % ( - ", ".join(database_paths.keys()), - ", ".join(host.db_environments.keys()))) - - # Launches the process (0MQ client) - tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir)) - cmd = ['execute', self.server.address, tmp_dir] - if logger.getEffectiveLevel() <= logging.DEBUG: cmd.insert(1, '--debug') - - if daemon > 0: - image = host.env2docker(envkey) - logger.debug("Daemon mode: start the user process with the following " \ - "command: `docker run -ti %s %s'", image, ' '.join(cmd)) - cmd = ['sleep', str(daemon)] - logger.debug("Daemon mode: sleeping for %d seconds", daemon) - else: - if db_address is not None: - tmp_dir = os.path.join('/tmp', os.path.basename(self.db_tempdir)) - db_cmd = ['databases_provider', db_address, tmp_dir] - - volumes = {} - - if not configuration.data.has_key('datasets_root_path'): - for db_name, db_path in database_paths.items(): - volumes[db_path] = { - 'bind': os.path.join('/databases', db_name), - 'mode': 'ro', - } - else: - volumes[configuration.data['datasets_root_path']] = { - 'bind': configuration.data['datasets_root_path'], - 'mode': 'ro', - } - - # Note: we only support one databases image loaded at the same time - self.db_process = dock.Popen( - host, - db_envkey, - command=db_cmd, - tmp_archive=self.db_tempdir, - volumes=volumes - ) - - volumes = {} - if not configuration.proxy_mode: - volumes[configuration.cache] = { - 'bind': '/cache', - 'mode': 'rw', - } - - # for name, details in configuration.data['inputs'].items(): - # if 'database' in details: - # continue - # - # basename = os.path.join(configuration.cache, details['path']) - # filenames = glob.glob(basename + '*.data') - # filenames.extend(glob.glob(basename + '*.data.checksum')) - # filenames.extend(glob.glob(basename + '*.data.index')) - # filenames.extend(glob.glob(basename + '*.data.index.checksum')) - # - # for filename in filenames: - # volumes[filename] = { - # 'bind': os.path.join('/cache', filename.replace(configuration.cache + '/', '')), - # 'mode': 'ro', - # } - # - # if 'result' in configuration.data: - # outputs_config = { - # 'result': configuration.data['result'] - # } - # else: - # outputs_config = configuration.data['outputs'] - # - # for name, details in outputs_config.items(): - # basename = os.path.join(configuration.cache, details['path']) - # dirname = os.path.dirname(basename) - # - # volumes[dirname] = { - # 'bind': os.path.join('/cache', dirname.replace(configuration.cache + '/', '')), - # 'mode': 'rw', - # } - - self.process = dock.Popen( - host, - envkey, - command=cmd, - tmp_archive=self.tempdir, - virtual_memory_in_megabytes=self.virtual_memory_in_megabytes, - max_cpu_percent=self.max_cpu_percent, - volumes=volumes - ) - - # provide a tip on how to stop the test - if daemon > 0: - logger.debug("To stop the daemon, press CTRL-c or kill the user " \ - "process with `docker kill %s`", self.process.pid) - - # Serve asynchronously - self.server.set_process(self.process) - self.server.start() - - timed_out = False - - try: - timeout = (60*timeout_in_minutes) if timeout_in_minutes else None - status = self.process.wait(timeout) - - except requests.exceptions.ReadTimeout: - logger.warn("user process has timed out after %d minutes", - timeout_in_minutes) - self.process.kill() - status = self.process.wait() - - if self.db_process is not None: - self.db_process.kill() - self.db_process.wait() - - timed_out = True - - except KeyboardInterrupt: #developer pushed CTRL-C - logger.info("stopping user process on CTRL-C console request") - self.process.kill() - status = self.process.wait() - - if self.db_process is not None: - self.db_process.kill() - self.db_process.wait() - - finally: - self.server.stop.set() - - # Collects final information and returns to caller - process = self.process - self.process = None - retval = dict( - stdout = process.stdout, - stderr = process.stderr, - status = status, - timed_out = timed_out, - statistics = self.server.last_statistics, - system_error = self.server.system_error, - user_error = self.server.user_error, - ) - process.rm() - - if self.db_process is not None: - retval['stdout'] += '\n' + self.db_process.stdout - retval['stderr'] += '\n' + self.db_process.stderr - self.db_process.rm() - self.db_process = None - - self.server.destroy() - self.server = None - return retval - - - def kill(self): - """Stops the user process by force - to be called from signal handlers""" - - if self.server is not None: - self.server.kill() diff --git a/beat/core/dock.py b/beat/core/dock.py index fda2f2a6d51cfc12501c9f5819edb9e805f356bd..ca2245cf50ead548dd6d45fe216deb196a34f2af 100755 --- a/beat/core/dock.py +++ b/beat/core/dock.py @@ -292,6 +292,26 @@ class Host(object): return (environments, db_environments) + def create_archive(self, src): + c = six.moves.cStringIO() + + with tarfile.open(mode='w', fileobj=c) as tar: + tar.add(src, arcname=os.path.basename(src)) + + return c.getvalue() + + + def put_archive(self, container, archive, dest='/tmp', chmod=None): + # Place the tarball into the container + logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest) + self.client.put_archive(container, dest, archive) + + # (If necessary) Change permissions to access the path + if chmod is not None: + ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest]) + output = self.client.exec_start(ex) # waits until it is executed + + def put_path(self, container, src, dest='/tmp', chmod=None): """Puts a given src path into a destination folder @@ -322,24 +342,17 @@ class Host(object): """ # The docker API only accepts in-memory tar archives - c = six.moves.cStringIO() - with tarfile.open(mode='w', fileobj=c) as tar: - tar.add(src, arcname=os.path.basename(src)) - archive = c.getvalue() + archive = self.create_archive(src) # Place the tarball into the container - path = os.path.join(dest, os.path.basename(src)) - logger.debug("[docker] archive -> %s@%s", container['Id'][:12], dest) - self.client.put_archive(container, dest, archive) - - if chmod is not None: - # Change permissions to access the path - ex = self.client.exec_create(container, cmd=['chmod', '-R', chmod, dest]) - output = self.client.exec_start(ex) #waits until it is executed + self.put_archive(container, archive, dest='/tmp', + # dest=os.path.join(dest, os.path.basename(src)), + chmod=chmod + ) - def create_container(self, image, command, tmp_path=None, host_args=None, - **args): + def create_container(self, image, command, configuration_archive=None, + configuration_path=None, host_args=None, **args): """Prepares the docker container for running the user code @@ -350,8 +363,8 @@ class Host(object): command (list): A list of strings with the command to run inside the container. - tmp_path (str): A path with a file name or directory that will be - copied into the container (inside ``/tmp``), supposedly with + configuration_path (str): a path with a file name or directory that will + be copied into the container (inside ``/tmp``), supposedly with information that is used by the command. host_args (dict): A dictionary that will be transformed into a @@ -397,12 +410,14 @@ class Host(object): args['host_config'] = self.client.create_host_config(**config_args) logger.debug("[docker] create_container %s %s", image, ' '.join(command)) - container = self.client.create_container(image=image, command=command, - **args) + container = self.client.create_container(image=image, command=command, **args) self.containers.append(container) - if tmp_path is not None: - self.put_path(container, tmp_path) + if configuration_archive is not None: + self.put_archive(container, configuration_archive) + + if configuration_path is not None: + self.put_path(container, configuration_path) return container @@ -495,7 +510,7 @@ class Popen: ''' - def __init__(self, host, image, command, tmp_archive=None, + def __init__(self, host, image, command, configuration_archive=None, virtual_memory_in_megabytes=0, max_cpu_percent=0, **args): self.host = host @@ -546,7 +561,7 @@ class Popen: # creates the container self.container = self.host.create_container(image=image, - command=command, tmp_path=tmp_archive, + command=command, configuration_archive=configuration_archive, host_args=host_args, **args) # Starts the container diff --git a/beat/core/execution/docker.py b/beat/core/execution/docker.py index aef7c32e5688a84a1870a592b90865a9db5a1f39..549d9980b35461635dd5d9bd0da6025c3b43e4b6 100755 --- a/beat/core/execution/docker.py +++ b/beat/core/execution/docker.py @@ -29,14 +29,21 @@ '''Execution utilities''' import os -import sys +import requests +import simplejson import zmq.green as zmq import logging logger = logging.getLogger(__name__) +from gevent import monkey +monkey.patch_socket(dns=False) +monkey.patch_ssl() + from .. import stats -from .. import agent +from .. import message_handler +from .. import utils +from .. import dock from .base import BaseExecutor @@ -149,6 +156,7 @@ class DockerExecutor(BaseExecutor): self.db_socket = None self.db_address = None self.proxy_mode = proxy_mode + self.message_handler = None # Check if the execution environment supports proxy_mode=False (if necessary) if not self.proxy_mode: @@ -186,8 +194,6 @@ class DockerExecutor(BaseExecutor): super(DockerExecutor, self).__exit__(exc_type, exc_value, traceback) - self.agent = None - if self.context is not None: self.context.destroy() self.context = None @@ -223,7 +229,7 @@ class DockerExecutor(BaseExecutor): def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0, - timeout_in_minutes=0, daemon=0): + timeout_in_minutes=0): """Executes the user algorithm code using an external program. The execution interface follows the backend API as described in our @@ -259,11 +265,6 @@ class DockerExecutor(BaseExecutor): killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will be applied. - daemon (int): If this variable is set, then we don't really start the - user process, but just kick out 0MQ server, print the command-line and - sleep for that many seconds. You're supposed to start the client by - hand then and debug it. - Returns: dict: A dictionary which is JSON formattable containing the summary of @@ -275,22 +276,190 @@ class DockerExecutor(BaseExecutor): raise RuntimeError("execution information is bogus:\n * %s" % \ '\n * '.join(self.errors)) - with agent.Agent(virtual_memory_in_megabytes, max_cpu_percent) as runner: - self.agent = runner + # Creates an in-memory archive containing all the configuration and files + # needed by the processing container + configuration_path = utils.temporary_directory() + self.dump_runner_configuration(configuration_path) + processing_archive = self.host.create_archive(configuration_path) + + + # (If necessary) Creates an in-memory archive containing all the configuration + # and files needed by the databases container + if self.db_address is not None: + databases_configuration_path = utils.temporary_directory() + self.dump_databases_provider_configuration(databases_configuration_path) + + # Modify the paths to the databases in the dumped configuration files + root_folder = os.path.join(databases_configuration_path, 'prefix', 'databases') + + database_paths = {} + + if not self.data.has_key('datasets_root_path'): + for db_name in self.databases.keys(): + json_path = os.path.join(root_folder, db_name + '.json') + + with open(json_path, 'r') as f: + db_data = simplejson.load(f) + + database_paths[db_name] = db_data['root_folder'] + db_data['root_folder'] = os.path.join('/databases', db_name) + + with open(json_path, 'w') as f: + simplejson.dump(db_data, f, indent=4) + + databases_archive = self.host.create_archive(databases_configuration_path) + + + # Creates the message handler + self.message_handler = message_handler.ProxyMessageHandler( + self.input_list, self.output_list, self.host.ip) + + + # Determine the docker image to use for the processing + processing_environment = '%(name)s (%(version)s)' % self.data['environment'] + if processing_environment not in self.host: + raise RuntimeError("Environment `%s' is not available on docker " \ + "host `%s' - available environments are %s" % (processing_environment, + self.host, ", ".join(self.host.environments.keys()))) + + + # (If necessary) Instantiate the docker container that provide the databases + databases_container = None + + if self.db_address is not None: + + # Determine the docker image to use for the databases + try: + databases_environment = self.host.db2docker(database_paths.keys()) + except: + raise RuntimeError("No environment found for the databases `%s' " \ + "- available environments are %s" % ( + ", ".join(database_paths.keys()), + ", ".join(self.host.db_environments.keys()))) + + # Specify the volumes to mount inside the container + volumes = {} + + if not self.data.has_key('datasets_root_path'): + for db_name, db_path in database_paths.items(): + volumes[db_path] = { + 'bind': os.path.join('/databases', db_name), + 'mode': 'ro', + } + else: + volumes[self.data['datasets_root_path']] = { + 'bind': self.data['datasets_root_path'], + 'mode': 'ro', + } + + # Instantiate the container + # Note: we only support one databases image loaded at the same time + cmd = [ + 'databases_provider', + self.db_address, + os.path.join('/tmp', os.path.basename(databases_configuration_path)) + ] + + databases_container = dock.Popen( + self.host, + databases_environment, + command=cmd, + configuration_archive=databases_archive, + volumes=volumes + ) + + + # Specify the volumes to mount inside the algorithm container + volumes = {} + if not self.proxy_mode: + volumes[self.cache] = { + 'bind': '/cache', + 'mode': 'rw', + } + + + # Instantiate the algorithm container + cmd = [ + 'execute', + self.message_handler.address, + os.path.join('/tmp', os.path.basename(configuration_path)) + ] + + if logger.getEffectiveLevel() <= logging.DEBUG: + cmd.insert(1, '--debug') + + algorithm_container = dock.Popen( + self.host, + processing_environment, + command=cmd, + configuration_archive=processing_archive, + virtual_memory_in_megabytes=virtual_memory_in_megabytes, + max_cpu_percent=max_cpu_percent, + volumes=volumes + ) + + + # Process the messages until the container is done + self.message_handler.set_process(algorithm_container) + self.message_handler.start() + + timed_out = False + + try: + timeout = (60 * timeout_in_minutes) if timeout_in_minutes else None + status = algorithm_container.wait(timeout) + + except requests.exceptions.ReadTimeout: + logger.warn("user process has timed out after %d minutes", timeout_in_minutes) + algorithm_container.kill() + status = algorithm_container.wait() + + if databases_container is not None: + databases_container.kill() + databases_container.wait() + + timed_out = True + + except KeyboardInterrupt: # Developer pushed CTRL-C + logger.info("stopping user process on CTRL-C console request") + algorithm_container.kill() + status = algorithm_container.wait() + + if databases_container is not None: + databases_container.kill() + databases_container.wait() + + finally: + self.message_handler.stop.set() + + + # Collects final information and returns to caller + retval = dict( + stdout = algorithm_container.stdout, + stderr = algorithm_container.stderr, + status = status, + timed_out = timed_out, + statistics = self.message_handler.last_statistics, + system_error = self.message_handler.system_error, + user_error = self.message_handler.user_error, + ) + + if 'data' in retval['statistics']: + stats.update(retval['statistics']['data'], self.io_statistics) + else: + logger.warn("cannot find 'data' entry on returned stats, " \ + "therefore not appending I/O info either") + + algorithm_container.rm() - #synchronous call - always returns after a certain timeout - retval = runner.run(self, self.host, timeout_in_minutes=timeout_in_minutes, - daemon=daemon, db_address=self.db_address) + if databases_container is not None: + retval['stdout'] += '\n' + databases_container.stdout + retval['stderr'] += '\n' + databases_container.stderr + databases_container.rm() - #adds I/O statistics from the current executor, if its complete already - #otherwise, it means the running process went bananas, ignore it ;-) - if 'statistics' in retval: - if 'data' in retval['statistics']: - stats.update(retval['statistics']['data'], self.io_statistics) - else: - logger.warn("cannot find 'data' entry on returned stats, " \ - "therefore not appending I/O info either") + self.message_handler.destroy() + self.message_handler = None return retval @@ -298,8 +467,8 @@ class DockerExecutor(BaseExecutor): def kill(self): """Stops the user process by force - to be called from signal handlers""" - if self.agent is not None: - self.agent.kill() + if self.message_handler is not None: + self.message_handler.kill() return True return False diff --git a/beat/core/message_handler.py b/beat/core/message_handler.py new file mode 100755 index 0000000000000000000000000000000000000000..766d10ade931bdb923ed8dc1e655be012a7e335f --- /dev/null +++ b/beat/core/message_handler.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +import logging +logger = logging.getLogger(__name__) + +import gevent +import zmq.green as zmq + +from gevent import monkey +monkey.patch_socket(dns=False) +monkey.patch_ssl() + +from beat.backend.python.message_handler import MessageHandler + + +class ProxyMessageHandler(MessageHandler): + '''A 0MQ server for our communication with the user process + + Add support for output-related messages. + ''' + + def __init__(self, input_list, output_list, host_address): + + # Starts our 0MQ server + self.context = zmq.Context() + self.socket = self.context.socket(zmq.PAIR) + + self.address = 'tcp://' + host_address + port = self.socket.bind_to_random_port(self.address, min_port=50000) + self.address += ':%d' % port + logger.debug("zmq server bound to `%s'", self.address) + + super(ProxyMessageHandler, self).__init__(input_list, self.context, self.socket) + + self.output_list = output_list + + # implementations + self.callbacks.update(dict( + wrt = self.write, + idm = self.is_data_missing, + oic = self.output_is_connected, + )) + + + def destroy(self): + self.context.destroy() + + + def __str__(self): + return 'Server(%s)' % self.address + + + def _get_output_candidate(self, name): + + retval = self.output_list[name] + if retval is None: raise RuntimeError("Could not find output `%s'" % name) + return retval + + + def write(self, name, packed): + """Syntax: wrt output data""" + + logger.debug('recv: wrt %s <bin> (size=%d)', name, len(packed)) + + # Get output object + output_candidate = self._get_output_candidate(name) + if output_candidate is None: + raise RuntimeError("Could not find output `%s' to write to" % name) + + data = output_candidate.data_sink.dataformat.type() + data.unpack(packed) + output_candidate.write(data) + + logger.debug('send: ack') + self.socket.send('ack') + + + def is_data_missing(self, name): + """Syntax: idm output""" + + logger.debug('recv: idm %s', name) + + output_candidate = self._get_output_candidate(name) + what = 'tru' if output_candidate.isDataMissing() else 'fal' + logger.debug('send: %s', what) + self.socket.send(what) + + + def output_is_connected(self, name): + """Syntax: oic output""" + + logger.debug('recv: oic %s', name) + + output_candidate = self._get_output_candidate(name) + what = 'tru' if output_candidate.isConnected() else 'fal' + logger.debug('send: %s', what) + self.socket.send(what) + diff --git a/beat/core/test/test_docker.py b/beat/core/test/test_docker.py index 66aca256965d0abc5fa56469e2e764ae8f251141..a4b0eccda92bcc4a982a4ab9b3152c12d5ab9000 100644 --- a/beat/core/test/test_docker.py +++ b/beat/core/test/test_docker.py @@ -253,7 +253,7 @@ class AsyncTest(unittest.TestCase): with Popen(self.host, 'Python 2.7 (1.1.0)', ['python', tmp_name, str(processes)], max_cpu_percent=max_cpu_percent, - tmp_archive=program) as p: + configuration_path=program) as p: p.statistics() # start recording time.sleep(sleep_time) diff --git a/beat/core/test/test_execution.py b/beat/core/test/test_execution.py old mode 100755 new mode 100644