data.py 26.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""Data I/O classes and functions"""

import os
import re
import glob
import simplejson as json
import select
import time
import tempfile
import abc
from functools import reduce

import logging
logger = logging.getLogger(__name__)

import six
from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm


class DataSource(object):

  """Interface of all the Data Sources

  Data Sources are used to provides data to the inputs of an algorithm.
  """
  __metaclass__ = abc.ABCMeta

  @abc.abstractmethod
  def next(self, load=True):
    """Retrieves the next block of data

    Returns:

      A tuple (*data*, *start_index*, *end_index*)

    """

    pass

  @abc.abstractmethod
  def hasMoreData(self):
    """Indicates if there is more data to process on some of the inputs"""

    pass


class DataSink(object):

  """Interface of all the Data Sinks

  Data Sinks are used by the outputs of an algorithm to write/transmit data.
  """
  __metaclass__ = abc.ABCMeta

  @abc.abstractmethod
  def write(self, data, start_data_index, end_data_index):
    """Writes a block of data

    Parameters:

      data (beat.core.baseformat.baseformat): The block of data to write

      start_data_index (int): Start index of the written data

      end_data_index (int): End index of the written data

    """

    pass

  @abc.abstractmethod
  def isConnected(self):
    pass


class CachedDataSource(DataSource):
  """Data Source that load data from the Cache"""

  def __init__(self):
    self.filenames = None
    self.cur_file = None
    self.cur_file_index = None
    self.encoding = None  # must be 'binary' or 'json'
    self.prefix = None  # where to find dataformats
    self.dataformat = None  # the dataformat itself
    self.preloaded = False
    self.next_start_index = None
    self.next_end_index = None
    self.next_data_size = None
    self.force_start_index = None
    self.force_end_index = None
    self._cache_size = 10 * 1024 * 1024  # 10 megabytes
    self._cache = six.b('')
    self._nb_bytes_read = 0
    self._read_duration = 0
126
    self._unpack = True
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158

  def _readHeader(self):
    """Read the header of the current file"""

    # Read file format
    encoding = self.cur_file.readline()[:-1]
    if not isinstance(encoding, str): encoding = encoding.decode('utf8')

    if encoding not in ('binary', 'json'):
      raise RuntimeError("valid formats for data reading are 'binary' "
                         "or 'json': the format `%s' is invalid" % (encoding,))
    self.encoding = encoding

    # Read data format
    dataformat_name = self.cur_file.readline()[:-1]
    if not isinstance(dataformat_name, str):
      dataformat_name = dataformat_name.decode('utf8')
    if dataformat_name.startswith('analysis:'):
      algo_name = dataformat_name.split(':')[1]
      algo = Algorithm(self.prefix, algo_name)
      if not algo.valid:
        raise RuntimeError("the dataformat `%s' is the result of an " \
                "algorithm which is not valid" % algo_name)
      self.dataformat = algo.result_dataformat()
    else:
      self.dataformat = DataFormat(self.prefix, dataformat_name)
    if not self.dataformat.valid:
      raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

    return True

  def setup(self, filename, prefix, force_start_index=None,
159
            force_end_index=None, unpack=True):
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
    """Configures the data source


    Parameters:

      filename (str): Name of the file to read the data from

      prefix (str, path): Path to the prefix where the dataformats are stored.

      force_start_index (int): The starting index (if not set or set to
        ``None``, the default, read data from the begin of file)

      force_end_index (int): The end index (if not set or set to ``None``, the
        default, reads the data until the end)

175
176
      unpack (bool): Indicates if the data must be unpacked or not

177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268

    Returns:

      ``True``, if successful, or ``False`` otherwise.

    """
    index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')

    def file_start(f):
      """Returns the converted start indexes from a filename, otherwise 0"""

      r = index_re.match(f)
      if r: return int(r.group(1))
      return 0

    def trim_filename(l, start_index, end_index):
      """Function to trim out the useless file given a range of indices
      """

      res = []
      for f in l:
        r = index_re.match(f)
        if r:
          s = int(r.group(1))
          e = int(r.group(2))
          if (start_index is not None and e < start_index) or \
                  (end_index is not None and s > end_index):
            continue
        res.append(f)
      return res

    def check_consistency(data_filenames, basename, data_ext):
      """Perform some sanity check on the data/checksum files on disk:

        1. One-to-one mapping between data and checksum files
        2. Checksum comparison between hash(data) and checksum files
        3. Contiguous indices if they are present
      """

      # Check checksum of files
      checksum_filenames = sorted(glob.glob(basename + '*' + data_ext + '.checksum'), key=file_start)

      # Make sure that we have a perfect match between data files and checksum
      # files
      checksum_filenames_noext = [os.path.splitext(f)[0] for f in checksum_filenames]

      if data_filenames != checksum_filenames_noext:
        raise IOError("number of data files and checksum files for `%s' " \
            "does not match (%d != %d)" % (filename, len(data_filenames),
              len(checksum_filenames_noext)))

      # list of start/end indices to check that there are contiguous
      indices = []
      for f_data, f_chck in zip(data_filenames, checksum_filenames):

        expected_chksum = open(f_chck, 'rt').read().strip()
        current_chksum = hashFileContents(f_data)
        if expected_chksum != current_chksum:
          raise IOError("data file `%s' has a checksum (%s) that differs " \
              "from expected one (%s)" % (f_data, current_chksum,
                expected_chksum))

        r = index_re.match(f_data)
        if r: indices.append((int(r.group(1)), int(r.group(2))))

      indices = sorted(indices, key=lambda v: v[0])
      ok_indices = True

      if len(indices) > 0:
        ok_indices = (indices[0][0] == 0)

      if ok_indices and len(indices) > 1:
        ok_indices = sum([indices[i + 1][0] - indices[i][1] == 1
                          for i in range(len(indices) - 1)])

      if not ok_indices:
        raise IOError("data file `%s' have missing indices." % f_data)

    self.prefix = prefix
    basename, data_ext = os.path.splitext(filename)
    data_filenames = sorted(glob.glob(basename + '*' + data_ext),
            key=file_start)

    # Check consistency of the data/checksum files
    check_consistency(data_filenames, basename, data_ext)

    # List files to process
    self.force_start_index = force_start_index
    self.force_end_index = force_end_index
    self.filenames = trim_filename(data_filenames, force_start_index,
                                   force_end_index)

269
270
    self._unpack = unpack

271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
    # Read the first file to process
    self.cur_file_index = 0
    try:
      self.cur_file = open(self.filenames[self.cur_file_index], 'rb')
    except Exception as e:
      logger.warn("Could not setup `%s': %s" % (filename, e))
      return False

    # Reads the header of the current file
    self._readHeader()

    if force_start_index is not None:

      # Read chunck until force_start_index is reached
      while True:

        # Read indexes
        t1 = time.time()
        line = self.cur_file.readline()
        self._nb_bytes_read += len(line)
        t2 = time.time()
        self._read_duration += t2 - t1

        (self.next_start_index, self.next_end_index, self.next_data_size) = \
            [int(x) for x in line.split()]

        # Seek to the next chunck of data if start index is still too small
        if self.next_start_index < force_start_index:
          self.cur_file.seek(self.next_data_size, 1)

        # Otherwise, read the next 'chunck' of data (binary or json)
        else:
          t1 = time.time()
          data = self.cur_file.read(self._cache_size - len(self._cache))
          t2 = time.time()

          self._nb_bytes_read += len(data)
          self._read_duration += t2 - t1
          self._cache += data

          self.preloaded = True
          break

    else:
        # Preload the data
      self._preload()

    return True

  def close(self):
    """Closes the data source"""

    if self.cur_file is not None:
      self.cur_file.close()

  def __del__(self):
    """Makes sure the files are close when the object is deleted"""

    self.close()

  def next(self):
    """Retrieve the next block of data

    Returns:

      A tuple (data, start_index, end_index)

    """
    if self.next_start_index is None:
      return (None, None, None)

    # Determine if the cache already contains all the data we need
    if len(self._cache) >= self.next_data_size:
      encoded_data = self._cache[:self.next_data_size]
      self._cache = self._cache[self.next_data_size:]
    else:
      t1 = time.time()
      data = self.cur_file.read(self.next_data_size - len(self._cache))
      t2 = time.time()

      self._nb_bytes_read += len(data)
      self._read_duration += t2 - t1

      encoded_data = self._cache + data
      self._cache = six.b('')

357
358
359
360
361
    if self._unpack:
      data = self.dataformat.type()
      data.unpack(encoded_data) #checks validity
    else:
      data = encoded_data
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916

    result = (data, self.next_start_index, self.next_end_index)

    self._preload()

    return result

  def hasMoreData(self):
    """Indicates if there is more data to process on some of the inputs"""

    if not(self.preloaded):
      self._preload(blocking=True)

    if self.force_end_index is not None and \
            self.next_start_index is not None and \
            self.next_start_index > self.force_end_index:
      return False

    return (self.next_start_index is not None)

  def statistics(self):
    """Return the statistics about the number of bytes read from the cache"""
    return (self._nb_bytes_read, self._read_duration)

  def _preload(self, blocking=False):
    # Determine if the cache already contains all the data we need
    offset = self._cache.find(six.b('\n'))
    if offset == -1:

      # Try to read the next chunck of data
      while True:

        # Read in the current file
        t1 = time.time()

        if blocking:
          (readable, writable, errors) = select.select([self.cur_file], [], [])

        data = self.cur_file.read(self._cache_size - len(self._cache))

        t2 = time.time()

        self._nb_bytes_read += len(data)
        self._read_duration += t2 - t1
        self._cache += data

        # If not read from the current file
        if (len(data) == 0) or (self._cache.find(six.b('\n')) == -1):
          # Read the next one if possible
          if self.cur_file_index < len(self.filenames) - 1:

            if self.cur_file is not None:
              self.cur_file.close()

            self.cur_file_index += 1

            try:
              self.cur_file = open(self.filenames[self.cur_file_index], 'rb')
            except:
              return

            self._readHeader()

          # Otherwise, stop the parsing
          else:
            self.next_start_index = None
            self.next_end_index = None
            self.next_data_size = None
            self.preloaded = blocking
            return

        else:
          break

      offset = self._cache.find(six.b('\n'))

    # Extract the informations about the next block of data
    line = self._cache[:offset]
    self._cache = self._cache[offset + 1:]

    (self.next_start_index, self.next_end_index, self.next_data_size) = \
            [int(x) for x in line.split()]

    self.preloaded = True


class CachedDataSink(DataSink):

  """Data Sink that save data in the Cache

  The default behavior is to save the data in a binary format.
  """

  def __init__(self):

    self.filename = None
    self.process_id = None
    self.split_id = None
    self.max_size = None

    self._nb_bytes_written = 0
    self._write_duration = 0
    self._nb_bytes_written_split = 0

    self._new_file = False

    self._cur_filename = None
    self._cur_file = None
    self._cur_indexname = None
    self._cur_index = None

    self._cur_start_index = None
    self._cur_end_index = None
    self._filenames = []
    self._filenames_tmp = []

    self._tmp_ext = '.tmp'

    self.encoding = None
    self.dataformat = None

  def _curTmpFilenameWithSplit(self):

    filename, data_ext = os.path.splitext(self.filename)
    dirname = os.path.dirname(filename)
    basename = os.path.basename(filename)
    fd, tmp_file = tempfile.mkstemp(
        dir=dirname,
        prefix=basename+'.' + str(self.process_id)+'.'+ str(self.split_id)+'_',
        suffix=data_ext + self._tmp_ext,
        )
    os.close(fd)  # Preserve only the name
    os.unlink(tmp_file)
    return tmp_file

  def _curFilenameWithIndices(self):

    basename = os.path.basename(self.filename)
    basename, data_ext = os.path.splitext(basename)
    dirname = os.path.dirname(self.filename)
    return os.path.join(dirname, basename + '.' + str(self._cur_start_index) + '.' + str(self._cur_end_index) + data_ext)

  def _tmpIndexFilenameFromTmpFilename(self, tmp_filename):

    return os.path.splitext(os.path.splitext(tmp_filename)[0])[0] + '.index' + self._tmp_ext

  def _indexFilenameFromFilename(self, filename):
    return os.path.splitext(filename)[0] + '.index'

  def _openAndWriteHeader(self):
    """Write the header of the current file"""

    # Close current file if open
    self._close_current()

    # Open new file in writing mode
    self._cur_filename = self._curTmpFilenameWithSplit()
    self._cur_indexname = \
        self._tmpIndexFilenameFromTmpFilename(self._cur_filename)
    self._filenames_tmp.append(self._cur_filename)
    try:
      self._cur_file = open(self._cur_filename, 'wb')
      self._cur_index = open(self._cur_indexname, 'wt')
    except:
      return

    # Write dataformat
    self._cur_file.write(six.b('%s\n%s\n' % \
            (self.encoding, self.dataformat.name)))
    self._cur_file.flush()

    # Reset few flags
    self._cur_start_index = None
    self._cur_end_index = None
    self._new_file = False
    self._nb_bytes_written_split = 0

  def setup(self, filename, dataformat, encoding='binary', process_id=0,
            max_size=0):
    """Configures the data sink

    Parameters:

      filename (str): Name of the file to generate

      dataformat (beat.core.dataformat.DataFormat): The dataformat to be used
        inside this file. All objects stored inside this file will respect that
        format.

      encoding (str): String defining the encoding to be used for encoding the
        data. Only a few options are supported: ``binary`` (the default) or
        ``json`` (debugging purposes).

    """

    if encoding not in ('binary', 'json'):
      raise RuntimeError("valid formats for data writting are 'binary' "
                         "or 'json': the format `%s' is invalid" % format)

    if dataformat.name == '__unnamed_dataformat__':
      raise RuntimeError("cannot record data using an unnammed data format")

    self.filename = filename
    self.process_id = process_id
    self.split_id = 0
    self.max_size = max_size

    self._nb_bytes_written = 0
    self._write_duration = 0
    self._new_file = True

    self._cur_filename = None
    self._cur_file = None
    self._cur_indexname = None
    self._cur_index = None
    self._cur_start_index = None
    self._cur_end_index = None

    self._filenames = []
    self._filenames_tmp = []

    self.dataformat = dataformat
    self.encoding = encoding

    return True

  def _close_current(self):
    """Closes the data sink
    """

    if self._cur_file is not None:
      self._cur_file.close()
      self._cur_index.close()

      # If file is empty, remove it
      if self._cur_start_index is None or self._cur_end_index is None:
        try:
          os.remove(self._cur_filename)
          os.remove(self._cur_index)
        except:
          return False
        self._filenames_tmp.pop()

      # Otherwise, append final filename to list
      else:
        self._filenames.append(self._curFilenameWithIndices())

      self._cur_filename = None
      self._cur_file = None
      self._cur_indexname = None
      self._cur_index = None

  def close(self):
    """Move the files to final location
    """

    self._close_current()
    assert len(self._filenames_tmp) == len(self._filenames)

    for i in range(len(self._filenames_tmp)):

      os.rename(self._filenames_tmp[i], self._filenames[i])
      tmp_indexname = \
          self._tmpIndexFilenameFromTmpFilename(self._filenames_tmp[i])
      final_indexname = self._indexFilenameFromFilename(self._filenames[i])
      os.rename(tmp_indexname, final_indexname)

      # creates the checksums for all data and indexes
      chksum_data = hashFileContents(self._filenames[i])
      with open(self._filenames[i] + '.checksum', 'wt') as f:
        f.write(chksum_data)
      chksum_index = hashFileContents(final_indexname)
      with open(final_indexname + '.checksum', 'wt') as f:
        f.write(chksum_index)

    self._cur_filename = None
    self._cur_file = None
    self._cur_indexname = None
    self._cur_index = None

    self._cur_start_index = None
    self._cur_end_index = None
    self._filenames = []
    self._filenames_tmp = []

  def reset(self):
    """Move the files to final location
    """

    self._close_current()
    assert len(self._filenames_tmp) == len(self._filenames)
    for i in range(len(self._filenames_tmp)):
      try:
        os.remove(self._filenames_tmp[i])
        tmp_indexname = \
            self._tmpIndexFilenameFromTmpFilename(self._filenames_tmp[i])
        os.remove(tmp_indexname)
      except:
        return False

    self._cur_filename = None
    self._cur_file = None
    self._cur_indexname = None
    self._cur_index = None

    self._cur_start_index = None
    self._cur_end_index = None
    self._filenames = []
    self._filenames_tmp = []

  def __del__(self):
    """Make sure the files are close and renamed when the object is deleted
    """

    self.close()

  def write(self, data, start_data_index, end_data_index):
    """Writes a block of data to the filesystem

    Parameters:

      data (beat.core.baseformat.baseformat): The block of data to write

      start_data_index (int): Start index of the written data

      end_data_index (int): End index of the written data

    """

    if isinstance(data, dict):
      # the user passed a dictionary - must convert
      data = self.dataformat.type(**data)
    else:
      # Checks that the input data conforms to the expected format
      if data.__class__._name != self.dataformat.name:
        raise TypeError("input data uses format `%s' while this sink "
                "expects `%s'" % (data.__class__._name, self.dataformat))

    # If the flag new_file is set, open new file and write header
    if self._new_file:
      self._openAndWriteHeader()

    if self._cur_file is None:
      raise RuntimeError("no destination file")

    # encoding happens here
    if self.encoding == 'binary':
      encoded_data = data.pack()
    else:
      from .utils import NumpyJSONEncoder
      encoded_data = json.dumps(data.as_dict(), indent=4, cls=NumpyJSONEncoder)

    # adds a new line by the end of the encoded data, for clarity
    encoded_data += six.b('\n')

    informations = six.b('%d %d %d\n' % (start_data_index,
        end_data_index, len(encoded_data)))

    t1 = time.time()

    self._cur_file.write(informations + encoded_data)
    self._cur_file.flush()

    indexes = '%d %d\n' % (start_data_index, end_data_index)
    self._cur_index.write(indexes)
    self._cur_index.flush()

    t2 = time.time()

    self._nb_bytes_written       += \
        len(informations) + len(encoded_data) + len(indexes)
    self._nb_bytes_written_split += \
        len(informations) + len(encoded_data) + len(indexes)
    self._write_duration += t2 - t1

    # Update start and end indices
    if self._cur_start_index is None:
      self._cur_start_index = start_data_index
    self._cur_end_index = end_data_index

    # If file size exceeds max, sets the flag to create a new file
    if self.max_size != 0 and self._nb_bytes_written >= self.max_size:
      self._new_file = True
      self.split_id += 1

  def statistics(self):
    """Return the statistics about the number of bytes written to the cache"""

    return (self._nb_bytes_written, self._write_duration)

  def isConnected(self):
    return (self.filename is not None)


class MemoryDataSource(DataSource):

  """Interface of all the Data Sources

  Data Sources are used to provides data to the inputs of an algorithm.
  """

  def __init__(self, done_callback, next_callback=None, index=None):

    self.data = []
    self._done_callback = done_callback
    self._next_callback = next_callback

  def add(self, data, start_data_index, end_data_index):
    self.data.append((data, start_data_index, end_data_index))

  def next(self):
    """Retrieves the next block of data

    :return: A tuple (*data*, *start_index*, *end_index*)
    """

    if (len(self.data) == 0) and (self._next_callback is not None):
      if not(self._done_callback()):
        self._next_callback()

    if len(self.data) == 0:
      return (None, None, None)

    return self.data.pop(0)

  def hasMoreData(self):

    if len(self.data) != 0:
      return True
    return not self._done_callback()

  def statistics(self):
    """Return the statistics about the number of bytes read from the cache"""

    return (0, 0)


class MemoryDataSink(DataSink):

  """Data Sink that directly transmit data to associated MemoryDataSource
  objects.
  """

  def __init__(self):
    self.data_sources = None

  def setup(self, data_sources):
    """Configure the data sink

    :param list data_sources: The MemoryDataSource objects to use
    """
    self.data_sources = data_sources

  def write(self, data, start_data_index, end_data_index):
    """Write a block of data

    Parameters:

      data (beat.core.baseformat.baseformat) The block of data to write

      start_data_index (int): Start index of the written data

      end_data_index (int): End index of the written data

    """
    for data_source in self.data_sources:
      data_source.add(data, start_data_index, end_data_index)

  def isConnected(self):
    return len(self.data_sources) > 0


def load_data_index(cache_prefix, hash_path):
  """Loads a cached-data index if it exists. Returns empty otherwise.

  Parameters:

    cache_prefix (str): The path to the root of the cache directory

    hash_path (str): The hashed path of the input you wish to load the indexes
      for, as it is returned by the utility function
      :py:func:`beat.core.hash.toPath`.


  Returns:

    A list, which will be empty if the index file is not present. Note that,
    given the current design, an empty list means an error condition.

  """

  name_no_extension = os.path.splitext(hash_path)[0]  # remove .data
  index_stem = os.path.join(cache_prefix, name_no_extension)
  index_glob = index_stem + '*.index'

  candidates = glob.glob(index_glob)

  assert candidates, "No index file matching the pattern `%s' found." % \
      index_glob

  retval = []
  end_index = 0
  for filename in candidates:
    with open(filename, 'rt') as f:
      data = [k.split() for k in f.readlines() if k.strip()]
      start = [int(k[0]) for k in data]
      end = int(data[-1][1])  # last index

    # checks if the sum exists and is correct, only appends in that case
    # returns an empty list otherwise, as these indices are considered
    # invalid.
    expected_chksum = open(filename + '.checksum', 'rt').read().strip()

    current_chksum = hashFileContents(filename)
    assert expected_chksum == current_chksum, "index file `%s' has a " \
        "checksum (%s) that differs from expected one (%s)" % \
        (filename, current_chksum, expected_chksum)

    # else, group indices
    retval.extend(start)
    if end > end_index:
      end_index = end

  return sorted(retval) + [end_index + 1]


def _foundCommonIndices(lst):
  """Returns the list of common indices, given a list of list of indices
  """

  if lst == []:
    return lst
  lst_set = [set(k) for k in lst]
  common_indices = sorted(list(reduce(set.intersection, lst_set)))
  return common_indices


def foundSplitRanges(lst, n_split):
  """Splits a list of lists of indices into n splits for parallelization
  purposes. """

  if [] in lst or lst == []:
    return []
  ci_lst = _foundCommonIndices(lst)
  res = []
  average_length = (float)(ci_lst[-1]) / n_split
  c = 0
  s = 0
  for i in range(1, len(ci_lst)):
    if (ci_lst[i] - ci_lst[s] >= average_length and c < n_split - 1) or \
            len(ci_lst) - i <= n_split - c:
      res.append((ci_lst[s], ci_lst[i] - 1))
      s = i
      c += 1
  return res