diff --git a/beat/core/algorithm.py b/beat/core/algorithm.py index eba4cd57f39531fbf6bc06fd5f3dc504a31bb506..ef833c6469ee6ba30a88f4921ac9967c53ef5814 100644 --- a/beat/core/algorithm.py +++ b/beat/core/algorithm.py @@ -38,20 +38,45 @@ Forward importing from :py:mod:`beat.backend.python.algorithm` """ +import os import six +import json import numpy +import pkg_resources from . import dataformat from . import library from . import schema from . import prototypes -from . import utils from beat.backend.python.algorithm import Storage -from beat.backend.python.algorithm import Runner +from beat.backend.python.algorithm import Runner # noqa from beat.backend.python.algorithm import Algorithm as BackendAlgorithm +def load_algorithm_prototype(prefix): + algorithm_data = json.loads( + pkg_resources.resource_string(__name__, "prototypes/algorithm.json") + ) + ref_dataformat = "integers" + dataformat = None + + for root, dirs, _ in os.walk(prefix, "algorithm"): + if ref_dataformat in dirs: + dataformat_versions = sorted(os.listdir(os.path.join(root, ref_dataformat))) + version = dataformat_versions[-1].split(".")[0] + dataformat = "{}/{}/{}".format( + os.path.basename(root), ref_dataformat, version + ) + break + assert dataformat is not None, ( # nosec + "Reference data format %s not found" % ref_dataformat + ) + algorithm_data["groups"][0]["inputs"]["in_data"]["type"] = dataformat + algorithm_data["groups"][0]["outputs"]["out_data"]["type"] = dataformat + return algorithm_data + + class Algorithm(BackendAlgorithm): """Algorithms represent runnable components within the platform. @@ -144,7 +169,6 @@ class Algorithm(BackendAlgorithm): def __init__(self, prefix, data, dataformat_cache=None, library_cache=None): super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache) - def _load(self, data, dataformat_cache, library_cache): """Loads the algorithm""" @@ -167,64 +191,69 @@ class Algorithm(BackendAlgorithm): data, code = data # break down into two components - if isinstance(data, six.string_types): # user has passed a file pointer self._name = data self.storage = Storage(self.prefix, self._name) if not self.storage.json.exists(): - self.errors.append('Algorithm declaration file not found: %s' % data) + self.errors.append("Algorithm declaration file not found: %s" % data) return data = self.storage.json.path # loads data from JSON declaration - # At this point, `data' can be a dictionary or ``None`` if data is None: # loads the default declaration for an algorithm - self.data, self.errors = prototypes.load('algorithm') - assert not self.errors, "\n * %s" % "\n *".join(self.errors) + algorithm_data = load_algorithm_prototype(self.prefix) + self.data, self.errors = schema.validate("algorithm", algorithm_data) + assert not self.errors, "\n * %s" % "\n *".join(self.errors) # nosec else: # just assign it # this runs basic validation, including JSON loading if required - self.data, self.errors = schema.validate('algorithm', data) + self.data, self.errors = schema.validate("algorithm", data) - - if self.errors: return # don't proceed with the rest of validation + if self.errors: + return # don't proceed with the rest of validation if self.storage is not None: # loading from the disk, check code if not self.storage.code.exists(): - if self.data['language'] != 'cxx': - self.errors.append('Algorithm code not found: %s' % \ - self.storage.code.path) + if self.data["language"] != "cxx": + self.errors.append( + "Algorithm code not found: %s" % self.storage.code.path + ) return else: code = self.storage.code.load() - # At this point, `code' can be a string (or a binary blob) or ``None`` if code is None: # loads the default code for an algorithm - self.code = prototypes.binary_load('algorithm.py') - self.data['language'] = 'python' + self.code = prototypes.binary_load("algorithm.py") + self.data["language"] = "python" else: # just assign it - notice that in this case, no language is set self.code = code - - if self.errors: return # don't proceed with the rest of validation - + if self.errors: + return # don't proceed with the rest of validation # if no errors so far, make sense out of the declaration data - self.groups = self.data['groups'] + self.groups = self.data["groups"] # now we check for consistence self._check_endpoint_uniqueness() # create maps for easy access to data - self.input_map = dict([(k,v['type']) for g in self.groups \ - for k,v in g['inputs'].items()]) - self.output_map = dict([(k,v['type']) for g in self.groups \ - for k,v in g.get('outputs', {}).items()]) - self.loop_map = dict([(k,v['type']) for g in self.groups \ - for k,v in g.get('loop', {}).items()]) + self.input_map = dict( + [(k, v["type"]) for g in self.groups for k, v in g["inputs"].items()] + ) + self.output_map = dict( + [ + (k, v["type"]) + for g in self.groups + for k, v in g.get("outputs", {}).items() + ] + ) + self.loop_map = dict( + [(k, v["type"]) for g in self.groups for k, v in g.get("loop", {}).items()] + ) self._validate_required_dataformats(dataformat_cache) self._convert_parameter_types() @@ -233,26 +262,30 @@ class Algorithm(BackendAlgorithm): self._validate_required_libraries(library_cache) self._check_language_consistence() - def _check_endpoint_uniqueness(self): """Checks for name clashes accross input/output groups """ all_input_names = [] - for group in self.groups: all_input_names.extend(group['inputs'].keys()) + for group in self.groups: + all_input_names.extend(group["inputs"].keys()) if len(set(all_input_names)) != len(all_input_names): - self.errors.append("repeated input name in algorithm `%s' " \ - "declaration: %s" % (self.name, ', '.join(all_input_names))) + self.errors.append( + "repeated input name in algorithm `%s' " + "declaration: %s" % (self.name, ", ".join(all_input_names)) + ) # all outputs must have unique names all_output_names = [] for group in self.groups: - if 'outputs' not in group: continue - all_output_names.extend(group['outputs'].keys()) + if "outputs" not in group: + continue + all_output_names.extend(group["outputs"].keys()) if len(set(all_output_names)) != len(all_output_names): - self.errors.append("repeated output name in algorithm `%s' " \ - "declaration: %s" % (self.name, ', '.join(all_output_names))) - + self.errors.append( + "repeated output name in algorithm `%s' " + "declaration: %s" % (self.name, ", ".join(all_output_names)) + ) def _validate_required_dataformats(self, dataformat_cache): """Makes sure we can load all requested formats @@ -260,67 +293,83 @@ class Algorithm(BackendAlgorithm): for group in self.groups: - for name, input in group['inputs'].items(): - if input['type'] in self.dataformats: continue + for name, input in group["inputs"].items(): + if input["type"] in self.dataformats: + continue - if dataformat_cache and input['type'] in dataformat_cache: # reuse - thisformat = dataformat_cache[input['type']] + if dataformat_cache and input["type"] in dataformat_cache: # reuse + thisformat = dataformat_cache[input["type"]] else: # load it - thisformat = dataformat.DataFormat(self.prefix, input['type']) + thisformat = dataformat.DataFormat(self.prefix, input["type"]) if dataformat_cache is not None: # update it - dataformat_cache[input['type']] = thisformat + dataformat_cache[input["type"]] = thisformat - self.dataformats[input['type']] = thisformat + self.dataformats[input["type"]] = thisformat if thisformat.errors: - self.errors.append("found error validating data format `%s' " \ - "for input `%s' on algorithm `%s': %s" % \ - (input['type'], name, self.name, - '\n'.join(thisformat.errors))) + self.errors.append( + "found error validating data format `%s' " + "for input `%s' on algorithm `%s': %s" + % (input["type"], name, self.name, "\n".join(thisformat.errors)) + ) - if 'outputs' not in group: continue + if "outputs" not in group: + continue - for name, output in group['outputs'].items(): - if output['type'] in self.dataformats: continue + for name, output in group["outputs"].items(): + if output["type"] in self.dataformats: + continue - if dataformat_cache and output['type'] in dataformat_cache: # reuse - thisformat = dataformat_cache[output['type']] + if dataformat_cache and output["type"] in dataformat_cache: # reuse + thisformat = dataformat_cache[output["type"]] else: # load it - thisformat = dataformat.DataFormat(self.prefix, output['type']) + thisformat = dataformat.DataFormat(self.prefix, output["type"]) if dataformat_cache is not None: # update it - dataformat_cache[output['type']] = thisformat + dataformat_cache[output["type"]] = thisformat - self.dataformats[output['type']] = thisformat + self.dataformats[output["type"]] = thisformat if thisformat.errors: - self.errors.append("found error validating data format `%s' " \ - "for output `%s' on algorithm `%s': %s" % \ - (output['type'], name, self.name, - '\n'.join(thisformat.errors))) + self.errors.append( + "found error validating data format `%s' " + "for output `%s' on algorithm `%s': %s" + % ( + output["type"], + name, + self.name, + "\n".join(thisformat.errors), + ) + ) if self.results: for name, result in self.results.items(): - if result['type'].find('/') != -1: + if result["type"].find("/") != -1: - if result['type'] in self.dataformats: continue + if result["type"] in self.dataformats: + continue - if dataformat_cache and result['type'] in dataformat_cache: # reuse - thisformat = dataformat_cache[result['type']] + if dataformat_cache and result["type"] in dataformat_cache: # reuse + thisformat = dataformat_cache[result["type"]] else: - thisformat = dataformat.DataFormat(self.prefix, result['type']) + thisformat = dataformat.DataFormat(self.prefix, result["type"]) if dataformat_cache is not None: # update it - dataformat_cache[result['type']] = thisformat + dataformat_cache[result["type"]] = thisformat - self.dataformats[result['type']] = thisformat + self.dataformats[result["type"]] = thisformat if thisformat.errors: - self.errors.append("found error validating data format `%s' " \ - "for result `%s' on algorithm `%s': %s" % \ - (result['type'], name, self.name, - '\n'.join(thisformat.errors))) - + self.errors.append( + "found error validating data format `%s' " + "for result `%s' on algorithm `%s': %s" + % ( + result["type"], + name, + self.name, + "\n".join(thisformat.errors), + ) + ) def _convert_parameter_types(self): """Converts types to numpy equivalents, checks defaults, ranges and @@ -331,49 +380,75 @@ class Algorithm(BackendAlgorithm): try: return tp.type(value) except Exception as e: - self.errors.append("%s for parameter `%s' cannot be cast to type " \ - "`%s': %s" % (desc, name, tp.name, e)) + self.errors.append( + "%s for parameter `%s' cannot be cast to type " + "`%s': %s" % (desc, name, tp.name, e) + ) - if self.parameters is None: return + if self.parameters is None: + return for name, parameter in self.parameters.items(): - if parameter['type'] == 'string': - parameter['type'] = numpy.dtype('str') + if parameter["type"] == "string": + parameter["type"] = numpy.dtype("str") else: - parameter['type'] = numpy.dtype(parameter['type']) - - if 'range' in parameter: - parameter['range'][0] = _try_convert(name, parameter['type'], - parameter['range'][0], 'start of range') - parameter['range'][1] = _try_convert(name, parameter['type'], - parameter['range'][1], 'end of range') - if parameter['range'][0] >= parameter['range'][1]: - self.errors.append("range for parameter `%s' has a start greater " \ - "then the end value (%r >= %r)" % \ - (name, parameter['range'][0], parameter['range'][1])) - - if 'choice' in parameter: - for i, choice in enumerate(parameter['choice']): - parameter['choice'][i] = _try_convert(name, parameter['type'], - parameter['choice'][i], 'choice[%d]' % i) - - if 'default' in parameter: - parameter['default'] = _try_convert(name, parameter['type'], - parameter['default'], 'default') - - if 'range' in parameter: # check range - if parameter['default'] < parameter['range'][0] or \ - parameter['default'] > parameter['range'][1]: - self.errors.append("default for parameter `%s' (%r) is not " \ - "within parameter range [%r, %r]" % (name, parameter['default'], - parameter['range'][0], parameter['range'][1])) - - if 'choice' in parameter: # check choices - if parameter['default'] not in parameter['choice']: - self.errors.append("default for parameter `%s' (%r) is not " \ - "a valid choice `[%s]'" % (name, parameter['default'], - ', '.join(['%r' % k for k in parameter['choice']]))) - + parameter["type"] = numpy.dtype(parameter["type"]) + + if "range" in parameter: + parameter["range"][0] = _try_convert( + name, parameter["type"], parameter["range"][0], "start of range" + ) + parameter["range"][1] = _try_convert( + name, parameter["type"], parameter["range"][1], "end of range" + ) + if parameter["range"][0] >= parameter["range"][1]: + self.errors.append( + "range for parameter `%s' has a start greater " + "then the end value (%r >= %r)" + % (name, parameter["range"][0], parameter["range"][1]) + ) + + if "choice" in parameter: + for i, choice in enumerate(parameter["choice"]): + parameter["choice"][i] = _try_convert( + name, + parameter["type"], + parameter["choice"][i], + "choice[%d]" % i, + ) + + if "default" in parameter: + parameter["default"] = _try_convert( + name, parameter["type"], parameter["default"], "default" + ) + + if "range" in parameter: # check range + if ( + parameter["default"] < parameter["range"][0] + or parameter["default"] > parameter["range"][1] + ): + self.errors.append( + "default for parameter `%s' (%r) is not " + "within parameter range [%r, %r]" + % ( + name, + parameter["default"], + parameter["range"][0], + parameter["range"][1], + ) + ) + + if "choice" in parameter: # check choices + if parameter["default"] not in parameter["choice"]: + self.errors.append( + "default for parameter `%s' (%r) is not " + "a valid choice `[%s]'" + % ( + name, + parameter["default"], + ", ".join(["%r" % k for k in parameter["choice"]]), + ) + ) def _validate_required_libraries(self, library_cache): @@ -383,32 +458,44 @@ class Algorithm(BackendAlgorithm): for name, value in self.uses.items(): - self.libraries[value] = library_cache.setdefault(value, - library.Library(self.prefix, value, library_cache)) + self.libraries[value] = library_cache.setdefault( + value, library.Library(self.prefix, value, library_cache) + ) if not self.libraries[value].valid: - self.errors.append("referred library `%s' (%s) is not valid" % \ - (self.libraries[value].name, name)) - + self.errors.append( + "referred library `%s' (%s) is not valid" + % (self.libraries[value].name, name) + ) def _check_language_consistence(self): # all used libraries must be programmed with the same language - if self.language == 'unknown': return # bail out on unknown language + if self.language == "unknown": + return # bail out on unknown language if self.uses: - for name, library in self.uses.items(): + for name, library_name in self.uses.items(): - if library not in self.libraries: continue # invalid + if library_name not in self.libraries: + continue # invalid - if self.libraries[library].data is None: - self.errors.append("language for used library `%s' cannot be " \ - "inferred as the library was not properly loaded" % \ - (library,)) + if self.libraries[library_name].data is None: + self.errors.append( + "language for used library `%s' cannot be " + "inferred as the library was not properly loaded" + % (library_name,) + ) continue - if self.libraries[library].language != self.language: - self.errors.append("language for used library `%s' (`%s') " \ - "differs from current language for this algorithm (`%s')" % \ - (library, self.libraries[library].language, self.language)) + if self.libraries[library_name].language != self.language: + self.errors.append( + "language for used library `%s' (`%s') " + "differs from current language for this algorithm (`%s')" + % ( + library_name, + self.libraries[library_name].language, + self.language, + ) + ) diff --git a/beat/core/prototypes/algorithm.json b/beat/core/prototypes/algorithm.json index ecf2d472803ddd7c3ea80628f977c9c5e26ffe12..20c963d316d56b74f05faacedb5e48956e89c35e 100644 --- a/beat/core/prototypes/algorithm.json +++ b/beat/core/prototypes/algorithm.json @@ -8,8 +8,14 @@ { "name": "main", "inputs": { + "in_data": { + "type": "author/dataformat/1" + } }, "outputs": { + "out_data": { + "type": "author/dataformat/1" + } } } ], diff --git a/beat/core/schema/algorithm/common.json b/beat/core/schema/algorithm/common.json index 9c4a232ecb33482064c568aeb55922676f39c4e5..b2d23e91b118eb74bd286a5ec92a8887a35ab056 100644 --- a/beat/core/schema/algorithm/common.json +++ b/beat/core/schema/algorithm/common.json @@ -25,6 +25,7 @@ } }, "uniqueItems": true, + "minProperties": 1, "additionalProperties": false }, @@ -168,10 +169,11 @@ } }, "uniqueItems": true, + "minProperties": 1, "additionalProperties": false }, "schema_version": { "$ref": "../common/1.json#/definitions/version" }, "api_version": { "$ref": "../common/1.json#/definitions/version" } } -} \ No newline at end of file +}