Skip to content
Snippets Groups Projects
Commit 86ed17a3 authored by Philip ABBET's avatar Philip ABBET
Browse files

Add support for the 'no proxy' mode for outputs in containers

parent 238e0c24
No related branches found
No related tags found
No related merge requests found
...@@ -42,6 +42,7 @@ from . import inputs ...@@ -42,6 +42,7 @@ from . import inputs
from . import outputs from . import outputs
from . import stats from . import stats
from .helpers import create_inputs_from_configuration from .helpers import create_inputs_from_configuration
from .helpers import create_outputs_from_configuration
from .helpers import CacheAccess from .helpers import CacheAccess
...@@ -111,24 +112,11 @@ class Executor(object): ...@@ -111,24 +112,11 @@ class Executor(object):
cache_access=cache_access, socket=self.socket cache_access=cache_access, socket=self.socket
) )
# Loads outputs # Loads algorithm outputs
if 'outputs' in self.data: (self.output_list, _) = create_outputs_from_configuration(
self.output_list = outputs.OutputList() self.data, self.algorithm, self.prefix, '/cache', self.input_list,
for name, channel in self.data['outputs'].items(): cache_access=cache_access, socket=self.socket
thisformat = self.algorithm.dataformats[self.algorithm.output_map[name]] )
self.output_list.add(outputs.RemoteOutput(name, thisformat, self.socket))
logger.debug("Loaded output list with %d output(s)",
len(self.output_list))
# Loads results if it is an analyzer
if 'result' in self.data:
self.output_list = outputs.OutputList()
name = 'result'
# Retrieve dataformats in the JSON of the algorithm
analysis_format = self.algorithm.result_dataformat()
analysis_format.name = 'analysis:' + self.algorithm.name
self.output_list.add(outputs.RemoteOutput(name, analysis_format, self.socket))
logger.debug("Loaded output list for analyzer (1 single output)")
def setup(self): def setup(self):
......
...@@ -27,6 +27,7 @@ ...@@ -27,6 +27,7 @@
import os import os
import errno
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -147,3 +148,96 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root, ...@@ -147,3 +148,96 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
group.add(input) group.add(input)
return (input_list, data_sources) return (input_list, data_sources)
#----------------------------------------------------------
def create_outputs_from_configuration(config, algorithm, prefix, cache_root, input_list,
cache_access=CacheAccess.NONE, socket=None):
data_sinks = []
output_list = outputs.OutputList()
# This is used for parallelization purposes
start_index, end_index = config.get('range', (None, None))
# If the algorithm is an analyser
if 'result' in config:
output_config = {
'result': config['result']
}
else:
output_config = config['outputs']
for name, details in output_config.items():
if 'result' in config:
dataformat_name = 'analysis:' + algorithm.name
dataformat = algorithm.result_dataformat()
else:
dataformat_name = algorithm.output_map[name]
dataformat = algorithm.dataformats[dataformat_name]
if cache_access == CacheAccess.LOCAL:
path = os.path.join(cache_root, details['path'] + '.data')
dirname = os.path.dirname(path)
# Make sure that the directory exists while taking care of race
# conditions. see: http://stackoverflow.com/questions/273192/check-if-a-directory-exists-and-create-it-if-necessary
try:
if (len(dirname) > 0):
os.makedirs(dirname)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
data_sink = data.CachedDataSink()
data_sinks.append(data_sink)
status = data_sink.setup(
filename=path,
dataformat=dataformat,
encoding='binary',
max_size=0, # in bytes, for individual file chunks
)
if not status:
raise IOError("Cannot create cache sink '%s'" % details['path'])
synchronization_listener = None
if 'result' not in config:
input_group = input_list.group(details['channel'])
if (input_group is not None) and hasattr(input_group, 'synchronization_listener'):
synchronization_listener = input_group.synchronization_listener
output_list.add(outputs.Output(name, data_sink,
synchronization_listener=synchronization_listener,
force_start_index=start_index or 0)
)
if 'result' not in config:
logger.debug("Output '%s' created: group='%s', dataformat='%s', filename='%s'" % \
(name, details['channel'], dataformat_name, path))
else:
logger.debug("Output '%s' created: dataformat='%s', filename='%s'" % \
(name, dataformat_name, path))
elif cache_access == CacheAccess.REMOTE:
if socket is None:
raise IOError("No socket provided for remote outputs")
output_list.add(outputs.RemoteOutput(name, dataformat, socket))
logger.debug("RemoteOutput '%s' created: group='%s', dataformat='%s'" % \
(name, details['channel'], dataformat_name))
else:
continue
return (output_list, data_sinks)
...@@ -129,6 +129,13 @@ class MessageHandler(gevent.Greenlet): ...@@ -129,6 +129,13 @@ class MessageHandler(gevent.Greenlet):
self.process.kill() self.process.kill()
self.stop.set() self.stop.set()
break break
except RuntimeError as e:
self.send_error(str(e), kind='usr')
self.user_error = str(e)
if self.process is not None:
self.process.kill()
self.stop.set()
break
except: except:
import traceback import traceback
parser = lambda s: s if len(s)<20 else s[:20] + '...' parser = lambda s: s if len(s)<20 else s[:20] + '...'
......
...@@ -74,9 +74,9 @@ def io_statistics(configuration, input_list=None, output_list=None): ...@@ -74,9 +74,9 @@ def io_statistics(configuration, input_list=None, output_list=None):
blocks_written += output.nb_data_blocks_written blocks_written += output.nb_data_blocks_written
if 'result' in configuration: if 'result' in configuration:
hash = configuration['result']['hash'] hash = configuration['result']['path'].replace('/', '')
else: else:
hash = configuration['outputs'][output.name]['hash'] hash = configuration['outputs'][output.name]['path'].replace('/', '')
files.append(dict( files.append(dict(
hash=hash, hash=hash,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment