Commit 24f787ef authored by Philip ABBET's avatar Philip ABBET
Browse files

Merge branch 'datasets' into 'master'

Refactoring and support of dataset providing in a container

See merge request !8
parents f0c66547 3a9bd3ae
......@@ -28,6 +28,18 @@
This package contains the source code for a python-based backend for the BEAT
platform.
It contains the minimum amount of code needed to run an algorithm or serve
data from a dataset. It is designed to be installed in a container.
The ``beat.core`` package extends the functionalities of this one (for
instance, it adds thorough validation of each user contribution, whereas
``beat.backend.python`` assumes that an invalid contribution will never
reach the container).
For this reason (and to keep ``beat.backend.python`` as small as possible),
all the unit tests are located in ``beat.core``.
Installation
------------
......@@ -39,48 +51,11 @@ Really easy, with ``zc.buildout``::
These 2 commands should download and install all non-installed dependencies and
get you a fully operational test and development environment.
.. note::
If you are on the Idiap filesystem, you may use
``/idiap/project/beat/environments/staging/usr/bin/python`` to bootstrap this
package instead. It contains the same setup deployed at the final BEAT
machinery.
Documentation
-------------
To build the documentation, just do::
$ ./bin/sphinx-apidoc --separate -d 2 --output=doc/api beat/backend/python
$ ./bin/sphinx-apidoc --separate -d 2 --output=doc/api beat
$ ./bin/sphinx-build doc sphinx
Testing
-------
After installation, it is possible to run our suite of unit tests. To do so,
use ``nose``::
$ ./bin/nosetests -sv
If you want to skip slow tests (at least those pulling stuff from our servers)
or executing lengthy operations, just do::
$ ./bin/nosetests -sv -a '!slow'
To measure the test coverage, do the following::
$ ./bin/nosetests -sv --with-coverage --cover-package=beat.backend.python
To produce an HTML test coverage report, at the directory `./htmlcov`, do the
following::
$ ./bin/nosetests -sv --with-coverage --cover-package=beat.backend.python --cover-html --cover-html-dir=htmlcov
Our documentation is also interspersed with test units. You can run them using
sphinx::
$ ./bin/sphinx -b doctest doc sphinx
......@@ -38,6 +38,34 @@ import simplejson
from . import dataformat
from . import library
from . import loader
from . import utils
class Storage(utils.CodeStorage):
"""Resolves paths for algorithms
Parameters:
prefix (str): Establishes the prefix of your installation.
name (str): The name of the algorithm object in the format
``<user>/<name>/<version>``.
"""
def __init__(self, prefix, name, language=None):
if name.count('/') != 2:
raise RuntimeError("invalid algorithm name: `%s'" % name)
self.username, self.name, self.version = name.split('/')
self.prefix = prefix
self.fullname = name
path = utils.hashed_or_simple(self.prefix, 'algorithms', name)
super(Storage, self).__init__(path, language)
class Runner(object):
......@@ -160,6 +188,7 @@ class Runner(object):
return getattr(self.obj, key)
class Algorithm(object):
"""Algorithms represent runnable components within the platform.
......@@ -222,6 +251,9 @@ class Algorithm(object):
groups (dict): A list containing dictionaries with inputs and outputs
belonging to the same synchronization group.
errors (list): A list containing errors found while loading this
algorithm.
data (dict): The original data for this algorithm, as loaded by our JSON
decoder.
......@@ -232,20 +264,34 @@ class Algorithm(object):
def __init__(self, prefix, name, dataformat_cache=None, library_cache=None):
self._name = None
self.storage = None
self.prefix = prefix
self.dataformats = {}
self.libraries = {}
self.groups = []
dataformat_cache = dataformat_cache if dataformat_cache is not None else {}
library_cache = library_cache if library_cache is not None else {}
self.name = name
json_path = os.path.join(prefix, 'algorithms', name + '.json')
with open(json_path, 'rb') as f: self.data = simplejson.load(f)
self._load(name, dataformat_cache, library_cache)
self.code_path = os.path.join(prefix, 'algorithms', name + '.py')
def _load(self, data, dataformat_cache, library_cache):
"""Loads the algorithm"""
self._name = data
self.storage = Storage(self.prefix, data)
json_path = self.storage.json.path
if not self.storage.exists():
self.errors.append('Algorithm declaration file not found: %s' % json_path)
return
with open(json_path, 'rb') as f:
self.data = simplejson.load(f)
self.code_path = self.storage.code.path
self.groups = self.data['groups']
......@@ -375,6 +421,22 @@ class Algorithm(object):
library.Library(self.prefix, value, library_cache))
@property
def name(self):
"""Returns the name of this object
"""
return self._name or '__unnamed_algorithm__'
@name.setter
def name(self, value):
if self.data['language'] == 'unknown':
raise RuntimeError("algorithm has no programming language set")
self._name = value
self.storage = Storage(self.prefix, value, self.data['language'])
@property
def schema_version(self):
......@@ -382,6 +444,20 @@ class Algorithm(object):
return self.data.get('schema_version', 1)
@property
def language(self):
"""Returns the current language set for the executable code"""
return self.data['language']
@language.setter
def language(self, value):
"""Sets the current executable code programming language"""
if self.storage:
self.storage.language = value
self.data['language'] = value
def clean_parameter(self, parameter, value):
"""Checks if a given value against a declared parameter
......@@ -410,8 +486,8 @@ class Algorithm(object):
ValueError: If the parameter cannot be safe cast into the algorithm's
type. Alternatively, a ``ValueError`` may also be raised if a range or
choice was specified and the value does not obbey those settings
estipulated for the parameter
choice was specified and the value does not obey those settings
stipulated for the parameter
"""
......@@ -437,35 +513,72 @@ class Algorithm(object):
return retval
@property
def valid(self):
"""A boolean that indicates if this algorithm is valid or not"""
return not bool(self.errors)
@property
def uses(self):
return self.data.get('uses')
@uses.setter
def uses(self, value):
self.data['uses'] = value
return value
@property
def results(self):
return self.data.get('results')
@results.setter
def results(self, value):
self.data['results'] = value
return value
@property
def parameters(self):
return self.data.get('parameters')
@parameters.setter
def parameters(self, value):
self.data['parameters'] = value
return value
@property
def splittable(self):
return self.data.get('splittable', False)
@splittable.setter
def splittable(self, value):
self.data['splittable'] = value
return value
def uses_dict(self):
"""Returns the usage dictionary for all dependent modules"""
if self.data['language'] == 'unknown':
raise RuntimeError("algorithm has no programming language set")
if not self._name:
raise RuntimeError("algorithm has no name")
retval = {}
if self.uses is not None:
for name, value in self.uses.items():
retval[name] = dict(
path=self.libraries[value].code_path,
path=self.libraries[value].storage.code.path,
uses=self.libraries[value].uses_dict(),
)
......@@ -489,11 +602,24 @@ class Algorithm(object):
before using the ``process`` method.
"""
if not self._name:
exc = exc or RuntimeError
raise exc("algorithm has no name")
if self.data['language'] == 'unknown':
exc = exc or RuntimeError
raise exc("algorithm has no programming language set")
if not self.valid:
message = "cannot load code for invalid algorithm (%s)" % (self.name,)
exc = exc or RuntimeError
raise exc(message)
# loads the module only once through the lifetime of the algorithm object
try:
self.__module = getattr(self, 'module',
loader.load_module(self.name.replace(os.sep, '_'),
self.code_path, self.uses_dict()))
self.storage.code.path, self.uses_dict()))
except Exception as e:
if exc is not None:
type, value, traceback = sys.exc_info()
......@@ -504,6 +630,52 @@ class Algorithm(object):
return Runner(self.__module, klass, self, exc)
@property
def description(self):
"""The short description for this object"""
return self.data.get('description', None)
@description.setter
def description(self, value):
"""Sets the short description for this object"""
self.data['description'] = value
@property
def documentation(self):
"""The full-length description for this object"""
if not self._name:
raise RuntimeError("algorithm has no name")
if self.storage.doc.exists():
return self.storage.doc.load()
return None
@documentation.setter
def documentation(self, value):
"""Sets the full-length description for this object"""
if not self._name:
raise RuntimeError("algorithm has no name")
if hasattr(value, 'read'):
self.storage.doc.save(value.read())
else:
self.storage.doc.save(value)
def hash(self):
"""Returns the hexadecimal hash for the current algorithm"""
if not self._name:
raise RuntimeError("algorithm has no name")
return self.storage.hash()
def result_dataformat(self):
"""Generates, on-the-fly, the dataformat for the result readout"""
......
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
###############################################################################
# #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/ #
# Contact: beat.support@idiap.ch #
# #
# This file is part of the beat.backend.python module of the BEAT platform. #
# #
# Commercial License Usage #
# Licensees holding valid commercial BEAT licenses may use this file in #
# accordance with the terms contained in a written agreement between you #
# and Idiap. For further information contact tto@idiap.ch #
# #
# Alternatively, this file may be used under the terms of the GNU Affero #
# Public License version 3 as published by the Free Software and appearing #
# in the file LICENSE.AGPL included in the packaging of this file. #
# The BEAT platform is distributed in the hope that it will be useful, but #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY #
# or FITNESS FOR A PARTICULAR PURPOSE. #
# #
# You should have received a copy of the GNU Affero Public License along #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/. #
# #
###############################################################################
"""Data I/O classes and functions"""
import os
import re
import glob
import simplejson as json
import select
import time
import tempfile
import abc
from functools import reduce
import logging
logger = logging.getLogger(__name__)
import six
from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm
class DataSource(object):
"""Interface of all the Data Sources
Data Sources are used to provides data to the inputs of an algorithm.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def next(self, load=True):
"""Retrieves the next block of data
Returns:
A tuple (*data*, *start_index*, *end_index*)
"""
pass
@abc.abstractmethod
def hasMoreData(self):
"""Indicates if there is more data to process on some of the inputs"""
pass
class DataSink(object):
"""Interface of all the Data Sinks
Data Sinks are used by the outputs of an algorithm to write/transmit data.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def write(self, data, start_data_index, end_data_index):
"""Writes a block of data
Parameters:
data (beat.core.baseformat.baseformat): The block of data to write
start_data_index (int): Start index of the written data
end_data_index (int): End index of the written data
"""
pass
@abc.abstractmethod
def isConnected(self):
pass
class CachedDataSource(DataSource):
"""Data Source that load data from the Cache"""
def __init__(self):
self.filenames = None
self.cur_file = None
self.cur_file_index = None
self.encoding = None # must be 'binary' or 'json'
self.prefix = None # where to find dataformats
self.dataformat = None # the dataformat itself
self.preloaded = False
self.next_start_index = None
self.next_end_index = None
self.next_data_size = None
self.force_start_index = None
self.force_end_index = None
self._cache_size = 10 * 1024 * 1024 # 10 megabytes
self._cache = six.b('')
self._nb_bytes_read = 0
self._read_duration = 0
def _readHeader(self):
"""Read the header of the current file"""
# Read file format
encoding = self.cur_file.readline()[:-1]
if not isinstance(encoding, str): encoding = encoding.decode('utf8')
if encoding not in ('binary', 'json'):
raise RuntimeError("valid formats for data reading are 'binary' "
"or 'json': the format `%s' is invalid" % (encoding,))
self.encoding = encoding
# Read data format
dataformat_name = self.cur_file.readline()[:-1]
if not isinstance(dataformat_name, str):
dataformat_name = dataformat_name.decode('utf8')
if dataformat_name.startswith('analysis:'):
algo_name = dataformat_name.split(':')[1]
algo = Algorithm(self.prefix, algo_name)
if not algo.valid:
raise RuntimeError("the dataformat `%s' is the result of an " \
"algorithm which is not valid" % algo_name)
self.dataformat = algo.result_dataformat()
else:
self.dataformat = DataFormat(self.prefix, dataformat_name)
if not self.dataformat.valid:
raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)
return True
def setup(self, filename, prefix, force_start_index=None,
force_end_index=None):
"""Configures the data source
Parameters:
filename (str): Name of the file to read the data from
prefix (str, path): Path to the prefix where the dataformats are stored.
force_start_index (int): The starting index (if not set or set to
``None``, the default, read data from the begin of file)
force_end_index (int): The end index (if not set or set to ``None``, the
default, reads the data until the end)
Returns:
``True``, if successful, or ``False`` otherwise.
"""
index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
def file_start(f):
"""Returns the converted start indexes from a filename, otherwise 0"""
r = index_re.match(f)
if r: return int(r.group(1))
return 0
def trim_filename(l, start_index, end_index):
"""Function to trim out the useless file given a range of indices
"""
res = []
for f in l:
r = index_re.match(f)
if r:
s = int(r.group(1))
e = int(r.group(2))
if (start_index is not None and e < start_index) or \
(end_index is not None and s > end_index):
continue
res.append(f)
return res
def check_consistency(data_filenames, basename, data_ext):
"""Perform some sanity check on the data/checksum files on disk:
1. One-to-one mapping between data and checksum files
2. Checksum comparison between hash(data) and checksum files
3. Contiguous indices if they are present
"""
# Check checksum of files
checksum_filenames = sorted(glob.glob(basename + '*' + data_ext + '.checksum'), key=file_start)
# Make sure that we have a perfect match between data files and checksum
# files
checksum_filenames_noext = [os.path.splitext(f)[0] for f in checksum_filenames]
if data_filenames != checksum_filenames_noext:
raise IOError("number of data files and checksum files for `%s' " \
"does not match (%d != %d)" % (filename, len(data_filenames),
len(checksum_filenames_noext)))
# list of start/end indices to check that there are contiguous
indices = []
for f_data, f_chck in zip(data_filenames, checksum_filenames):
expected_chksum = open(f_chck, 'rt').read().strip()
current_chksum = hashFileContents(f_data)
if expected_chksum != current_chksum:
raise IOError("data file `%s' has a checksum (%s) that differs " \
"from expected one (%s)" % (f_data, current_chksum,
expected_chksum))
r = index_re.match(f_data)
if r: indices.append((int(r.group(1)), int(r.group(2))))
indices = sorted(indices, key=lambda v: v[0])
ok_indices = True
if len(indices) > 0:
ok_indices = (indices[0][0] == 0)
if ok_indices and len(indices) > 1:
ok_indices = sum([indices[i + 1][0] - indices[i][1] == 1
for i in range(len(indices) - 1)])
if not ok_indices:
raise IOError("data file `%s' have missing indices." % f_data)
self.prefix = prefix
basename, data_ext = os.path.splitext(filename)
data_filenames = sorted(glob.glob(basename + '*' + data_ext),
key=file_start)