Commit fef80dd1 authored by Amir MOHAMMADI's avatar Amir MOHAMMADI

[prefix] remove prefix requirement till experiments

parent 74f87c14
Pipeline #41165 failed with stage
in 2 minutes and 50 seconds
......@@ -51,6 +51,7 @@ import simplejson as json
import six
from . import dataformat
from . import hash
from . import library
from . import loader
from . import utils
......@@ -142,24 +143,41 @@ class Algorithm_:
def __init__(
self,
algorithm,
input_output_groups,
groups,
parameters=None,
name=None,
description=None,
type="autonomous",
splittable=False,
api_version=2,
**kwargs
language="python",
**kwargs,
):
super().__init__(**kwargs)
self.algorithm = algorithm
self.input_output_groups = input_output_groups
self.groups = groups
self.parameters = parameters
self.name = name
self.description = description
self.type = type
self.splittable = splittable
self.api_version = api_version
self.language = language
self.valid = True
self.errors = []
self.input_map = dict(
[(k, v["type"]) for g in self.groups for k, v in g["inputs"].items()]
)
self.output_map = dict(
[
(k, v["type"])
for g in self.groups
for k, v in g.get("outputs", {}).items()
]
)
self.loop_map = dict(
[(k, v["type"]) for g in self.groups for k, v in g.get("loop", {}).items()]
)
def clean_parameter(self, parameter, value):
return _clean_parameter(self.parameters, parameter, value)
......@@ -168,6 +186,10 @@ class Algorithm_:
def is_analyzer(self):
return False
@property
def isAnalyzer(self):
return self.is_analyzer
@property
def is_autonomous(self):
""" Returns whether the algorithm is in the autonomous category"""
......@@ -193,6 +215,66 @@ class Algorithm_:
Algorithm_.AUTONOMOUS_LOOP_EVALUATOR,
]
def hash(self):
hash_stuff = dict(
groups=self.groups,
parameters=self.parameters,
api_version=self.api_version,
splittable=self.splittable,
type=self.type,
)
def normalize(v):
if isinstance(v, list):
return [normalize(v_) for v_ in v]
if isinstance(v, dict):
return {key: normalize(value) for key, value in v.items()}
# if it is another BEAT component
if hasattr(v, "hash"):
v = v.hash()
return v
for key, value in hash_stuff.items():
hash_stuff[key] = normalize(value)
return hash.hashJSON(hash_stuff, None)
def runner(self, klass="Algorithm", exc=None):
"""Returns a runnable algorithm object.
Parameters:
klass (str): Ignored
exc (:std:term:`class`): If passed, must be a valid exception class
that will be used to report errors in the read-out of this algorithm's code.
Returns:
:py:class:`Runner`: An instance of the algorithm,
which will be constructed, but not setup. You **must** set it up
before using the ``process`` method.
"""
if not self.name:
exc = exc or RuntimeError
raise exc("algorithm has no name")
if self.language == "unknown":
exc = exc or RuntimeError
raise exc("algorithm has no programming language set")
if not self.valid:
message = "cannot load code for invalid algorithm (%s)" % (self.name,)
exc = exc or RuntimeError
raise exc(message)
return Runner.from_object(self.algorithm, self.algorithm.name, self, exc=exc)
class Analyzer_(Algorithm_):
def __init__(
......@@ -204,17 +286,19 @@ class Analyzer_(Algorithm_):
description=None,
type="autonomous",
api_version=2,
**kwargs
**kwargs,
):
# merge input_groups and results into input_output_groups
input_output_groups = list(input_groups)
input_output_groups[0]["outputs"] = results
super().__init__(
algorithm,
input_groups,
input_output_groups,
name=name,
description=description,
type=type,
api_version=api_version,
**kwargs
**kwargs,
)
@property
......@@ -292,6 +376,16 @@ class Runner(object):
self.obj = loader.run(class_, "__new__", exc)
self.name = module.__name__
@classmethod
def from_object(cls, obj, name, algorithm, exc=None):
self = cls.__new__(cls)
self.obj = obj
self.name = name
self._init(algorithm, exc)
return self
def _init(self, algorithm, exc):
self.algorithm = algorithm
self.exc = exc
......
......@@ -565,6 +565,126 @@ class CachedDataSource(DataSource):
# ----------------------------------------------------------
class DatabaseOutputDataSource_(DataSource):
"""Utility class to load data from an output of a database view"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.dataformat = None
self.view = None
self.output_name = None
self.pack = True
def setup(
self,
view,
output_name,
dataformat,
start_index=None,
end_index=None,
pack=False,
):
"""Configures the data source
Parameters:
start_index (int): The starting index (if not set or set to
``None``, the default, read data from the begin of file)
end_index (int): The end index (if not set or set to ``None``, the
default, reads the data until the end)
unpack (bool): Indicates if the data must be unpacked or not
Returns:
``True``, if successful, or ``False`` otherwise.
"""
self.view = view
self.output_name = output_name
self.pack = pack
self.dataformat = dataformat
# Load all the needed infos from all the files
Infos = namedtuple("Infos", ["start_index", "end_index"])
objects = self.view.objects()
start = None
end = None
previous_value = None
attribute = self.view.get_output_mapping(output_name)
for index, obj in enumerate(objects):
if start is None:
start = index
previous_value = getattr(obj, attribute)
elif getattr(obj, attribute) != previous_value:
end = index - 1
previous_value = None
if ((start_index is None) or (start >= start_index)) and (
(end_index is None) or (end <= end_index)
):
self.infos.append(Infos(start_index=start, end_index=end))
start = index
previous_value = getattr(obj, attribute)
end = index
if ((start_index is None) or (start >= start_index)) and (
(end_index is None) or (end <= end_index)
):
self.infos.append(Infos(start_index=start, end_index=end))
return True
def __getitem__(self, index):
"""Retrieve a block of data
Returns:
A tuple (data, start_index, end_index)
"""
if not self.ready:
self._prepare()
if (index < 0) or (index >= len(self.infos)):
return (None, None, None)
infos = self.infos[index]
t1 = time.time()
data = self.view.get(self.output_name, infos.start_index)
t2 = time.time()
self.read_duration += t2 - t1
if isinstance(data, dict):
d = self.dataformat.type()
d.from_dict(data, casting="safe", add_defaults=False)
data = d
if self.pack:
data = data.pack()
self.nb_bytes_read += len(data)
return (data, infos.start_index, infos.end_index)
# ----------------------------------------------------------
class DatabaseOutputDataSource(DataSource):
"""Utility class to load data from an output of a database view"""
......
......@@ -59,6 +59,197 @@ from .exceptions import OutputError
from .outputs import OutputList
from .protocoltemplate import ProtocolTemplate
class ViewRunner:
def __init__(
self, module, definition, name=None, root_folder=None, exc=None, **kwargs
):
super().__init__(**kwargs)
self.obj = module
self.definition = definition
self.name = name
self.root_folder = root_folder
self.exc = exc
def index(self, filename):
"""Index the content of the view"""
parameters = self.definition.get("parameters", {})
objs = loader.run(self.obj, "index", self.exc, self.root_folder, parameters)
if not isinstance(objs, list):
raise self.exc("index() didn't return a list")
os.makedirs(os.path.dirname(filename), exist_ok=True)
with open(filename, "wb") as f:
data = json.dumps(objs, cls=utils.NumpyJSONEncoder)
f.write(data.encode("utf-8"))
def setup(self, filename, start_index=None, end_index=None, pack=True):
"""Sets up the view"""
if self.ready:
return
with open(filename, "rb") as f:
objs = json.loads(
f.read().decode("utf-8"),
object_pairs_hook=utils.error_on_duplicate_key_hook,
)
Entry = namedtuple("Entry", sorted(objs[0].keys()))
objs = [Entry(**x) for x in objs]
parameters = self.definition.get("parameters", {})
loader.run(
self.obj,
"setup",
self.exc,
self.root_folder,
parameters,
objs,
start_index=start_index,
end_index=end_index,
)
# Create data sources for the outputs
from .data import DatabaseOutputDataSource_
self.data_sources = {}
for output_name, output_format in self.definition.get("outputs", {}).items():
data_source = DatabaseOutputDataSource_()
data_source.setup(
self,
output_name,
output_format,
start_index=start_index,
end_index=end_index,
pack=pack,
)
self.data_sources[output_name] = data_source
self.ready = True
def get(self, output, index):
"""Returns the data of the provided output at the provided index"""
if not self.ready:
raise self.exc("Database view not yet setup")
return loader.run(self.obj, "get", self.exc, output, index)
def get_output_mapping(self, output):
return loader.run(self.obj, "get_output_mapping", self.exc, output)
def objects(self):
return self.obj.objs
def __getitem__(self, key):
return self.definition[key]
class Database_:
def __init__(
self,
protocols,
description=None,
name="__unnamed_database__",
root_folder=None,
documentation=None,
environment=None,
**kwargs
):
super().__init__(**kwargs)
self.protocols = protocols
self.description = description
self.name = name
self.root_folder = root_folder
self.documentation = documentation
self.environment = environment
self.errors = []
def hash(self):
"""Returns the hexadecimal hash for its declaration"""
definition = dict(protocols=self.protocols)
return hash.hashJSON(definition, None)
@property
def valid(self):
"""A boolean that indicates if this database is valid or not"""
return not bool(self.errors)
def protocol(self, name):
"""The declaration of a specific protocol in the database"""
return self.protocols[name]
@property
def protocol_names(self):
"""Names of protocols declared for this database"""
return self.protocols.keys()
def sets(self, protocol):
"""The declaration of a specific set in the database protocol"""
return self.protocol(protocol)["sets"]
def set(self, protocol, name):
"""The declaration of all the protocols of the database"""
return self.sets(protocol)[name]
def set_names(self, protocol):
"""The names of sets in a given protocol for this database"""
data = self.protocol(protocol)["sets"]
return data.keys()
def view_definition(self, protocol_name, set_name):
"""Returns the definition of a view
Parameters:
protocol_name (str): The name of the protocol where to retrieve the view
from
set_name (str): The name of the set in the protocol where to retrieve the
view from
"""
return self.set(protocol_name, set_name).definition
def view(self, protocol, name, exc=None):
"""Returns the database view, given the protocol and the set name
Parameters:
protocol (str): The name of the protocol where to retrieve the view
from
name (str): The name of the set in the protocol where to retrieve the
view from
exc (:std:term:`class`): If passed, must be a valid exception class
that will be used to report errors in the read-out of this
database's view.
Returns:
The database view, which will be constructed, but not setup. You
**must** set it up before using methods ``done`` or ``next``.
"""
return self.set(protocol, name)
# ----------------------------------------------------------
......
......@@ -49,6 +49,149 @@ import simplejson
from ..database import Database
class DBExecutor_:
"""Executor specialised in database views
Parameters:
data (dict, str): The piece of data representing the block to be
executed.
It must validate against the schema defined for execution blocks. If a
string is passed, it is supposed to be a fully qualified absolute path
to a JSON file containing the block execution information.
Attributes:
errors (list): A list containing errors found while loading this
execution block.
data (dict): The original data for this executor, as loaded by our JSON
decoder.
databases (dict): A dictionary in which keys are strings with database
names and values are :py:class:`.database.Database`, representing the
databases required for running this block. The dictionary may be empty
in case all inputs are taken from the file cache.
views (dict): A dictionary in which the keys are tuples pointing to the
``(<database-name>, <protocol>, <set>)`` and the value is a setup view
for that particular combination of details. The dictionary may be empty
in case all inputs are taken from the file cache.
input_list (inputs.InputList): A list of inputs that will be served to
the algorithm.
data_sources (list): A list with all data-sources created by our
execution loader.
"""
def __init__(
self, message_handler, cache_root, data,
):
# Initialisation
self.databases = {}
self.views = {}
self.errors = []
self.data = None
self.data_sources = {}
self.message_handler = message_handler
# Load the data
if not isinstance(data, dict): # User has passed a file name
if not os.path.exists(data):
self.errors.append("File not found: %s" % data)
return
with open(data) as f:
self.data = simplejson.load(f)
else:
self.data = data
# this runs basic validation, including JSON loading if required
# self.data, self.errors = schema.validate('execution', data)
# if self.errors: return #don't proceed with the rest of validation
# Load the databases
for name, details in self.data["inputs"].items():
if "database" not in details:
continue
db = details["database"]
# Load the database
if db.name not in self.databases:
self.databases[db.name] = db
if not db.valid:
self.errors += db.errors
if not db.valid:
continue
# Create and load the required views
key = (details["database"], details["protocol"], details["set"])
if key not in self.views:
view = db.view(details["protocol"], details["set"])
if details["channel"] == self.data["channel"]: # synchronized
start_index, end_index = self.data.get("range", (None, None))
else:
start_index, end_index = (None, None)
view.setup(
os.path.join(cache_root, details["path"]),
start_index=start_index,
end_index=end_index,
)
self.views[key] = view
# Create the data sources
for name, details in self.data["inputs"].items():
if "database" not in details:
continue
view_key = (details["database"], details["protocol"], details["set"])
view = self.views[view_key]
self.data_sources[name] = view.data_sources[details["output"]]
self.message_handler.set_data_sources(self.data_sources)
def process(self):
""" Starts the message handler"""
self.message_handler.start()
@property
def address(self):
""" Address of the message handler"""
return self.message_handler.address
@property
def valid(self):
"""A boolean that indicates if this executor is valid or not"""
return not bool(self.errors)
def wait(self):
"""Wait for the message handle to finish"""
try:
self.message_handler.join()
except RuntimeError:
# tried to join the handler before it has started.
pass
self.message_handler = None
def __str__(self):
return simplejson.dumps(self.data, indent=4)
class DBExecutor(object):
"""Executor specialised in database views
......@@ -120,7 +263,6 @@ class DBExecutor(object):
self.views = {}
self.errors = []
self.data = None
self.message_handler = None
self.data_sources = {}
self.message_handler = message_handler
......
......@@ -3,7 +3,6 @@ import unittest
import numpy as np
# from ..algorithm import Algorithm_
from ..dataformat import DataFormat_
coordinates = DataFormat_(
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment