data.py 33.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


29
30
31
32
33
34
35
"""
====
data
====

Data I/O classes and functions
"""
36
37
38
39
40
41
42

import os
import re
import glob
import simplejson as json
import time
import abc
43
import zmq
44
45
46
import logging
import six

47
from functools import reduce
48
from collections import namedtuple
49
50
51
52
53

from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm

54
logger = logging.getLogger(__name__)
55

56
# ----------------------------------------------------------
57
58


59
class RemoteException(Exception):
60
    """Exception happening on a remote location"""
61
62
63
64
65
66
67
68
69
70
71

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message

72
    def __str__(self):
73
74
        if self.system_error:
            return '(sys) {}'.format(self.system_error)
75
        else:
76
            return '(usr) {}'.format(self.user_error)
77

78

79
# ----------------------------------------------------------
80
81


82
83
84
85
def mixDataIndices(list_of_data_indices):
    """Given a collection of lists of data indices (belonging to separate
    but synchronized files/inputs), returns the most granular list of
    indices that span all the data
86

87
    For example, the mix of
88

89
      [(0, 2), (3, 4)]
90

91
    and
92

93
      [(0, 4)]
94

95
    is:
96

97
      [(0, 2), (3, 4)]
98
99


100
    The mix of
101

102
      [(0, 2), (3, 4)]
103

104
    and
105

106
      [(0, 1), (2, 3), (4, 4)]
107

108
    is:
109

110
      [(0, 1), (2, 2), (3, 3), (4, 4)]
111

Philip ABBET's avatar
Philip ABBET committed
112
    """
113

114
115
    start = max([ x[0][0] for x in list_of_data_indices ])
    end = min([ x[-1][1] for x in list_of_data_indices ])
116

117
118
    result = []
    current_start = start
119

120
121
    for index in range(start, end + 1):
        done = False
122

123
124
125
126
127
128
129
        for l in list_of_data_indices:
            for indices in l:
                if indices[1] == index:
                    result.append( (current_start, index) )
                    current_start = index + 1
                    done = True
                    break
130

131
132
            if done:
                break
133

134
    return result
135
136


137
# ----------------------------------------------------------
138
139


140
141
142
def getAllFilenames(filename, start_index=None, end_index=None):
    """Returns the names of all the files related to the given data file,
    taking the provided start and end indices into account.
143
144


145
    Parameters:
146

147
        filename (str): Name of the data file (path/to/cache/<hash>.data)
148

149
150
        start_index (int): The starting index (if not set or set to
            ``None``, the default, equivalent to ``0``)
151

152
153
        end_index (int): The end index (if not set or set to ``None``, the
            default, equivalent to ``the last existing data``)
154
155


156
    Returns:
157

158
159
        (data_filenames, indices_filenames,
         data_checksum_filenames, indices_checksum_filenames)
160
    """
161

162
    index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
163

164
165
166
167
168
169
    def file_start(f):
        """Returns the converted start indexes from a filename, otherwise 0"""
        r = index_re.match(f)
        if r:
            return int(r.group(1))
        return 0
170
171


172
173
174
    # Retrieve all the related files
    basename, ext = os.path.splitext(filename)
    filenames = sorted(glob.glob(basename + '*'), key=file_start)
175

176
177
178
179
180
181
182
183
184
185
186
187
188
    # (If necessary) Only keep files containing the desired indices
    if (start_index is not None) or (end_index is not None):
        filtered_filenames = []
        for f in filenames:
            match = index_re.match(f)
            if match:
                start = int(match.group(1))
                end = int(match.group(2))
                if ((start_index is not None) and (end < start_index)) or \
                   ((end_index is not None) and (start > end_index)):
                    continue
            filtered_filenames.append(f)
        filenames = filtered_filenames
189

190
191
192
193
194
    # Separate the filenames in different lists
    data_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data$', x) ]
    indices_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index$', x) ]
    data_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data.checksum$', x) ]
    indices_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index.checksum$', x) ]
195

196
    return (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames)
197
198


199
# ----------------------------------------------------------
200
201


202
203
204
205
206
207
208
class DataSource(object):
    """Base class to load data from some source"""

    def __init__(self):
        self.infos         = []
        self.read_duration = 0
        self.nb_bytes_read = 0
209
        self.ready         = False
210
211
212
213
214
215
216
217
218
219
220
221


    def close(self):
        self.infos = []


    def __del__(self):
        """Makes sure all resources are released when the object is deleted"""
        self.close()


    def __len__(self):
222
223
224
        if not self.ready:
            self._prepare()

225
226
227
228
        return len(self.infos)


    def __iter__(self):
229
230
231
        if not self.ready:
            self._prepare()

232
233
234
235
236
237
238
239
240
        for i in range(0, len(self.infos)):
            yield self[i]


    def __getitem__(self, index):
        raise NotImplemented()


    def first_data_index(self):
241
242
243
        if not self.ready:
            self._prepare()

244
245
246
247
        return self.infos[0].start_index


    def last_data_index(self):
248
249
250
        if not self.ready:
            self._prepare()

251
252
253
254
        return self.infos[-1].end_index


    def data_indices(self):
255
256
257
        if not self.ready:
            self._prepare()

258
259
260
261
        return [ (x.start_index, x.end_index) for x in self.infos ]


    def getAtDataIndex(self, data_index):
262
263
264
        if not self.ready:
            self._prepare()

265
266
267
268
269
270
271
272
273
274
275
276
        for index, infos in enumerate(self.infos):
            if (infos.start_index <= data_index) and (data_index <= infos.end_index):
                return self[index]

        return (None, None, None)


    def statistics(self):
        """Return the statistics about the number of bytes read"""
        return (self.nb_bytes_read, self.read_duration)


277
278
279
280
    def _prepare(self):
        self.ready = True


281
# ----------------------------------------------------------
282
283
284


class CachedDataSource(DataSource):
285
    """Utility class to load data from a file in the cache"""
Philip ABBET's avatar
Philip ABBET committed
286
287

    def __init__(self):
288
289
        super(CachedDataSource, self).__init__()

290
291
292
293
294
295
296
297
298
299
300
        self.filenames          = None
        self.encoding           = None  # Must be 'binary' or 'json'
        self.prefix             = None
        self.dataformat         = None
        self.current_file       = None
        self.current_file_index = None
        self.unpack             = True


    def _readHeader(self, file):
        """Read the header of the provided file"""
Philip ABBET's avatar
Philip ABBET committed
301
302

        # Read file format
303
304
305
        self.encoding = file.readline()[:-1]
        if not isinstance(self.encoding, str):
            self.encoding = self.encoding.decode('utf8')
Philip ABBET's avatar
Philip ABBET committed
306

307
        if self.encoding not in ('binary', 'json'):
Philip ABBET's avatar
Philip ABBET committed
308
            raise RuntimeError("valid formats for data reading are 'binary' "
309
                               "or 'json': the format `%s' is invalid" % self.encoding)
Philip ABBET's avatar
Philip ABBET committed
310
311

        # Read data format
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
        dataformat_name = file.readline()[:-1]

        if self.dataformat is None:
            if not isinstance(dataformat_name, str):
                dataformat_name = dataformat_name.decode('utf8')

            if dataformat_name.startswith('analysis:'):
                algo_name = dataformat_name.split(':')[1]
                algo = Algorithm(self.prefix, algo_name)
                if not algo.valid:
                    raise RuntimeError("the dataformat `%s' is the result of an " \
                                       "algorithm which is not valid" % algo_name)
                self.dataformat = algo.result_dataformat()
            else:
                self.dataformat = DataFormat(self.prefix, dataformat_name)

            if not self.dataformat.valid:
                raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)
330

Philip ABBET's avatar
Philip ABBET committed
331
        return True
332

333
334

    def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
335
        """Configures the data source
336
337


Philip ABBET's avatar
Philip ABBET committed
338
        Parameters:
339

Philip ABBET's avatar
Philip ABBET committed
340
          filename (str): Name of the file to read the data from
341

342
          prefix (str): Establishes the prefix of your installation.
343

344
          start_index (int): The starting index (if not set or set to
Philip ABBET's avatar
Philip ABBET committed
345
            ``None``, the default, read data from the begin of file)
346

347
          end_index (int): The end index (if not set or set to ``None``, the
Philip ABBET's avatar
Philip ABBET committed
348
            default, reads the data until the end)
349

Philip ABBET's avatar
Philip ABBET committed
350
          unpack (bool): Indicates if the data must be unpacked or not
351
352


Philip ABBET's avatar
Philip ABBET committed
353
        Returns:
354

Philip ABBET's avatar
Philip ABBET committed
355
          ``True``, if successful, or ``False`` otherwise.
356

Philip ABBET's avatar
Philip ABBET committed
357
358
        """
        index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
359

360
        def check_consistency(data_filenames, checksum_filenames):
Philip ABBET's avatar
Philip ABBET committed
361
            """Perform some sanity check on the data/checksum files on disk:
362

Philip ABBET's avatar
Philip ABBET committed
363
364
365
366
              1. One-to-one mapping between data and checksum files
              2. Checksum comparison between hash(data) and checksum files
              3. Contiguous indices if they are present
            """
367

368
369
            # Make sure that we have a perfect match between data files and
            # checksum files
Philip ABBET's avatar
Philip ABBET committed
370
            checksum_filenames_noext = [os.path.splitext(f)[0] for f in checksum_filenames]
371

Philip ABBET's avatar
Philip ABBET committed
372
373
            if data_filenames != checksum_filenames_noext:
                raise IOError("number of data files and checksum files for `%s' " \
374
375
                              "does not match (%d != %d)" % (filename, len(data_filenames),
                              len(checksum_filenames_noext)))
376

Philip ABBET's avatar
Philip ABBET committed
377
378
379
380
381
382
383
            # list of start/end indices to check that there are contiguous
            indices = []
            for f_data, f_chck in zip(data_filenames, checksum_filenames):
                expected_chksum = open(f_chck, 'rt').read().strip()
                current_chksum = hashFileContents(f_data)
                if expected_chksum != current_chksum:
                    raise IOError("data file `%s' has a checksum (%s) that differs " \
384
385
                                  "from expected one (%s)" % (f_data, current_chksum,
                                  expected_chksum))
386

Philip ABBET's avatar
Philip ABBET committed
387
                r = index_re.match(f_data)
388
389
                if r:
                    indices.append((int(r.group(1)), int(r.group(2))))
390

Philip ABBET's avatar
Philip ABBET committed
391
392
            indices = sorted(indices, key=lambda v: v[0])
            ok_indices = True
393

394
395
396
            if len(indices) > 1:
                ok_indices = sum([ (indices[i + 1][0] - indices[i][1] == 1)
                                   for i in range(len(indices) - 1) ])
397

Philip ABBET's avatar
Philip ABBET committed
398
399
            if not ok_indices:
                raise IOError("data file `%s' have missing indices." % f_data)
400

401

Philip ABBET's avatar
Philip ABBET committed
402
        self.prefix = prefix
403
        self.unpack = unpack
404
405


406
407
408
        # Retrieve the list of all needed files
        (self.filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \
            getAllFilenames(filename, start_index, end_index)
409

410
411
412
        if len(self.filenames) == 0:
            return False

413
        check_consistency(self.filenames, data_checksum_filenames)
414
415


416
417
        # Load all the needed infos from all the files
        FileInfos = namedtuple('FileInfos', ['file_index', 'start_index', 'end_index', 'offset', 'size'])
418

419
420
421
422
423
424
        for file_index, current_filename in enumerate(self.filenames):
            try:
                f = open(current_filename, 'rb')
            except Exception as e:
                logger.warn("Could not setup `%s': %s" % (filename, e))
                return False
425

426
427
428
            # Reads the header of the current file
            self._readHeader(f)
            offset = f.tell()
429

430
431
432
            # Process each data unit from the file
            while True:
                line = f.readline()
Samuel GAIST's avatar
Samuel GAIST committed
433
                if not line:
434
                    break
435

436
                offset += len(line)
437

438
                (start, end, data_size) = [ int(x) for x in line.split() ]
439

440
441
442
443
                if ((start_index is None) or (start >= start_index)) and \
                   ((end_index is None) or (end <= end_index)):
                    self.infos.append(FileInfos(file_index=file_index, start_index=start,
                                                end_index=end, offset=offset, size=data_size))
444

445
446
                f.seek(data_size, 1)
                offset += data_size
447

448
            f.close()
449

Philip ABBET's avatar
Philip ABBET committed
450
        return True
451

452

Philip ABBET's avatar
Philip ABBET committed
453
    def close(self):
454
455
        if self.current_file is not None:
            self.current_file.close()
456

457
        super(CachedDataSource, self).close()
458
459
460
461


    def __getitem__(self, index):
        """Retrieve a block of data
462

Philip ABBET's avatar
Philip ABBET committed
463
        Returns:
464

Philip ABBET's avatar
Philip ABBET committed
465
          A tuple (data, start_index, end_index)
466

Philip ABBET's avatar
Philip ABBET committed
467
        """
468

469
470
471
        if not self.ready:
            self._prepare()

472
        if (index < 0) or (index >= len(self.infos)):
Philip ABBET's avatar
Philip ABBET committed
473
            return (None, None, None)
474

475
476
477
478
479
480
481
482
483
484
485
486
487
488
        infos = self.infos[index]

        if self.current_file_index != infos.file_index:
            if self.current_file is not None:
                self.current_file.close()
                self.current_file = None

            try:
                self.current_file = open(self.filenames[infos.file_index], 'rb')
                self.current_file_index = infos.file_index
            except Exception as e:
                raise IOError("Could not read `%s': %s" % (self.filenames[infos.file_index], e))

        self.current_file.seek(infos.offset, 0)
489

490
491
492
        t1 = time.time()
        encoded_data = self.current_file.read(infos.size)
        t2 = time.time()
493

494
495
        self.read_duration += t2 - t1
        self.nb_bytes_read += infos.size
496

497
        if self.unpack:
Philip ABBET's avatar
Philip ABBET committed
498
            data = self.dataformat.type()
499
            data.unpack(encoded_data)
Philip ABBET's avatar
Philip ABBET committed
500
501
        else:
            data = encoded_data
502

503
        return (data, infos.start_index, infos.end_index)
504
505


506
# ----------------------------------------------------------
507
508


509
510
class DatabaseOutputDataSource(DataSource):
    """Utility class to load data from an output of a database view"""
511

512
513
    def __init__(self):
        super(DatabaseOutputDataSource, self).__init__()
514

515
516
517
518
519
        self.prefix        = None
        self.dataformat    = None
        self.view          = None
        self.output_name   = None
        self.pack          = True
520
521


522
523
524
    def setup(self, view, output_name, dataformat_name, prefix, start_index=None,
              end_index=None, pack=False):
        """Configures the data source
525
526


527
        Parameters:
528

529
          prefix (str): Establishes the prefix of your installation.
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598

          start_index (int): The starting index (if not set or set to
            ``None``, the default, read data from the begin of file)

          end_index (int): The end index (if not set or set to ``None``, the
            default, reads the data until the end)

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

        self.prefix      = prefix
        self.view        = view
        self.output_name = output_name
        self.pack        = pack

        self.dataformat = DataFormat(self.prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)


        # Load all the needed infos from all the files
        Infos = namedtuple('Infos', ['start_index', 'end_index'])

        objects = self.view.objects()

        start = None
        end = None
        previous_value = None

        for index, obj in enumerate(objects):
            if start is None:
                start = index
                previous_value = getattr(obj, output_name)
            elif getattr(obj, output_name) != previous_value:
                end = index - 1
                previous_value = None

                if ((start_index is None) or (start >= start_index)) and \
                   ((end_index is None) or (end <= end_index)):
                    self.infos.append(Infos(start_index=start, end_index=end))

                start = index
                previous_value = getattr(obj, output_name)

        end = index

        if ((start_index is None) or (start >= start_index)) and \
           ((end_index is None) or (end <= end_index)):
            self.infos.append(Infos(start_index=start, end_index=end))

        return True


    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

599
600
601
        if not self.ready:
            self._prepare()

602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        t1 = time.time()
        data = self.view.get(self.output_name, infos.start_index)
        t2 = time.time()

        self.read_duration += t2 - t1

        if isinstance(data, dict):
            d = self.dataformat.type()
            d.from_dict(data, casting='safe', add_defaults=False)
            data = d

618
        if self.pack:
619
620
621
622
            data = data.pack()
            self.nb_bytes_read += len(data)

        return (data, infos.start_index, infos.end_index)
623

624

625
# ----------------------------------------------------------
626
627


628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
class RemoteDataSource(DataSource):
    """Utility class to load data from a data source accessible via a socket"""

    def __init__(self):
        super(RemoteDataSource, self).__init__()

        self.socket     = None
        self.input_name = None
        self.dataformat = None
        self.unpack     = True


    def setup(self, socket, input_name, dataformat_name, prefix, unpack=True):
        """Configures the data source


        Parameters:

646
          socket (zmq.Socket): The socket to use to access the data.
647
648
649
650
651

          input_name (str): Name of the input corresponding to the data source.

          dataformat_name (str): Name of the data format.

652
          prefix (str): Establishes the prefix of your installation.
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

        self.socket     = socket
        self.input_name = input_name
        self.unpack     = unpack

        self.dataformat = DataFormat(prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        return True


    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

684
685
686
        if not self.ready:
            self._prepare()

687
688
689
690
691
692
693
694
695
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        logger.debug('send: (get) get %s %d', self.input_name, index)

        t1 = time.time()

696
697
698
        self.socket.send_string('get', zmq.SNDMORE)
        self.socket.send_string(self.input_name, zmq.SNDMORE)
        self.socket.send_string('%d' % index)
699

700
        answer = self.socket.recv().decode('utf-8')
701
702

        if answer == 'err':
703
            self.read_duration += time.time() - t1
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        start = int(answer)
        end = int(self.socket.recv())

        packed = self.socket.recv()

        t2 = time.time()

        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed), start, end)

        self.nb_bytes_read += len(packed)

        if self.unpack:
            data = self.dataformat.type()
            data.unpack(packed)
        else:
            data = packed

        self.read_duration += t2 - t1

        return (data, infos.start_index, infos.end_index)


730
731
732
733
734
735
    def _prepare(self):
        # Load the needed infos from the socket
        Infos = namedtuple('Infos', ['start_index', 'end_index'])

        logger.debug('send: (ifo) infos %s', self.input_name)

736
737
        self.socket.send_string('ifo', zmq.SNDMORE)
        self.socket.send_string(self.input_name)
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755

        answer = self.socket.recv()
        logger.debug('recv: %s', answer)

        if answer == 'err':
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        nb = int(answer)
        for i in range(nb):
            start = int(self.socket.recv())
            end = int(self.socket.recv())
            self.infos.append(Infos(start_index=start, end_index=end))

        self.ready = True


756
# ----------------------------------------------------------
757
758


759
760
761
class DataSink(object):

    """Interface of all the Data Sinks
762

763
764
765
    Data Sinks are used by the outputs of an algorithm to write/transmit data.
    """
    __metaclass__ = abc.ABCMeta
766

767
768
769
    @abc.abstractmethod
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data
770

771
        Parameters:
772

773
          data (baseformat.baseformat): The block of data to write
774

775
          start_data_index (int): Start index of the written data
776

777
          end_data_index (int): End index of the written data
778

779
        """
780

781
        pass
782

783
784
    @abc.abstractmethod
    def isConnected(self):
785
        """Returns whether the data sink is connected"""
786

787
        pass
788

789
    def close(self):
790
791
        """Closes the data sink"""

792
793
794
        pass


795
# ----------------------------------------------------------
796
797


798
799
800
801
class StdoutDataSink(DataSink):

    """Data Sink that prints informations about the written data on stdout

802
    Note: The written data is lost! Use this class for debugging purposes
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
    """

    def __init__(self):
        super(StdoutDataSink, self).__init__()
        self.dataformat = None
        self.prefix = ''
        self.display_data = True

    def setup(self, dataformat, prefix=None, display_data=True):
        self.dataformat = dataformat
        self.display_data = display_data

        if prefix is not None:
            self.prefix = prefix
            if self.prefix != '':
                self.prefix += ' '

    def write(self, data, start_data_index, end_data_index):
        """Write a block of data

        Parameters:

825
          data (baseformat.baseformat) The block of data to write
826
827
828
829
830
831

          start_data_index (int): Start index of the written data

          end_data_index (int): End index of the written data

        """
832

833
        if self.display_data:
834
835
            print('%s(%d -> %d): %s' % \
                (self.prefix, start_data_index, end_data_index, str(data)))
836
        else:
837
838
            print('%s(%d -> %d): <data>' % \
                (self.prefix, start_data_index, end_data_index))
839
840
841
842
843
844


    def isConnected(self):
        return True


845
# ----------------------------------------------------------
846
847


Philip ABBET's avatar
Philip ABBET committed
848
class CachedDataSink(DataSink):
849

Philip ABBET's avatar
Philip ABBET committed
850
    """Data Sink that save data in the Cache
851

Philip ABBET's avatar
Philip ABBET committed
852
    The default behavior is to save the data in a binary format.
853
854
    """

Philip ABBET's avatar
Philip ABBET committed
855
856
857
858
    def __init__(self):
        self.filename = None
        self.encoding = None
        self.dataformat = None
859
860
        self.start_index = None
        self.end_index = None
861

862
863
864
        self.data_file = None
        self.index_file = None
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
865

866
867
        self.nb_bytes_written = 0
        self.write_duration = 0
Philip ABBET's avatar
Philip ABBET committed
868
869


870
    def setup(self, filename, dataformat, start_index, end_index, encoding='binary'):
Philip ABBET's avatar
Philip ABBET committed
871
872
873
874
875
876
        """Configures the data sink

        Parameters:

          filename (str): Name of the file to generate

877
          dataformat (dataformat.DataFormat): The dataformat to be used
878
879
            inside this file. All objects stored inside this file will respect
            that format.
Philip ABBET's avatar
Philip ABBET committed
880

881
882
883
          encoding (str): String defining the encoding to be used for encoding
            the data. Only a few options are supported: ``binary``
            (the default) or ``json`` (debugging purposes).
Philip ABBET's avatar
Philip ABBET committed
884
885
886

        """

887
888
889
        # Close current file if open
        self.close()

Philip ABBET's avatar
Philip ABBET committed
890
        if encoding not in ('binary', 'json'):
891
            raise RuntimeError("valid formats for data writing are 'binary' "
Philip ABBET's avatar
Philip ABBET committed
892
893
894
                               "or 'json': the format `%s' is invalid" % format)

        if dataformat.name == '__unnamed_dataformat__':
895
            raise RuntimeError("cannot record data using an unnamed data format")
Philip ABBET's avatar
Philip ABBET committed
896

897
        filename, data_ext = os.path.splitext(filename)
Philip ABBET's avatar
Philip ABBET committed
898

899
900
901
902
903
        self.filename = '%s.%d.%d%s' % (filename, start_index, end_index, data_ext)
        self.encoding = encoding
        self.dataformat = dataformat
        self.start_index = start_index
        self.end_index = end_index
Philip ABBET's avatar
Philip ABBET committed
904

905
906
907
        self.nb_bytes_written = 0
        self.write_duration = 0
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
908

909
910
911
912
913
        try:
            self.data_file = open(self.filename, 'wb')
            self.index_file = open(self.filename.replace('.data', '.index'), 'wt')
        except:
            return False
Philip ABBET's avatar
Philip ABBET committed
914

915
916
917
        # Write the dataformat
        self.data_file.write(six.b('%s\n%s\n' % (self.encoding, self.dataformat.name)))
        self.data_file.flush()
Philip ABBET's avatar
Philip ABBET committed
918
919
920

        return True

921
922

    def close(self):
Philip ABBET's avatar
Philip ABBET committed
923
924
925
        """Closes the data sink
        """

926
927
928
        if self.data_file is not None:
            self.data_file.close()
            self.index_file.close()
Philip ABBET's avatar
Philip ABBET committed
929

930
931
932
            # If file is not complete, delete it
            if (self.last_written_data_index is None) or \
               (self.last_written_data_index < self.end_index):
Philip ABBET's avatar
Philip ABBET committed
933
                try:
934
935
936
                    os.remove(self.filename)
                    os.remove(self.filename.replace('.data', '.index'))
                    return True
Philip ABBET's avatar
Philip ABBET committed
937
938
939
                except:
                    return False

940
941
942
            # Creates the checksums for all data and indexes
            chksum_data = hashFileContents(self.filename)
            with open(self.filename + '.checksum', 'wt') as f:
Philip ABBET's avatar
Philip ABBET committed
943
944
                f.write(chksum_data)

945
946
947
948
            index_filename = self.filename.replace('.data', '.index')
            chksum_index = hashFileContents(index_filename)
            with open(index_filename + '.checksum', 'wt') as f:
                f.write(chksum_index)
Philip ABBET's avatar
Philip ABBET committed
949

950
951
952
            self.data_file = None
            self.index_file = None
            self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
953

954
        return True
955
956


Philip ABBET's avatar
Philip ABBET committed
957
    def __del__(self):
958
        """Make sure the files are closed when the object is deleted
Philip ABBET's avatar
Philip ABBET committed
959
960
        """
        self.close()
961

962

Philip ABBET's avatar
Philip ABBET committed
963
964
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data to the filesystem
965

Philip ABBET's avatar
Philip ABBET committed
966
        Parameters:
967

968
          data (baseformat.baseformat): The block of data to write
969

Philip ABBET's avatar
Philip ABBET committed
970
          start_data_index (int): Start index of the written data
971

Philip ABBET's avatar
Philip ABBET committed
972
          end_data_index (int): End index of the written data
973

Philip ABBET's avatar
Philip ABBET committed
974
        """
975

976
        # If the user passed a dictionary - convert it
Philip ABBET's avatar
Philip ABBET committed
977
978
979
980
981
982
983
        if isinstance(data, dict):
            data = self.dataformat.type(**data)
        else:
            # Checks that the input data conforms to the expected format
            if data.__class__._name != self.dataformat.name:
                raise TypeError("input data uses format `%s' while this sink "
                        "expects `%s'" % (data.__class__._name, self.dataformat))
984

985
986
        if self.data_file is None:
            raise RuntimeError("No destination file")
987

988
        # Encoding
Philip ABBET's avatar
Philip ABBET committed
989
990
991
992
993
        if self.encoding == 'binary':
            encoded_data = data.pack()
        else:
            from .utils import NumpyJSONEncoder
            encoded_data = json.dumps(data.as_dict(), indent=4, cls=NumpyJSONEncoder)
994

995
        # Adds a new line by the end of the encoded data
Philip ABBET's avatar
Philip ABBET committed
996
        encoded_data += six.b('\n')
997

Philip ABBET's avatar
Philip ABBET committed
998
        informations = six.b('%d %d %d\n' % (start_data_index,
999
                             end_data_index, len(encoded_data)))
1000

For faster browsing, not all history is shown. View entire blame