Commit af10bdd0 authored by Philip ABBET's avatar Philip ABBET

Add the 'SubprocessExecutor' class

parent 23ef306f
......@@ -28,3 +28,4 @@
from .docker import DockerExecutor
from .local import LocalExecutor
from .subprocess import SubprocessExecutor
......@@ -39,17 +39,12 @@ logger = logging.getLogger(__name__)
from .. import stats
from .. import message_handler
from .. import utils
from .. import dock
from .base import BaseExecutor
from .remote import RemoteExecutor
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
from beat.backend.python.helpers import AccessMode
class DockerExecutor(BaseExecutor):
"""DockerExecutors runs the code given an execution block information, externally
class DockerExecutor(RemoteExecutor):
"""DockerExecutor runs the code given an execution block information, externally
Parameters:
......@@ -138,7 +133,7 @@ class DockerExecutor(BaseExecutor):
database_cache=None, algorithm_cache=None, library_cache=None,
custom_root_folders=None, proxy_mode=True):
super(DockerExecutor, self).__init__(prefix, data, cache=cache,
super(DockerExecutor, self).__init__(prefix, data, host.ip, cache=cache,
dataformat_cache=dataformat_cache,
database_cache=database_cache,
algorithm_cache=algorithm_cache,
......@@ -147,12 +142,6 @@ class DockerExecutor(BaseExecutor):
# Initialisations
self.host = host
self.agent = None
self.context = None
self.db_socket = None
self.db_address = None
self.proxy_mode = proxy_mode
self.message_handler = None
# Check if the execution environment supports proxy_mode=False (if necessary)
if not self.proxy_mode:
......@@ -165,65 +154,6 @@ class DockerExecutor(BaseExecutor):
self.proxy_mode = 'direct_access' not in self.host.processing_environments[envkey].get('capabilities', [])
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
if len(self.databases) > 0:
self.context = zmq.Context()
self.db_socket = self.context.socket(zmq.PAIR)
self.db_address = 'tcp://' + self.host.ip
port = self.db_socket.bind_to_random_port(self.db_address, min_port=50000)
self.db_address += ':%d' % port
return super(DockerExecutor, self).__enter__()
def __exit__(self, exc_type, exc_value, traceback):
"""Closes all sinks and disconnects inputs and outputs
"""
super(DockerExecutor, self).__exit__(exc_type, exc_value, traceback)
if self.context is not None:
self.context.destroy()
self.context = None
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
if self.proxy_mode:
cache_access = AccessMode.LOCAL
else:
cache_access = AccessMode.NONE
(self.input_list, self.data_sources) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
cache_access=cache_access, db_access=AccessMode.REMOTE, unpack=False,
socket=self.db_socket
)
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
if self.proxy_mode:
cache_access = AccessMode.LOCAL
else:
cache_access = AccessMode.NONE
(self.output_list, self.data_sinks) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache, self.input_list,
cache_access=cache_access
)
def process(self, virtual_memory_in_megabytes=0, max_cpu_percent=0,
timeout_in_minutes=0):
"""Executes the user algorithm code using an external program.
......@@ -331,11 +261,14 @@ class DockerExecutor(BaseExecutor):
# Creation of the container
# Note: we only support one databases image loaded at the same time
cmd = [
'databases_provider',
self.db_address,
os.path.join('/tmp', os.path.basename(databases_configuration_path))
'databases_provider',
self.db_address,
os.path.join('/tmp', os.path.basename(databases_configuration_path))
]
if logger.getEffectiveLevel() <= logging.DEBUG:
cmd.insert(1, '--debug')
databases_container = self.host.create_container(databases_environment, cmd)
databases_container.copy_path(databases_configuration_path, '/tmp')
......@@ -359,9 +292,9 @@ class DockerExecutor(BaseExecutor):
# Command to execute
cmd = [
'execute',
self.message_handler.address,
os.path.join('/tmp', os.path.basename(configuration_path))
'execute',
self.message_handler.address,
os.path.join('/tmp', os.path.basename(configuration_path))
]
if logger.getEffectiveLevel() <= logging.DEBUG:
......@@ -427,9 +360,9 @@ class DockerExecutor(BaseExecutor):
logger.debug("Log of the container: " + container_log)
retval = dict(
status = status,
stdout = self.host.stdout(algorithm_container),
stderr = stderr,
status = status,
timed_out = timed_out,
statistics = self.host.statistics(algorithm_container),
system_error = self.message_handler.system_error,
......@@ -442,9 +375,16 @@ class DockerExecutor(BaseExecutor):
self.host.rm(algorithm_container)
if databases_container is not None:
retval['stdout'] += '\n' + self.host.stdout(databases_container)
db_container_log = self.host.stderr(databases_container)
if logger.getEffectiveLevel() <= logging.DEBUG:
logger.debug("Log of the database container: " + db_container_log)
if status != 0:
retval['stderr'] += '\n' + self.host.stderr(databases_container)
retval['stderr'] += '\n' + db_container_log
retval['stdout'] += '\n' + self.host.stdout(databases_container)
self.host.rm(databases_container)
self.db_socket.setsockopt(zmq.LINGER, 0)
self.db_socket.close()
......@@ -454,13 +394,3 @@ class DockerExecutor(BaseExecutor):
self.message_handler = None
return retval
def kill(self):
"""Stops the user process by force - to be called from signal handlers"""
if self.message_handler is not None:
self.message_handler.kill()
return True
return False
......@@ -58,7 +58,7 @@ from beat.backend.python.helpers import AccessMode
class LocalExecutor(BaseExecutor):
"""Executors runs the code given an execution block information, externally
"""LocalExecutor runs the code given an execution block information
Parameters:
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
'''Execution utilities'''
import zmq
import logging
logger = logging.getLogger(__name__)
from .base import BaseExecutor
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import create_outputs_from_configuration
from beat.backend.python.helpers import AccessMode
class RemoteExecutor(BaseExecutor):
"""Base class for Executors that communicate with a message handler
Parameters:
prefix (str): Establishes the prefix of your installation.
data (dict, str): The piece of data representing the block to be executed.
It must validate against the schema defined for execution blocks. If a
string is passed, it is supposed to be a fully qualified absolute path to
a JSON file containing the block execution information.
cache (str, optional): If your cache is not located under
``<prefix>/cache``, then specify a full path here. It will be used
instead.
dataformat_cache (dict, optional): A dictionary mapping dataformat names to
loaded dataformats. This parameter is optional and, if passed, may
greatly speed-up database loading times as dataformats that are already
loaded may be re-used. If you use this parameter, you must guarantee that
the cache is refreshed as appropriate in case the underlying dataformats
change.
database_cache (dict, optional): A dictionary mapping database names to
loaded databases. This parameter is optional and, if passed, may
greatly speed-up database loading times as databases that are already
loaded may be re-used. If you use this parameter, you must guarantee that
the cache is refreshed as appropriate in case the underlying databases
change.
algorithm_cache (dict, optional): A dictionary mapping algorithm names to
loaded algorithms. This parameter is optional and, if passed, may
greatly speed-up database loading times as algorithms that are already
loaded may be re-used. If you use this parameter, you must guarantee that
the cache is refreshed as appropriate in case the underlying algorithms
change.
library_cache (dict, optional): A dictionary mapping library names to
loaded libraries. This parameter is optional and, if passed, may greatly
speed-up library loading times as libraries that are already loaded may
be re-used. If you use this parameter, you must guarantee that the cache
is refreshed as appropriate in case the underlying libraries change.
Attributes:
cache (str): The path to the cache currently being used
errors (list): A list containing errors found while loading this execution
block.
data (dict): The original data for this executor, as loaded by our JSON
decoder.
algorithm (beat.core.algorithm.Algorithm): An object representing the
algorithm to be run.
databases (dict): A dictionary in which keys are strings with database
names and values are :py:class:`database.Database`, representing the
databases required for running this block. The dictionary may be empty
in case all inputs are taken from the file cache.
views (dict): A dictionary in which the keys are tuples pointing to the
``(<database-name>, <protocol>, <set>)`` and the value is a setup view
for that particular combination of details. The dictionary may be empty
in case all inputs are taken from the file cache.
input_list (beat.core.inputs.InputList): A list of inputs that will be
served to the algorithm.
output_list (beat.core.outputs.OutputList): A list of outputs that the
algorithm will produce.
data_sources (list): A list with all data-sources created by our execution
loader.
data_sinks (list): A list with all data-sinks created by our execution
loader. These are useful for clean-up actions in case of problems.
"""
def __init__(self, prefix, data, ip_address, cache=None, dataformat_cache=None,
database_cache=None, algorithm_cache=None, library_cache=None,
custom_root_folders=None, proxy_mode=True):
super(RemoteExecutor, self).__init__(prefix, data, cache=cache,
dataformat_cache=dataformat_cache,
database_cache=database_cache,
algorithm_cache=algorithm_cache,
library_cache=library_cache,
custom_root_folders=custom_root_folders)
# Initialisations
self.ip_address = ip_address
self.context = None
self.db_socket = None
self.db_address = None
self.proxy_mode = proxy_mode
self.message_handler = None
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
if len(self.databases) > 0:
self.context = zmq.Context()
self.db_socket = self.context.socket(zmq.PAIR)
self.db_address = 'tcp://' + self.ip_address
port = self.db_socket.bind_to_random_port(self.db_address, min_port=50000)
self.db_address += ':%d' % port
return super(RemoteExecutor, self).__enter__()
def __exit__(self, exc_type, exc_value, traceback):
"""Closes all sinks and disconnects inputs and outputs
"""
super(RemoteExecutor, self).__exit__(exc_type, exc_value, traceback)
if self.context is not None:
self.context.destroy()
self.context = None
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
if self.proxy_mode:
cache_access = AccessMode.LOCAL
else:
cache_access = AccessMode.NONE
(self.input_list, self.data_sources) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
cache_access=cache_access, db_access=AccessMode.REMOTE, unpack=False,
socket=self.db_socket,
no_synchronisation_listeners=True
)
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
if self.proxy_mode:
cache_access = AccessMode.LOCAL
else:
cache_access = AccessMode.NONE
(self.output_list, self.data_sinks) = create_outputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache, self.input_list,
cache_access=cache_access
)
def kill(self):
"""Stops the user process by force - to be called from signal handlers"""
if self.message_handler is not None:
self.message_handler.kill()
return True
return False
This diff is collapsed.
......@@ -39,6 +39,7 @@ import unittest
from ..experiment import Experiment
from ..execution import DockerExecutor
from ..execution import LocalExecutor
from ..execution import SubprocessExecutor
from ..hash import hashFileContents
from ..data import CachedDataSource
from ..dock import Host
......@@ -48,6 +49,9 @@ from .utils import cleanup
from .utils import slow
#----------------------------------------------------------
class TestExecution(unittest.TestCase):
def check_output(self, prefix, path):
......@@ -268,19 +272,23 @@ class TestExecution(unittest.TestCase):
assert self.execute('user/user/preprocessing/1/different_frequencies', [{'sum': 363, 'nb': 8}]) is None
# For benchmark purposes
# @slow
# def test_double_1_large(self):
# import time
# start = time.time()
# assert self.execute('user/user/double/1/large', [{'out_data': 49489830}]) is None
# print time.time() - start
# import time
# start = time.time()
# assert self.execute('user/user/double/1/large', [{'out_data': 49489830}]) is None
# print time.time() - start
# For benchmark purposes
# @slow
# def test_double_1_large2(self):
# import time
# start = time.time()
# assert self.execute('user/user/double/1/large2', [{'out_data': 21513820}]) is None
# print time.time() - start
# import time
# start = time.time()
# assert self.execute('user/user/double/1/large2', [{'out_data': 21513820}]) is None
# print time.time() - start
#----------------------------------------------------------
class TestDockerExecution(TestExecution):
......@@ -318,6 +326,7 @@ class TestDockerExecution(TestExecution):
assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None
#----------------------------------------------------------
class TestDockerExecutionNoProxy(TestDockerExecution):
......@@ -327,6 +336,8 @@ class TestDockerExecutionNoProxy(TestDockerExecution):
self.proxy_mode = False
#----------------------------------------------------------
class TestLocalExecution(TestExecution):
......@@ -336,6 +347,35 @@ class TestLocalExecution(TestExecution):
dataformat_cache, database_cache, algorithm_cache)
#----------------------------------------------------------
class TestSubprocessExecution(TestExecution):
def setUp(self):
super(TestSubprocessExecution, self).setUp()
self.proxy_mode = True
def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache,
database_cache, algorithm_cache):
return SubprocessExecutor(prefix, configuration, tmp_prefix,
dataformat_cache, database_cache, algorithm_cache,
proxy_mode=self.proxy_mode)
#----------------------------------------------------------
class TestSubprocessExecutionNoProxy(TestSubprocessExecution):
def setUp(self):
super(TestSubprocessExecutionNoProxy, self).setUp()
self.proxy_mode = False
#----------------------------------------------------------
# Don't run tests from the base class
del TestExecution
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment