diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b34ee73ba0890b372e5ddc8b94ff59298396299c..d698e1c5ca99df18ba84f8feb3de1013fdc424f5 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -2,7 +2,7 @@ stages: - build variables: - PREFIX: /opt/beat.env.web-${CI_BUILD_REF_NAME}/usr + PREFIX: /opt/beat.env.web/usr build: stage: build diff --git a/README.rst b/README.rst index 03db5f212547a434fef3f9a5b543859698726533..66bdfa844ad291ef4d50a74a418e3fcccc02bf6b 100644 --- a/README.rst +++ b/README.rst @@ -63,6 +63,7 @@ In particular, this package controls memory and CPU utilisation of the containers it launches. You must make sure to enable those functionalities on your installation. + Docker Setup ============ @@ -75,14 +76,16 @@ execute algorithms or experiments. We use specific docker images to run user algorithms. Download the following base images before you try to run tests or experiments on your computer:: - $ docker pull beats/py27:system - $ docker pull debian:8.4 + $ docker pull docker.idiap.ch/beat/beat.env.system.python:system + $ docker pull docker.idiap.ch/beat/beat.env.db.examples:1.0.0 Optionally, also download the following images to be able to re-run experiments downloaded from the BEAT platform (not required for unit testing):: - $ docker pull beats/py27:0.0.4 - $ docker pull beats/py27:0.1.0 + $ docker pull docker.idiap.ch/beat/beat.env.python:0.0.4 + $ docker pull docker.idiap.ch/beat/beat.env.python:0.1.0 + $ docker pull docker.idiap.ch/beat/beat.env.cxx:1.0.1 + $ docker pull docker.idiap.ch/beat/beat.env.db:1.0.0 Documentation @@ -139,7 +142,6 @@ sphinx:: Development ----------- - Indentation =========== @@ -148,8 +150,8 @@ example, to enforce compliance on a single file and edit it in place, do:: $ ./bin/autopep8 --indent-size=2 --in-place beat/core/utils.py -We normally use 2-space identattion. If ever, you can easily change the -identation to 4 spaces like this:: +We normally use 2-space indentation. If ever, you can easily change the +indentation to 4 spaces like this:: $ ./bin/autopep8 --indent-size=4 --in-place beat/core/utils.py diff --git a/beat/core/agent.py b/beat/core/agent.py index ea50d0f2d47bfa6dc8289f6bd114f19096f7e7a9..6b2b2898c05912118dad2befabcf5e201c042453 100755 --- a/beat/core/agent.py +++ b/beat/core/agent.py @@ -28,6 +28,7 @@ import os import shutil +import simplejson import logging logger = logging.getLogger(__name__) @@ -44,21 +45,14 @@ from . import utils from . import dock from . import baseformat +from beat.backend.python.message_handler import MessageHandler -class Server(gevent.Greenlet): + +class Server(MessageHandler): '''A 0MQ server for our communication with the user process''' def __init__(self, input_list, output_list, host_address): - super(Server, self).__init__() - - # An event unblocking a graceful stop - self.stop = gevent.event.Event() - self.stop.clear() - - self.must_kill = gevent.event.Event() - self.must_kill.clear() - # Starts our 0MQ server self.context = zmq.Context() self.socket = self.context.socket(zmq.PAIR) @@ -67,104 +61,23 @@ class Server(gevent.Greenlet): port = self.socket.bind_to_random_port(self.address) self.address += ':%d' % port logger.debug("zmq server bound to `%s'", self.address) - self.poller = zmq.Poller() - self.poller.register(self.socket, zmq.POLLIN) - self.input_list = input_list - self.output_list = output_list + super(Server, self).__init__(input_list, self.context, self.socket) - self.system_error = '' - self.user_error = '' - self.last_statistics = {} + self.output_list = output_list # implementations - self.callbacks = dict( - nxt = self.next, - hmd = self.has_more_data, - idd = self.is_dataunit_done, - wrt = self.write, - idm = self.is_data_missing, - oic = self.output_is_connected, - don = self.done, - err = self.error, - ) - - - def set_process(self, process): - self.process = process - self.process.statistics() # initialize internal statistics + self.callbacks.update(dict( + wrt = self.write, + idm = self.is_data_missing, + oic = self.output_is_connected, + )) def __str__(self): return 'Server(%s)' % self.address - def _run(self): - - logger.debug("0MQ server thread started") - - while not self.stop.is_set(): #keep on - - if self.must_kill.is_set(): - self.process.kill() - self.must_kill.clear() - - timeout = 1000 #ms - socks = dict(self.poller.poll(timeout)) #yields to the next greenlet - - if self.socket in socks and socks[self.socket] == zmq.POLLIN: - - # incomming - more = True - parts = [] - while more: - parts.append(self.socket.recv()) - more = self.socket.getsockopt(zmq.RCVMORE) - command = parts[0] - - logger.debug("recv: %s", command) - - if command in self.callbacks: - try: #to handle command - self.callbacks[command](*parts[1:]) - except: - import traceback - parser = lambda s: s if len(s)<20 else s[:20] + '...' - parsed_parts = ' '.join([parser(k) for k in parts]) - message = "A problem occurred while performing command `%s' " \ - "killing user process. Exception:\n %s" % \ - (parsed_parts, traceback.format_exc()) - logger.error(message, exc_info=True) - self.system_error = message - self.process.kill() - self.stop.set() - break - - else: - message = "Command `%s' is not implemented - stopping user process" \ - % command - logger.error(message) - self.system_error = message - self.process.kill() - self.stop.set() - break - - self.socket.setsockopt(zmq.LINGER, 0) - self.socket.close() - self.context.term() - logger.debug("0MQ server thread stopped") - - - def _get_input_candidate(self, channel, name): - - channel_group = self.input_list.group(channel) - retval = channel_group[name] - if retval is None: - raise RuntimeError("Could not find input `%s' at channel `%s'" % \ - (name, channel)) - return retval - - def _get_output_candidate(self, name): retval = self.output_list[name] @@ -172,90 +85,6 @@ class Server(gevent.Greenlet): return retval - def next(self, channel, name=None): - """Syntax: nxt channel [name] ...""" - - if name is not None: #single input - logger.debug('recv: nxt %s %s', channel, name) - - input_candidate = self._get_input_candidate(channel, name) - input_candidate.next() - if input_candidate.data is None: #error - message = "User algorithm asked for more data for channel " \ - "`%s' on input `%s', but it is over (no more data). This " \ - "normally indicates a programming error on the user " \ - "side." % (channel, name) - self.user_error += message + '\n' - raise RuntimeError(message) - if isinstance(input_candidate.data, baseformat.baseformat): - packed = input_candidate.data.pack() - else: - packed = input_candidate.data - logger.debug('send: <bin> (size=%d)', len(packed)) - self.socket.send(packed) - - else: #whole group data - logger.debug('recv: nxt %s', channel) - - channel_group = self.input_list.group(channel) - - # Call next() on the group - channel_group.restricted_access = False - channel_group.next() - channel_group.restricted_access = True - - # Loop over the inputs - inputs_to_go = len(channel_group) - self.socket.send(str(inputs_to_go), zmq.SNDMORE) - for inp in channel_group: - logger.debug('send: %s', inp.name) - self.socket.send(str(inp.name), zmq.SNDMORE) - if inp.data is None: - message = "User algorithm process asked for more data on channel " \ - "`%s' (all inputs), but input `%s' has nothing. This " \ - "normally indicates a programming error on the user " \ - "side." % (channel, inp.name) - self.user_error += message + '\n' - raise RuntimeError(message) - elif isinstance(inp.data, baseformat.baseformat): - packed = inp.data.pack() - else: - packed = inp.data - logger.debug('send: <bin> (size=%d)', len(packed)) - inputs_to_go -= 1 - if inputs_to_go > 0: - self.socket.send(packed, zmq.SNDMORE) - else: - self.socket.send(packed) - - - def has_more_data(self, channel, name=None): - """Syntax: hmd channel [name]""" - - if name: #single input - logger.debug('recv: hmd %s %s', channel, name) - input_candidate = self._get_input_candidate(channel, name) - what = 'tru' if input_candidate.hasMoreData() else 'fal' - - else: #for all channel names - logger.debug('recv: hmd %s', channel) - channel_group = self.input_list.group(channel) - what = 'tru' if channel_group.hasMoreData() else 'fal' - - logger.debug('send: %s', what) - self.socket.send(what) - - - def is_dataunit_done(self, channel, name): - """Syntax: idd channel name""" - - logger.debug('recv: idd %s %s', channel, name) - input_candidate = self._get_input_candidate(channel, name) - what = 'tru' if input_candidate.isDataUnitDone() else 'fal' - logger.debug('send: %s', what) - self.socket.send(what) - - def write(self, name, packed): """Syntax: wrt output data""" @@ -296,50 +125,6 @@ class Server(gevent.Greenlet): self.socket.send(what) - def _collect_statistics(self): - - logger.debug('collecting user process statistics...') - self.last_statistics = self.process.statistics() - - - def _acknowledge(self): - - logger.debug('send: ack') - self.socket.send('ack') - logger.debug('setting stop condition for 0MQ server thread') - self.stop.set() - - - def done(self, wait_time): - """Syntax: don""" - - logger.debug('recv: don %s', wait_time) - - self._collect_statistics() - - # collect I/O stats from client - wait_time = float(wait_time) - self.last_statistics['data'] = dict(network=dict(wait_time=wait_time)) - - self._acknowledge() - - - def error(self, t, msg): - """Syntax: err type message""" - - logger.debug('recv: err %s <msg> (size=%d)', t, len(msg)) - if t == 'usr': self.user_error = msg - else: self.system_error = msg - - self._collect_statistics() - self.last_statistics['data'] = dict(network=dict(wait_time=0.)) - self._acknowledge() - - - def kill(self): - self.must_kill.set() - - class Agent(object): '''Handles synchronous commands. @@ -370,7 +155,9 @@ class Agent(object): self.virtual_memory_in_megabytes = virtual_memory_in_megabytes self.max_cpu_percent = max_cpu_percent self.tempdir = None + self.db_tempdir = None self.process = None + self.db_process = None self.server = None @@ -382,7 +169,10 @@ class Agent(object): # Creates a temporary directory for the user process self.tempdir = utils.temporary_directory() logger.debug("Created temporary directory `%s'", self.tempdir) + self.db_tempdir = utils.temporary_directory() + logger.debug("Created temporary directory `%s'", self.db_tempdir) self.process = None + self.db_process = None return self @@ -392,11 +182,16 @@ class Agent(object): shutil.rmtree(self.tempdir) self.tempdir = None + if self.db_tempdir is not None and os.path.exists(self.db_tempdir): + shutil.rmtree(self.db_tempdir) + self.db_tempdir = None + self.process = None + self.db_process = None logger.debug("Exiting processing context...") - def run(self, configuration, host, timeout_in_minutes=0, daemon=0): + def run(self, configuration, host, timeout_in_minutes=0, daemon=0, db_address=None): """Runs the algorithm code @@ -424,17 +219,46 @@ class Agent(object): # Recursively copies configuration data to <tempdir>/prefix configuration.dump_runner_configuration(self.tempdir) + if db_address is not None: + configuration.dump_databases_provider_configuration(self.db_tempdir) + + # Modify the paths to the databases in the dumped configuration files + root_folder = os.path.join(self.db_tempdir, 'prefix', 'databases') + + database_paths = {} + + for db_name in configuration.databases.keys(): + json_path = os.path.join(root_folder, db_name + '.json') + + with open(json_path, 'r') as f: + db_data = simplejson.load(f) + + database_paths[db_name] = db_data['root_folder'] + db_data['root_folder'] = os.path.join('/databases', db_name) + + with open(json_path, 'w') as f: + simplejson.dump(db_data, f, indent=4) + # Server for our single client self.server = Server(configuration.input_list, configuration.output_list, host.ip) - # Figures out the image to use + # Figures out the images to use envkey = '%(name)s (%(version)s)' % configuration.data['environment'] if envkey not in host: raise RuntimeError("Environment `%s' is not available on docker " \ "host `%s' - available environments are %s" % (envkey, host, ", ".join(host.environments.keys()))) + if db_address is not None: + try: + db_envkey = host.db2docker(database_paths.keys()) + except: + raise RuntimeError("No environment found for the databases `%s' " \ + "- available environments are %s" % ( + ", ".join(database_paths.keys()), + ", ".join(host.db_environments.keys()))) + # Launches the process (0MQ client) tmp_dir = os.path.join('/tmp', os.path.basename(self.tempdir)) cmd = ['execute', self.server.address, tmp_dir] @@ -447,14 +271,35 @@ class Agent(object): cmd = ['sleep', str(daemon)] logger.debug("Daemon mode: sleeping for %d seconds", daemon) else: + if db_address is not None: + tmp_dir = os.path.join('/tmp', os.path.basename(self.db_tempdir)) + db_cmd = ['databases_provider', db_address, tmp_dir] + + volumes = {} + + for db_name, db_path in database_paths.items(): + volumes[db_path] = { + 'bind': os.path.join('/databases', db_name), + 'mode': 'ro', + } + + # Note: we only support one databases image loaded at the same time + self.db_process = dock.Popen( + host, + db_envkey, + command=db_cmd, + tmp_archive=self.db_tempdir, + volumes=volumes + ) + self.process = dock.Popen( - host, - envkey, - command=cmd, - tmp_archive=self.tempdir, - virtual_memory_in_megabytes=self.virtual_memory_in_megabytes, - max_cpu_percent=self.max_cpu_percent, - ) + host, + envkey, + command=cmd, + tmp_archive=self.tempdir, + virtual_memory_in_megabytes=self.virtual_memory_in_megabytes, + max_cpu_percent=self.max_cpu_percent, + ) # provide a tip on how to stop the test if daemon > 0: @@ -476,6 +321,11 @@ class Agent(object): timeout_in_minutes) self.process.kill() status = self.process.wait() + + if self.db_process is not None: + self.db_process.kill() + self.db_process.wait() + timed_out = True except KeyboardInterrupt: #developer pushed CTRL-C @@ -483,6 +333,10 @@ class Agent(object): self.process.kill() status = self.process.wait() + if self.db_process is not None: + self.db_process.kill() + self.db_process.wait() + finally: self.server.stop.set() @@ -499,6 +353,13 @@ class Agent(object): user_error = self.server.user_error, ) process.rm() + + if self.db_process is not None: + retval['stdout'] += '\n' + self.db_process.stdout + retval['stderr'] += '\n' + self.db_process.stderr + self.db_process.rm() + self.db_process = None + self.server = None return retval diff --git a/beat/core/algorithm.py b/beat/core/algorithm.py index 38025ce3328ceb31d8603ecee221c2dc7573546f..5084641376718150a0c2c978b7dc7237a2b02f81 100755 --- a/beat/core/algorithm.py +++ b/beat/core/algorithm.py @@ -39,155 +39,15 @@ from . import dataformat from . import library from . import schema from . import prototypes -from . import loader from . import utils -class Storage(utils.CodeStorage): - """Resolves paths for algorithms +from beat.backend.python.algorithm import Storage +from beat.backend.python.algorithm import Runner +from beat.backend.python.algorithm import Algorithm as BackendAlgorithm - Parameters: - - prefix (str): Establishes the prefix of your installation. - - name (str): The name of the algorithm object in the format - ``<user>/<name>/<version>``. - - """ - - def __init__(self, prefix, name, language=None): - - if name.count('/') != 2: - raise RuntimeError("invalid algorithm name: `%s'" % name) - - self.username, self.name, self.version = name.split('/') - self.prefix = prefix - self.fullname = name - - path = utils.hashed_or_simple(self.prefix, 'algorithms', name) - super(Storage, self).__init__(path, language) - - -class Runner(object): - '''A special loader class for algorithms, with specialized methods - - Parameters: - - module (module): The preloaded module containing the algorithm as - returned by :py:func:`beat.core.loader.load_module`. - - obj_name (str): The name of the object within the module you're interested - on - - exc (class): The class to use as base exception when translating the - exception from the user code. Read the documention of :py:func:`run` - for more details. - - algorithm (object): The algorithm instance that is used for parameter - checking. - - *args: Constructor parameters for the algorithm (normally none) - - **kwargs: Constructor parameters for the algorithm (normally none) - - ''' - - - def __init__(self, module, obj_name, algorithm, exc=None, *args, - **kwargs): - - try: - class_ = getattr(module, obj_name) - except Exception as e: - if exc is not None: - type, value, traceback = sys.exc_info() - six.reraise(exc, exc(value), traceback) - else: - raise #just re-raise the user exception - - self.obj = loader.run(class_, '__new__', exc, *args, **kwargs) - self.name = module.__name__ - self.algorithm = algorithm - self.exc = exc - - # if the algorithm does not have a 'setup' method, it is ready by default - self.ready = not hasattr(self.obj, 'setup') - - - def _check_parameters(self, parameters): - """Checks input parameters from the user and fill defaults""" - - user_keys = set(parameters.keys()) - algo_parameters = self.algorithm.parameters or {} - valid_keys = set(algo_parameters.keys()) - - # checks the user is not trying to set an undeclared parameter - if not user_keys.issubset(valid_keys): - err_keys = user_keys - valid_keys - message = "parameters `%s' are not declared for algorithm `%s' - " \ - "valid parameters are `%s'" % ( - ','.join(err_keys), - self.name, - ','.join(valid_keys), - ) - exc = self.exc or KeyError - raise exc(message) - - # checks all values set by the user are in range (if a range is set) - - retval = dict() #dictionary with checked user parameters and defaults - - for key, definition in algo_parameters.items(): - - if key in parameters: - try: - value = self.algorithm.clean_parameter(key, parameters[key]) - except Exception as e: - message = "parameter `%s' cannot be safely cast to the declared " \ - "type on algorithm `%s': %s" % (key, self.name, e) - exc = self.exc or ValueError - raise exc(message) - - else: #user has not set a value, use the default - value = algo_parameters[key]['default'] - - # in the end, set the value on the dictionary to be returned - retval[key] = value - - return retval - - - def setup(self, parameters, *args, **kwargs): - '''Sets up the algorithm, only effective on the first call''' - if self.ready: return self.ready - completed_parameters = self._check_parameters(parameters) #may raise - kwargs['parameters'] = completed_parameters - - if hasattr(self.obj, 'setup'): - self.ready = loader.run(self.obj, 'setup', self.exc, *args, **kwargs) - return self.ready - else: - return True - - - def process(self, *args, **kwargs): - '''Runs through data''' - - if not self.ready: - message = "algorithm `%s' is not yet setup" % (self.name,) - exc = self.exc or RuntimeError - raise self.exc(message) - - return loader.run(self.obj, 'process', self.exc, *args, **kwargs) - - - def __getattr__(self, key): - '''Returns an attribute of the algorithm - only called at last resort''' - return getattr(self.obj, key) - - -class Algorithm(object): +class Algorithm(BackendAlgorithm): """Algorithms represent runnable components within the platform. This class can only parse the meta-parameters of the algorithm (i.e., input @@ -276,19 +136,8 @@ class Algorithm(object): """ def __init__(self, prefix, data, dataformat_cache=None, library_cache=None): + super(Algorithm, self).__init__(prefix, data, dataformat_cache, library_cache) - self._name = None - self.storage = None - self.prefix = prefix - self.dataformats = {} - self.libraries = {} - - self.groups = [] - - dataformat_cache = dataformat_cache if dataformat_cache is not None else {} - library_cache = library_cache if library_cache is not None else {} - - self._load(data, dataformat_cache, library_cache) def _load(self, data, dataformat_cache, library_cache): """Loads the algorithm""" @@ -556,269 +405,6 @@ class Algorithm(object): (library, self.libraries[library].language, self.language)) - @property - def name(self): - """Returns the name of this object - """ - return self._name or '__unnamed_algorithm__' - - - @name.setter - def name(self, value): - - if self.data['language'] == 'unknown': - raise RuntimeError("algorithm has no programming language set") - - self._name = value - self.storage = Storage(self.prefix, value, self.data['language']) - - - @property - def schema_version(self): - """Returns the schema version""" - return self.data.get('schema_version', 1) - - - @property - def language(self): - """Returns the current language set for the executable code""" - return self.data['language'] - - - @language.setter - def language(self, value): - """Sets the current executable code programming language""" - if self.storage: self.storage.language = value - self.data['language'] = value - - - def clean_parameter(self, parameter, value): - """Checks if a given value against a declared parameter - - This method checks if the provided user value can be safe-cast to the - parameter type as defined on its specification and that it conforms to any - parameter-imposed restrictions. - - - Parameters: - - parameter (str): The name of the parameter to check the value against - - value (object): An object that will be safe cast into the defined - parameter type. - - - Returns: - - The converted value, with an appropriate numpy type. - - - Raises: - - KeyError: If the parameter cannot be found on this algorithm's - declaration. - - ValueError: If the parameter cannot be safe cast into the algorithm's - type. Alternatively, a ``ValueError`` may also be raised if a range or - choice was specified and the value does not obbey those settings - estipulated for the parameter - - """ - - if (self.parameters is None) or (parameter not in self.parameters): - raise KeyError(parameter) - - retval = self.parameters[parameter]['type'].type(value) - - if 'choice' in self.parameters[parameter] and \ - retval not in self.parameters[parameter]['choice']: - raise ValueError("value for `%s' (%r) must be one of `[%s]'" % \ - (parameter, value, ', '.join(['%r' % k for k in \ - self.parameters[parameter]['choice']]))) - - if 'range' in self.parameters[parameter] and \ - (retval < self.parameters[parameter]['range'][0] or \ - retval > self.parameters[parameter]['range'][1]): - raise ValueError("value for `%s' (%r) must be picked within " \ - "interval `[%r, %r]'" % (parameter, value, - self.parameters[parameter]['range'][0], - self.parameters[parameter]['range'][1])) - - return retval - - @property - def valid(self): - """A boolean that indicates if this algorithm is valid or not""" - - return not bool(self.errors) - - - @property - def uses(self): - return self.data.get('uses') - - @uses.setter - def uses(self, value): - self.data['uses'] = value - return value - - - @property - def results(self): - return self.data.get('results') - - @results.setter - def results(self, value): - self.data['results'] = value - return value - - - @property - def parameters(self): - return self.data.get('parameters') - - @parameters.setter - def parameters(self, value): - self.data['parameters'] = value - return value - - - @property - def splittable(self): - return self.data.get('splittable', False) - - @splittable.setter - def splittable(self, value): - self.data['splittable'] = value - return value - - - def uses_dict(self): - """Returns the usage dictionary for all dependent modules""" - - if self.data['language'] == 'unknown': - raise RuntimeError("algorithm has no programming language set") - - if not self._name: - raise RuntimeError("algorithm has no name") - - retval = {} - - if self.uses is not None: - for name, value in self.uses.items(): - retval[name] = dict( - path=self.libraries[value].storage.code.path, - uses=self.libraries[value].uses_dict(), - ) - - return retval - - - def runner(self, klass='Algorithm', exc=None): - """Returns a runnable algorithm object. - - Parameters: - - klass (str): The name of the class to load the runnable algorithm from - - exc (class): If passed, must be a valid exception class that will be - used to report errors in the read-out of this algorithm's code. - - Returns: - - :py:class:`beat.core.algorithm.Runner`: An instance of the algorithm, - which will be constructed, but not setup. You **must** set it up - before using the ``process`` method. - """ - - if not self._name: - exc = exc or RuntimeError - raise exc("algorithm has no name") - - if self.data['language'] == 'unknown': - exc = exc or RuntimeError - raise exc("algorithm has no programming language set") - - if not self.valid: - message = "cannot load code for invalid algorithm (%s)" % (self.name,) - exc = exc or RuntimeError - raise exc(message) - - # loads the module only once through the lifetime of the algorithm object - try: - self.__module = getattr(self, 'module', - loader.load_module(self.name.replace(os.sep, '_'), - self.storage.code.path, self.uses_dict())) - except Exception as e: - if exc is not None: - type, value, traceback = sys.exc_info() - six.reraise(exc, exc(value), traceback) - else: - raise #just re-raise the user exception - - return Runner(self.__module, klass, self, exc) - - - @property - def description(self): - """The short description for this object""" - return self.data.get('description', None) - - @description.setter - def description(self, value): - """Sets the short description for this object""" - self.data['description'] = value - - - @property - def documentation(self): - """The full-length description for this object""" - - if not self._name: - raise RuntimeError("algorithm has no name") - - if self.storage.doc.exists(): - return self.storage.doc.load() - return None - - - @documentation.setter - def documentation(self, value): - """Sets the full-length description for this object""" - - if not self._name: - raise RuntimeError("algorithm has no name") - - if hasattr(value, 'read'): - self.storage.doc.save(value.read()) - else: - self.storage.doc.save(value) - - - def hash(self): - """Returns the hexadecimal hash for the current algorithm""" - - if not self._name: - raise RuntimeError("algorithm has no name") - - return self.storage.hash() - - - def result_dataformat(self): - """Generates, on-the-fly, the dataformat for the result readout""" - - if not self.results: - raise TypeError("algorithm `%s' is a block algorithm, not an analyzer" \ - % (self.name)) - - format = dataformat.DataFormat(self.prefix, - dict([(k, v['type']) for k,v in self.results.items()])) - - format.name = 'analysis:' + self.name - - return format - - def json_dumps(self, indent=4): """Dumps the JSON declaration of this object in a string diff --git a/beat/core/database.py b/beat/core/database.py index 550ed8cace765cc3e9e58c60c369e4b19238c381..9b52b5f3b0f8d86889269570a25ef1568067b6c3 100644 --- a/beat/core/database.py +++ b/beat/core/database.py @@ -43,162 +43,13 @@ from . import hash from . import utils from . import prototypes -class Storage(utils.CodeStorage): - """Resolves paths for databases - - Parameters: - - prefix (str): Establishes the prefix of your installation. - - name (str): The name of the database object in the format - ``<name>/<version>``. - - """ - - def __init__(self, prefix, name): - - if name.count('/') != 1: - raise RuntimeError("invalid database name: `%s'" % name) - - self.name, self.version = name.split('/') - self.fullname = name - - path = os.path.join(prefix, 'databases', name) - super(Storage, self).__init__(path, 'python') #views are coded in Python - - -class View(object): - '''A special loader class for database views, with specialized methods - - Parameters: - - db_name (str): The full name of the database object for this view - - module (module): The preloaded module containing the database views as - returned by :py:func:`beat.core.loader.load_module`. - - prefix (str, path): The prefix path for the current installation - - root_folder (str, path): The path pointing to the root folder of this - database - - exc (class): The class to use as base exception when translating the - exception from the user code. Read the documention of :py:func:`run` - for more details. - - *args: Constructor parameters for the database view. Normally, none. - - **kwargs: Constructor parameters for the database view. Normally, none. - - ''' - - - def __init__(self, module, definition, prefix, root_folder, exc=None, - *args, **kwargs): - - try: - class_ = getattr(module, definition['view']) - except Exception as e: - if exc is not None: - type, value, traceback = sys.exc_info() - six.reraise(exc, exc(value), traceback) - else: - raise #just re-raise the user exception - - self.obj = loader.run(class_, '__new__', exc, *args, **kwargs) - self.ready = False - self.prefix = prefix - self.root_folder = root_folder - self.definition = definition - self.exc = exc or RuntimeError - self.outputs = None - - - def prepare_outputs(self): - '''Prepares the outputs of the dataset''' - - from .outputs import Output, OutputList - from .data import MemoryDataSink - from .dataformat import DataFormat - - # create the stock outputs for this dataset, so data is dumped - # on a in-memory sink - self.outputs = OutputList() - for out_name, out_format in self.definition.get('outputs', {}).items(): - data_sink = MemoryDataSink() - data_sink.dataformat = DataFormat(self.prefix, out_format) - data_sink.setup([]) - self.outputs.add(Output(out_name, data_sink, dataset_output=True)) - - - def setup(self, *args, **kwargs): - '''Sets up the view''' - - kwargs.setdefault('root_folder', self.root_folder) - kwargs.setdefault('parameters', self.definition.get('parameters', {})) - - if 'outputs' not in kwargs: - kwargs['outputs'] = self.outputs - else: - self.outputs = kwargs['outputs'] #record outputs nevertheless - - self.ready = loader.run(self.obj, 'setup', self.exc, *args, **kwargs) - - if not self.ready: - raise self.exc("unknow setup failure") - - return self.ready - - - def input_group(self, name='default', exclude_outputs=[]): - '''A memory-source input group matching the outputs from the view''' - - if not self.ready: - raise self.exc("database view not yet setup") - - from .data import MemoryDataSource - from .outputs import SynchronizationListener - from .inputs import Input, InputGroup - - # Setup the inputs - synchronization_listener = SynchronizationListener() - input_group = InputGroup(name, - synchronization_listener=synchronization_listener, - restricted_access=False) - - for output in self.outputs: - if output.name in exclude_outputs: continue - data_source = MemoryDataSource(self.done, next_callback=self.next) - output.data_sink.data_sources.append(data_source) - input_group.add(Input(output.name, - output.data_sink.dataformat, data_source)) - - return input_group - - - def done(self, *args, **kwargs): - '''Checks if the view is done''' - - if not self.ready: - raise self.exc("database view not yet setup") - - return loader.run(self.obj, 'done', self.exc, *args, **kwargs) +from beat.backend.python.database import Storage +from beat.backend.python.database import View +from beat.backend.python.database import Database as BackendDatabase - def next(self, *args, **kwargs): - '''Runs through the next data chunk''' - if not self.ready: - raise self.exc("database view not yet setup") - return loader.run(self.obj, 'next', self.exc, *args, **kwargs) - - - def __getattr__(self, key): - '''Returns an attribute of the view - only called at last resort''' - return getattr(self.obj, key) - - -class Database(object): +class Database(BackendDatabase): """Databases define the start point of the dataflow in an experiment. @@ -240,20 +91,8 @@ class Database(object): """ def __init__(self, prefix, data, dataformat_cache=None): + super(Database, self).__init__(prefix, data, dataformat_cache) - self._name = None - self.storage = None - self.prefix = prefix - self.dataformats = {} # preloaded dataformats - - self.errors = [] - self.data = None - self.code = None - - # if the user has not provided a cache, still use one for performance - dataformat_cache = dataformat_cache if dataformat_cache is not None else {} - - self._load(data, dataformat_cache) def _load(self, data, dataformat_cache): """Loads the database""" @@ -353,111 +192,20 @@ class Database(object): "unsupported by this version" % (_set['view'],) ) + @property def name(self): """Returns the name of this object """ return self._name or '__unnamed_database__' + @name.setter def name(self, value): self._name = value self.storage = Storage(self.prefix, value) - @property - def schema_version(self): - """Returns the schema version""" - return self.data.get('schema_version', 1) - - - @property - def protocols(self): - """The declaration of all the protocols of the database""" - - data = self.data['protocols'] - return dict(zip([k['name'] for k in data], data)) - - def protocol(self, name): - """The declaration of a specific protocol in the database""" - - return self.protocols[name] - - @property - def protocol_names(self): - """Names of protocols declared for this database""" - - data = self.data['protocols'] - return [k['name'] for k in data] - - def sets(self, protocol): - """The declaration of a specific set in the database protocol""" - - data = self.protocol(protocol)['sets'] - return dict(zip([k['name'] for k in data], data)) - - def set(self, protocol, name): - """The declaration of all the protocols of the database""" - - return self.sets(protocol)[name] - - def set_names(self, protocol): - """The names of sets in a given protocol for this database""" - - data = self.protocol(protocol)['sets'] - return [k['name'] for k in data] - - @property - def valid(self): - return not bool(self.errors) - - - def view(self, protocol, name, exc=None): - """Returns the database view, given the protocol and the set name - - Parameters: - - protocol (str): The name of the protocol where to retrieve the view from - - name (str): The name of the set in the protocol where to retrieve the - view from - - exc (class): If passed, must be a valid exception class that will be - used to report errors in the read-out of this database's view. - - Returns: - - The database view, which will be constructed, but not setup. You - **must** set it up before using methods ``done`` or ``next``. - - """ - - if not self._name: - exc = exc or RuntimeError - raise exc("database has no name") - - if not self.valid: - message = "cannot load view for set `%s' of protocol `%s' " \ - "from invalid database (%s)" % (protocol, name, self.name) - if exc: raise exc(message) - raise RuntimeError(message) - - # loads the module only once through the lifetime of the database object - try: - if not hasattr(self, '_module'): - self._module = loader.load_module(self.name.replace(os.sep, '_'), - self.storage.code.path, {}) - except Exception as e: - if exc is not None: - type, value, traceback = sys.exc_info() - six.reraise(exc, exc(value), traceback) - else: - raise #just re-raise the user exception - - return View(self._module, self.set(protocol, name), self.prefix, - self.data['root_folder'], exc) - - def hash_output(self, protocol, set, output): """Creates a unique hash the represents the output from the dataset @@ -497,6 +245,7 @@ class Database(object): """The short description for this object""" return self.data.get('description', None) + @description.setter def description(self, value): """Sets the short description for this object""" diff --git a/beat/core/dataformat.py b/beat/core/dataformat.py index a926487ad83ef272180df68c95679c4fc5ef8cab..75f9656391b9968a9de0d546eabbe61e5aa24d88 100644 --- a/beat/core/dataformat.py +++ b/beat/core/dataformat.py @@ -29,11 +29,9 @@ """Validation and parsing for dataformats""" import os -import re import copy import six -import numpy import simplejson from . import schema @@ -41,37 +39,13 @@ from . import prototypes from . import utils from .baseformat import baseformat -class Storage(utils.Storage): - """Resolves paths for dataformats +from beat.backend.python.dataformat import Storage +from beat.backend.python.dataformat import DataFormat as BackendDataFormat - Parameters: - - prefix (str): Establishes the prefix of your installation. - - name (str): The name of the dataformat object in the format - ``<user>/<name>/<version>``. - - """ - - def __init__(self, prefix, name): - if name.count('/') != 2: - raise RuntimeError("invalid dataformat name: `%s'" % name) - self.username, self.name, self.version = name.split('/') - self.fullname = name - - path = utils.hashed_or_simple(prefix, 'dataformats', name) - super(Storage, self).__init__(path) - - - def hash(self): - """The 64-character hash of the database declaration JSON""" - return super(Storage, self).hash('#description') - - -class DataFormat(object): - """Data formats define the chunks of data that circulate at data formats. +class DataFormat(BackendDataFormat): + """Data formats define the chunks of data that circulate between blocks. Parameters: @@ -127,25 +101,8 @@ class DataFormat(object): """ def __init__(self, prefix, data, parent=None, dataformat_cache=None): + super(DataFormat, self).__init__(prefix, data, parent, dataformat_cache) - self._name = None - self.storage = None - self.resolved = None - self.prefix = prefix - self.errors = [] - self.data = None - self.resolved = None - self.referenced = {} - self.parent = parent - - # if the user has not provided a cache, still use one for performance - dataformat_cache = dataformat_cache if dataformat_cache is not None else {} - - try: - self._load(data, dataformat_cache) - finally: - if self._name is not None: #registers it into the cache, even if failed - dataformat_cache[self._name] = self def _load(self, data, dataformat_cache): """Loads the dataformat""" @@ -264,217 +221,13 @@ class DataFormat(object): # all references are resolved at this point and the final model is built # you can lookup the original data in ``self.data`` and the final model # in ``self.resolved``. - if self.errors: self.errors = utils.uniq(self.errors) - - - @property - def name(self): - """Returns the name of this object, either from the filename or composed - from the hierarchy it belongs. - """ - if self.parent and self._name is None: - return self.parent[0].name + '.' + self.parent[1] + '_type' - else: - return self._name or '__unnamed_dataformat__' - - - @property - def schema_version(self): - """Returns the schema version""" - return self.data.get('#schema_version', 1) - - - @name.setter - def name(self, value): - self._name = value - self.storage = Storage(self.prefix, value) - - - @property - def extends(self): - """If this dataformat extends another one, this is it, otherwise ``None`` - """ - return self.data.get('#extends') - - - @property - def type(self): - """Returns a new type that can create instances of this dataformat. - - The new returned type provides a basis to construct new objects which - represent the dataformat. It provides a simple JSON serializer and a - for-screen representation. - - Example: - - To create an object respecting the data format from a JSON descriptor, use - the following technique: - - .. code-block:: python - - ftype = dataformat(...).type - json = simplejson.loads(...) - newobj = ftype(**json) # instantiates the new object, checks format - - To dump the object into JSON, use the following technique: - - .. code-block:: python - - simplejson.dumps(newobj.as_dict(), indent=4) - - A string representation of the object uses the technique above to - pretty-print the object contents to the screen. - """ - - if self.resolved is None: - raise RuntimeError("Cannot prototype while not properly initialized") - - classname = re.sub(r'[-/]', '_', self.name) - if not isinstance(classname, str): classname = str(classname) - - def init(self, **kwargs): baseformat.__init__(self, **kwargs) - - attributes = dict( - __init__=init, - _name=self.name, - _format=self.resolved, - ) - - # create the converters for the class we're about to return - for k, v in self.resolved.items(): - - if isinstance(v, list): #it is an array - attributes[k] = copy.deepcopy(v) - if isinstance(v[-1], DataFormat): - attributes[k][-1] = v[-1].type - else: - if v[-1] in ('string', 'str'): - attributes[k][-1] = str - else: - attributes[k][-1] = numpy.dtype(v[-1]) - - elif isinstance(v, DataFormat): #it is another dataformat - attributes[k] = v.type - - else: #it is a simple type - if v in ('string', 'str'): - attributes[k] = str - else: - attributes[k] = numpy.dtype(v) - - return type( - classname, - (baseformat,), - attributes, - ) - - - @property - def valid(self): - return not bool(self.errors) - - - def validate(self, data): - """Validates a piece of data provided by the user - - In order to validate, the data object must be complete and safe-castable to - this dataformat. For any other validation operation that would require - special settings, use instead the :py:meth:`type` method to generate a - valid type and use either ``from_dict``, ``unpack`` or ``unpack_from`` - depending on your use-case. - - Parameters: - - data (dict, str, fd): This parameter represents the data to be validated. - It may be a dictionary with the JSON representation of a data blob or, - else, a binary blob (represented by either a string or a file - descriptor object) from which the data will be read. If problems occur, - an exception is raised. - - Returns: - - ``None``: Raises if an error occurs. - """ - - obj = self.type() - if isinstance(data, dict): - obj.from_dict(data, casting='safe', add_defaults=False) - elif isinstance(data, six.string_types): - obj.unpack(data) - else: - obj.unpack_from(data) - - - def isparent(self, other): - """Tells if the other object extends self (directly or indirectly). - - Parameters: - - other (DataFormat): another object to check - - - Returns: - - bool: ``True``, if ``other`` is a parent of ``self``. ``False`` - otherwise. - - """ - - if other.extends: - if self.name == other.extends: return True - else: return self.isparent(other.referenced[other.extends]) - - return False - - - @property - def description(self): - """The short description for this object""" - return self.data.get('#description', None) - - @description.setter - def description(self, value): - """Sets the short description for this object""" - self.data['#description'] = value - - - @property - def documentation(self): - """The full-length description for this object""" - - if not self._name: - raise RuntimeError("dataformat has no name") - - if self.storage.doc.exists(): - return self.storage.doc.load() - return None - - @documentation.setter - def documentation(self, value): - """Sets the full-length description for this object""" - - if not self._name: - raise RuntimeError("dataformat has no name") - - if hasattr(value, 'read'): - self.storage.doc.save(value.read()) - else: - self.storage.doc.save(value) - - - def hash(self): - """Returns the hexadecimal hash for its declaration""" - - if not self._name: - raise RuntimeError("dataformat has no name") - - return self.storage.hash() + if self.errors: + self.errors = utils.uniq(self.errors) def json_dumps(self, indent=4): """Dumps the JSON declaration of this object in a string - Parameters: indent (int): The number of indentation spaces at every indentation level diff --git a/beat/core/dbexecution.py b/beat/core/dbexecution.py new file mode 100644 index 0000000000000000000000000000000000000000..db630861c427d475a0a7397ba832f41a4fa32e88 --- /dev/null +++ b/beat/core/dbexecution.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +'''Execution utilities''' + +import os +import sys +import glob +import errno +import tempfile +import subprocess + +import logging +logger = logging.getLogger(__name__) + +import simplejson + +from . import schema +from . import database +from . import algorithm +from . import inputs +from . import outputs +from . import data +from . import stats +from . import agent + + +class DBExecutor(object): + """Executor specialised in database views + + + Parameters: + + prefix (str): Establishes the prefix of your installation. + + data (dict, str): The piece of data representing the block to be executed. + It must validate against the schema defined for execution blocks. If a + string is passed, it is supposed to be a fully qualified absolute path to + a JSON file containing the block execution information. + + dataformat_cache (dict, optional): A dictionary mapping dataformat names to + loaded dataformats. This parameter is optional and, if passed, may + greatly speed-up database loading times as dataformats that are already + loaded may be re-used. If you use this parameter, you must guarantee that + the cache is refreshed as appropriate in case the underlying dataformats + change. + + database_cache (dict, optional): A dictionary mapping database names to + loaded databases. This parameter is optional and, if passed, may + greatly speed-up database loading times as databases that are already + loaded may be re-used. If you use this parameter, you must guarantee that + the cache is refreshed as appropriate in case the underlying databases + change. + + + Attributes: + + errors (list): A list containing errors found while loading this execution + block. + + data (dict): The original data for this executor, as loaded by our JSON + decoder. + + databases (dict): A dictionary in which keys are strings with database + names and values are :py:class:`database.Database`, representing the + databases required for running this block. The dictionary may be empty + in case all inputs are taken from the file cache. + + views (dict): A dictionary in which the keys are tuples pointing to the + ``(<database-name>, <protocol>, <set>)`` and the value is a setup view + for that particular combination of details. The dictionary may be empty + in case all inputs are taken from the file cache. + + input_list (beat.core.inputs.InputList): A list of inputs that will be + served to the algorithm. + + data_sources (list): A list with all data-sources created by our execution + loader. + + """ + + def __init__(self, prefix, data, dataformat_cache=None, database_cache=None): + + self.prefix = prefix + + # some attributes + self.databases = {} + self.views = {} + self.input_list = None + self.data_sources = [] + self.handler = None + self.errors = [] + self.data = None + + # temporary caches, if the user has not set them, for performance + database_cache = database_cache if database_cache is not None else {} + self.dataformat_cache = dataformat_cache if dataformat_cache is not None else {} + + self._load(data, self.dataformat_cache, database_cache) + + + def _load(self, data, dataformat_cache, database_cache): + """Loads the block execution information""" + + # reset + self.data = None + self.errors = [] + self.databases = {} + self.views = {} + self.input_list = None + self.data_sources = [] + + if not isinstance(data, dict): #user has passed a file name + if not os.path.exists(data): + self.errors.append('File not found: %s' % data) + return + + # this runs basic validation, including JSON loading if required + self.data, self.errors = schema.validate('execution', data) + if self.errors: return #don't proceed with the rest of validation + + # load databases + for name, details in self.data['inputs'].items(): + if 'database' in details: + + if details['database'] not in self.databases: + + if details['database'] in database_cache: #reuse + db = database_cache[details['database']] + else: #load it + db = database.Database(self.prefix, details['database'], + dataformat_cache) + database_cache[db.name] = db + + self.databases[details['database']] = db + + if not db.valid: + self.errors += db.errors + continue + + if not db.valid: + # do not add errors again + continue + + # create and load the required views + key = (details['database'], details['protocol'], details['set']) + if key not in self.views: + view = self.databases[details['database']].view(details['protocol'], + details['set']) + + if details['channel'] == self.data['channel']: #synchronized + start_index, end_index = self.data.get('range', (None, None)) + else: + start_index, end_index = (None, None) + view.prepare_outputs() + self.views[key] = (view, start_index, end_index) + + + def __enter__(self): + """Prepares inputs and outputs for the processing task + + Raises: + + IOError: in case something cannot be properly setup + + """ + + self._prepare_inputs() + + # The setup() of a database view may call isConnected() on an input + # to set the index at the right location when parallelization is enabled. + # This is why setup() should be called after initialized the inputs. + for key, (view, start_index, end_index) in self.views.items(): + + if (start_index is None) and (end_index is None): + status = view.setup() + else: + status = view.setup(force_start_index=start_index, + force_end_index=end_index) + + if not status: + raise RuntimeError("Could not setup database view `%s'" % key) + + return self + + + def __exit__(self, exc_type, exc_value, traceback): + """Closes all sinks and disconnects inputs and outputs + """ + self.input_list = None + self.data_sources = [] + + + def _prepare_inputs(self): + """Prepares all input required by the execution.""" + + self.input_list = inputs.InputList() + + # This is used for parallelization purposes + start_index, end_index = self.data.get('range', (None, None)) + + for name, details in self.data['inputs'].items(): + + if 'database' in details: #it is a dataset input + + view_key = (details['database'], details['protocol'], details['set']) + view = self.views[view_key][0] + + data_source = data.MemoryDataSource(view.done, next_callback=view.next) + self.data_sources.append(data_source) + output = view.outputs[details['output']] + + # if it's a synchronized channel, makes the output start at the right + # index, otherwise, it gets lost + if start_index is not None and \ + details['channel'] == self.data['channel']: + output.last_written_data_index = start_index - 1 + output.data_sink.data_sources.append(data_source) + + # Synchronization bits + group = self.input_list.group(details['channel']) + if group is None: + group = inputs.InputGroup( + details['channel'], + synchronization_listener=outputs.SynchronizationListener(), + restricted_access=(details['channel'] == self.data['channel']) + ) + self.input_list.add(group) + + input_db = self.databases[details['database']] + input_dataformat_name = input_db.set(details['protocol'], details['set'])['outputs'][details['output']] + group.add(inputs.Input(name, self.dataformat_cache[input_dataformat_name], data_source)) + + + def process(self, zmq_context, zmq_socket): + + self.handler = agent.MessageHandler(self.input_list, zmq_context, zmq_socket) + self.handler.start() + + + @property + def valid(self): + """A boolean that indicates if this executor is valid or not""" + + return not bool(self.errors) + + + def wait(self): + self.handler.join() + self.handler = None + + + def __str__(self): + return simplejson.dumps(self.data, indent=4) diff --git a/beat/core/dock.py b/beat/core/dock.py index 37ecfd76ece247352e3c4440d1d0693303664be1..9a2706703de3c9e809b7801105eb8ea504987528 100755 --- a/beat/core/dock.py +++ b/beat/core/dock.py @@ -62,17 +62,18 @@ class Host(object): self.kwargs = kwargs self.environments = {} + self.db_environments = {} def setup(self, raise_on_errors=True): self.client = docker.Client(**self.kwargs) - self.environments = self._discover_environments(raise_on_errors) + (self.environments, self.db_environments) = self._discover_environments(raise_on_errors) def __contains__(self, key): - return key in self.environments + return (key in self.environments) or (key in self.db_environments) def __str__(self): @@ -84,13 +85,29 @@ class Host(object): attrs = self.environments[key] - if attrs['tag'] is not None: return attrs['tag'] + if attrs['tag'] is not None: + return attrs['tag'] + return attrs['short_id'] - def teardown(self): + def db2docker(self, db_names): + '''Returns a nice docker image name given a database name''' + + def _all_in(db_names, databases): + return len([ x for x in db_names if x in databases ]) == len(db_names) + + attrs = [ x for x in self.db_environments.values() if _all_in(db_names, x['databases']) ][0] - for container in self.containers: self.rm(container) + if attrs['tag'] is not None: + return attrs['tag'] + + return attrs['short_id'] + + + def teardown(self): + for container in self.containers: + self.rm(container) def __enter__(self): @@ -140,98 +157,118 @@ class Host(object): "`describe' output cannot be parsed: %s", image, str(e)) return {} - retval = {} - - for image in self.client.images(): - # call the "describe" application on each existing image with "beat" in - # its name - tag = image['RepoTags'][0] if image['RepoTags'] else None - if (tag is None) or (tag.find('beat') == -1): - continue - - id = image['Id'].split(':')[1][:12] - logger.debug("Checking image `%s' (%s)...", tag, id) - description = _describe(tag or id) - if not description: continue - - key = description['name'] + ' (' + description['version'] + ')' - - if key in retval: + def _must_replace(image_tag, environments, key): # this check avoids we do a new environment and, by mistake, override # it with a previous version or the contrary. if raise_on_errors: raise RuntimeError("Environments at `%s' and `%s' have the " \ "same name (`%s'). Distinct environments must be " \ "uniquely named. Fix this and re-start." % \ - (tag or id, retval[key]['image'], key)) - else: - new_version = None - previous_version = None + (image_tag, environments[key]['image'], key)) + + new_version = None + previous_version = None - parts = tag.split('/') + parts = image_tag.split('/') + if len(parts) > 1: + parts = parts[-1].split(':') if len(parts) > 1: - parts = parts[-1].split(':') - if len(parts) > 1: - new_version = parts[-1] + new_version = parts[-1] - parts = retval[key]['tag'].split('/') + parts = environments[key]['tag'].split('/') + if len(parts) > 1: + parts = parts[-1].split(':') if len(parts) > 1: - parts = parts[-1].split(':') - if len(parts) > 1: - previous_version = parts[-1] + previous_version = parts[-1] - replacement = False - keep = False + replacement = False + keep = False - if (new_version is not None) and (previous_version is not None): - if new_version == 'latest': - replacement = True - elif previous_version == 'latest': - keep = True - else: - try: - new_version = tuple([ int(x) for x in new_version.split('.') ]) + if (new_version is not None) and (previous_version is not None): + if new_version == 'latest': + replacement = True + elif previous_version == 'latest': + keep = True + else: + try: + new_version = tuple([ int(x) for x in new_version.split('.') ]) - try: - previous_version = tuple([ int(x) for x in previous_version.split('.') ]) + try: + previous_version = tuple([ int(x) for x in previous_version.split('.') ]) - if new_version > previous_version: - replacement = True - else: - keep = True - except: + if new_version > previous_version: replacement = True - + else: + keep = True except: - keep = True + replacement = True - elif new_version is not None: - replacement = True - elif previous_version is not None: - keep = True + except: + keep = True + + elif new_version is not None: + replacement = True + elif previous_version is not None: + keep = True + + if replacement: + logger.debug("Overriding **existing** environment `%s' in image `%s'", key, environments[key]['tag']) + elif keep: + logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, environments[key]['tag']) + return False + else: + logger.warn("Overriding **existing** environment `%s' image " \ + "with `%s'. To avoid this warning make " \ + "sure your docker images do not contain environments " \ + "with the same names", key, environments[key]['image']) + + return True + + + environments = {} + db_environments = {} + + for image in self.client.images(): + # call the "describe" application on each existing image with "beat" in + # its name + tag = image['RepoTags'][0] if image['RepoTags'] else None + if (tag is None) or (tag.find('beat') == -1): + continue + + id = image['Id'].split(':')[1][:12] + logger.debug("Checking image `%s' (%s)...", tag, id) + description = _describe(tag or id) + if not description: + continue + + key = description['name'] + ' (' + description['version'] + ')' + + if description.has_key('databases'): + if (key in db_environments) and not _must_replace(tag, db_environments, key): + continue + + db_environments[key] = description + db_environments[key]['image'] = image['Id'] + db_environments[key]['tag'] = tag + db_environments[key]['short_id'] = id + db_environments[key]['nickname'] = tag or id + else: + if (key in environments) and not _must_replace(tag, environments, key): + continue + + environments[key] = description + environments[key]['image'] = image['Id'] + environments[key]['tag'] = tag + environments[key]['short_id'] = id + environments[key]['nickname'] = tag or id - if replacement: - logger.debug("Overriding **existing** environment `%s' in image `%s'", key, retval[key]['tag']) - elif keep: - logger.debug("Environment `%s' already existing in image `%s', we'll keep it", key, retval[key]['tag']) - continue - else: - logger.warn("Overriding **existing** environment `%s' image " \ - "with `%s' (it was `%s). To avoid this warning make " \ - "sure your docker images do not contain environments " \ - "with the same names", key, retval[key]['image'], - image['Id']) - - retval[key] = description - retval[key]['image'] = image['Id'] - retval[key]['tag'] = tag - retval[key]['short_id'] = id - retval[key]['nickname'] = tag or id logger.info("Registered `%s' -> `%s (%s)'", key, tag, id) - logger.debug("Found %d environments", len(retval)) - return retval + logger.debug("Found %d environments and %d database environments", len(environments), + len(db_environments)) + + return (environments, db_environments) def put_path(self, container, src, dest='/tmp', chmod=None): diff --git a/beat/core/execution.py b/beat/core/execution.py index 9636f2b1ac578d7a01ae31140f5d3d59a63fd780..26690b8eb3013d62cd6265a91d7525f69e357f1b 100644 --- a/beat/core/execution.py +++ b/beat/core/execution.py @@ -34,6 +34,7 @@ import glob import errno import tempfile import subprocess +import zmq.green as zmq import logging logger = logging.getLogger(__name__) @@ -48,6 +49,7 @@ from . import outputs from . import data from . import stats from . import agent +from . import dock class Executor(object): @@ -181,6 +183,7 @@ class Executor(object): self.output_list = None self.data_sinks = [] self.data_sources = [] + self.db_address = None if not isinstance(data, dict): #user has passed a file pointer if not os.path.exists(data): @@ -221,24 +224,6 @@ class Executor(object): if not db.valid: self.errors += db.errors - continue - - if not db.valid: - # do not add errors again - continue - - # create and load the required views - key = (details['database'], details['protocol'], details['set']) - if key not in self.views: - view = self.databases[details['database']].view(details['protocol'], - details['set']) - - if details['channel'] == self.data['channel']: #synchronized - start_index, end_index = self.data.get('range', (None, None)) - else: - start_index, end_index = (None, None) - view.prepare_outputs() - self.views[key] = (view, start_index, end_index) def __enter__(self): @@ -250,23 +235,17 @@ class Executor(object): """ + if len(self.databases) > 0: + host = dock.Host() + self.context = zmq.Context() + self.db_socket = self.context.socket(zmq.PAIR) + self.db_address = 'tcp://' + host.ip + port = self.db_socket.bind_to_random_port(self.db_address) + self.db_address += ':%d' % port + self._prepare_inputs() self._prepare_outputs() - # The setup() of a database view may call isConnected() on an input - # to set the index at the right location when parallelization is enabled. - # This is why setup() should be called after initialized the inputs. - for key, (view, start_index, end_index) in self.views.items(): - - if (start_index is None) and (end_index is None): - status = view.setup() - else: - status = view.setup(force_start_index=start_index, - force_end_index=end_index) - - if not status: - raise RuntimeError("Could not setup database view `%s'" % key) - self.agent = None return self @@ -302,19 +281,23 @@ class Executor(object): if 'database' in details: #it is a dataset input - view_key = (details['database'], details['protocol'], details['set']) - view = self.views[view_key][0] + # create the remote input + db = self.databases[details['database']] - data_source = data.MemoryDataSource(view.done, next_callback=view.next) - self.data_sources.append(data_source) - output = view.outputs[details['output']] + dataformat_name = db.set(details['protocol'], details['set'])['outputs'][details['output']] + input = inputs.RemoteInput(name, db.dataformats[dataformat_name], self.db_socket) - # if it's a synchronized channel, makes the output start at the right - # index, otherwise, it gets lost - if start_index is not None and \ - details['channel'] == self.data['channel']: - output.last_written_data_index = start_index - 1 - output.data_sink.data_sources.append(data_source) + # Synchronization bits + group = self.input_list.group(details['channel']) + if group is None: + group = inputs.RemoteInputGroup( + details['channel'], + restricted_access=(details['channel'] == self.data['channel']), + socket=self.db_socket + ) + self.input_list.add(group) + + group.add(input) else: @@ -322,30 +305,33 @@ class Executor(object): self.data_sources.append(data_source) if details['channel'] == self.data['channel']: #synchronized status = data_source.setup( - filename=os.path.join(self.cache, details['path'] + '.data'), - prefix=self.prefix, - force_start_index=start_index, - force_end_index=end_index, + filename=os.path.join(self.cache, details['path'] + '.data'), + prefix=self.prefix, + force_start_index=start_index, + force_end_index=end_index, ) else: status = data_source.setup( - filename=os.path.join(self.cache, details['path'] + '.data'), - prefix=self.prefix, + filename=os.path.join(self.cache, details['path'] + '.data'), + prefix=self.prefix, ) if not status: raise IOError("cannot load cache file `%s'" % details['path']) - # Synchronization bits - group = self.input_list.group(details['channel']) - if group is None: - group = inputs.InputGroup( - details['channel'], - synchronization_listener=outputs.SynchronizationListener(), - restricted_access=(details['channel'] == self.data['channel']) - ) - self.input_list.add(group) - group.add(inputs.Input(name, self.algorithm.input_map[name], data_source)) + input = inputs.Input(name, self.algorithm.input_map[name], data_source) + + # Synchronization bits + group = self.input_list.group(details['channel']) + if group is None: + group = inputs.InputGroup( + details['channel'], + synchronization_listener=outputs.SynchronizationListener(), + restricted_access=(details['channel'] == self.data['channel']) + ) + self.input_list.add(group) + + group.add(input) def _prepare_outputs(self): @@ -383,7 +369,7 @@ class Executor(object): raise IOError("cannot create cache sink `%s'" % details['path']) input_group = self.input_list.group(details['channel']) - if input_group is None: + if (input_group is None) or not hasattr(input_group, 'synchronization_listener'): synchronization_listener = None else: synchronization_listener = input_group.synchronization_listener @@ -490,7 +476,7 @@ class Executor(object): #synchronous call - always returns after a certain timeout retval = runner.run(self, host, timeout_in_minutes=timeout_in_minutes, - daemon=daemon) + daemon=daemon, db_address=self.db_address) #adds I/O statistics from the current executor, if its complete already #otherwise, it means the running process went bananas, ignore it ;-) @@ -579,7 +565,7 @@ class Executor(object): data['channel'] = self.data['channel'] with open(os.path.join(directory, 'configuration.json'), 'wb') as f: - simplejson.dump(data, f, indent=2) + simplejson.dump(data, f, indent=2) tmp_prefix = os.path.join(directory, 'prefix') if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix) @@ -587,6 +573,19 @@ class Executor(object): self.algorithm.export(tmp_prefix) + def dump_databases_provider_configuration(self, directory): + """Exports contents useful for a backend runner to run the algorithm""" + + with open(os.path.join(directory, 'configuration.json'), 'wb') as f: + simplejson.dump(self.data, f, indent=2) + + tmp_prefix = os.path.join(directory, 'prefix') + if not os.path.exists(tmp_prefix): os.makedirs(tmp_prefix) + + for db in self.databases.values(): + db.export(tmp_prefix) + + def kill(self): """Stops the user process by force - to be called from signal handlers""" diff --git a/beat/core/hash.py b/beat/core/hash.py index 40b074f302334280d13c83aeaede030d88db587e..a7ae1d1b33e0ab3832063625fae7ae1f27697605 100644 --- a/beat/core/hash.py +++ b/beat/core/hash.py @@ -31,48 +31,20 @@ import os import six -import copy import hashlib import collections import simplejson - -def _sha256(s): - """A python2/3 replacement for :py:func:`haslib.sha256`""" - - try: - if isinstance(s, str): s = six.u(s) - return hashlib.sha256(s.encode('utf8')).hexdigest() - except: - return hashlib.sha256(s).hexdigest() +from beat.backend.python.hash import * +from beat.backend.python.hash import _sha256 +from beat.backend.python.hash import _stringify def _compact(text): return text.replace(' ', '').replace('\n', '') -def _stringify(dictionary): - names = sorted(dictionary.keys()) - - converted_dictionary = '{' - for name in names: - converted_dictionary += '"%s":%s,' % (name, str(dictionary[name])) - - if len(converted_dictionary) > 1: - converted_dictionary = converted_dictionary[:-1] - - converted_dictionary += '}' - - return converted_dictionary - - -def hash(dictionary_or_string): - if isinstance(dictionary_or_string, dict): - return _sha256(_stringify(dictionary_or_string)) - else: - return _sha256(dictionary_or_string) - def hashDatasetOutput(database_hash, protocol_name, set_name, output_name): s = _compact("""{ @@ -84,6 +56,7 @@ def hashDatasetOutput(database_hash, protocol_name, set_name, output_name): return hash(s) + def hashBlockOutput(block_name, algorithm_name, algorithm_hash, parameters, environment, input_hashes, output_name): # Note: 'block_name' and 'algorithm_name' aren't used to compute the hash, @@ -100,6 +73,7 @@ def hashBlockOutput(block_name, algorithm_name, algorithm_hash, return hash(s) + def hashAnalyzer(analyzer_name, algorithm_name, algorithm_hash, parameters, environment, input_hashes): # Note: 'analyzer_name' isn't used to compute the hash, but are useful when @@ -115,26 +89,11 @@ def hashAnalyzer(analyzer_name, algorithm_name, algorithm_hash, return hash(s) + def toPath(hash, suffix='.data'): return os.path.join(hash[0:2], hash[2:4], hash[4:6], hash[6:] + suffix) -def toUserPath(username): - hash = _sha256(username) - return os.path.join(hash[0:2], hash[2:4], username) - - -def hashJSON(contents, description): - """Hashes the pre-loaded JSON object using :py:func:`hashlib.sha256` - - Excludes description changes - """ - - if description in contents: - contents = copy.deepcopy(contents) #temporary copy - del contents[description] - contents = simplejson.dumps(contents, sort_keys=True) - return hashlib.sha256(contents).hexdigest() def hashJSONStr(contents, description): """Hashes the JSON string contents using :py:func:`hashlib.sha256` @@ -148,23 +107,3 @@ def hashJSONStr(contents, description): except simplejson.JSONDecodeError: # falls back to normal file content hashing return hash(contents) - -def hashJSONFile(path, description): - """Hashes the JSON file contents using :py:func:`hashlib.sha256` - - Excludes description changes - """ - - try: - with open(path, 'rb') as f: - return hashJSON(simplejson.load(f, - object_pairs_hook=collections.OrderedDict), description) #preserve order - except simplejson.JSONDecodeError: - # falls back to normal file content hashing - return hashFileContents(path) - -def hashFileContents(path): - """Hashes the file contents using :py:func:`hashlib.sha256`.""" - - with open(path, 'rb') as f: - return hashlib.sha256(f.read()).hexdigest() diff --git a/beat/core/inputs.py b/beat/core/inputs.py index 328650db0a9282b0d2b6b72c3407d404ee22e644..50b0d118c93c5834eb4dda0fe64a4f5d2bbb1791 100644 --- a/beat/core/inputs.py +++ b/beat/core/inputs.py @@ -26,210 +26,8 @@ ############################################################################### -from functools import reduce - -import six - from beat.backend.python.inputs import InputList - - -class Input: - """Represents the input of a processing block - - A list of those inputs must be provided to the algorithms (see - :py:class:`beat.backend.python.inputs.InputList`) - - - Parameters: - - name (str): Name of the input - - data_format (str): Data format accepted by the input - - data_source (beat.core.platform.data.DataSource): Source of data to be used - by the input - - - Attributes: - - group (beat.core.inputs.InputGroup): Group containing this input - - name (str): Name of the input (algorithm-specific) - - data (beat.core.baseformat.baseformat): The last block of data received on - the input - - data_index (int): Index of the last block of data received on the input - (see the section *Inputs synchronization* of the User's Guide) - - data_index_end (int): End index of the last block of data received on the - input (see the section *Inputs synchronization* of the User's Guide) - - data_format (str): Data format accepted by the input - - data_source (beat.core.data.DataSource): Source of data used by the output - - nb_data_blocks_read (int): Number of data blocks read so far - - """ - - def __init__(self, name, data_format, data_source): - - self.group = None - self.name = name - self.data = None - self.data_index = -1 - self.data_index_end = -1 - self.data_same_as_previous = False - self.data_format = data_format - self.data_source = data_source - self.nb_data_blocks_read = 0 - - def isDataUnitDone(self): - """Indicates if the current data unit will change at the next iteration""" - - return (self.data_index_end == self.group.data_index_end) - - def hasMoreData(self): - """Indicates if there is more data to process on the input""" - - return self.data_source.hasMoreData() - - def next(self): - """Retrieves the next block of data""" - - (self.data, self.data_index, self.data_index_end) = self.data_source.next() - self.data_same_as_previous = False - self.nb_data_blocks_read += 1 - - -class InputGroup: - """Represents a group of inputs synchronized together - - A group implementing this interface is provided to the algorithms (see - :py:class:`beat.backend.python.inputs.InputList`). - - See :py:class:`beat.core.inputs.Input` - - Example: - - .. code-block:: python - - inputs = InputList() - - print inputs['labels'].data_format - - for index in range(0, len(inputs)): - print inputs[index].data_format - - for input in inputs: - print input.data_format - - for input in inputs[0:2]: - print input.data_format - - - Parameters: - - channel (str): Name of the data channel of the group - - synchronization_listener (beat.core.outputs.SynchronizationListener): - Synchronization listener to use - - restricted_access (bool): Indicates if the algorithm can freely use the - inputs - - - Atttributes: - - data_index (int): Index of the last block of data received on the inputs - (see the section *Inputs synchronization* of the User's Guide) - - data_index_end (int): End index of the last block of data received on the - inputs (see the section *Inputs synchronization* of the User's Guide) - - channel (str): Name of the data channel of the group - - synchronization_listener (beat.core.outputs.SynchronizationListener): - Synchronization listener used - - """ - - def __init__(self, channel, synchronization_listener=None, - restricted_access=True): - - self._inputs = [] - self.data_index = -1 - self.data_index_end = -1 - self.channel = channel - self.synchronization_listener = synchronization_listener - self.restricted_access = restricted_access - - def __getitem__(self, index): - - if isinstance(index, six.string_types): - try: - return [x for x in self._inputs if x.name == index][0] - except: - pass - elif isinstance(index, int): - if index < len(self._inputs): - return self._inputs[index] - return None - - def __iter__(self): - - for k in self._inputs: yield k - - def __len__(self): - - return len(self._inputs) - - def add(self, input): - """Add an input to the group - - Parameters: - - input (beat.core.inputs.Input): The input to add - - """ - - input.group = self - self._inputs.append(input) - - def hasMoreData(self): - """Indicates if there is more data to process in the group""" - - return bool([x for x in self._inputs if x.hasMoreData()]) - - def next(self): - """Retrieve the next block of data on all the inputs""" - - # Only for groups not managed by the platform - if self.restricted_access: raise RuntimeError('Not authorized') - - # Only retrieve new data on the inputs where the current data expire first - lower_end_index = reduce(lambda x, y: min(x, y.data_index_end), - self._inputs[1:], self._inputs[0].data_index_end) - inputs_to_update = [x for x in self._inputs \ - if x.data_index_end == lower_end_index] - inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update] - - for input in inputs_to_update: - input.next() - input.data_same_as_previous = False - - for input in inputs_up_to_date: - input.data_same_as_previous = True - - - # Compute the group's start and end indices - self.data_index = reduce(lambda x, y: max(x, y.data_index), - self._inputs[1:], self._inputs[0].data_index) - self.data_index_end = reduce(lambda x, y: min(x, y.data_index_end), - self._inputs[1:], self._inputs[0].data_index_end) - - # Inform the synchronisation listener - if self.synchronization_listener is not None: - self.synchronization_listener.onIntervalChanged(self.data_index, - self.data_index_end) +from beat.backend.python.inputs import Input +from beat.backend.python.inputs import InputGroup +from beat.backend.python.inputs import RemoteInput +from beat.backend.python.inputs import RemoteInputGroup diff --git a/beat/core/library.py b/beat/core/library.py index cc6fdaeaedecc28d3396b81ebeaf0538fd3738cb..60296d78e97b2f9c685f718d8748074a9b1eab66 100644 --- a/beat/core/library.py +++ b/beat/core/library.py @@ -39,32 +39,12 @@ from . import prototypes from . import loader from . import utils -class Storage(utils.CodeStorage): - """Resolves paths for libraries +from beat.backend.python.library import Storage +from beat.backend.python.library import Library as BackendLibrary - Parameters: - - prefix (str): Establishes the prefix of your installation. - - name (str): The name of the library object in the format - ``<user>/<name>/<version>``. - - """ - def __init__(self, prefix, name, language=None): - if name.count('/') != 2: - raise RuntimeError("invalid library name: `%s'" % name) - - self.username, self.name, self.version = name.split('/') - self.prefix = prefix - self.fullname = name - - path = utils.hashed_or_simple(self.prefix, 'libraries', name) - super(Storage, self).__init__(path, language) - - -class Library(object): +class Library(BackendLibrary): """Librarys represent independent algorithm components within the platform. This class can only parse the meta-parameters of the library. The actual @@ -121,20 +101,8 @@ class Library(object): """ def __init__(self, prefix, data, library_cache=None): + super(Library, self).__init__(prefix, data, library_cache) - self._name = None - self.storage = None - self.prefix = prefix - - self.libraries = {} - - library_cache = library_cache if library_cache is not None else {} - - try: - self._load(data, library_cache) - finally: - if self._name is not None: #registers it into the cache, even if failed - library_cache[self._name] = self def _load(self, data, library_cache): """Loads the library""" @@ -251,142 +219,6 @@ class Library(object): (library, self.libraries[library].language, self.language)) - def uses_dict(self): - """Returns the usage dictionary for all dependent modules""" - - if self.data['language'] == 'unknown': - raise RuntimeError("library has no programming language set") - - if not self._name: - raise RuntimeError("library has no name") - - retval = {} - - if self.uses is not None: - - for name, value in self.uses.items(): - retval[name] = dict( - path=self.libraries[value].storage.code.path, - uses=self.libraries[value].uses_dict(), - ) - - return retval - - - def load(self): - """Loads the Python module for this library resolving all references - - Returns the loaded Python module. - """ - - if self.data['language'] == 'unknown': - raise RuntimeError("library has no programming language set") - - if not self._name: - raise RuntimeError("library has no name") - - return loader.load_module(self.name.replace(os.sep, '_'), - self.storage.code.path, self.uses_dict()) - - @property - def name(self): - """Returns the name of this object - """ - return self._name or '__unnamed_library__' - - - @property - def schema_version(self): - """Returns the schema version""" - return self.data.get('schema_version', 1) - - - @name.setter - def name(self, value): - - if self.data['language'] == 'unknown': - raise RuntimeError("library has no programming language set") - - self._name = value - self.storage = Storage(self.prefix, value, self.data['language']) - - - @property - def language(self): - """Returns the current language set for the library code""" - return self.data['language'] - - - @language.setter - def language(self, value): - """Sets the current executable code programming language""" - if self.storage: self.storage.language = value - self.data['language'] = value - self._check_language_consistence() - - - @property - def valid(self): - """A boolean that indicates if this library is valid or not""" - - return not bool(self.errors) - - - @property - def uses(self): - return self.data.get('uses') - - @uses.setter - def uses(self, value): - self.data['uses'] = value - return value - - - @property - def description(self): - """The short description for this object""" - return self.data.get('description', None) - - @description.setter - def description(self, value): - """Sets the short description for this object""" - self.data['description'] = value - - - @property - def documentation(self): - """The full-length description for this object""" - - if not self._name: - raise RuntimeError("library has no name") - - if self.storage.doc.exists(): - return self.storage.doc.load() - return None - - - @documentation.setter - def documentation(self, value): - """Sets the full-length description for this object""" - - if not self._name: - raise RuntimeError("library has no name") - - if hasattr(value, 'read'): - self.storage.doc.save(value.read()) - else: - self.storage.doc.save(value) - - - def hash(self): - """Returns the hexadecimal hash for the current library""" - - if not self._name: - raise RuntimeError("library has no name") - - return self.storage.hash() - - def json_dumps(self, indent=4): """Dumps the JSON declaration of this object in a string diff --git a/beat/core/outputs.py b/beat/core/outputs.py index adfb4416b1750b373986dab568f860e889733418..3f4762edff0f3441330c4c2e5dbb51811ff776bb 100644 --- a/beat/core/outputs.py +++ b/beat/core/outputs.py @@ -26,130 +26,7 @@ ############################################################################### +from beat.backend.python.outputs import SynchronizationListener +from beat.backend.python.outputs import Output +from beat.backend.python.outputs import RemoteOutput from beat.backend.python.outputs import OutputList - - -class SynchronizationListener: - """A callback mechanism to keep Inputs and Outputs in groups and lists - synchronized together.""" - - def __init__(self): - self.data_index_start = -1 - self.data_index_end = -1 - - def onIntervalChanged(self, data_index_start, data_index_end): - self.data_index_start = data_index_start - self.data_index_end = data_index_end - - -class Output: - """Represents one output of a processing block - - A list of outputs implementing this interface is provided to the algorithms - (see :py:class:`beat.core.outputs.OutputList`). - - - Parameters: - - name (str): Name of the output - - data_sink (beat.core.data.DataSink): Sink of data to be used by the output, - pre-configured with the correct data format. - - - Attributes: - - name (str): Name of the output (algorithm-specific) - - data_sink (beat.core.data.DataSink): Sink of data used by the output - - last_written_data_index (int): Index of the last block of data written by - the output - - nb_data_blocks_written (int): Number of data blocks written so far - - - """ - - def __init__(self, name, data_sink, synchronization_listener=None, - dataset_output=False, force_start_index=0): - - self.name = name - self.data_sink = data_sink - self._synchronization_listener = synchronization_listener - self._dataset_output = dataset_output - self.last_written_data_index = force_start_index-1 - self.nb_data_blocks_written = 0 - - - def _createData(self): - """Retrieves an uninitialized block of data corresponding to the data - format of the output - - This method must be called to correctly create a new block of data - """ - - if hasattr(self.data_sink, 'dataformat'): - return self.data_sink.dataformat.type() - else: - raise RuntimeError("The currently used data sink is not bound to " \ - "a dataformat - you cannot create uninitialized data under " \ - "these circumstances") - - - def write(self, data, end_data_index=None): - """Write a block of data on the output - - Parameters: - - data (beat.core.baseformat.baseformat): The block of data to write, or - None (if the algorithm doesn't want to write any data) - - end_data_index (int): Last index of the written data (see the section - *Inputs synchronization* of the User's Guide). If not specified, the - *current end data index* of the Inputs List is used - - """ - - if self._dataset_output: - if end_data_index is None: - end_data_index = self.last_written_data_index + 1 - elif end_data_index < self.last_written_data_index + 1: - raise KeyError("Database wants to write an `end_data_index' (%d) " \ - "which is smaller than the last written index (%d) " \ - "+1 - this is a database bug - Fix it!" % \ - (end_data_index, self.last_written_data_index)) - - elif end_data_index is not None: - if (end_data_index < self.last_written_data_index + 1) or \ - ((self._synchronization_listener is not None) and \ - (end_data_index > self._synchronization_listener.data_index_end)): - raise KeyError("Algorithm logic error on write(): `end_data_index' " \ - "is not consistent with last written index") - - elif self._synchronization_listener is not None: - end_data_index = self._synchronization_listener.data_index_end - - else: - end_data_index = self.last_written_data_index + 1 - - # if the user passes a dictionary, converts to the proper baseformat type - if isinstance(data, dict): - d = self.data_sink.dataformat.type() - d.from_dict(data, casting='safe', add_defaults=False) - data = d - - self.data_sink.write(data, self.last_written_data_index + 1, end_data_index) - - self.last_written_data_index = end_data_index - self.nb_data_blocks_written += 1 - - def isDataMissing(self): - - return not(self._dataset_output) and \ - (self._synchronization_listener is not None) and \ - (self._synchronization_listener.data_index_end != self.last_written_data_index) - - def isConnected(self): - - return self.data_sink.isConnected() diff --git a/beat/core/scripts/__init__.py b/beat/core/scripts/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/beat/core/scripts/databases_provider.py b/beat/core/scripts/databases_provider.py new file mode 100644 index 0000000000000000000000000000000000000000..64a1a67d2e088c090e3e3f6ef6df0fe063d2e748 --- /dev/null +++ b/beat/core/scripts/databases_provider.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2017 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +"""Executes some database views. (%(version)s) + +usage: + %(prog)s [--debug] <addr> <dir> + %(prog)s (--help) + %(prog)s (--version) + + +arguments: + <addr> Address of the server for I/O requests + <dir> Directory containing all configuration required to run the views + + +options: + -h, --help Shows this help message and exit + -V, --version Shows program's version number and exit + -d, --debug Runs executor in debugging mode + +""" + +import logging + +import os +import sys +import docopt + +import zmq + +from ..dbexecution import DBExecutor + + +class UserError(Exception): + + def __init__(self, value): + self.value = value + + def __str__(self): + return repr(self.value) + + +def send_error(logger, socket, tp, message): + """Sends a user (usr) or system (sys) error message to the infrastructure""" + + logger.debug('send: (err) error') + socket.send('err', zmq.SNDMORE) + socket.send(tp, zmq.SNDMORE) + logger.debug('send: """%s"""' % message.rstrip()) + socket.send(message) + + poller = zmq.Poller() + poller.register(socket, zmq.POLLIN) + + this_try = 1 + max_tries = 5 + timeout = 1000 #ms + while this_try <= max_tries: + socks = dict(poller.poll(timeout)) #blocks here, for 5 seconds at most + if socket in socks and socks[socket] == zmq.POLLIN: + answer = socket.recv() #ack + logger.debug('recv: %s', answer) + break + logger.warn('(try %d) waited %d ms for "ack" from server', + this_try, timeout) + this_try += 1 + if this_try > max_tries: + logger.error('could not send error message to server') + logger.error('stopping 0MQ client anyway') + + +def main(): + package = __name__.rsplit('.', 2)[0] + version = package + ' v' + \ + __import__('pkg_resources').require(package)[0].version + prog = os.path.basename(sys.argv[0]) + + args = docopt.docopt(__doc__ % dict(prog=prog, version=version), + version=version) + + # Sets up the logging system + if args['--debug']: + logging.basicConfig(format='[remote|%(name)s] %(levelname)s: %(message)s', + level=logging.DEBUG) + else: + logging.basicConfig(format='[remote|%(name)s] %(levelname)s: %(message)s', + level=logging.WARNING) + + logger = logging.getLogger(__name__) + + # Creates the 0MQ socket for communication with BEAT + context = zmq.Context() + socket = context.socket(zmq.PAIR) + address = args['<addr>'] + socket.connect(address) + logger.debug("zmq client connected to `%s'", address) + + try: + + # Check the dir + if not os.path.exists(args['<dir>']): + raise IOError("Running directory `%s' not found" % args['<dir>']) + + # Sets up the execution + dataformat_cache = {} + database_cache = {} + + try: + dbexecutor = DBExecutor(os.path.join(args['<dir>'], 'prefix'), + os.path.join(args['<dir>'], 'configuration.json'), + dataformat_cache, database_cache) + except (MemoryError): + raise + except Exception as e: + import traceback + exc_type, exc_value, exc_traceback = sys.exc_info() + tb = traceback.extract_tb(exc_traceback) + s = ''.join(traceback.format_list(tb[4:])) #exclude this file + s = s.replace(args['<dir>'], '').strip() + raise UserError("%s%s: %s" % (s, type(e).__name__, e)) + + # Execute the code + try: + with dbexecutor: + dbexecutor.process(context, socket) + dbexecutor.wait() + except (MemoryError): + raise + except Exception as e: + import traceback + exc_type, exc_value, exc_traceback = sys.exc_info() + tb = traceback.extract_tb(exc_traceback) + s = ''.join(traceback.format_list(tb[4:])) + s = s.replace(args['<dir>'], '').strip() + raise UserError("%s%s: %s" % (s, type(e).__name__, e)) + + except UserError as e: + msg = str(e).decode('string_escape').strip("'") + send_error(logger, socket, 'usr', msg) + return 1 + + except MemoryError as e: + # Say something meaningful to the user + msg = "The user process for this block ran out of memory. We " \ + "suggest you optimise your code to reduce memory usage or, " \ + "if this is not an option, choose an appropriate processing " \ + "queue with enough memory." + send_error(logger, socket, 'usr', msg) + return 1 + + except Exception as e: + import traceback + send_error(logger, socket, 'sys', traceback.format_exc()) + return 1 + + finally: + socket.setsockopt(zmq.LINGER, 0) + socket.close() + context.term() + logger.debug("0MQ client finished") + + return 0 + + +if __name__ == '__main__': + sys.exit(main()) diff --git a/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json b/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json index 1f3750601973152ab3c47a5f3a68be67d83efad1..9f9c2c8b37047b90f97e16ec4fc050cc07ff6f85 100644 --- a/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json +++ b/beat/core/test/prefix/experiments/user/user/double/1/cxx_double.json @@ -17,8 +17,8 @@ "out_data": "out" }, "environment": { - "name": "cxx_environment", - "version": "1" + "name": "Cxx backend", + "version": "1.0.0" } }, "echo2": { @@ -30,8 +30,8 @@ "out_data": "out" }, "environment": { - "name": "cxx_environment", - "version": "1" + "name": "Cxx backend", + "version": "1.0.0" } } }, diff --git a/beat/core/test/test_dbexecution.py b/beat/core/test/test_dbexecution.py new file mode 100644 index 0000000000000000000000000000000000000000..fd107925d53a5e3aadd2db07ade8bceb1ebca40d --- /dev/null +++ b/beat/core/test/test_dbexecution.py @@ -0,0 +1,163 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +# Tests for experiment execution + +import os +import logging +logger = logging.getLogger(__name__) + +# in case you want to see the printouts dynamically, set to ``True`` +if False: + logger = logging.getLogger() #root logger + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) + logger.addHandler(ch) + +import unittest +import zmq.green as zmq + +from ..dbexecution import DBExecutor +from ..inputs import RemoteInput +from ..inputs import RemoteInputGroup +from ..database import Database + +from . import prefix + + +CONFIGURATION = { + 'queue': 'queue', + 'inputs': { + 'a': { + 'set': 'double', + 'protocol': 'double', + 'database': 'integers_db/1', + 'output': 'a', + 'path': 'ec/89/e5/6e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55', + 'endpoint': 'a', + 'hash': 'ec89e56e161d2cb012ef6ac8acf59bf453a6328766f90dc9baba9eb14ea23c55', + 'channel': 'integers' + }, + 'b': { + 'set': 'double', + 'protocol': 'double', + 'database': 'integers_db/1', + 'output': 'b', + 'path': '6f/b6/66/68e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8', + 'endpoint': 'b', + 'hash': '6fb66668e68476cb24be80fc3cb99f6cc8daa822cd86fb8108ce7476bc261fb8', + 'channel': 'integers' + } + }, + 'algorithm': 'user/sum/1', + 'parameters': {}, + 'environment': { + 'name': 'environment', + 'version': '1' + }, + 'outputs': { + 'sum': { + 'path': '20/61/b6/2df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681', + 'endpoint': 'sum', + 'hash': '2061b62df3c3bedd5366f4a625c5d87ffbf5a26007c46c456e9abf21b46c6681', + 'channel': 'integers' + } + }, + 'nb_slots': 1, + 'channel': 'integers' +} + + +class HostSide(object): + + def __init__(self, zmq_context): + + # 0MQ server + self.socket = zmq_context.socket(zmq.PAIR) + self.address = 'tcp://127.0.0.1' + port = self.socket.bind_to_random_port(self.address) + self.address += ':%d' % port + + database = Database(prefix, 'integers_db/1') + + # Creation of the inputs + input_a_conf = CONFIGURATION['inputs']['a'] + dataformat_name_a = database.set(input_a_conf['protocol'], input_a_conf['set'])['outputs']['a'] + self.input_a = RemoteInput('a', database.dataformats[dataformat_name_a], self.socket) + + input_b_conf = CONFIGURATION['inputs']['b'] + dataformat_name_b = database.set(input_b_conf['protocol'], input_b_conf['set'])['outputs']['b'] + self.input_b = RemoteInput('b', database.dataformats[dataformat_name_b], self.socket) + + self.group = RemoteInputGroup('integers', False, self.socket) + self.group.add(self.input_a) + self.group.add(self.input_b) + + + +class ContainerSide(object): + + def __init__(self, zmq_context, address): + + dataformat_cache = {} + database_cache = {} + + self.dbexecutor = DBExecutor(prefix, CONFIGURATION, + dataformat_cache, database_cache) + assert self.dbexecutor.valid, '\n * %s' % '\n * '.join(self.dbexecutor.errors) + + self.socket = zmq_context.socket(zmq.PAIR) + self.socket.connect(address) + + with self.dbexecutor: + self.dbexecutor.process(zmq_context, self.socket) + + + def wait(self): + self.dbexecutor.wait() + + + +class TestExecution(unittest.TestCase): + + def test_success(self): + + context = zmq.Context() + + host = HostSide(context) + container = ContainerSide(context, host.address) + + while host.group.hasMoreData(): + host.group.next() + + host.socket.send('don') + + container.wait() + diff --git a/beat/core/test/test_message_handler.py b/beat/core/test/test_message_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..babeae4fcfeb95f75c0f112c737de3a9de565a86 --- /dev/null +++ b/beat/core/test/test_message_handler.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# vim: set fileencoding=utf-8 : + +############################################################################### +# # +# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ # +# Contact: beat.support@idiap.ch # +# # +# This file is part of the beat.core module of the BEAT platform. # +# # +# Commercial License Usage # +# Licensees holding valid commercial BEAT licenses may use this file in # +# accordance with the terms contained in a written agreement between you # +# and Idiap. For further information contact tto@idiap.ch # +# # +# Alternatively, this file may be used under the terms of the GNU Affero # +# Public License version 3 as published by the Free Software and appearing # +# in the file LICENSE.AGPL included in the packaging of this file. # +# The BEAT platform is distributed in the hope that it will be useful, but # +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY # +# or FITNESS FOR A PARTICULAR PURPOSE. # +# # +# You should have received a copy of the GNU Affero Public License along # +# with the BEAT platform. If not, see http://www.gnu.org/licenses/. # +# # +############################################################################### + + +# Tests for experiment execution + +import os +import logging +logger = logging.getLogger(__name__) + +# in case you want to see the printouts dynamically, set to ``True`` +if False: + logger = logging.getLogger() #root logger + logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + ch.setFormatter(logging.Formatter('%(levelname)s: %(message)s')) + logger.addHandler(ch) + +import unittest +import zmq.green as zmq +import nose.tools + +from ..agent import MessageHandler +from ..dataformat import DataFormat +from ..inputs import RemoteInput +from ..inputs import RemoteInputGroup +from ..inputs import Input +from ..inputs import InputGroup +from ..inputs import InputList + +from .mocks import MockDataSource + +from . import prefix + + + +class TestMessageHandler(unittest.TestCase): + + def setUp(self): + dataformat = DataFormat(prefix, 'user/single_integer/1') + + data_source_a = MockDataSource([ + dataformat.type(value=10), + dataformat.type(value=20), + ], + [ + (0, 0), + (1, 1), + ] + ) + + input_a = Input('a', 'user/single_integer/1', data_source_a) + + data_source_b = MockDataSource([ + dataformat.type(value=100), + dataformat.type(value=200), + ], + [ + (0, 0), + (1, 1), + ] + ) + + input_b = Input('b', 'user/single_integer/1', data_source_b) + + group = InputGroup('channel') + group.add(input_a) + group.add(input_b) + + self.input_list = InputList() + self.input_list.add(group) + + + self.server_context = zmq.Context() + server_socket = self.server_context.socket(zmq.PAIR) + address = 'tcp://127.0.0.1' + port = server_socket.bind_to_random_port(address) + address += ':%d' % port + + self.message_handler = MessageHandler(self.input_list, self.server_context, server_socket) + + + self.client_context = zmq.Context() + client_socket = self.client_context.socket(zmq.PAIR) + client_socket.connect(address) + + self.remote_input_a = RemoteInput('a', dataformat, client_socket) + self.remote_input_b = RemoteInput('b', dataformat, client_socket) + + self.remote_group = RemoteInputGroup('channel', False, client_socket) + self.remote_group.add(self.remote_input_a) + self.remote_group.add(self.remote_input_b) + + self.remote_input_list = InputList() + self.remote_input_list.add(self.remote_group) + + self.message_handler.start() + + + def test_input_has_more_data(self): + assert self.remote_input_a.hasMoreData() + + + def test_input_next(self): + self.remote_input_a.next() + nose.tools.eq_(self.remote_input_a.data.value, 10) + + + def test_input_full_cycle(self): + assert self.remote_input_a.hasMoreData() + self.remote_input_a.next() + nose.tools.eq_(self.remote_input_a.data.value, 10) + + assert self.remote_input_a.hasMoreData() + self.remote_input_a.next() + nose.tools.eq_(self.remote_input_a.data.value, 20) + + assert not self.remote_input_a.hasMoreData() + + + def test_group_has_more_data(self): + assert self.remote_group.hasMoreData() + + + def test_group_next(self): + self.remote_group.next() + nose.tools.eq_(self.remote_input_a.data.value, 10) + nose.tools.eq_(self.remote_input_b.data.value, 100) + + + def test_group_full_cycle(self): + assert self.remote_group.hasMoreData() + self.remote_group.next() + nose.tools.eq_(self.remote_input_a.data.value, 10) + nose.tools.eq_(self.remote_input_b.data.value, 100) + + assert self.remote_group.hasMoreData() + self.remote_group.next() + nose.tools.eq_(self.remote_input_a.data.value, 20) + nose.tools.eq_(self.remote_input_b.data.value, 200) + + assert not self.remote_group.hasMoreData() diff --git a/beat/core/utils.py b/beat/core/utils.py index 0811e739aceb009bdd83f08f34dff39e7b124c02..b0de646f56d13d6d8f65dc46bf1757e12286343b 100644 --- a/beat/core/utils.py +++ b/beat/core/utils.py @@ -27,14 +27,12 @@ import os -import shutil import tempfile -import collections import numpy import simplejson -import six +from beat.backend.python.utils import * from . import hash @@ -45,60 +43,6 @@ def temporary_directory(prefix='beat_'): return tempfile.mkdtemp(prefix=prefix) -def hashed_or_simple(prefix, what, path, suffix='.json'): - """Returns a hashed path or simple path depending on where the resource is""" - - username, right_bit = path.split('/', 1) - hashed_prefix = hash.toUserPath(username) - candidate = os.path.join(prefix, what, hashed_prefix, right_bit) - if os.path.exists(candidate + suffix): return candidate - return os.path.join(prefix, what, path) - - -def safe_rmfile(f): - """Safely removes a file from the disk""" - - if os.path.exists(f): os.unlink(f) - - -def safe_rmdir(f): - """Safely removes the directory containg a given file from the disk""" - - d = os.path.dirname(f) - if not os.path.exists(d): return - if not os.listdir(d): os.rmdir(d) - - -def extension_for_language(language): - """Returns the preferred extension for a given programming language - - The set of languages supported must match those declared in our - ``common.json`` schema. - - Parameters: - - language (str) The language for which you'd like to get the extension for. - - - Returns: - - str: The extension for the given language, including a leading ``.`` (dot) - - - Raises: - - KeyError: If the language is not defined in our internal dictionary. - - """ - - return dict( - unknown = '', - cxx = '.so', - matlab = '.m', - python = '.py', - r = '.r', - )[language] - class NumpyJSONEncoder(simplejson.JSONEncoder): """Encodes numpy arrays and scalars @@ -118,59 +62,6 @@ class NumpyJSONEncoder(simplejson.JSONEncoder): return simplejson.JSONEncoder.default(self, obj) -class File(object): - """User helper to read and write file objects""" - - - def __init__(self, path, binary=False): - - self.path = path - self.binary = binary - - - def exists(self): - - return os.path.exists(self.path) - - - def load(self): - - mode = 'rb' if self.binary else 'rt' - with open(self.path, mode) as f: return f.read() - - - def try_load(self): - - if os.path.exists(self.path): - return self.load() - return None - - - def backup(self): - - if not os.path.exists(self.path): return #no point in backing-up - backup = self.path + '~' - if os.path.exists(backup): os.remove(backup) - shutil.copy(self.path, backup) - - - def save(self, contents): - - d = os.path.dirname(self.path) - if not os.path.exists(d): os.makedirs(d) - - if os.path.exists(self.path): self.backup() - - mode = 'wb' if self.binary else 'wt' - with open(self.path, mode) as f: f.write(contents) - - - def remove(self): - - safe_rmfile(self.path) - safe_rmfile(self.path + '~') #backup - safe_rmdir(self.path) #remove containing directory - def uniq(seq): '''Order preserving (very fast) uniq function for sequences''' @@ -183,129 +74,3 @@ def uniq(seq): result.append(item) return result - - -class Storage(object): - """Resolves paths for objects that provide only a description - - Parameters: - - prefix (str): Establishes the prefix of your installation. - - name (str): The name of the database object in the format - ``<name>/<version>``. - - """ - - def __init__(self, path): - - self.path = path - self.json = File(self.path + '.json') - self.doc = File(self.path + '.rst') - - def hash(self, description='description'): - """The 64-character hash of the database declaration JSON""" - return hash.hashJSONFile(self.json.path, description) - - def exists(self): - """If the database declaration file exists""" - return self.json.exists() - - def load(self): - """Loads the JSON declaration as a file""" - tp = collections.namedtuple('Storage', ['declaration', 'description']) - return tp(self.json.load(), self.doc.try_load()) - - def save(self, declaration, description=None): - """Saves the JSON declaration as files""" - if description: self.doc.save(description.encode('utf8')) - if not isinstance(declaration, six.string_types): - declaration = simplejson.dumps(declaration, indent=4) - self.json.save(declaration) - - def remove(self): - """Removes the object from the disk""" - self.json.remove() - self.doc.remove() - - -class CodeStorage(object): - """Resolves paths for objects that provide a description and code - - Parameters: - - prefix (str): Establishes the prefix of your installation. - - name (str): The name of the database object in the format - ``<name>/<version>``. - - language (str): One of the valdid programming languages - - """ - - def __init__(self, path, language=None): - - self.path = path - self.json = File(self.path + '.json') - self.doc = File(self.path + '.rst') - - self._language = language or self.__auto_discover_language() - self.code = File(self.path + \ - extension_for_language(self._language), binary=True) - - def __auto_discover_language(self, json=None): - """Discovers and sets the language from its own JSON descriptor""" - try: - text = json or self.json.load() - json = simplejson.loads(text) - return json['language'] - except IOError: - return 'unknown' - - @property - def language(self): - return self._language - - @language.setter - def language(self, value): - self._language = value - self.code = File(self.path + extension_for_language(self._language), - binary=True) - - def hash(self): - """The 64-character hash of the database declaration JSON""" - - if self.code.exists(): - return hash.hash(dict( - json=hash.hashJSONFile(self.json.path, 'description'), - code=hash.hashFileContents(self.code.path), - )) - else: - return hash.hash(dict( - json=hash.hashJSONFile(self.json.path, 'description'), - )) - - def exists(self): - """If the database declaration file exists""" - return self.json.exists() and self.code.exists() - - def load(self): - """Loads the JSON declaration as a file""" - tp = collections.namedtuple('CodeStorage', - ['declaration', 'code', 'description']) - return tp(self.json.load(), self.code.try_load(), self.doc.try_load()) - - def save(self, declaration, code=None, description=None): - """Saves the JSON declaration and the code as files""" - if description: self.doc.save(description.encode('utf8')) - self.json.save(declaration) - if code: - if self._language == 'unknown': - self.language = self.__auto_discover_language(declaration) - self.code.save(code) - - def remove(self): - """Removes the object from the disk""" - self.json.remove() - self.code.remove() - self.doc.remove() diff --git a/buildout.cfg b/buildout.cfg index 4b9f9a0d4de333df65f5b62e466540962b1947ca..7d28610c0362ef4bca05f1ded7067dd42b0bdb0b 100644 --- a/buildout.cfg +++ b/buildout.cfg @@ -19,7 +19,7 @@ recipe = bob.buildout:scripts recipe = collective.recipe.cmd cmds = cd beat/core/test/prefix/algorithms/user/ tar -cf cxx_integers_echo.tar cxx_integers_echo/ - docker run -dti --name build docker.idiap.ch/beat/beat.env.cxx_base:0.1.6 > /dev/null + docker run -dti --name build docker.idiap.ch/beat/beat.env.client:0.1.6 > /dev/null docker cp cxx_integers_echo.tar build:/tmp/cxx_integers_echo.tar docker exec build bash -c 'cd /tmp ; tar -xf /tmp/cxx_integers_echo.tar' docker exec build bash -c 'cd /tmp/cxx_integers_echo ; mkdir build ; cd build ; cmake .. ; make' diff --git a/setup.py b/setup.py index 911843979195c782a09874bfd73d3f59807f6b0b..e88232f43e6f2a5c6977d2c2bb0d30ad9c22da99 100644 --- a/setup.py +++ b/setup.py @@ -71,6 +71,12 @@ setup( install_requires=requires, + entry_points={ + 'console_scripts': [ + 'databases_provider = beat.core.scripts.databases_provider:main', + ], + }, + classifiers = [ 'Framework :: BEAT', 'Development Status :: 5 - Production/Stable',