Commit 1066ed81 authored by Philip ABBET's avatar Philip ABBET

Improve beat.backend.python.helpers.create_inputs_from_configuration() to...

Improve beat.backend.python.helpers.create_inputs_from_configuration() to support local access to databases
parent 1921a49e
......@@ -43,7 +43,7 @@ from . import outputs
from . import stats
from .helpers import create_inputs_from_configuration
from .helpers import create_outputs_from_configuration
from .helpers import CacheAccess
from .helpers import AccessMode
class Executor(object):
......@@ -103,13 +103,14 @@ class Executor(object):
# Loads algorithm inputs
if self.data['proxy_mode']:
cache_access = CacheAccess.REMOTE
cache_access = AccessMode.REMOTE
else:
cache_access = CacheAccess.LOCAL
cache_access = AccessMode.LOCAL
(self.input_list, _) = create_inputs_from_configuration(
self.data, self.algorithm, self.prefix, '/cache',
cache_access=cache_access, socket=self.socket
cache_access=cache_access, db_access=AccessMode.REMOTE,
socket=self.socket
)
# Loads algorithm outputs
......
......@@ -65,17 +65,20 @@ def convert_experiment_configuration_to_container(config, proxy_mode):
#----------------------------------------------------------
class CacheAccess:
class AccessMode:
NONE = 0
LOCAL = 1
REMOTE = 2
def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
cache_access=CacheAccess.NONE, unpack=True,
socket=None):
cache_access=AccessMode.NONE,
db_access=AccessMode.NONE,
unpack=True, socket=None,
databases=None):
data_sources = []
views = {}
input_list = inputs.InputList()
# This is used for parallelization purposes
......@@ -84,16 +87,54 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
for name, details in config['inputs'].items():
if details.get('database', False):
if socket is None:
raise IOError("No socket provided for remote inputs")
if db_access == AccessMode.LOCAL:
if databases is None:
raise IOError("No databases provided")
input = inputs.RemoteInput(name, algorithm.dataformats[algorithm.input_map[name]],
socket, unpack=unpack)
# Retrieve the database
try:
db = databases[details['database']]
except:
raise IOError("Database '%s' not found" % details['database'])
logger.debug("RemoteInput '%s' created: group='%s', dataformat='%s', connected to a database" % \
(name, details['channel'], algorithm.input_map[name]))
# Create of retrieve the database view
channel = details['channel']
if not views.has_key(channel):
view = db.view(details['protocol'], details['set'])
view.prepare_outputs()
view.setup()
views[channel] = view
logger.debug("Database view '%s/%s/%s' created: group='%s'" % \
(details['database'], details['protocol'], details['set'],
channel))
else:
view = views[channel]
# Creation of the input
data_source = data.MemoryDataSource(view.done, next_callback=view.next)
output = view.outputs[details['output']]
output.data_sink.data_sources.append(data_source)
input = inputs.Input(name, algorithm.input_map[name], data_source)
logger.debug("Input '%s' created: group='%s', dataformat='%s', database-output='%s/%s/%s:%s'" % \
(name, channel, algorithm.input_map[name], details['database'],
details['protocol'], details['set'], details['output']))
elif db_access == AccessMode.REMOTE:
if socket is None:
raise IOError("No socket provided for remote inputs")
input = inputs.RemoteInput(name, algorithm.dataformats[algorithm.input_map[name]],
socket, unpack=unpack)
logger.debug("RemoteInput '%s' created: group='%s', dataformat='%s', connected to a database" % \
(name, details['channel'], algorithm.input_map[name]))
elif cache_access == CacheAccess.LOCAL:
elif cache_access == AccessMode.LOCAL:
data_source = data.CachedDataSource()
data_sources.append(data_source)
......@@ -105,13 +146,13 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
prefix=prefix,
force_start_index=start_index,
force_end_index=end_index,
unpack=unpack,
unpack=True,
)
else:
status = data_source.setup(
filename=filename,
prefix=prefix,
unpack=unpack,
unpack=True,
)
if not status:
......@@ -122,7 +163,7 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
logger.debug("Input '%s' created: group='%s', dataformat='%s', filename='%s'" % \
(name, details['channel'], algorithm.input_map[name], filename))
elif cache_access == CacheAccess.REMOTE:
elif cache_access == AccessMode.REMOTE:
if socket is None:
raise IOError("No socket provided for remote inputs")
......@@ -156,7 +197,7 @@ def create_inputs_from_configuration(config, algorithm, prefix, cache_root,
def create_outputs_from_configuration(config, algorithm, prefix, cache_root, input_list,
cache_access=CacheAccess.NONE, socket=None):
cache_access=AccessMode.NONE, socket=None):
data_sinks = []
output_list = outputs.OutputList()
......@@ -183,7 +224,7 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
dataformat = algorithm.dataformats[dataformat_name]
if cache_access == CacheAccess.LOCAL:
if cache_access == AccessMode.LOCAL:
path = os.path.join(cache_root, details['path'] + '.data')
dirname = os.path.dirname(path)
......@@ -229,7 +270,7 @@ def create_outputs_from_configuration(config, algorithm, prefix, cache_root, inp
logger.debug("Output '%s' created: dataformat='%s', filename='%s'" % \
(name, dataformat_name, path))
elif cache_access == CacheAccess.REMOTE:
elif cache_access == AccessMode.REMOTE:
if socket is None:
raise IOError("No socket provided for remote outputs")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment