Skip to content
Snippets Groups Projects
Commit 9642f24e authored by Philip ABBET's avatar Philip ABBET
Browse files

Refactoring: Integrate the LocalExecutor class from beat.cmdline

parent f0e07339
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
from .docker import DockerExecutor
from .local import LocalExecutor
...@@ -40,15 +40,15 @@ logger = logging.getLogger(__name__) ...@@ -40,15 +40,15 @@ logger = logging.getLogger(__name__)
import simplejson import simplejson
from . import schema from .. import schema
from . import database from .. import database
from . import algorithm from .. import algorithm
from . import inputs from .. import inputs
from . import outputs from .. import outputs
from . import data from .. import data
from . import stats from .. import stats
from . import agent from .. import agent
from . import dock from .. import dock
from beat.backend.python.helpers import convert_experiment_configuration_to_container from beat.backend.python.helpers import convert_experiment_configuration_to_container
from beat.backend.python.helpers import create_inputs_from_configuration from beat.backend.python.helpers import create_inputs_from_configuration
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.core module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
'''Execution utilities'''
import os
import sys
import glob
import errno
import tempfile
import subprocess
import zmq.green as zmq
import time
import collections
import logging
logger = logging.getLogger(__name__)
import simplejson
from .. import schema
from .. import database
from .. import algorithm
from .. import inputs
from .. import outputs
from .. import data
from .. import stats
class LocalExecutor(object):
"""Executors runs the code given an execution block information, externally
Parameters:
prefix (str): Establishes the prefix of your installation.
data (dict, str): The piece of data representing the block to be executed.
It must validate against the schema defined for execution blocks. If a
string is passed, it is supposed to be a fully qualified absolute path to
a JSON file containing the block execution information.
cache (str, optional): If your cache is not located under
``<prefix>/cache``, then specify a full path here. It will be used
instead.
dataformat_cache (dict, optional): A dictionary mapping dataformat names to
loaded dataformats. This parameter is optional and, if passed, may
greatly speed-up database loading times as dataformats that are already
loaded may be re-used. If you use this parameter, you must guarantee that
the cache is refreshed as appropriate in case the underlying dataformats
change.
database_cache (dict, optional): A dictionary mapping database names to
loaded databases. This parameter is optional and, if passed, may
greatly speed-up database loading times as databases that are already
loaded may be re-used. If you use this parameter, you must guarantee that
the cache is refreshed as appropriate in case the underlying databases
change.
algorithm_cache (dict, optional): A dictionary mapping algorithm names to
loaded algorithms. This parameter is optional and, if passed, may
greatly speed-up database loading times as algorithms that are already
loaded may be re-used. If you use this parameter, you must guarantee that
the cache is refreshed as appropriate in case the underlying algorithms
change.
library_cache (dict, optional): A dictionary mapping library names to
loaded libraries. This parameter is optional and, if passed, may greatly
speed-up library loading times as libraries that are already loaded may
be re-used. If you use this parameter, you must guarantee that the cache
is refreshed as appropriate in case the underlying libraries change.
Attributes:
cache (str): The path to the cache currently being used
errors (list): A list containing errors found while loading this execution
block.
data (dict): The original data for this executor, as loaded by our JSON
decoder.
algorithm (beat.core.algorithm.Algorithm): An object representing the
algorithm to be run.
databases (dict): A dictionary in which keys are strings with database
names and values are :py:class:`database.Database`, representing the
databases required for running this block. The dictionary may be empty
in case all inputs are taken from the file cache.
views (dict): A dictionary in which the keys are tuples pointing to the
``(<database-name>, <protocol>, <set>)`` and the value is a setup view
for that particular combination of details. The dictionary may be empty
in case all inputs are taken from the file cache.
input_list (beat.core.inputs.InputList): A list of inputs that will be
served to the algorithm.
output_list (beat.core.outputs.OutputList): A list of outputs that the
algorithm will produce.
data_sources (list): A list with all data-sources created by our execution
loader.
data_sinks (list): A list with all data-sinks created by our execution
loader. These are useful for clean-up actions in case of problems.
custom_root_folders (dict): A dictionary where the keys are database
identifiers (`<db_name>/<version>`) and the values are paths to the
given database's files. These values will override the value found
in the database's metadata.
"""
def __init__(self, prefix, data, cache=None, dataformat_cache=None,
database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None):
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, 'cache')
# check cache - halt if required
if not os.path.exists(self.cache):
raise IOError("Cache path `%s' does not exist" % self.cache)
# some attributes
self.algorithm = None
self.databases = {}
self.views = {}
self.input_list = None
self.output_list = None
self.data_sinks = []
self.data_sources = []
# runs validation if required
self.errors = []
self.data = data
if custom_root_folders is not None and not isinstance(custom_root_folders, collections.Mapping):
raise TypeError("The custom root folders must be in dictionary format")
self.custom_root_folders = custom_root_folders
# temporary caches, if the user has not set them, for performance
database_cache = database_cache if database_cache is not None else {}
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
algorithm_cache = algorithm_cache if algorithm_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
self._load(data, dataformat_cache, algorithm_cache, database_cache,
library_cache)
def _load(self, data, dataformat_cache, algorithm_cache, database_cache,
library_cache):
"""Loads the block execution information"""
# reset
self.data = None
self.errors = []
self.algorithm = None
self.databases = {}
self.views = {}
self.input_list = None
self.output_list = None
self.data_sinks = []
self.data_sources = []
self.db_address = None
if not isinstance(data, dict): #user has passed a file pointer
if not os.path.exists(data):
self.errors.append('File not found: %s' % data)
return
# this runs basic validation, including JSON loading if required
self.data, self.errors = schema.validate('execution', data)
if self.errors: return #don't proceed with the rest of validation
# at this point, the execution information is loaded, must now go on and
# load the algorithm code.
if self.data['algorithm'] in algorithm_cache: #reuse
self.algorithm = algorithm_cache[self.data['algorithm']]
else: #load it, use dataformat cache if possible
self.algorithm = algorithm.Algorithm(self.prefix,
self.data['algorithm'], dataformat_cache, library_cache)
algorithm_cache[self.algorithm.name] = self.algorithm
if not self.algorithm.valid:
self.errors += self.algorithm.errors
return #don't proceed if algorithm is bogus!
# load databases (if any is required)
for name, details in self.data['inputs'].items():
if 'database' in details:
if details['database'] not in self.databases:
if details['database'] in database_cache: #reuse
db = database_cache[details['database']]
else: #load it
db = database.Database(self.prefix, details['database'],
dataformat_cache)
database_cache[db.name] = db
self.databases[details['database']] = db
if not db.valid:
self.errors += db.errors
def __enter__(self):
"""Prepares inputs and outputs for the processing task
Raises:
IOError: in case something cannot be properly setup
"""
self._prepare_inputs()
self._prepare_outputs()
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Closes all sinks and disconnects inputs and outputs
"""
for sink in self.data_sinks:
# we save the output only if no valid error has been thrown
# n.b.: a system exit will raise SystemExit which is not an Exception
if not isinstance(exc_type, Exception): sink.close()
sink.reset()
self.input_list = None
self.output_list = None
self.data_sinks = []
self.data_sources = []
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
self.input_list = inputs.InputList()
# This is used for parallelization purposes
start_index, end_index = self.data.get('range', (None, None))
db_views = {}
for name, details in self.data['inputs'].items():
if 'database' in details: #it is a dataset input
# create the remote input
db = self.databases[details['database']]
configName = "database/%s" % db.name
if self.custom_root_folders is not None and configName in self.custom_root_folders:
db.data['root_folder'] = self.custom_root_folders[configName]
dataformat_name = db.set(details['protocol'], details['set'])['outputs'][details['output']]
# Get the relevant data for the requested view
view_key = (details['database'], details['protocol'], details['set'])
if not db_views.has_key(view_key):
# create the view
v = db.view(view_key[1], view_key[2])
# setup
v.prepare_outputs()
v.setup()
db_views[view_key] = v
else:
v = db_views[view_key]
#v.obj.outputs = v.outputs
# Use the database view as an in-memory data source
v_data_source = data.MemoryDataSource(v.done, next_callback=v.next)
v_output = v.outputs[details['output']]
# Output the data from the view
v_output.data_sink.data_sources.append(v_data_source)
# Create a new local input
input = inputs.Input(name, db.dataformats[dataformat_name], v_data_source)
#input = inputs.RemoteInput(name, db.dataformats[dataformat_name], self.db_socket)
# Synchronization bits
group = self.input_list.group(details['channel'])
if group is None:
group = inputs.InputGroup(
details['channel'],
synchronization_listener=outputs.SynchronizationListener(),
restricted_access=(details['channel'] == self.data['channel'])
)
self.input_list.add(group)
group.add(input)
else:
data_source = data.CachedDataSource()
self.data_sources.append(data_source)
if details['channel'] == self.data['channel']: #synchronized
status = data_source.setup(
filename=os.path.join(self.cache, details['path'] + '.data'),
prefix=self.prefix,
force_start_index=start_index,
force_end_index=end_index,
)
else:
status = data_source.setup(
filename=os.path.join(self.cache, details['path'] + '.data'),
prefix=self.prefix,
)
if not status:
raise IOError("cannot load cache file `%s'" % details['path'])
input = inputs.Input(name, self.algorithm.input_map[name], data_source)
# Synchronization bits
group = self.input_list.group(details['channel'])
if group is None:
group = inputs.InputGroup(
details['channel'],
synchronization_listener=outputs.SynchronizationListener(),
restricted_access=(details['channel'] == self.data['channel'])
)
self.input_list.add(group)
group.add(input)
def _prepare_outputs(self):
"""Prepares all output required by the execution."""
self.output_list = outputs.OutputList()
# This is used for parallelization purposes
start_index, end_index = self.data.get('range', (None, None))
if 'outputs' in self.data: #it is a normal block (not analyzer)
for name, details in self.data['outputs'].items():
path = os.path.join(self.cache, details['path'] + '.data')
dirname = os.path.dirname(path)
# Make sure that the directory exists while taking care of race
# conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
try:
if (len(dirname) > 0):
os.makedirs(dirname)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
data_sink = data.CachedDataSink()
self.data_sinks.append(data_sink)
status = data_sink.setup(
filename=path,
dataformat=self.algorithm.dataformats[self.algorithm.output_map[name]],
encoding='binary',
max_size=0, #in bytes, for individual file chunks
)
if not status:
raise IOError("cannot create cache sink `%s'" % details['path'])
input_group = self.input_list.group(details['channel'])
if (input_group is None) or not hasattr(input_group, 'synchronization_listener'):
synchronization_listener = None
else:
synchronization_listener = input_group.synchronization_listener
self.output_list.add(outputs.Output(name, data_sink,
synchronization_listener=synchronization_listener,
force_start_index=start_index or 0)
)
else: #it is an analyzer
name = 'result'
details = self.data[name]
path = os.path.join(self.cache, details['path'] + '.data')
dirname = os.path.dirname(path)
# Make sure that the directory exists while taking care of race
# conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
try:
if (len(dirname) > 0):
os.makedirs(dirname)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
data_sink = data.CachedDataSink()
self.data_sinks.append(data_sink)
status = data_sink.setup(
filename=path,
dataformat=self.algorithm.result_dataformat(),
encoding='binary',
)
if not status:
raise IOError("cannot create cache sink `%s'" % details['path'])
self.output_list.add(outputs.Output(name, data_sink,
force_start_index=start_index or 0))
def process(self, virtual_memory_in_megabytes=0,
max_cpu_percent=0, timeout_in_minutes=0, daemon=0):
"""Executes the user algorithm code using an external program.
If ``executable`` is set, then execute the process using an external
program, else, uses the python application living by the side of this
installation (if one is found).
The execution interface follows the backend API as described in our
documentation.
We use green subprocesses this implementation. Each co-process is linked
to us via 2 uni-directional pipes which work as datain and dataout
end-points. The parent process (i.e. the current one) establishes the
connection to the child and then can pass/receive commands, data and logs.
Usage of the data pipes (datain, dataout) is **synchronous** - you send a
command and block for an answer. The co-process is normally controlled by
the current process, except for data requests, which are user-code driven.
The nature of our problem does not require an *asynchronous* implementation
which, in turn, would require a much more complex set of dependencies (on
asyncio or Twisted for example).
Parameters:
virtual_memory_in_megabytes (int, Optional): The amount of virtual memory
(in Megabytes) available for the job. If set to zero, no limit will be
applied.
max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed
in a system. This number must be an integer number between 0 and
``100*number_of_cores`` in your system. For instance, if your system
has 2 cores, this number can go between 0 and 200. If it is <= 0, then
we don't track CPU usage.
timeout_in_minutes (int): The number of minutes to wait for the user
process to execute. After this amount of time, the user process is
killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will
be applied.
daemon (int): If this variable is set, then we don't really start the
user process, but just kick out 0MQ server, print the command-line and
sleep for that many seconds. You're supposed to start the client by
hand then and debug it.
Returns:
dict: A dictionary which is JSON formattable containing the summary of
this block execution.
"""
def _create_result(status, error_message=''):
return {
'status': status,
'statistics': {
'data': self.io_statistics
},
'stderr': '',
'stdout': '',
'system_error': '',
'user_error': error_message,
'timed_out': False
}
def _process_exception(exception, prefix, contribution_kind):
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
contributions_prefix = os.path.join(prefix, contribution_kind) + os.sep
for first_line, line in enumerate(tb):
if line[0].startswith(contributions_prefix):
break
if first_line == len(tb):
first_line = 0
s = ''.join(traceback.format_list(tb[first_line:]))
s = s.replace(contributions_prefix, '').strip()
return "%s\n%s: %s" % (s, type(exception).__name__, exception)
if not self.valid:
raise RuntimeError("execution information is bogus:\n * %s" % \
'\n * '.join(self.errors))
self.runner = self.algorithm.runner()
retval = self.runner.setup(self.data['parameters'])
if not self.input_list or not self.output_list:
raise RuntimeError("I/O for execution block has not yet been set up")
using_output = self.output_list[0] if self.analysis else self.output_list
_start = time.time()
try:
main_group = self.input_list.main_group
while main_group.hasMoreData():
main_group.restricted_access = False
main_group.next()
main_group.restricted_access = True
try:
if not self.runner.process(self.input_list, using_output):
return _create_result(1, "The algorithm returned 'False'")
except Exception as e:
message = _process_exception(e, self.prefix, 'algorithms')
return _create_result(1, message)
except Exception as e:
message = _process_exception(e, self.prefix, 'databases')
return _create_result(1, message)
missing_data_outputs = [x for x in self.output_list if x.isDataMissing()]
proc_time = time.time() - _start
if missing_data_outputs:
raise RuntimeError("Missing data on the following output(s): %s" % \
', '.join([x.name for x in missing_data_outputs]))
# some local information
logger.debug("Total processing time was %.3f seconds" , proc_time)
return _create_result(0)
@property
def valid(self):
"""A boolean that indicates if this executor is valid or not"""
return not bool(self.errors)
@property
def analysis(self):
"""A boolean that indicates if the current block is an analysis block"""
return 'result' in self.data
@property
def outputs_exist(self):
"""Returns ``True`` if outputs this block is supposed to produce exists."""
if self.analysis:
path = os.path.join(self.cache, self.data['result']['path']) + '*'
if not glob.glob(path): return False
else:
for name, details in self.data['outputs'].items():
path = os.path.join(self.cache, details['path']) + '*'
if not glob.glob(path): return False
# if you get to this point all outputs already exist
return True
@property
def io_statistics(self):
"""Summarize current I/O statistics looking at data sources and sinks, inputs and outputs
Returns:
dict: A dictionary summarizing current I/O statistics
"""
return stats.io_statistics(self.data, self.input_list, self.output_list)
def __str__(self):
return simplejson.dumps(self.data, indent=4)
def write(self, path):
"""Writes contents to precise filesystem location"""
with open(path, 'wt') as f: f.write(str(self))
def dump_runner_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
data = {
'algorithm': self.data['algorithm'],
'parameters': self.data['parameters'],
}
data['inputs'] = \
dict([(k, v['channel']) for k,v in self.data['inputs'].items()])
if 'outputs' in self.data:
data['outputs'] = \
dict([(k, v['channel']) for k,v in self.data['outputs'].items()])
else:
data['result'] = self.data['channel']
data['channel'] = self.data['channel']
with open(os.path.join(directory, 'configuration.json'), 'wb') as f:
simplejson.dump(data, f, indent=2)
tmp_prefix = os.path.join(directory, 'prefix')
if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix)
self.algorithm.export(tmp_prefix)
def dump_databases_provider_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
with open(os.path.join(directory, 'configuration.json'), 'wb') as f:
simplejson.dump(self.data, f, indent=2)
tmp_prefix = os.path.join(directory, 'prefix')
if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix)
for db in self.databases.values():
db.export(tmp_prefix)
def kill(self):
"""Stops the user process by force - to be called from signal handlers"""
if self.agent is not None:
self.agent.kill()
return True
return False
...@@ -47,6 +47,7 @@ import unittest ...@@ -47,6 +47,7 @@ import unittest
from ..experiment import Experiment from ..experiment import Experiment
from ..execution import DockerExecutor from ..execution import DockerExecutor
from ..execution import LocalExecutor
from ..hash import hashFileContents from ..hash import hashFileContents
from ..data import CachedDataSource from ..data import CachedDataSource
from ..dock import Host from ..dock import Host
...@@ -57,23 +58,6 @@ from .utils import cleanup ...@@ -57,23 +58,6 @@ from .utils import cleanup
class TestExecution(unittest.TestCase): class TestExecution(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.host = Host()
cls.host.setup(raise_on_errors=False)
@classmethod
def tearDownClass(cls):
cls.host.teardown()
cleanup()
def setUp(self):
super(TestExecution, self).setUp()
self.proxy_mode = True
def check_output(self, prefix, path): def check_output(self, prefix, path):
'''Checks if a given output exists, together with its indexes and checksums '''Checks if a given output exists, together with its indexes and checksums
''' '''
...@@ -130,9 +114,8 @@ class TestExecution(unittest.TestCase): ...@@ -130,9 +114,8 @@ class TestExecution(unittest.TestCase):
# can we execute it? # can we execute it?
results = [] results = []
for key, value in scheduled.items(): for key, value in scheduled.items():
executor = DockerExecutor(self.host, prefix, value['configuration'], tmp_prefix, executor = self.create_executor(prefix, value['configuration'], tmp_prefix,
dataformat_cache, database_cache, algorithm_cache, dataformat_cache, database_cache, algorithm_cache)
proxy_mode=self.proxy_mode)
assert executor.valid, '\n * %s' % '\n * '.join(executor.errors) assert executor.valid, '\n * %s' % '\n * '.join(executor.errors)
with executor: with executor:
...@@ -241,9 +224,6 @@ class TestExecution(unittest.TestCase): ...@@ -241,9 +224,6 @@ class TestExecution(unittest.TestCase):
def test_double_triangle_1(self): def test_double_triangle_1(self):
assert self.execute('user/user/double_triangle/1/double_triangle', [{'out_data': 42}]) is None assert self.execute('user/user/double_triangle/1/double_triangle', [{'out_data': 42}]) is None
def test_cxx_double_1(self):
assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None
def test_inputs_mix_1(self): def test_inputs_mix_1(self):
assert self.execute('user/user/inputs_mix/1/test', [{'sum': 12272, 'nb': 10}]) is None assert self.execute('user/user/inputs_mix/1/test', [{'sum': 12272, 'nb': 10}]) is None
...@@ -286,8 +266,54 @@ class TestExecution(unittest.TestCase): ...@@ -286,8 +266,54 @@ class TestExecution(unittest.TestCase):
class TestExecutionNoProxy(TestExecution): class TestDockerExecution(TestExecution):
@classmethod
def setUpClass(cls):
cls.host = Host()
cls.host.setup(raise_on_errors=False)
@classmethod
def tearDownClass(cls):
cls.host.teardown()
cleanup()
def setUp(self): def setUp(self):
super(TestExecutionNoProxy, self).setUp() super(TestDockerExecution, self).setUp()
self.proxy_mode = True
def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache,
database_cache, algorithm_cache):
return DockerExecutor(self.host, prefix, configuration, tmp_prefix,
dataformat_cache, database_cache, algorithm_cache,
proxy_mode=self.proxy_mode)
def test_cxx_double_1(self):
assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None
class TestDockerExecutionNoProxy(TestDockerExecution):
def setUp(self):
super(TestDockerExecutionNoProxy, self).setUp()
self.proxy_mode = False self.proxy_mode = False
class TestLocalExecution(TestExecution):
def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache,
database_cache, algorithm_cache):
return LocalExecutor(prefix, configuration, tmp_prefix,
dataformat_cache, database_cache, algorithm_cache)
# Don't run tests from the base class
del TestExecution
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment