diff --git a/beat/backend/python/data.py b/beat/backend/python/data.py index 129276e22ac6b8460e01e910a628b1eca3670235..356189419c60584ebad571ec26fefc2305227692 100755 --- a/beat/backend/python/data.py +++ b/beat/backend/python/data.py @@ -37,6 +37,7 @@ import time import tempfile import abc from functools import reduce +from collections import namedtuple import logging logger = logging.getLogger(__name__) @@ -50,170 +51,178 @@ from .algorithm import Algorithm #---------------------------------------------------------- -class DataSource(object): +def mixDataIndices(list_of_data_indices): + """Given a collection of lists of data indices (belonging to separate + but synchronized files/inputs), returns the most granular list of + indices that span all the data - """Interface of all the Data Sources + For example, the mix of - Data Sources are used to provides data to the inputs of an algorithm. - """ - __metaclass__ = abc.ABCMeta + [(0, 2), (3, 4)] - @abc.abstractmethod - def next(self, load=True): - """Retrieves the next block of data + and - Returns: + [(0, 4)] - A tuple (*data*, *start_index*, *end_index*) + is: - """ + [(0, 2), (3, 4)] - pass - @abc.abstractmethod - def hasMoreData(self): - """Indicates if there is more data to process on some of the inputs""" + The mix of - pass + [(0, 2), (3, 4)] + and -#---------------------------------------------------------- + [(0, 1), (2, 3), (4, 4)] + is: -class DataSink(object): + [(0, 1), (2, 2), (3, 3), (4, 4)] - """Interface of all the Data Sinks - - Data Sinks are used by the outputs of an algorithm to write/transmit data. """ - __metaclass__ = abc.ABCMeta - @abc.abstractmethod - def write(self, data, start_data_index, end_data_index): - """Writes a block of data + start = max([ x[0][0] for x in list_of_data_indices ]) + end = min([ x[-1][1] for x in list_of_data_indices ]) - Parameters: + result = [] + current_start = start - data (beat.core.baseformat.baseformat): The block of data to write + for index in range(start, end + 1): + done = False - start_data_index (int): Start index of the written data + for l in list_of_data_indices: + for indices in l: + if indices[1] == index: + result.append( (current_start, index) ) + current_start = index + 1 + done = True + break - end_data_index (int): End index of the written data + if done: + break - """ + return result - pass - @abc.abstractmethod - def isConnected(self): - pass +#---------------------------------------------------------- -#---------------------------------------------------------- +def getAllFilenames(filename, start_index=None, end_index=None): + """Returns the names of all the files related to the given data file, + taking the provided start and end indices into account. -class StdoutDataSink(DataSink): + Parameters: - """Data Sink that prints informations about the written data on stdout + filename (str): Name of the data file (path/to/cache/<hash>.data) - Note: The written data is lost! Use ii for debugging purposes - """ + start_index (int): The starting index (if not set or set to + ``None``, the default, equivalent to ``0``) - def __init__(self): - super(StdoutDataSink, self).__init__() - self.dataformat = None - self.prefix = '' - self.display_data = True + end_index (int): The end index (if not set or set to ``None``, the + default, equivalent to ``the last existing data``) - def setup(self, dataformat, prefix=None, display_data=True): - self.dataformat = dataformat - self.display_data = display_data - if prefix is not None: - self.prefix = prefix - if self.prefix != '': - self.prefix += ' ' + Returns: - def write(self, data, start_data_index, end_data_index): - """Write a block of data + (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) - Parameters: + """ - data (beat.core.baseformat.baseformat) The block of data to write + index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$') - start_data_index (int): Start index of the written data + def file_start(f): + """Returns the converted start indexes from a filename, otherwise 0""" + r = index_re.match(f) + if r: + return int(r.group(1)) + return 0 - end_data_index (int): End index of the written data - """ + # Retrieve all the related files + basename, ext = os.path.splitext(filename) + filenames = sorted(glob.glob(basename + '*'), key=file_start) - if self.display_data: - print '%s(%d -> %d): %s' % (self.prefix, start_data_index, end_data_index, str(data)) - else: - print '%s(%d -> %d): <data>' % (self.prefix, start_data_index, end_data_index) + # (If necessary) Only keep files containing the desired indices + if (start_index is not None) or (end_index is not None): + filtered_filenames = [] + for f in filenames: + match = index_re.match(f) + if match: + start = int(match.group(1)) + end = int(match.group(2)) + if ((start_index is not None) and (end < start_index)) or \ + ((end_index is not None) and (start > end_index)): + continue + filtered_filenames.append(f) + filenames = filtered_filenames + # Separate the filenames in different lists + data_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data$', x) ] + indices_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index$', x) ] + data_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data.checksum$', x) ] + indices_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index.checksum$', x) ] - def isConnected(self): - return True + return (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) #---------------------------------------------------------- -class CachedDataSource(DataSource): - """Data Source that load data from the Cache""" +class CachedFileLoader(object): + """Utility class to load data from a file in the cache""" def __init__(self): - self.filenames = None - self.cur_file = None - self.cur_file_index = None - self.encoding = None # must be 'binary' or 'json' - self.prefix = None # where to find dataformats - self.dataformat = None # the dataformat itself - self.preloaded = False - self.next_start_index = None - self.next_end_index = None - self.next_data_size = None - self.force_start_index = None - self.force_end_index = None - self._cache_size = 10 * 1024 * 1024 # 10 megabytes - self._cache = six.b('') - self._nb_bytes_read = 0 - self._read_duration = 0 - self._unpack = True - - def _readHeader(self): - """Read the header of the current file""" + self.filenames = None + self.encoding = None # Must be 'binary' or 'json' + self.prefix = None + self.dataformat = None + self.infos = [] + self.current_file = None + self.current_file_index = None + self.unpack = True + self.read_duration = 0 + self.nb_bytes_read = 0 + + + def _readHeader(self, file): + """Read the header of the provided file""" # Read file format - encoding = self.cur_file.readline()[:-1] - if not isinstance(encoding, str): encoding = encoding.decode('utf8') + self.encoding = file.readline()[:-1] + if not isinstance(self.encoding, str): + self.encoding = self.encoding.decode('utf8') - if encoding not in ('binary', 'json'): + if self.encoding not in ('binary', 'json'): raise RuntimeError("valid formats for data reading are 'binary' " - "or 'json': the format `%s' is invalid" % (encoding,)) - self.encoding = encoding + "or 'json': the format `%s' is invalid" % self.encoding) # Read data format - dataformat_name = self.cur_file.readline()[:-1] - if not isinstance(dataformat_name, str): - dataformat_name = dataformat_name.decode('utf8') - if dataformat_name.startswith('analysis:'): - algo_name = dataformat_name.split(':')[1] - algo = Algorithm(self.prefix, algo_name) - if not algo.valid: - raise RuntimeError("the dataformat `%s' is the result of an " \ - "algorithm which is not valid" % algo_name) - self.dataformat = algo.result_dataformat() - else: - self.dataformat = DataFormat(self.prefix, dataformat_name) - if not self.dataformat.valid: - raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name) + dataformat_name = file.readline()[:-1] + + if self.dataformat is None: + if not isinstance(dataformat_name, str): + dataformat_name = dataformat_name.decode('utf8') + + if dataformat_name.startswith('analysis:'): + algo_name = dataformat_name.split(':')[1] + algo = Algorithm(self.prefix, algo_name) + if not algo.valid: + raise RuntimeError("the dataformat `%s' is the result of an " \ + "algorithm which is not valid" % algo_name) + self.dataformat = algo.result_dataformat() + else: + self.dataformat = DataFormat(self.prefix, dataformat_name) + + if not self.dataformat.valid: + raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name) return True - def setup(self, filename, prefix, force_start_index=None, - force_end_index=None, unpack=True): + + def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True): """Configures the data source @@ -239,30 +248,7 @@ class CachedDataSource(DataSource): """ index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$') - def file_start(f): - """Returns the converted start indexes from a filename, otherwise 0""" - - r = index_re.match(f) - if r: return int(r.group(1)) - return 0 - - def trim_filename(l, start_index, end_index): - """Function to trim out the useless file given a range of indices - """ - - res = [] - for f in l: - r = index_re.match(f) - if r: - s = int(r.group(1)) - e = int(r.group(2)) - if (start_index is not None and e < start_index) or \ - (end_index is not None and s > end_index): - continue - res.append(f) - return res - - def check_consistency(data_filenames, basename, data_ext): + def check_consistency(data_filenames, checksum_filenames): """Perform some sanity check on the data/checksum files on disk: 1. One-to-one mapping between data and checksum files @@ -270,236 +256,370 @@ class CachedDataSource(DataSource): 3. Contiguous indices if they are present """ - # Check checksum of files - checksum_filenames = sorted(glob.glob(basename + '*' + data_ext + '.checksum'), key=file_start) - # Make sure that we have a perfect match between data files and checksum # files checksum_filenames_noext = [os.path.splitext(f)[0] for f in checksum_filenames] if data_filenames != checksum_filenames_noext: raise IOError("number of data files and checksum files for `%s' " \ - "does not match (%d != %d)" % (filename, len(data_filenames), - len(checksum_filenames_noext))) + "does not match (%d != %d)" % (filename, len(data_filenames), + len(checksum_filenames_noext))) # list of start/end indices to check that there are contiguous indices = [] for f_data, f_chck in zip(data_filenames, checksum_filenames): - expected_chksum = open(f_chck, 'rt').read().strip() current_chksum = hashFileContents(f_data) if expected_chksum != current_chksum: raise IOError("data file `%s' has a checksum (%s) that differs " \ - "from expected one (%s)" % (f_data, current_chksum, - expected_chksum)) + "from expected one (%s)" % (f_data, current_chksum, + expected_chksum)) r = index_re.match(f_data) - if r: indices.append((int(r.group(1)), int(r.group(2)))) + if r: + indices.append((int(r.group(1)), int(r.group(2)))) indices = sorted(indices, key=lambda v: v[0]) ok_indices = True - if len(indices) > 0: - ok_indices = (indices[0][0] == 0) - - if ok_indices and len(indices) > 1: - ok_indices = sum([indices[i + 1][0] - indices[i][1] == 1 - for i in range(len(indices) - 1)]) + if len(indices) > 1: + ok_indices = sum([ (indices[i + 1][0] - indices[i][1] == 1) + for i in range(len(indices) - 1) ]) if not ok_indices: raise IOError("data file `%s' have missing indices." % f_data) + self.prefix = prefix - basename, data_ext = os.path.splitext(filename) - data_filenames = sorted(glob.glob(basename + '*' + data_ext), - key=file_start) + self.unpack = unpack - # Check consistency of the data/checksum files - check_consistency(data_filenames, basename, data_ext) - # List files to process - self.force_start_index = force_start_index - self.force_end_index = force_end_index - self.filenames = trim_filename(data_filenames, force_start_index, - force_end_index) + # Retrieve the list of all needed files + (self.filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \ + getAllFilenames(filename, start_index, end_index) - self._unpack = unpack + check_consistency(self.filenames, data_checksum_filenames) - # Read the first file to process - self.cur_file_index = 0 - try: - self.cur_file = open(self.filenames[self.cur_file_index], 'rb') - except Exception as e: - logger.warn("Could not setup `%s': %s" % (filename, e)) - return False - # Reads the header of the current file - self._readHeader() + # Load all the needed infos from all the files + FileInfos = namedtuple('FileInfos', ['file_index', 'start_index', 'end_index', 'offset', 'size']) - if force_start_index is not None: + for file_index, current_filename in enumerate(self.filenames): + try: + f = open(current_filename, 'rb') + except Exception as e: + logger.warn("Could not setup `%s': %s" % (filename, e)) + return False - # Read chunck until force_start_index is reached - while True: + # Reads the header of the current file + self._readHeader(f) + offset = f.tell() - # Read indexes - t1 = time.time() - line = self.cur_file.readline() - self._nb_bytes_read += len(line) - t2 = time.time() - self._read_duration += t2 - t1 + # Process each data unit from the file + while True: + line = f.readline() + if line == '': + break - (self.next_start_index, self.next_end_index, self.next_data_size) = \ - [int(x) for x in line.split()] + offset += len(line) - # Seek to the next chunck of data if start index is still too small - if self.next_start_index < force_start_index: - self.cur_file.seek(self.next_data_size, 1) + (start, end, data_size) = [ int(x) for x in line.split() ] - # Otherwise, read the next 'chunck' of data (binary or json) - else: - t1 = time.time() - data = self.cur_file.read(self._cache_size - len(self._cache)) - t2 = time.time() + if ((start_index is None) or (start >= start_index)) and \ + ((end_index is None) or (end <= end_index)): + self.infos.append(FileInfos(file_index=file_index, start_index=start, + end_index=end, offset=offset, size=data_size)) - self._nb_bytes_read += len(data) - self._read_duration += t2 - t1 - self._cache += data + f.seek(data_size, 1) + offset += data_size - self.preloaded = True - break - - else: - # Preload the data - self._preload() + f.close() return True + def close(self): - """Closes the data source""" + if self.current_file is not None: + self.current_file.close() - if self.cur_file is not None: - self.cur_file.close() def __del__(self): """Makes sure the files are close when the object is deleted""" - self.close() - def next(self): - """Retrieve the next block of data + + def __len__(self): + return len(self.infos) + + + def __iter__(self): + for i in range(0, len(self.infos)): + yield self[i] + + + def __getitem__(self, index): + """Retrieve a block of data Returns: A tuple (data, start_index, end_index) """ - if self.next_start_index is None: + + if (index < 0) or (index >= len(self.infos)): return (None, None, None) - # Determine if the cache already contains all the data we need - if len(self._cache) >= self.next_data_size: - encoded_data = self._cache[:self.next_data_size] - self._cache = self._cache[self.next_data_size:] - else: - t1 = time.time() - data = self.cur_file.read(self.next_data_size - len(self._cache)) - t2 = time.time() + infos = self.infos[index] + + if self.current_file_index != infos.file_index: + if self.current_file is not None: + self.current_file.close() + self.current_file = None + + try: + self.current_file = open(self.filenames[infos.file_index], 'rb') + self.current_file_index = infos.file_index + except Exception as e: + raise IOError("Could not read `%s': %s" % (self.filenames[infos.file_index], e)) + + self.current_file.seek(infos.offset, 0) - self._nb_bytes_read += len(data) - self._read_duration += t2 - t1 + t1 = time.time() + encoded_data = self.current_file.read(infos.size) + t2 = time.time() - encoded_data = self._cache + data - self._cache = six.b('') + self.read_duration += t2 - t1 + self.nb_bytes_read += infos.size - if self._unpack: + if self.unpack: data = self.dataformat.type() - data.unpack(encoded_data) #checks validity + data.unpack(encoded_data) else: data = encoded_data - result = (data, self.next_start_index, self.next_end_index) + return (data, infos.start_index, infos.end_index) - self._preload() - return result + def first_data_index(self): + return self.infos[0].start_index + + + def last_data_index(self): + return self.infos[-1].end_index + + + def data_indices(self): + return [ (x.start_index, x.end_index) for x in self.infos ] + + + def getAtDataIndex(self, data_index): + for index, infos in enumerate(self.infos): + if (infos.start_index <= data_index) and (data_index <= infos.end_index): + return self[index] + return (None, None, None) + + + def statistics(self): + """Return the statistics about the number of bytes read from the files""" + return (self.nb_bytes_read, self.read_duration) + + +#---------------------------------------------------------- + + +class DataSource(object): + + """Interface of all the Data Sources + + Data Sources are used to provides data to the inputs of an algorithm. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def next(self, load=True): + """Retrieves the next block of data + + Returns: + + A tuple (*data*, *start_index*, *end_index*) + + """ + + pass + + @abc.abstractmethod def hasMoreData(self): """Indicates if there is more data to process on some of the inputs""" - if not(self.preloaded): - self._preload(blocking=True) + pass - if self.force_end_index is not None and \ - self.next_start_index is not None and \ - self.next_start_index > self.force_end_index: - return False - return (self.next_start_index is not None) +#---------------------------------------------------------- - def statistics(self): - """Return the statistics about the number of bytes read from the cache""" - return (self._nb_bytes_read, self._read_duration) - def _preload(self, blocking=False): - # Determine if the cache already contains all the data we need - offset = self._cache.find(six.b('\n')) - if offset == -1: +class DataSink(object): - # Try to read the next chunck of data - while True: + """Interface of all the Data Sinks + + Data Sinks are used by the outputs of an algorithm to write/transmit data. + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def write(self, data, start_data_index, end_data_index): + """Writes a block of data + + Parameters: + + data (beat.core.baseformat.baseformat): The block of data to write + + start_data_index (int): Start index of the written data + + end_data_index (int): End index of the written data - # Read in the current file - t1 = time.time() + """ - if blocking: - (readable, writable, errors) = select.select([self.cur_file], [], []) + pass - data = self.cur_file.read(self._cache_size - len(self._cache)) + @abc.abstractmethod + def isConnected(self): + pass - t2 = time.time() - self._nb_bytes_read += len(data) - self._read_duration += t2 - t1 - self._cache += data +#---------------------------------------------------------- - # If not read from the current file - if (len(data) == 0) or (self._cache.find(six.b('\n')) == -1): - # Read the next one if possible - if self.cur_file_index < len(self.filenames) - 1: - if self.cur_file is not None: - self.cur_file.close() +class StdoutDataSink(DataSink): - self.cur_file_index += 1 + """Data Sink that prints informations about the written data on stdout - try: - self.cur_file = open(self.filenames[self.cur_file_index], 'rb') - except: - return + Note: The written data is lost! Use ii for debugging purposes + """ - self._readHeader() + def __init__(self): + super(StdoutDataSink, self).__init__() + self.dataformat = None + self.prefix = '' + self.display_data = True - # Otherwise, stop the parsing - else: - self.next_start_index = None - self.next_end_index = None - self.next_data_size = None - self.preloaded = blocking - return + def setup(self, dataformat, prefix=None, display_data=True): + self.dataformat = dataformat + self.display_data = display_data - else: - break + if prefix is not None: + self.prefix = prefix + if self.prefix != '': + self.prefix += ' ' - offset = self._cache.find(six.b('\n')) + def write(self, data, start_data_index, end_data_index): + """Write a block of data - # Extract the informations about the next block of data - line = self._cache[:offset] - self._cache = self._cache[offset + 1:] + Parameters: - (self.next_start_index, self.next_end_index, self.next_data_size) = \ - [int(x) for x in line.split()] + data (beat.core.baseformat.baseformat) The block of data to write - self.preloaded = True + start_data_index (int): Start index of the written data + + end_data_index (int): End index of the written data + + """ + + if self.display_data: + print '%s(%d -> %d): %s' % (self.prefix, start_data_index, end_data_index, str(data)) + else: + print '%s(%d -> %d): <data>' % (self.prefix, start_data_index, end_data_index) + + + def isConnected(self): + return True + + +#---------------------------------------------------------- + + +class CachedDataSource(DataSource): + """Data Source that load data from the Cache""" + + def __init__(self): + self.cached_file = None + self.dataformat = None + self.next_data_index = 0 + + + def setup(self, filename, prefix, force_start_index=None, force_end_index=None, + unpack=True): + """Configures the data source + + + Parameters: + + filename (str): Name of the file to read the data from + + prefix (str, path): Path to the prefix where the dataformats are stored. + + force_start_index (int): The starting index (if not set or set to + ``None``, the default, read data from the begin of file) + + force_end_index (int): The end index (if not set or set to ``None``, the + default, reads the data until the end) + + unpack (bool): Indicates if the data must be unpacked or not + + + Returns: + + ``True``, if successful, or ``False`` otherwise. + + """ + + self.cached_file = CachedFileLoader() + if self.cached_file.setup(filename, prefix, start_index=force_start_index, + end_index=force_end_index, unpack=unpack): + self.dataformat = self.cached_file.dataformat + return True + + return False + + + def close(self): + """Closes the data source""" + if self.cached_file is not None: + self.cached_file.close() + self.cached_file = None + + + def __del__(self): + """Makes sure the files are close when the object is deleted""" + self.close() + + + def next(self): + """Retrieve the next block of data + + Returns: + + A tuple (data, start_index, end_index) + + """ + if self.next_data_index >= len(self.cached_file): + return (None, None, None) + + result = self.cached_file[self.next_data_index] + + self.next_data_index += 1 + + return result + + + def hasMoreData(self): + """Indicates if there is more data to process on some of the inputs""" + return (self.next_data_index < len(self.cached_file)) + + + def statistics(self): + """Return the statistics about the number of bytes read from the cache""" + return self.cached_file.statistics() #----------------------------------------------------------