From 9642f24ebd966f97bfc15da0200570e1095745eb Mon Sep 17 00:00:00 2001 From: Philip ABBET <philip.abbet@idiap.ch> Date: Fri, 15 Sep 2017 10:10:54 +0200 Subject: [PATCH] Refactoring: Integrate the LocalExecutor class from beat.cmdline --- beat/core/execution/__init__.py | 30 + .../{execution.py => execution/docker.py} | 18 +- beat/core/execution/local.py | 667 ++++++++++++++++++ beat/core/test/test_execution.py | 76 +- 4 files changed, 757 insertions(+), 34 deletions(-) create mode 100755 beat/core/execution/__init__.py rename beat/core/{execution.py => execution/docker.py} (98%) create mode 100755 beat/core/execution/local.py mode change 100644 => 100755 beat/core/test/test_execution.py diff --git a/beat/core/execution/__init__.py b/beat/core/execution/__init__.py new file mode 100755 index 00000000..ae98da9c --- /dev/null +++ b/beat/core/execution/__init__.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +from .docker import DockerExecutor +from .local import LocalExecutor diff --git a/beat/core/execution.py b/beat/core/execution/docker.py similarity index 98% rename from beat/core/execution.py rename to beat/core/execution/docker.py index 064f490b..f3c4e392 100755 --- a/beat/core/execution.py +++ b/beat/core/execution/docker.py @@ -40,15 +40,15 @@ logger = logging.getLogger(__name__) import simplejson -from . import schema -from . import database -from . import algorithm -from . import inputs -from . import outputs -from . import data -from . import stats -from . import agent -from . import dock +from .. import schema +from .. import database +from .. import algorithm +from .. import inputs +from .. import outputs +from .. import data +from .. import stats +from .. import agent +from .. import dock from beat.backend.python.helpers import convert_experiment_configuration_to_container from beat.backend.python.helpers import create_inputs_from_configuration diff --git a/beat/core/execution/local.py b/beat/core/execution/local.py new file mode 100755 index 00000000..bd69eecd --- /dev/null +++ b/beat/core/execution/local.py @@ -0,0 +1,667 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +'''Execution utilities''' + +import os +import sys +import glob +import errno +import tempfile +import subprocess +import zmq.green as zmq +import time +import collections + +import logging +logger = logging.getLogger(__name__) + +import simplejson + +from .. import schema +from .. import database +from .. import algorithm +from .. import inputs +from .. import outputs +from .. import data +from .. import stats + + +class LocalExecutor(object): + """Executors runs the code given an execution block information, externally + + + Parameters: + + prefix (str): Establishes the prefix of your installation. + + data (dict, str): The piece of data representing the block to be executed. + It must validate against the schema defined for execution blocks. If a + string is passed, it is supposed to be a fully qualified absolute path to + a JSON file containing the block execution information. + + cache (str, optional): If your cache is not located under + ``<prefix>/cache``, then specify a full path here. It will be used + instead. + + dataformat_cache (dict, optional): A dictionary mapping dataformat names to + loaded dataformats. This parameter is optional and, if passed, may + greatly speed-up database loading times as dataformats that are already + loaded may be re-used. If you use this parameter, you must guarantee that + the cache is refreshed as appropriate in case the underlying dataformats + change. + + database_cache (dict, optional): A dictionary mapping database names to + loaded databases. This parameter is optional and, if passed, may + greatly speed-up database loading times as databases that are already + loaded may be re-used. If you use this parameter, you must guarantee that + the cache is refreshed as appropriate in case the underlying databases + change. + + algorithm_cache (dict, optional): A dictionary mapping algorithm names to + loaded algorithms. This parameter is optional and, if passed, may + greatly speed-up database loading times as algorithms that are already + loaded may be re-used. If you use this parameter, you must guarantee that + the cache is refreshed as appropriate in case the underlying algorithms + change. + + library_cache (dict, optional): A dictionary mapping library names to + loaded libraries. This parameter is optional and, if passed, may greatly + speed-up library loading times as libraries that are already loaded may + be re-used. If you use this parameter, you must guarantee that the cache + is refreshed as appropriate in case the underlying libraries change. + + + Attributes: + + cache (str): The path to the cache currently being used + + errors (list): A list containing errors found while loading this execution + block. + + data (dict): The original data for this executor, as loaded by our JSON + decoder. + + algorithm (beat.core.algorithm.Algorithm): An object representing the + algorithm to be run. + + databases (dict): A dictionary in which keys are strings with database + names and values are :py:class:`database.Database`, representing the + databases required for running this block. The dictionary may be empty + in case all inputs are taken from the file cache. + + views (dict): A dictionary in which the keys are tuples pointing to the + ``(<database-name>, <protocol>, <set>)`` and the value is a setup view + for that particular combination of details. The dictionary may be empty + in case all inputs are taken from the file cache. + + input_list (beat.core.inputs.InputList): A list of inputs that will be + served to the algorithm. + + output_list (beat.core.outputs.OutputList): A list of outputs that the + algorithm will produce. + + data_sources (list): A list with all data-sources created by our execution + loader. + + data_sinks (list): A list with all data-sinks created by our execution + loader. These are useful for clean-up actions in case of problems. + + custom_root_folders (dict): A dictionary where the keys are database + identifiers (`<db_name>/<version>`) and the values are paths to the + given database's files. These values will override the value found + in the database's metadata. + + """ + + def __init__(self, prefix, data, cache=None, dataformat_cache=None, + database_cache=None, algorithm_cache=None, library_cache=None, custom_root_folders=None): + + self.prefix = prefix + self.cache = cache or os.path.join(self.prefix, 'cache') + + # check cache - halt if required + if not os.path.exists(self.cache): + raise IOError("Cache path `%s' does not exist" % self.cache) + + # some attributes + self.algorithm = None + self.databases = {} + self.views = {} + self.input_list = None + self.output_list = None + self.data_sinks = [] + self.data_sources = [] + + # runs validation if required + self.errors = [] + self.data = data + + if custom_root_folders is not None and not isinstance(custom_root_folders, collections.Mapping): + raise TypeError("The custom root folders must be in dictionary format") + + self.custom_root_folders = custom_root_folders + + # temporary caches, if the user has not set them, for performance + database_cache = database_cache if database_cache is not None else {} + dataformat_cache = dataformat_cache if dataformat_cache is not None else {} + algorithm_cache = algorithm_cache if algorithm_cache is not None else {} + library_cache = library_cache if library_cache is not None else {} + + self._load(data, dataformat_cache, algorithm_cache, database_cache, + library_cache) + + + def _load(self, data, dataformat_cache, algorithm_cache, database_cache, + library_cache): + """Loads the block execution information""" + + # reset + self.data = None + self.errors = [] + self.algorithm = None + self.databases = {} + self.views = {} + self.input_list = None + self.output_list = None + self.data_sinks = [] + self.data_sources = [] + self.db_address = None + + if not isinstance(data, dict): #user has passed a file pointer + if not os.path.exists(data): + self.errors.append('File not found: %s' % data) + return + + # this runs basic validation, including JSON loading if required + self.data, self.errors = schema.validate('execution', data) + if self.errors: return #don't proceed with the rest of validation + + # at this point, the execution information is loaded, must now go on and + # load the algorithm code. + if self.data['algorithm'] in algorithm_cache: #reuse + self.algorithm = algorithm_cache[self.data['algorithm']] + else: #load it, use dataformat cache if possible + self.algorithm = algorithm.Algorithm(self.prefix, + self.data['algorithm'], dataformat_cache, library_cache) + algorithm_cache[self.algorithm.name] = self.algorithm + + if not self.algorithm.valid: + self.errors += self.algorithm.errors + return #don't proceed if algorithm is bogus! + + # load databases (if any is required) + for name, details in self.data['inputs'].items(): + if 'database' in details: + + if details['database'] not in self.databases: + + if details['database'] in database_cache: #reuse + db = database_cache[details['database']] + else: #load it + db = database.Database(self.prefix, details['database'], + dataformat_cache) + database_cache[db.name] = db + + self.databases[details['database']] = db + + if not db.valid: + self.errors += db.errors + + + def __enter__(self): + """Prepares inputs and outputs for the processing task + + Raises: + + IOError: in case something cannot be properly setup + + """ + + self._prepare_inputs() + self._prepare_outputs() + + return self + + + def __exit__(self, exc_type, exc_value, traceback): + """Closes all sinks and disconnects inputs and outputs + """ + + for sink in self.data_sinks: + # we save the output only if no valid error has been thrown + # n.b.: a system exit will raise SystemExit which is not an Exception + if not isinstance(exc_type, Exception): sink.close() + sink.reset() + + + self.input_list = None + self.output_list = None + self.data_sinks = [] + self.data_sources = [] + + + def _prepare_inputs(self): + """Prepares all input required by the execution.""" + + self.input_list = inputs.InputList() + + # This is used for parallelization purposes + start_index, end_index = self.data.get('range', (None, None)) + + db_views = {} + + for name, details in self.data['inputs'].items(): + + if 'database' in details: #it is a dataset input + # create the remote input + db = self.databases[details['database']] + configName = "database/%s" % db.name + if self.custom_root_folders is not None and configName in self.custom_root_folders: + db.data['root_folder'] = self.custom_root_folders[configName] + + dataformat_name = db.set(details['protocol'], details['set'])['outputs'][details['output']] + + # Get the relevant data for the requested view + view_key = (details['database'], details['protocol'], details['set']) + + if not db_views.has_key(view_key): + # create the view + v = db.view(view_key[1], view_key[2]) + # setup + v.prepare_outputs() + v.setup() + db_views[view_key] = v + else: + v = db_views[view_key] + + #v.obj.outputs = v.outputs + # Use the database view as an in-memory data source + v_data_source = data.MemoryDataSource(v.done, next_callback=v.next) + v_output = v.outputs[details['output']] + # Output the data from the view + v_output.data_sink.data_sources.append(v_data_source) + # Create a new local input + input = inputs.Input(name, db.dataformats[dataformat_name], v_data_source) + #input = inputs.RemoteInput(name, db.dataformats[dataformat_name], self.db_socket) + + # Synchronization bits + group = self.input_list.group(details['channel']) + if group is None: + group = inputs.InputGroup( + details['channel'], + synchronization_listener=outputs.SynchronizationListener(), + restricted_access=(details['channel'] == self.data['channel']) + ) + self.input_list.add(group) + + group.add(input) + + else: + + data_source = data.CachedDataSource() + self.data_sources.append(data_source) + if details['channel'] == self.data['channel']: #synchronized + status = data_source.setup( + filename=os.path.join(self.cache, details['path'] + '.data'), + prefix=self.prefix, + force_start_index=start_index, + force_end_index=end_index, + ) + else: + status = data_source.setup( + filename=os.path.join(self.cache, details['path'] + '.data'), + prefix=self.prefix, + ) + + if not status: + raise IOError("cannot load cache file `%s'" % details['path']) + + input = inputs.Input(name, self.algorithm.input_map[name], data_source) + + # Synchronization bits + group = self.input_list.group(details['channel']) + if group is None: + group = inputs.InputGroup( + details['channel'], + synchronization_listener=outputs.SynchronizationListener(), + restricted_access=(details['channel'] == self.data['channel']) + ) + self.input_list.add(group) + + group.add(input) + + + def _prepare_outputs(self): + """Prepares all output required by the execution.""" + + self.output_list = outputs.OutputList() + + # This is used for parallelization purposes + start_index, end_index = self.data.get('range', (None, None)) + + if 'outputs' in self.data: #it is a normal block (not analyzer) + + for name, details in self.data['outputs'].items(): + + path = os.path.join(self.cache, details['path'] + '.data') + dirname = os.path.dirname(path) + # Make sure that the directory exists while taking care of race + # conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary + try: + if (len(dirname) > 0): + os.makedirs(dirname) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + + data_sink = data.CachedDataSink() + self.data_sinks.append(data_sink) + status = data_sink.setup( + filename=path, + dataformat=self.algorithm.dataformats[self.algorithm.output_map[name]], + encoding='binary', + max_size=0, #in bytes, for individual file chunks + ) + if not status: + raise IOError("cannot create cache sink `%s'" % details['path']) + + input_group = self.input_list.group(details['channel']) + if (input_group is None) or not hasattr(input_group, 'synchronization_listener'): + synchronization_listener = None + else: + synchronization_listener = input_group.synchronization_listener + + self.output_list.add(outputs.Output(name, data_sink, + synchronization_listener=synchronization_listener, + force_start_index=start_index or 0) + ) + + else: #it is an analyzer + + name = 'result' + details = self.data[name] + path = os.path.join(self.cache, details['path'] + '.data') + dirname = os.path.dirname(path) + # Make sure that the directory exists while taking care of race + # conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary + try: + if (len(dirname) > 0): + os.makedirs(dirname) + except OSError as exception: + if exception.errno != errno.EEXIST: + raise + + data_sink = data.CachedDataSink() + self.data_sinks.append(data_sink) + status = data_sink.setup( + filename=path, + dataformat=self.algorithm.result_dataformat(), + encoding='binary', + ) + if not status: + raise IOError("cannot create cache sink `%s'" % details['path']) + + self.output_list.add(outputs.Output(name, data_sink, + force_start_index=start_index or 0)) + + + def process(self, virtual_memory_in_megabytes=0, + max_cpu_percent=0, timeout_in_minutes=0, daemon=0): + """Executes the user algorithm code using an external program. + + If ``executable`` is set, then execute the process using an external + program, else, uses the python application living by the side of this + installation (if one is found). + + The execution interface follows the backend API as described in our + documentation. + + We use green subprocesses this implementation. Each co-process is linked + to us via 2 uni-directional pipes which work as datain and dataout + end-points. The parent process (i.e. the current one) establishes the + connection to the child and then can pass/receive commands, data and logs. + + Usage of the data pipes (datain, dataout) is **synchronous** - you send a + command and block for an answer. The co-process is normally controlled by + the current process, except for data requests, which are user-code driven. + The nature of our problem does not require an *asynchronous* implementation + which, in turn, would require a much more complex set of dependencies (on + asyncio or Twisted for example). + + + Parameters: + + virtual_memory_in_megabytes (int, Optional): The amount of virtual memory + (in Megabytes) available for the job. If set to zero, no limit will be + applied. + + max_cpu_percent (int, Optional): The maximum amount of CPU usage allowed + in a system. This number must be an integer number between 0 and + ``100*number_of_cores`` in your system. For instance, if your system + has 2 cores, this number can go between 0 and 200. If it is <= 0, then + we don't track CPU usage. + + timeout_in_minutes (int): The number of minutes to wait for the user + process to execute. After this amount of time, the user process is + killed with :py:attr:`signal.SIGKILL`. If set to zero, no timeout will + be applied. + + daemon (int): If this variable is set, then we don't really start the + user process, but just kick out 0MQ server, print the command-line and + sleep for that many seconds. You're supposed to start the client by + hand then and debug it. + + + Returns: + + dict: A dictionary which is JSON formattable containing the summary of + this block execution. + + """ + + def _create_result(status, error_message=''): + return { + 'status': status, + 'statistics': { + 'data': self.io_statistics + }, + 'stderr': '', + 'stdout': '', + 'system_error': '', + 'user_error': error_message, + 'timed_out': False + } + + def _process_exception(exception, prefix, contribution_kind): + import traceback + exc_type, exc_value, exc_traceback = sys.exc_info() + tb = traceback.extract_tb(exc_traceback) + + contributions_prefix = os.path.join(prefix, contribution_kind) + os.sep + + for first_line, line in enumerate(tb): + if line[0].startswith(contributions_prefix): + break + + if first_line == len(tb): + first_line = 0 + + s = ''.join(traceback.format_list(tb[first_line:])) + s = s.replace(contributions_prefix, '').strip() + + return "%s\n%s: %s" % (s, type(exception).__name__, exception) + + + if not self.valid: + raise RuntimeError("execution information is bogus:\n * %s" % \ + '\n * '.join(self.errors)) + + self.runner = self.algorithm.runner() + retval = self.runner.setup(self.data['parameters']) + + if not self.input_list or not self.output_list: + raise RuntimeError("I/O for execution block has not yet been set up") + + using_output = self.output_list[0] if self.analysis else self.output_list + + _start = time.time() + + try: + main_group = self.input_list.main_group + while main_group.hasMoreData(): + main_group.restricted_access = False + main_group.next() + main_group.restricted_access = True + + try: + if not self.runner.process(self.input_list, using_output): + return _create_result(1, "The algorithm returned 'False'") + except Exception as e: + message = _process_exception(e, self.prefix, 'algorithms') + return _create_result(1, message) + + except Exception as e: + message = _process_exception(e, self.prefix, 'databases') + return _create_result(1, message) + + missing_data_outputs = [x for x in self.output_list if x.isDataMissing()] + + proc_time = time.time() - _start + + if missing_data_outputs: + raise RuntimeError("Missing data on the following output(s): %s" % \ + ', '.join([x.name for x in missing_data_outputs])) + + # some local information + logger.debug("Total processing time was %.3f seconds" , proc_time) + + return _create_result(0) + + + @property + def valid(self): + """A boolean that indicates if this executor is valid or not""" + + return not bool(self.errors) + + + @property + def analysis(self): + """A boolean that indicates if the current block is an analysis block""" + return 'result' in self.data + + + @property + def outputs_exist(self): + """Returns ``True`` if outputs this block is supposed to produce exists.""" + + if self.analysis: + path = os.path.join(self.cache, self.data['result']['path']) + '*' + if not glob.glob(path): return False + + else: + for name, details in self.data['outputs'].items(): + path = os.path.join(self.cache, details['path']) + '*' + if not glob.glob(path): return False + + # if you get to this point all outputs already exist + return True + + + @property + def io_statistics(self): + """Summarize current I/O statistics looking at data sources and sinks, inputs and outputs + + Returns: + + dict: A dictionary summarizing current I/O statistics + """ + + return stats.io_statistics(self.data, self.input_list, self.output_list) + + + def __str__(self): + + return simplejson.dumps(self.data, indent=4) + + + def write(self, path): + """Writes contents to precise filesystem location""" + + with open(path, 'wt') as f: f.write(str(self)) + + + def dump_runner_configuration(self, directory): + """Exports contents useful for a backend runner to run the algorithm""" + + data = { + 'algorithm': self.data['algorithm'], + 'parameters': self.data['parameters'], + } + + data['inputs'] = \ + dict([(k, v['channel']) for k,v in self.data['inputs'].items()]) + + if 'outputs' in self.data: + data['outputs'] = \ + dict([(k, v['channel']) for k,v in self.data['outputs'].items()]) + else: + data['result'] = self.data['channel'] + + data['channel'] = self.data['channel'] + + with open(os.path.join(directory, 'configuration.json'), 'wb') as f: + simplejson.dump(data, f, indent=2) + + tmp_prefix = os.path.join(directory, 'prefix') + if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix) + + self.algorithm.export(tmp_prefix) + + + def dump_databases_provider_configuration(self, directory): + """Exports contents useful for a backend runner to run the algorithm""" + + with open(os.path.join(directory, 'configuration.json'), 'wb') as f: + simplejson.dump(self.data, f, indent=2) + + tmp_prefix = os.path.join(directory, 'prefix') + if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix) + + for db in self.databases.values(): + db.export(tmp_prefix) + + + def kill(self): + """Stops the user process by force - to be called from signal handlers""" + + if self.agent is not None: + self.agent.kill() + return True + return False diff --git a/beat/core/test/test_execution.py b/beat/core/test/test_execution.py old mode 100644 new mode 100755 index b8ba1032..d4cb5f72 --- a/beat/core/test/test_execution.py +++ b/beat/core/test/test_execution.py @@ -47,6 +47,7 @@ import unittest from ..experiment import Experiment from ..execution import DockerExecutor +from ..execution import LocalExecutor from ..hash import hashFileContents from ..data import CachedDataSource from ..dock import Host @@ -57,23 +58,6 @@ from .utils import cleanup class TestExecution(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.host = Host() - cls.host.setup(raise_on_errors=False) - - - @classmethod - def tearDownClass(cls): - cls.host.teardown() - cleanup() - - - def setUp(self): - super(TestExecution, self).setUp() - self.proxy_mode = True - - def check_output(self, prefix, path): '''Checks if a given output exists, together with its indexes and checksums ''' @@ -130,9 +114,8 @@ class TestExecution(unittest.TestCase): # can we execute it? results = [] for key, value in scheduled.items(): - executor = DockerExecutor(self.host, prefix, value['configuration'], tmp_prefix, - dataformat_cache, database_cache, algorithm_cache, - proxy_mode=self.proxy_mode) + executor = self.create_executor(prefix, value['configuration'], tmp_prefix, + dataformat_cache, database_cache, algorithm_cache) assert executor.valid, '\n * %s' % '\n * '.join(executor.errors) with executor: @@ -241,9 +224,6 @@ class TestExecution(unittest.TestCase): def test_double_triangle_1(self): assert self.execute('user/user/double_triangle/1/double_triangle', [{'out_data': 42}]) is None - def test_cxx_double_1(self): - assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None - def test_inputs_mix_1(self): assert self.execute('user/user/inputs_mix/1/test', [{'sum': 12272, 'nb': 10}]) is None @@ -286,8 +266,54 @@ class TestExecution(unittest.TestCase): -class TestExecutionNoProxy(TestExecution): +class TestDockerExecution(TestExecution): + + @classmethod + def setUpClass(cls): + cls.host = Host() + cls.host.setup(raise_on_errors=False) + + + @classmethod + def tearDownClass(cls): + cls.host.teardown() + cleanup() + def setUp(self): - super(TestExecutionNoProxy, self).setUp() + super(TestDockerExecution, self).setUp() + self.proxy_mode = True + + + def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache, + database_cache, algorithm_cache): + return DockerExecutor(self.host, prefix, configuration, tmp_prefix, + dataformat_cache, database_cache, algorithm_cache, + proxy_mode=self.proxy_mode) + + + def test_cxx_double_1(self): + assert self.execute('user/user/double/1/cxx_double', [{'out_data': 42}]) is None + + + + +class TestDockerExecutionNoProxy(TestDockerExecution): + + def setUp(self): + super(TestDockerExecutionNoProxy, self).setUp() self.proxy_mode = False + + + +class TestLocalExecution(TestExecution): + + def create_executor(self, prefix, configuration, tmp_prefix, dataformat_cache, + database_cache, algorithm_cache): + return LocalExecutor(prefix, configuration, tmp_prefix, + dataformat_cache, database_cache, algorithm_cache) + + + +# Don't run tests from the base class +del TestExecution -- GitLab