data.py 31.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""Data I/O classes and functions"""

import os
import re
import glob
import simplejson as json
import select
import time
import tempfile
import abc
39
import zmq
40
from functools import reduce
41
from collections import namedtuple
42
43
44
45
46
47
48
49
50
51

import logging
logger = logging.getLogger(__name__)

import six
from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm


52
53
54
#----------------------------------------------------------


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message


#----------------------------------------------------------


71
72
73
74
def mixDataIndices(list_of_data_indices):
    """Given a collection of lists of data indices (belonging to separate
    but synchronized files/inputs), returns the most granular list of
    indices that span all the data
75

76
    For example, the mix of
77

78
      [(0, 2), (3, 4)]
79

80
    and
81

82
      [(0, 4)]
83

84
    is:
85

86
      [(0, 2), (3, 4)]
87
88


89
    The mix of
90

91
      [(0, 2), (3, 4)]
92

93
    and
94

95
      [(0, 1), (2, 3), (4, 4)]
96

97
    is:
98

99
      [(0, 1), (2, 2), (3, 3), (4, 4)]
100

Philip ABBET's avatar
Philip ABBET committed
101
    """
102

103
104
    start = max([ x[0][0] for x in list_of_data_indices ])
    end = min([ x[-1][1] for x in list_of_data_indices ])
105

106
107
    result = []
    current_start = start
108

109
110
    for index in range(start, end + 1):
        done = False
111

112
113
114
115
116
117
118
        for l in list_of_data_indices:
            for indices in l:
                if indices[1] == index:
                    result.append( (current_start, index) )
                    current_start = index + 1
                    done = True
                    break
119

120
121
            if done:
                break
122

123
    return result
124
125


126
#----------------------------------------------------------
127
128


129
130
131
def getAllFilenames(filename, start_index=None, end_index=None):
    """Returns the names of all the files related to the given data file,
    taking the provided start and end indices into account.
132
133


134
    Parameters:
135

136
        filename (str): Name of the data file (path/to/cache/<hash>.data)
137

138
139
        start_index (int): The starting index (if not set or set to
            ``None``, the default, equivalent to ``0``)
140

141
142
        end_index (int): The end index (if not set or set to ``None``, the
            default, equivalent to ``the last existing data``)
143
144


145
    Returns:
146

147
        (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames)
148

149
    """
150

151
    index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
152

153
154
155
156
157
158
    def file_start(f):
        """Returns the converted start indexes from a filename, otherwise 0"""
        r = index_re.match(f)
        if r:
            return int(r.group(1))
        return 0
159
160


161
162
163
    # Retrieve all the related files
    basename, ext = os.path.splitext(filename)
    filenames = sorted(glob.glob(basename + '*'), key=file_start)
164

165
166
167
168
169
170
171
172
173
174
175
176
177
    # (If necessary) Only keep files containing the desired indices
    if (start_index is not None) or (end_index is not None):
        filtered_filenames = []
        for f in filenames:
            match = index_re.match(f)
            if match:
                start = int(match.group(1))
                end = int(match.group(2))
                if ((start_index is not None) and (end < start_index)) or \
                   ((end_index is not None) and (start > end_index)):
                    continue
            filtered_filenames.append(f)
        filenames = filtered_filenames
178

179
180
181
182
183
    # Separate the filenames in different lists
    data_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data$', x) ]
    indices_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index$', x) ]
    data_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data.checksum$', x) ]
    indices_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index.checksum$', x) ]
184

185
    return (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames)
186
187
188
189
190


#----------------------------------------------------------


191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
class DataSource(object):
    """Base class to load data from some source"""

    def __init__(self):
        self.infos         = []
        self.read_duration = 0
        self.nb_bytes_read = 0


    def close(self):
        self.infos = []


    def __del__(self):
        """Makes sure all resources are released when the object is deleted"""
        self.close()


    def __len__(self):
        return len(self.infos)


    def __iter__(self):
        for i in range(0, len(self.infos)):
            yield self[i]


    def __getitem__(self, index):
        raise NotImplemented()


    def first_data_index(self):
        return self.infos[0].start_index


    def last_data_index(self):
        return self.infos[-1].end_index


    def data_indices(self):
        return [ (x.start_index, x.end_index) for x in self.infos ]


    def getAtDataIndex(self, data_index):
        for index, infos in enumerate(self.infos):
            if (infos.start_index <= data_index) and (data_index <= infos.end_index):
                return self[index]

        return (None, None, None)


    def statistics(self):
        """Return the statistics about the number of bytes read"""
        return (self.nb_bytes_read, self.read_duration)


#----------------------------------------------------------


class CachedDataSource(DataSource):
251
    """Utility class to load data from a file in the cache"""
Philip ABBET's avatar
Philip ABBET committed
252
253

    def __init__(self):
254
255
        super(CachedDataSource, self).__init__()

256
257
258
259
260
261
262
263
264
265
266
        self.filenames          = None
        self.encoding           = None  # Must be 'binary' or 'json'
        self.prefix             = None
        self.dataformat         = None
        self.current_file       = None
        self.current_file_index = None
        self.unpack             = True


    def _readHeader(self, file):
        """Read the header of the provided file"""
Philip ABBET's avatar
Philip ABBET committed
267
268

        # Read file format
269
270
271
        self.encoding = file.readline()[:-1]
        if not isinstance(self.encoding, str):
            self.encoding = self.encoding.decode('utf8')
Philip ABBET's avatar
Philip ABBET committed
272

273
        if self.encoding not in ('binary', 'json'):
Philip ABBET's avatar
Philip ABBET committed
274
            raise RuntimeError("valid formats for data reading are 'binary' "
275
                               "or 'json': the format `%s' is invalid" % self.encoding)
Philip ABBET's avatar
Philip ABBET committed
276
277

        # Read data format
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
        dataformat_name = file.readline()[:-1]

        if self.dataformat is None:
            if not isinstance(dataformat_name, str):
                dataformat_name = dataformat_name.decode('utf8')

            if dataformat_name.startswith('analysis:'):
                algo_name = dataformat_name.split(':')[1]
                algo = Algorithm(self.prefix, algo_name)
                if not algo.valid:
                    raise RuntimeError("the dataformat `%s' is the result of an " \
                                       "algorithm which is not valid" % algo_name)
                self.dataformat = algo.result_dataformat()
            else:
                self.dataformat = DataFormat(self.prefix, dataformat_name)

            if not self.dataformat.valid:
                raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)
296

Philip ABBET's avatar
Philip ABBET committed
297
        return True
298

299
300

    def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
301
        """Configures the data source
302
303


Philip ABBET's avatar
Philip ABBET committed
304
        Parameters:
305

Philip ABBET's avatar
Philip ABBET committed
306
          filename (str): Name of the file to read the data from
307

308
309
          prefix (beat.backend.python.utils.Prefix): Establishes the prefix of
            your installation.
310

311
          start_index (int): The starting index (if not set or set to
Philip ABBET's avatar
Philip ABBET committed
312
            ``None``, the default, read data from the begin of file)
313

314
          end_index (int): The end index (if not set or set to ``None``, the
Philip ABBET's avatar
Philip ABBET committed
315
            default, reads the data until the end)
316

Philip ABBET's avatar
Philip ABBET committed
317
          unpack (bool): Indicates if the data must be unpacked or not
318
319


Philip ABBET's avatar
Philip ABBET committed
320
        Returns:
321

Philip ABBET's avatar
Philip ABBET committed
322
          ``True``, if successful, or ``False`` otherwise.
323

Philip ABBET's avatar
Philip ABBET committed
324
325
        """
        index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
326

327
        def check_consistency(data_filenames, checksum_filenames):
Philip ABBET's avatar
Philip ABBET committed
328
            """Perform some sanity check on the data/checksum files on disk:
329

Philip ABBET's avatar
Philip ABBET committed
330
331
332
333
              1. One-to-one mapping between data and checksum files
              2. Checksum comparison between hash(data) and checksum files
              3. Contiguous indices if they are present
            """
334

Philip ABBET's avatar
Philip ABBET committed
335
336
337
            # Make sure that we have a perfect match between data files and checksum
            # files
            checksum_filenames_noext = [os.path.splitext(f)[0] for f in checksum_filenames]
338

Philip ABBET's avatar
Philip ABBET committed
339
340
            if data_filenames != checksum_filenames_noext:
                raise IOError("number of data files and checksum files for `%s' " \
341
342
                              "does not match (%d != %d)" % (filename, len(data_filenames),
                              len(checksum_filenames_noext)))
343

Philip ABBET's avatar
Philip ABBET committed
344
345
346
347
348
349
350
            # list of start/end indices to check that there are contiguous
            indices = []
            for f_data, f_chck in zip(data_filenames, checksum_filenames):
                expected_chksum = open(f_chck, 'rt').read().strip()
                current_chksum = hashFileContents(f_data)
                if expected_chksum != current_chksum:
                    raise IOError("data file `%s' has a checksum (%s) that differs " \
351
352
                                  "from expected one (%s)" % (f_data, current_chksum,
                                  expected_chksum))
353

Philip ABBET's avatar
Philip ABBET committed
354
                r = index_re.match(f_data)
355
356
                if r:
                    indices.append((int(r.group(1)), int(r.group(2))))
357

Philip ABBET's avatar
Philip ABBET committed
358
359
            indices = sorted(indices, key=lambda v: v[0])
            ok_indices = True
360

361
362
363
            if len(indices) > 1:
                ok_indices = sum([ (indices[i + 1][0] - indices[i][1] == 1)
                                   for i in range(len(indices) - 1) ])
364

Philip ABBET's avatar
Philip ABBET committed
365
366
            if not ok_indices:
                raise IOError("data file `%s' have missing indices." % f_data)
367

368

Philip ABBET's avatar
Philip ABBET committed
369
        self.prefix = prefix
370
        self.unpack = unpack
371
372


373
374
375
        # Retrieve the list of all needed files
        (self.filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \
            getAllFilenames(filename, start_index, end_index)
376

377
378
379
        if len(self.filenames) == 0:
            return False

380
        check_consistency(self.filenames, data_checksum_filenames)
381
382


383
384
        # Load all the needed infos from all the files
        FileInfos = namedtuple('FileInfos', ['file_index', 'start_index', 'end_index', 'offset', 'size'])
385

386
387
388
389
390
391
        for file_index, current_filename in enumerate(self.filenames):
            try:
                f = open(current_filename, 'rb')
            except Exception as e:
                logger.warn("Could not setup `%s': %s" % (filename, e))
                return False
392

393
394
395
            # Reads the header of the current file
            self._readHeader(f)
            offset = f.tell()
396

397
398
399
400
401
            # Process each data unit from the file
            while True:
                line = f.readline()
                if line == '':
                    break
402

403
                offset += len(line)
404

405
                (start, end, data_size) = [ int(x) for x in line.split() ]
406

407
408
409
410
                if ((start_index is None) or (start >= start_index)) and \
                   ((end_index is None) or (end <= end_index)):
                    self.infos.append(FileInfos(file_index=file_index, start_index=start,
                                                end_index=end, offset=offset, size=data_size))
411

412
413
                f.seek(data_size, 1)
                offset += data_size
414

415
            f.close()
416

Philip ABBET's avatar
Philip ABBET committed
417
        return True
418

419

Philip ABBET's avatar
Philip ABBET committed
420
    def close(self):
421
422
        if self.current_file is not None:
            self.current_file.close()
423

424
        super(CachedDataSource, self).close()
425
426
427
428


    def __getitem__(self, index):
        """Retrieve a block of data
429

Philip ABBET's avatar
Philip ABBET committed
430
        Returns:
431

Philip ABBET's avatar
Philip ABBET committed
432
          A tuple (data, start_index, end_index)
433

Philip ABBET's avatar
Philip ABBET committed
434
        """
435
436

        if (index < 0) or (index >= len(self.infos)):
Philip ABBET's avatar
Philip ABBET committed
437
            return (None, None, None)
438

439
440
441
442
443
444
445
446
447
448
449
450
451
452
        infos = self.infos[index]

        if self.current_file_index != infos.file_index:
            if self.current_file is not None:
                self.current_file.close()
                self.current_file = None

            try:
                self.current_file = open(self.filenames[infos.file_index], 'rb')
                self.current_file_index = infos.file_index
            except Exception as e:
                raise IOError("Could not read `%s': %s" % (self.filenames[infos.file_index], e))

        self.current_file.seek(infos.offset, 0)
453

454
455
456
        t1 = time.time()
        encoded_data = self.current_file.read(infos.size)
        t2 = time.time()
457

458
459
        self.read_duration += t2 - t1
        self.nb_bytes_read += infos.size
460

461
        if self.unpack:
Philip ABBET's avatar
Philip ABBET committed
462
            data = self.dataformat.type()
463
            data.unpack(encoded_data)
Philip ABBET's avatar
Philip ABBET committed
464
465
        else:
            data = encoded_data
466

467
        return (data, infos.start_index, infos.end_index)
468
469


470
#----------------------------------------------------------
471
472


473
474
class DatabaseOutputDataSource(DataSource):
    """Utility class to load data from an output of a database view"""
475

476
477
    def __init__(self):
        super(DatabaseOutputDataSource, self).__init__()
478

479
480
481
482
483
        self.prefix        = None
        self.dataformat    = None
        self.view          = None
        self.output_name   = None
        self.pack          = True
484
485


486
487
488
    def setup(self, view, output_name, dataformat_name, prefix, start_index=None,
              end_index=None, pack=False):
        """Configures the data source
489
490


491
        Parameters:
492

493
494
          prefix (beat.backend.python.utils.Prefix): Establishes the prefix of
            your installation.
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584

          start_index (int): The starting index (if not set or set to
            ``None``, the default, read data from the begin of file)

          end_index (int): The end index (if not set or set to ``None``, the
            default, reads the data until the end)

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

        self.prefix      = prefix
        self.view        = view
        self.output_name = output_name
        self.pack        = pack

        self.dataformat = DataFormat(self.prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)


        # Load all the needed infos from all the files
        Infos = namedtuple('Infos', ['start_index', 'end_index'])

        objects = self.view.objects()

        start = None
        end = None
        previous_value = None

        for index, obj in enumerate(objects):
            if start is None:
                start = index
                previous_value = getattr(obj, output_name)
            elif getattr(obj, output_name) != previous_value:
                end = index - 1
                previous_value = None

                if ((start_index is None) or (start >= start_index)) and \
                   ((end_index is None) or (end <= end_index)):
                    self.infos.append(Infos(start_index=start, end_index=end))

                start = index
                previous_value = getattr(obj, output_name)

        end = index

        if ((start_index is None) or (start >= start_index)) and \
           ((end_index is None) or (end <= end_index)):
            self.infos.append(Infos(start_index=start, end_index=end))

        return True


    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        t1 = time.time()
        data = self.view.get(self.output_name, infos.start_index)
        t2 = time.time()

        self.read_duration += t2 - t1

        if isinstance(data, dict):
            d = self.dataformat.type()
            d.from_dict(data, casting='safe', add_defaults=False)
            data = d

        if self.pack:            
            data = data.pack()
            self.nb_bytes_read += len(data)

        return (data, infos.start_index, infos.end_index)
585

586

587
588
589
#----------------------------------------------------------


590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
class RemoteDataSource(DataSource):
    """Utility class to load data from a data source accessible via a socket"""

    def __init__(self):
        super(RemoteDataSource, self).__init__()

        self.socket     = None
        self.input_name = None
        self.dataformat = None
        self.unpack     = True


    def setup(self, socket, input_name, dataformat_name, prefix, unpack=True):
        """Configures the data source


        Parameters:

          socket (socket): The socket to use to access the data.

          input_name (str): Name of the input corresponding to the data source.

          dataformat_name (str): Name of the data format.

614
615
          prefix (beat.backend.python.utils.Prefix): Establishes the prefix of
            your installation.
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

        self.socket     = socket
        self.input_name = input_name
        self.unpack     = unpack

        self.dataformat = DataFormat(prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)


        # Load the needed infos from the socket
        Infos = namedtuple('Infos', ['start_index', 'end_index'])

        logger.debug('send: (ifo) infos %s', self.input_name)

        self.socket.send('ifo', zmq.SNDMORE)
        self.socket.send(self.input_name)

        answer = self.socket.recv()
        logger.debug('recv: %s', answer)

        if answer == 'err':
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        nb = int(answer)
        for i in range(nb):
            start = int(self.socket.recv())
            end = int(self.socket.recv())
            self.infos.append(Infos(start_index=start, end_index=end))

        return True


    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        logger.debug('send: (get) get %s %d', self.input_name, index)

        t1 = time.time()

        self.socket.send('get', zmq.SNDMORE)
        self.socket.send(self.input_name, zmq.SNDMORE)
        self.socket.send('%d' % index)

        answer = self.socket.recv()

        if answer == 'err':
686
            self.read_duration += time.time() - t1
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        start = int(answer)
        end = int(self.socket.recv())

        packed = self.socket.recv()

        t2 = time.time()

        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed), start, end)

        self.nb_bytes_read += len(packed)

        if self.unpack:
            data = self.dataformat.type()
            data.unpack(packed)
        else:
            data = packed

        self.read_duration += t2 - t1

        return (data, infos.start_index, infos.end_index)


#----------------------------------------------------------


716
717
718
class DataSink(object):

    """Interface of all the Data Sinks
719

720
721
722
    Data Sinks are used by the outputs of an algorithm to write/transmit data.
    """
    __metaclass__ = abc.ABCMeta
723

724
725
726
    @abc.abstractmethod
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data
727

728
        Parameters:
729

730
          data (beat.core.baseformat.baseformat): The block of data to write
731

732
          start_data_index (int): Start index of the written data
733

734
          end_data_index (int): End index of the written data
735

736
        """
737

738
        pass
739

740
741
742
    @abc.abstractmethod
    def isConnected(self):
        pass
743
744


745
746
747
748
    def close(self):
        pass


749
#----------------------------------------------------------
750
751


752
753
754
755
class StdoutDataSink(DataSink):

    """Data Sink that prints informations about the written data on stdout

756
    Note: The written data is lost! Use this class for debugging purposes
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
    """

    def __init__(self):
        super(StdoutDataSink, self).__init__()
        self.dataformat = None
        self.prefix = ''
        self.display_data = True

    def setup(self, dataformat, prefix=None, display_data=True):
        self.dataformat = dataformat
        self.display_data = display_data

        if prefix is not None:
            self.prefix = prefix
            if self.prefix != '':
                self.prefix += ' '

    def write(self, data, start_data_index, end_data_index):
        """Write a block of data

        Parameters:

          data (beat.core.baseformat.baseformat) The block of data to write

          start_data_index (int): Start index of the written data

          end_data_index (int): End index of the written data

        """
786

787
788
789
790
791
792
793
794
795
796
797
798
799
        if self.display_data:
            print '%s(%d -> %d): %s' % (self.prefix, start_data_index, end_data_index, str(data))
        else:
            print '%s(%d -> %d): <data>' % (self.prefix, start_data_index, end_data_index)


    def isConnected(self):
        return True


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
800
class CachedDataSink(DataSink):
801

Philip ABBET's avatar
Philip ABBET committed
802
    """Data Sink that save data in the Cache
803

Philip ABBET's avatar
Philip ABBET committed
804
    The default behavior is to save the data in a binary format.
805
806
    """

Philip ABBET's avatar
Philip ABBET committed
807
808
809
810
    def __init__(self):
        self.filename = None
        self.encoding = None
        self.dataformat = None
811
812
        self.start_index = None
        self.end_index = None
813

814
815
816
        self.data_file = None
        self.index_file = None
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
817

818
819
        self.nb_bytes_written = 0
        self.write_duration = 0
Philip ABBET's avatar
Philip ABBET committed
820
821


822
    def setup(self, filename, dataformat, start_index, end_index, encoding='binary'):
Philip ABBET's avatar
Philip ABBET committed
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
        """Configures the data sink

        Parameters:

          filename (str): Name of the file to generate

          dataformat (beat.core.dataformat.DataFormat): The dataformat to be used
            inside this file. All objects stored inside this file will respect that
            format.

          encoding (str): String defining the encoding to be used for encoding the
            data. Only a few options are supported: ``binary`` (the default) or
            ``json`` (debugging purposes).

        """

839
840
841
        # Close current file if open
        self.close()

Philip ABBET's avatar
Philip ABBET committed
842
        if encoding not in ('binary', 'json'):
843
            raise RuntimeError("valid formats for data writing are 'binary' "
Philip ABBET's avatar
Philip ABBET committed
844
845
846
                               "or 'json': the format `%s' is invalid" % format)

        if dataformat.name == '__unnamed_dataformat__':
847
            raise RuntimeError("cannot record data using an unnamed data format")
Philip ABBET's avatar
Philip ABBET committed
848

849
        filename, data_ext = os.path.splitext(filename)
Philip ABBET's avatar
Philip ABBET committed
850

851
852
853
854
855
        self.filename = '%s.%d.%d%s' % (filename, start_index, end_index, data_ext)
        self.encoding = encoding
        self.dataformat = dataformat
        self.start_index = start_index
        self.end_index = end_index
Philip ABBET's avatar
Philip ABBET committed
856

857
858
859
        self.nb_bytes_written = 0
        self.write_duration = 0
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
860

861
862
863
864
865
        try:
            self.data_file = open(self.filename, 'wb')
            self.index_file = open(self.filename.replace('.data', '.index'), 'wt')
        except:
            return False
Philip ABBET's avatar
Philip ABBET committed
866

867
868
869
        # Write the dataformat
        self.data_file.write(six.b('%s\n%s\n' % (self.encoding, self.dataformat.name)))
        self.data_file.flush()
Philip ABBET's avatar
Philip ABBET committed
870
871
872

        return True

873
874

    def close(self):
Philip ABBET's avatar
Philip ABBET committed
875
876
877
        """Closes the data sink
        """

878
879
880
        if self.data_file is not None:
            self.data_file.close()
            self.index_file.close()
Philip ABBET's avatar
Philip ABBET committed
881

882
883
884
            # If file is not complete, delete it
            if (self.last_written_data_index is None) or \
               (self.last_written_data_index < self.end_index):
Philip ABBET's avatar
Philip ABBET committed
885
                try:
886
887
888
                    os.remove(self.filename)
                    os.remove(self.filename.replace('.data', '.index'))
                    return True
Philip ABBET's avatar
Philip ABBET committed
889
890
891
                except:
                    return False

892
893
894
            # Creates the checksums for all data and indexes
            chksum_data = hashFileContents(self.filename)
            with open(self.filename + '.checksum', 'wt') as f:
Philip ABBET's avatar
Philip ABBET committed
895
896
                f.write(chksum_data)

897
898
899
900
            index_filename = self.filename.replace('.data', '.index')
            chksum_index = hashFileContents(index_filename)
            with open(index_filename + '.checksum', 'wt') as f:
                f.write(chksum_index)
Philip ABBET's avatar
Philip ABBET committed
901

902
903
904
            self.data_file = None
            self.index_file = None
            self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
905

906
            return True
907
908


Philip ABBET's avatar
Philip ABBET committed
909
    def __del__(self):
910
        """Make sure the files are closed when the object is deleted
Philip ABBET's avatar
Philip ABBET committed
911
912
        """
        self.close()
913

914

Philip ABBET's avatar
Philip ABBET committed
915
916
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data to the filesystem
917

Philip ABBET's avatar
Philip ABBET committed
918
        Parameters:
919

Philip ABBET's avatar
Philip ABBET committed
920
          data (beat.core.baseformat.baseformat): The block of data to write
921

Philip ABBET's avatar
Philip ABBET committed
922
          start_data_index (int): Start index of the written data
923

Philip ABBET's avatar
Philip ABBET committed
924
          end_data_index (int): End index of the written data
925

Philip ABBET's avatar
Philip ABBET committed
926
        """
927

928
        # If the user passed a dictionary - convert it
Philip ABBET's avatar
Philip ABBET committed
929
930
931
932
933
934
935
        if isinstance(data, dict):
            data = self.dataformat.type(**data)
        else:
            # Checks that the input data conforms to the expected format
            if data.__class__._name != self.dataformat.name:
                raise TypeError("input data uses format `%s' while this sink "
                        "expects `%s'" % (data.__class__._name, self.dataformat))
936

937
938
        if self.data_file is None:
            raise RuntimeError("No destination file")
939

940
        # Encoding
Philip ABBET's avatar
Philip ABBET committed
941
942
943
944
945
        if self.encoding == 'binary':
            encoded_data = data.pack()
        else:
            from .utils import NumpyJSONEncoder
            encoded_data = json.dumps(data.as_dict(), indent=4, cls=NumpyJSONEncoder)
946

947
        # Adds a new line by the end of the encoded data
Philip ABBET's avatar
Philip ABBET committed
948
        encoded_data += six.b('\n')
949

Philip ABBET's avatar
Philip ABBET committed
950
        informations = six.b('%d %d %d\n' % (start_data_index,
951
                             end_data_index, len(encoded_data)))
952

Philip ABBET's avatar
Philip ABBET committed
953
        t1 = time.time()
954

955
956
        self.data_file.write(informations + encoded_data)
        self.data_file.flush()
957

Philip ABBET's avatar
Philip ABBET committed
958
        indexes = '%d %d\n' % (start_data_index, end_data_index)
959
960
        self.index_file.write(indexes)
        self.index_file.flush()
961

Philip ABBET's avatar
Philip ABBET committed
962
        t2 = time.time()
963

964
965
        self.nb_bytes_written += len(informations) + len(encoded_data) + len(indexes)
        self.write_duration += t2 - t1
966

967
        self.last_written_data_index = end_data_index
968
969


Philip ABBET's avatar
Philip ABBET committed
970
971
    def statistics(self):
        """Return the statistics about the number of bytes written to the cache"""
972
        return (self.nb_bytes_written, self.write_duration)
973
974


Philip ABBET's avatar
Philip ABBET committed
975
976
    def isConnected(self):
        return (self.filename is not None)
977
978


979
980
981
#----------------------------------------------------------


982
def load_data_index(cache_root, hash_path):
Philip ABBET's avatar
Philip ABBET committed
983
    """Loads a cached-data index if it exists. Returns empty otherwise.
984

Philip ABBET's avatar
Philip ABBET committed
985
    Parameters:
986

987
      cache_root (str): The path to the root of the cache directory
988

Philip ABBET's avatar
Philip ABBET committed
989
990
991
      hash_path (str): The hashed path of the input you wish to load the indexes
        for, as it is returned by the utility function
        :py:func:`beat.core.hash.toPath`.
992
993


Philip ABBET's avatar
Philip ABBET committed
994
    Returns:
995

Philip ABBET's avatar
Philip ABBET committed
996
997
      A list, which will be empty if the index file is not present. Note that,
      given the current design, an empty list means an error condition.
998

Philip ABBET's avatar
Philip ABBET committed
999
    """
1000

For faster browsing, not all history is shown. View entire blame