Commit e86422f2 authored by Philip ABBET's avatar Philip ABBET
Browse files

Add support for the 'no proxy' mode (for inputs) in containers

parent fa7de4f1
......@@ -29,6 +29,7 @@
import os
import shutil
import simplejson
import glob
import logging
logger = logging.getLogger(__name__)
......@@ -303,6 +304,24 @@ class Agent(object):
volumes=volumes
)
volumes = {}
if not configuration.proxy_mode:
for name, details in configuration.data['inputs'].items():
if 'database' in details:
continue
basename = os.path.join(configuration.cache, details['path'])
filenames = glob.glob(basename + '*.data')
filenames.extend(glob.glob(basename + '*.data.checksum'))
filenames.extend(glob.glob(basename + '*.data.index'))
filenames.extend(glob.glob(basename + '*.data.index.checksum'))
for filename in filenames:
volumes[filename] = {
'bind': os.path.join('/cache', filename.replace(configuration.cache + '/', '')),
'mode': 'ro',
}
self.process = dock.Popen(
host,
envkey,
......@@ -310,6 +329,7 @@ class Agent(object):
tmp_archive=self.tempdir,
virtual_memory_in_megabytes=self.virtual_memory_in_megabytes,
max_cpu_percent=self.max_cpu_percent,
volumes=volumes
)
# provide a tip on how to stop the test
......
......@@ -51,6 +51,10 @@ from . import stats
from . import agent
from . import dock
from beat.backend.python.helpers import convert_experiment_configuration_to_container
from beat.backend.python.helpers import create_inputs_from_configuration
from beat.backend.python.helpers import CacheAccess
class Executor(object):
"""Executors runs the code given an execution block information, externally
......@@ -135,10 +139,12 @@ class Executor(object):
"""
def __init__(self, prefix, data, cache=None, dataformat_cache=None,
database_cache=None, algorithm_cache=None, library_cache=None):
database_cache=None, algorithm_cache=None, library_cache=None,
proxy_mode=True):
self.prefix = prefix
self.cache = cache or os.path.join(self.prefix, 'cache')
self.proxy_mode = proxy_mode
# check cache - halt if required
if not os.path.exists(self.cache):
......@@ -186,6 +192,7 @@ class Executor(object):
self.data_sources = []
self.context = None
self.db_address = None
self.db_socket = None
if not isinstance(data, dict): #user has passed a file pointer
if not os.path.exists(data):
......@@ -278,68 +285,15 @@ class Executor(object):
def _prepare_inputs(self):
"""Prepares all input required by the execution."""
self.input_list = inputs.InputList()
# This is used for parallelization purposes
start_index, end_index = self.data.get('range', (None, None))
for name, details in self.data['inputs'].items():
if 'database' in details: #it is a dataset input
# create the remote input
db = self.databases[details['database']]
dataformat_name = db.set(details['protocol'], details['set'])['outputs'][details['output']]
input = inputs.RemoteInput(name, db.dataformats[dataformat_name], self.db_socket, unpack=False)
# Synchronization bits
group = self.input_list.group(details['channel'])
if group is None:
group = inputs.InputGroup(
details['channel'],
synchronization_listener=outputs.SynchronizationListener(),
restricted_access=(details['channel'] == self.data['channel'])
)
self.input_list.add(group)
group.add(input)
else:
data_source = data.CachedDataSource()
self.data_sources.append(data_source)
if details['channel'] == self.data['channel']: #synchronized
status = data_source.setup(
filename=os.path.join(self.cache, details['path'] + '.data'),
prefix=self.prefix,
force_start_index=start_index,
force_end_index=end_index,
unpack=False,
)
else:
status = data_source.setup(
filename=os.path.join(self.cache, details['path'] + '.data'),
prefix=self.prefix,
unpack=False,
)
if not status:
raise IOError("cannot load cache file `%s'" % details['path'])
input = inputs.Input(name, self.algorithm.input_map[name], data_source)
# Synchronization bits
group = self.input_list.group(details['channel'])
if group is None:
group = inputs.InputGroup(
details['channel'],
synchronization_listener=outputs.SynchronizationListener(),
restricted_access=(details['channel'] == self.data['channel'])
)
self.input_list.add(group)
if self.proxy_mode:
cache_access = CacheAccess.LOCAL
else:
cache_access = CacheAccess.NONE
group.add(input)
(self.input_list, self.data_sources) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, self.cache,
cache_access=cache_access, unpack=False, socket=self.db_socket
)
def _prepare_outputs(self):
......@@ -490,7 +444,7 @@ class Executor(object):
#otherwise, it means the running process went bananas, ignore it ;-)
if 'statistics' in retval:
if 'data' in retval['statistics']:
retval['statistics']['data'].update(self.io_statistics)
stats.update(retval['statistics']['data'], self.io_statistics)
else:
logger.warn("cannot find 'data' entry on returned stats, " \
"therefore not appending I/O info either")
......@@ -530,16 +484,14 @@ class Executor(object):
@property
def io_statistics(self):
"""Summarize current I/O statistics looking at data sources and sinks
"""Summarize current I/O statistics looking at data sources and sinks, inputs and outputs
Returns:
dict: A dictionary summarizing current I/O statistics, read from our
sinks, sources, inputs and outputs.
dict: A dictionary summarizing current I/O statistics
"""
is_analyzer = 'outputs' not in self.data
return stats.io_statistics(self.data_sources, self.input_list, self.data_sinks, self.output_list, self.data, is_analyzer)
return stats.io_statistics(self.data, self.input_list, self.output_list)
def __str__(self):
......@@ -556,21 +508,7 @@ class Executor(object):
def dump_runner_configuration(self, directory):
"""Exports contents useful for a backend runner to run the algorithm"""
data = {
'algorithm': self.data['algorithm'],
'parameters': self.data['parameters'],
}
data['inputs'] = \
dict([(k, v['channel']) for k,v in self.data['inputs'].items()])
if 'outputs' in self.data:
data['outputs'] = \
dict([(k, v['channel']) for k,v in self.data['outputs'].items()])
else:
data['result'] = self.data['channel']
data['channel'] = self.data['channel']
data = convert_experiment_configuration_to_container(self.data, self.proxy_mode)
with open(os.path.join(directory, 'configuration.json'), 'wb') as f:
simplejson.dump(data, f, indent=2)
......
......@@ -37,6 +37,10 @@ import simplejson
from . import schema
from . import prototypes
from beat.backend.python.stats import io_statistics
from beat.backend.python.stats import update
class Statistics(object):
"""Statistics define resource usage for algorithmic code runs
......@@ -212,58 +216,7 @@ class Statistics(object):
with open(f, 'wt') as fobj: fobj.write(str(self))
def io_statistics(data_sources, input_list, data_sinks, output_list, data, analyzer=False):
"""Summarize current I/O statistics looking at data sources and sinks
Returns:
dict: A dictionary summarizing current I/O statistics, read from our
sinks, sources, inputs and outputs.
"""
# data reading
bytes_read = 0
blocks_read = 0
read_time = 0.0
for source in data_sources:
size, duration = source.statistics()
bytes_read += size
read_time += duration
for inpt in input_list:
blocks_read += inpt.nb_data_blocks_read
# data writing
bytes_written = 0
blocks_written = 0
write_time = 0.0
for sink in data_sinks:
size, duration = sink.statistics()
bytes_written += size
write_time += duration
files = []
for outpt in output_list:
blocks_written += outpt.nb_data_blocks_written
#if self.analysis:
if analyzer: #'outputs' in self.data: #it is a normal block (not analyzer)
hash = data['result']['hash']
else:
hash = data['outputs'][outpt.name]['hash']
files.append(dict(
hash=hash,
size=float(outpt.data_sink.statistics()[0]),
blocks=outpt.nb_data_blocks_written,
))
return dict(
volume = dict(read=bytes_read, write=bytes_written),
blocks = dict(read=blocks_read, write=blocks_written),
time = dict(read=read_time, write=write_time),
files = files,
)
#----------------------------------------------------------
def cpu_statistics(start, end):
......@@ -312,6 +265,9 @@ def cpu_statistics(start, end):
}
#----------------------------------------------------------
def memory_statistics(data):
"""Summarizes current memory usage
......
......@@ -69,6 +69,11 @@ class TestExecution(unittest.TestCase):
cleanup()
def setUp(self):
super(TestExecution, self).setUp()
self.proxy_mode = True
def check_output(self, prefix, path):
'''Checks if a given output exists, together with its indexes and checksums
'''
......@@ -126,7 +131,8 @@ class TestExecution(unittest.TestCase):
results = []
for key, value in scheduled.items():
executor = Executor(prefix, value['configuration'], tmp_prefix,
dataformat_cache, database_cache, algorithm_cache)
dataformat_cache, database_cache, algorithm_cache,
proxy_mode=self.proxy_mode)
assert executor.valid, '\n * %s' % '\n * '.join(executor.errors)
with executor:
......@@ -228,7 +234,7 @@ class TestExecution(unittest.TestCase):
def test_too_many_nexts(self):
result = self.execute('user/user/triangle/1/too_many_nexts', [None])
assert result
self.assertEqual(result['status'], 137)
self.assertTrue(result['status'] != 0)
assert result['user_error']
assert 'no more data' in result['user_error']
......@@ -263,3 +269,11 @@ class TestExecution(unittest.TestCase):
'indices': '0 - 4\n5 - 9\n10 - 14\n'
}
]) is None
class TestExecutionNoProxy(TestExecution):
def setUp(self):
super(TestExecutionNoProxy, self).setUp()
self.proxy_mode = False
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment