diff --git a/beat/core/agent.py b/beat/core/agent.py old mode 100644 new mode 100755 index 623f494bdd79035020fb217f7f124869bd539f6c..0903b6fb6209bed6e683f576bbe2a84d38511959 --- a/beat/core/agent.py +++ b/beat/core/agent.py @@ -29,6 +29,7 @@ import os import shutil import simplejson +import glob import logging logger = logging.getLogger(__name__) @@ -303,6 +304,24 @@ class Agent(object): volumes=volumes ) + volumes = {} + if not configuration.proxy_mode: + for name, details in configuration.data['inputs'].items(): + if 'database' in details: + continue + + basename = os.path.join(configuration.cache, details['path']) + filenames = glob.glob(basename + '*.data') + filenames.extend(glob.glob(basename + '*.data.checksum')) + filenames.extend(glob.glob(basename + '*.data.index')) + filenames.extend(glob.glob(basename + '*.data.index.checksum')) + + for filename in filenames: + volumes[filename] = { + 'bind': os.path.join('/cache', filename.replace(configuration.cache + '/', '')), + 'mode': 'ro', + } + self.process = dock.Popen( host, envkey, @@ -310,6 +329,7 @@ class Agent(object): tmp_archive=self.tempdir, virtual_memory_in_megabytes=self.virtual_memory_in_megabytes, max_cpu_percent=self.max_cpu_percent, + volumes=volumes ) # provide a tip on how to stop the test diff --git a/beat/core/execution.py b/beat/core/execution.py old mode 100644 new mode 100755 index b4473e29161844d9f1c838bbb8836f76502cff38..a25653cff7fe1189481edbe200c3adff0f04a585 --- a/beat/core/execution.py +++ b/beat/core/execution.py @@ -51,6 +51,10 @@ from . import stats from . import agent from . import dock +from beat.backend.python.helpers import convert_experiment_configuration_to_container +from beat.backend.python.helpers import create_inputs_from_configuration +from beat.backend.python.helpers import CacheAccess + class Executor(object): """Executors runs the code given an execution block information, externally @@ -135,10 +139,12 @@ class Executor(object): """ def __init__(self, prefix, data, cache=None, dataformat_cache=None, - database_cache=None, algorithm_cache=None, library_cache=None): + database_cache=None, algorithm_cache=None, library_cache=None, + proxy_mode=True): self.prefix = prefix self.cache = cache or os.path.join(self.prefix, 'cache') + self.proxy_mode = proxy_mode # check cache - halt if required if not os.path.exists(self.cache): @@ -186,6 +192,7 @@ class Executor(object): self.data_sources = [] self.context = None self.db_address = None + self.db_socket = None if not isinstance(data, dict): #user has passed a file pointer if not os.path.exists(data): @@ -278,68 +285,15 @@ class Executor(object): def _prepare_inputs(self): """Prepares all input required by the execution.""" - self.input_list = inputs.InputList() - - # This is used for parallelization purposes - start_index, end_index = self.data.get('range', (None, None)) - - for name, details in self.data['inputs'].items(): - - if 'database' in details: #it is a dataset input - - # create the remote input - db = self.databases[details['database']] - - dataformat_name = db.set(details['protocol'], details['set'])['outputs'][details['output']] - input = inputs.RemoteInput(name, db.dataformats[dataformat_name], self.db_socket, unpack=False) - - # Synchronization bits - group = self.input_list.group(details['channel']) - if group is None: - group = inputs.InputGroup( - details['channel'], - synchronization_listener=outputs.SynchronizationListener(), - restricted_access=(details['channel'] == self.data['channel']) - ) - self.input_list.add(group) - - group.add(input) - - else: - - data_source = data.CachedDataSource() - self.data_sources.append(data_source) - if details['channel'] == self.data['channel']: #synchronized - status = data_source.setup( - filename=os.path.join(self.cache, details['path'] + '.data'), - prefix=self.prefix, - force_start_index=start_index, - force_end_index=end_index, - unpack=False, - ) - else: - status = data_source.setup( - filename=os.path.join(self.cache, details['path'] + '.data'), - prefix=self.prefix, - unpack=False, - ) - - if not status: - raise IOError("cannot load cache file `%s'" % details['path']) - - input = inputs.Input(name, self.algorithm.input_map[name], data_source) - - # Synchronization bits - group = self.input_list.group(details['channel']) - if group is None: - group = inputs.InputGroup( - details['channel'], - synchronization_listener=outputs.SynchronizationListener(), - restricted_access=(details['channel'] == self.data['channel']) - ) - self.input_list.add(group) + if self.proxy_mode: + cache_access = CacheAccess.LOCAL + else: + cache_access = CacheAccess.NONE - group.add(input) + (self.input_list, self.data_sources) = create_inputs_from_configuration( + self.data, self.algorithm, self.prefix, self.cache, + cache_access=cache_access, unpack=False, socket=self.db_socket + ) def _prepare_outputs(self): @@ -490,7 +444,7 @@ class Executor(object): #otherwise, it means the running process went bananas, ignore it ;-) if 'statistics' in retval: if 'data' in retval['statistics']: - retval['statistics']['data'].update(self.io_statistics) + stats.update(retval['statistics']['data'], self.io_statistics) else: logger.warn("cannot find 'data' entry on returned stats, " \ "therefore not appending I/O info either") @@ -530,16 +484,14 @@ class Executor(object): @property def io_statistics(self): - """Summarize current I/O statistics looking at data sources and sinks + """Summarize current I/O statistics looking at data sources and sinks, inputs and outputs Returns: - dict: A dictionary summarizing current I/O statistics, read from our - sinks, sources, inputs and outputs. + dict: A dictionary summarizing current I/O statistics """ - is_analyzer = 'outputs' not in self.data - return stats.io_statistics(self.data_sources, self.input_list, self.data_sinks, self.output_list, self.data, is_analyzer) + return stats.io_statistics(self.data, self.input_list, self.output_list) def __str__(self): @@ -556,21 +508,7 @@ class Executor(object): def dump_runner_configuration(self, directory): """Exports contents useful for a backend runner to run the algorithm""" - data = { - 'algorithm': self.data['algorithm'], - 'parameters': self.data['parameters'], - } - - data['inputs'] = \ - dict([(k, v['channel']) for k,v in self.data['inputs'].items()]) - - if 'outputs' in self.data: - data['outputs'] = \ - dict([(k, v['channel']) for k,v in self.data['outputs'].items()]) - else: - data['result'] = self.data['channel'] - - data['channel'] = self.data['channel'] + data = convert_experiment_configuration_to_container(self.data, self.proxy_mode) with open(os.path.join(directory, 'configuration.json'), 'wb') as f: simplejson.dump(data, f, indent=2) diff --git a/beat/core/stats.py b/beat/core/stats.py old mode 100644 new mode 100755 index 021d006bf2f6b9cb3130c841b79cf552b3c3d7c6..ce32e07a6a88e1f782591a1dc289a3875bbc52f4 --- a/beat/core/stats.py +++ b/beat/core/stats.py @@ -37,6 +37,10 @@ import simplejson from . import schema from . import prototypes +from beat.backend.python.stats import io_statistics +from beat.backend.python.stats import update + + class Statistics(object): """Statistics define resource usage for algorithmic code runs @@ -212,58 +216,7 @@ class Statistics(object): with open(f, 'wt') as fobj: fobj.write(str(self)) -def io_statistics(data_sources, input_list, data_sinks, output_list, data, analyzer=False): - """Summarize current I/O statistics looking at data sources and sinks - - Returns: - - dict: A dictionary summarizing current I/O statistics, read from our - sinks, sources, inputs and outputs. - """ - - # data reading - bytes_read = 0 - blocks_read = 0 - read_time = 0.0 - - for source in data_sources: - size, duration = source.statistics() - bytes_read += size - read_time += duration - - for inpt in input_list: - blocks_read += inpt.nb_data_blocks_read - - # data writing - bytes_written = 0 - blocks_written = 0 - write_time = 0.0 - - for sink in data_sinks: - size, duration = sink.statistics() - bytes_written += size - write_time += duration - - files = [] - for outpt in output_list: - blocks_written += outpt.nb_data_blocks_written - #if self.analysis: - if analyzer: #'outputs' in self.data: #it is a normal block (not analyzer) - hash = data['result']['hash'] - else: - hash = data['outputs'][outpt.name]['hash'] - files.append(dict( - hash=hash, - size=float(outpt.data_sink.statistics()[0]), - blocks=outpt.nb_data_blocks_written, - )) - - return dict( - volume = dict(read=bytes_read, write=bytes_written), - blocks = dict(read=blocks_read, write=blocks_written), - time = dict(read=read_time, write=write_time), - files = files, - ) +#---------------------------------------------------------- def cpu_statistics(start, end): @@ -312,6 +265,9 @@ def cpu_statistics(start, end): } +#---------------------------------------------------------- + + def memory_statistics(data): """Summarizes current memory usage diff --git a/beat/core/test/test_execution.py b/beat/core/test/test_execution.py old mode 100644 new mode 100755 index f74d3e4e4432c8e8c924740b20dde9a350d9fd78..ce7b6f9afd0d217efb652ed73d83ae83a6f51741 --- a/beat/core/test/test_execution.py +++ b/beat/core/test/test_execution.py @@ -69,6 +69,11 @@ class TestExecution(unittest.TestCase): cleanup() + def setUp(self): + super(TestExecution, self).setUp() + self.proxy_mode = True + + def check_output(self, prefix, path): '''Checks if a given output exists, together with its indexes and checksums ''' @@ -126,7 +131,8 @@ class TestExecution(unittest.TestCase): results = [] for key, value in scheduled.items(): executor = Executor(prefix, value['configuration'], tmp_prefix, - dataformat_cache, database_cache, algorithm_cache) + dataformat_cache, database_cache, algorithm_cache, + proxy_mode=self.proxy_mode) assert executor.valid, '\n * %s' % '\n * '.join(executor.errors) with executor: @@ -228,7 +234,7 @@ class TestExecution(unittest.TestCase): def test_too_many_nexts(self): result = self.execute('user/user/triangle/1/too_many_nexts', [None]) assert result - self.assertEqual(result['status'], 137) + self.assertTrue(result['status'] != 0) assert result['user_error'] assert 'no more data' in result['user_error'] @@ -263,3 +269,11 @@ class TestExecution(unittest.TestCase): 'indices': '0 - 4\n5 - 9\n10 - 14\n' } ]) is None + + + +class TestExecutionNoProxy(TestExecution): + + def setUp(self): + super(TestExecutionNoProxy, self).setUp() + self.proxy_mode = False