Commit e656cce3 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

get initial interactive experiment running

parent fef80dd1
Pipeline #41166 failed with stage
in 2 minutes and 49 seconds
......@@ -127,6 +127,11 @@ def _clean_parameter(parameters, parameter, value):
return retval
class _GetKeyAsValue(dict):
def __getitem__(self, key):
return key
class Algorithm_:
"""docstring for Algorithm_"""
......@@ -179,6 +184,9 @@ class Algorithm_:
[(k, v["type"]) for g in self.groups for k, v in g.get("loop", {}).items()]
)
# populate dataformats
self.dataformats = _GetKeyAsValue()
def clean_parameter(self, parameter, value):
return _clean_parameter(self.parameters, parameter, value)
......@@ -273,7 +281,7 @@ class Algorithm_:
exc = exc or RuntimeError
raise exc(message)
return Runner.from_object(self.algorithm, self.algorithm.name, self, exc=exc)
return Runner.from_object(self.algorithm, self.name, self, exc=exc)
class Analyzer_(Algorithm_):
......@@ -288,23 +296,39 @@ class Analyzer_(Algorithm_):
api_version=2,
**kwargs,
):
# merge input_groups and results into input_output_groups
input_output_groups = list(input_groups)
input_output_groups[0]["outputs"] = results
# merge input_groups and results into groups
groups = list(input_groups)
groups[0]["outputs"] = results
super().__init__(
algorithm,
input_output_groups,
groups,
name=name,
description=description,
type=type,
api_version=api_version,
**kwargs,
)
self.results = results
@property
def is_analyzer(self):
return True
def result_dataformat(self):
"""Generates, on-the-fly, the dataformat for the result readout"""
if not self.results:
raise TypeError(
"algorithm `%s' is a block algorithm, not an analyzer" % (self.name)
)
df = dataformat.DataFormat_(
definition=dict([(k, v["type"]) for k, v in self.results.items()]),
name="analysis:" + self.name,
)
return df
# ----------------------------------------------------------
......
......@@ -70,6 +70,7 @@ class ViewRunner:
self.name = name
self.root_folder = root_folder
self.exc = exc
self.ready = False
def index(self, filename):
"""Index the content of the view"""
......
......@@ -181,6 +181,70 @@ class AlgorithmExecutor(object):
self.loop_channel = LoopChannel(self.loop_socket)
self.loop_channel.setup(self.algorithm, self.prefix)
@classmethod
def from_dict(
cls,
data,
socket,
db_socket=None,
loop_socket=None,
dataformat_cache=None,
database_cache=None,
library_cache=None,
cache_root="/cache",
):
self = cls.__new__(cls)
self.data = data
self.socket = socket
self.db_socket = db_socket
self.loop_socket = loop_socket
self.loop_channel = None
self._runner = None
self.prefix = None
# Temporary caches, if the user has not set them, for performance
database_cache = database_cache if database_cache is not None else {}
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
# Load the algorithm
self.algorithm = self.data["algorithm"]
if db_socket:
db_access_mode = AccessMode.REMOTE
databases = None
else:
db_access_mode = AccessMode.LOCAL
databases = database_cache
(self.input_list, self.data_loaders) = create_inputs_from_configuration(
self.data,
self.algorithm,
prefix=self.prefix,
cache_root=cache_root,
cache_access=AccessMode.LOCAL,
db_access=db_access_mode,
socket=self.db_socket,
databases=databases,
)
# Loads algorithm outputs
(self.output_list, _) = create_outputs_from_configuration(
self.data,
self.algorithm,
prefix=self.prefix,
cache_root=cache_root,
input_list=self.input_list,
data_loaders=self.data_loaders,
loop_socket=self.loop_socket,
)
if self.loop_socket:
self.loop_channel = LoopChannel(self.loop_socket)
self.loop_channel.setup(self.algorithm, self.prefix)
return self
@property
def runner(self):
"""Returns the algorithm runner
......
......@@ -159,8 +159,9 @@ def create_inputs_from_configuration(
# This is used for parallelization purposes
start_index, end_index = config.get("range", (None, None))
def _create_local_input(details):
def _create_local_input(details, dataformat):
data_source = CachedDataSource()
data_source.dataformat = dataformat
filename = os.path.join(cache_root, details["path"] + ".data")
......@@ -197,12 +198,13 @@ def create_inputs_from_configuration(
return data_loader
def _create_data_source(details):
def _create_data_source(details, dataformat):
data_loader = _get_data_loader_for(details)
filename = os.path.join(cache_root, details["path"] + ".data")
data_source = CachedDataSource()
data_source.dataformat = dataformat
result = data_source.setup(
filename=filename,
prefix=prefix,
......@@ -224,6 +226,7 @@ def create_inputs_from_configuration(
for name, details in config["inputs"].items():
input = None
dataformat = algorithm.input_map[name]
if details.get("database", None) is not None:
if db_access == AccessMode.LOCAL:
......@@ -342,16 +345,16 @@ def create_inputs_from_configuration(
elif cache_access == AccessMode.LOCAL:
if algorithm.type == Algorithm.LEGACY:
input = _create_local_input(details)
input = _create_local_input(details, dataformat)
elif algorithm.is_sequential:
if details["channel"] == config["channel"]: # synchronized
input = _create_local_input(details)
input = _create_local_input(details, dataformat)
else:
_create_data_source(details)
_create_data_source(details, dataformat)
else: # Algorithm autonomous types
_create_data_source(details)
_create_data_source(details, dataformat)
else:
continue
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment