data.py 33.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


"""Data I/O classes and functions"""

import os
import re
import glob
import simplejson as json
import select
import time
import tempfile
import abc
39
import zmq
40
from functools import reduce
41
from collections import namedtuple
42
43
44
45
46
47
48
49
50
51

import logging
logger = logging.getLogger(__name__)

import six
from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm


52
53
54
#----------------------------------------------------------


55
56
57
58
59
60
61
62
63
64
65
66
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message

67
68
69
70
71
72
    def __str__(self):
        if self.system_error != '':
            return '(sys) ' + self.system_error
        else:
            return '(usr) ' + self.user_error

73
74
75
76

#----------------------------------------------------------


77
78
79
80
def mixDataIndices(list_of_data_indices):
    """Given a collection of lists of data indices (belonging to separate
    but synchronized files/inputs), returns the most granular list of
    indices that span all the data
81

82
    For example, the mix of
83

84
      [(0, 2), (3, 4)]
85

86
    and
87

88
      [(0, 4)]
89

90
    is:
91

92
      [(0, 2), (3, 4)]
93
94


95
    The mix of
96

97
      [(0, 2), (3, 4)]
98

99
    and
100

101
      [(0, 1), (2, 3), (4, 4)]
102

103
    is:
104

105
      [(0, 1), (2, 2), (3, 3), (4, 4)]
106

Philip ABBET's avatar
Philip ABBET committed
107
    """
108

109
110
    start = max([ x[0][0] for x in list_of_data_indices ])
    end = min([ x[-1][1] for x in list_of_data_indices ])
111

112
113
    result = []
    current_start = start
114

115
116
    for index in range(start, end + 1):
        done = False
117

118
119
120
121
122
123
124
        for l in list_of_data_indices:
            for indices in l:
                if indices[1] == index:
                    result.append( (current_start, index) )
                    current_start = index + 1
                    done = True
                    break
125

126
127
            if done:
                break
128

129
    return result
130
131


132
#----------------------------------------------------------
133
134


135
136
137
def getAllFilenames(filename, start_index=None, end_index=None):
    """Returns the names of all the files related to the given data file,
    taking the provided start and end indices into account.
138
139


140
    Parameters:
141

142
        filename (str): Name of the data file (path/to/cache/<hash>.data)
143

144
145
        start_index (int): The starting index (if not set or set to
            ``None``, the default, equivalent to ``0``)
146

147
148
        end_index (int): The end index (if not set or set to ``None``, the
            default, equivalent to ``the last existing data``)
149
150


151
    Returns:
152

153
        (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames)
154

155
    """
156

157
    index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
158

159
160
161
162
163
164
    def file_start(f):
        """Returns the converted start indexes from a filename, otherwise 0"""
        r = index_re.match(f)
        if r:
            return int(r.group(1))
        return 0
165
166


167
168
169
    # Retrieve all the related files
    basename, ext = os.path.splitext(filename)
    filenames = sorted(glob.glob(basename + '*'), key=file_start)
170

171
172
173
174
175
176
177
178
179
180
181
182
183
    # (If necessary) Only keep files containing the desired indices
    if (start_index is not None) or (end_index is not None):
        filtered_filenames = []
        for f in filenames:
            match = index_re.match(f)
            if match:
                start = int(match.group(1))
                end = int(match.group(2))
                if ((start_index is not None) and (end < start_index)) or \
                   ((end_index is not None) and (start > end_index)):
                    continue
            filtered_filenames.append(f)
        filenames = filtered_filenames
184

185
186
187
188
189
    # Separate the filenames in different lists
    data_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data$', x) ]
    indices_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index$', x) ]
    data_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.data.checksum$', x) ]
    indices_checksum_filenames = [ x for x in filenames if re.match(r'^.*\.(\d+)\.(\d+)\.index.checksum$', x) ]
190

191
    return (data_filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames)
192
193
194
195
196


#----------------------------------------------------------


197
198
199
200
201
202
203
class DataSource(object):
    """Base class to load data from some source"""

    def __init__(self):
        self.infos         = []
        self.read_duration = 0
        self.nb_bytes_read = 0
204
        self.ready         = False
205
206
207
208
209
210
211
212
213
214
215
216


    def close(self):
        self.infos = []


    def __del__(self):
        """Makes sure all resources are released when the object is deleted"""
        self.close()


    def __len__(self):
217
218
219
        if not self.ready:
            self._prepare()

220
221
222
223
        return len(self.infos)


    def __iter__(self):
224
225
226
        if not self.ready:
            self._prepare()

227
228
229
230
231
232
233
234
235
        for i in range(0, len(self.infos)):
            yield self[i]


    def __getitem__(self, index):
        raise NotImplemented()


    def first_data_index(self):
236
237
238
        if not self.ready:
            self._prepare()

239
240
241
242
        return self.infos[0].start_index


    def last_data_index(self):
243
244
245
        if not self.ready:
            self._prepare()

246
247
248
249
        return self.infos[-1].end_index


    def data_indices(self):
250
251
252
        if not self.ready:
            self._prepare()

253
254
255
256
        return [ (x.start_index, x.end_index) for x in self.infos ]


    def getAtDataIndex(self, data_index):
257
258
259
        if not self.ready:
            self._prepare()

260
261
262
263
264
265
266
267
268
269
270
271
        for index, infos in enumerate(self.infos):
            if (infos.start_index <= data_index) and (data_index <= infos.end_index):
                return self[index]

        return (None, None, None)


    def statistics(self):
        """Return the statistics about the number of bytes read"""
        return (self.nb_bytes_read, self.read_duration)


272
273
274
275
    def _prepare(self):
        self.ready = True


276
277
278
279
#----------------------------------------------------------


class CachedDataSource(DataSource):
280
    """Utility class to load data from a file in the cache"""
Philip ABBET's avatar
Philip ABBET committed
281
282

    def __init__(self):
283
284
        super(CachedDataSource, self).__init__()

285
286
287
288
289
290
291
292
293
294
295
        self.filenames          = None
        self.encoding           = None  # Must be 'binary' or 'json'
        self.prefix             = None
        self.dataformat         = None
        self.current_file       = None
        self.current_file_index = None
        self.unpack             = True


    def _readHeader(self, file):
        """Read the header of the provided file"""
Philip ABBET's avatar
Philip ABBET committed
296
297

        # Read file format
298
299
300
        self.encoding = file.readline()[:-1]
        if not isinstance(self.encoding, str):
            self.encoding = self.encoding.decode('utf8')
Philip ABBET's avatar
Philip ABBET committed
301

302
        if self.encoding not in ('binary', 'json'):
Philip ABBET's avatar
Philip ABBET committed
303
            raise RuntimeError("valid formats for data reading are 'binary' "
304
                               "or 'json': the format `%s' is invalid" % self.encoding)
Philip ABBET's avatar
Philip ABBET committed
305
306

        # Read data format
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
        dataformat_name = file.readline()[:-1]

        if self.dataformat is None:
            if not isinstance(dataformat_name, str):
                dataformat_name = dataformat_name.decode('utf8')

            if dataformat_name.startswith('analysis:'):
                algo_name = dataformat_name.split(':')[1]
                algo = Algorithm(self.prefix, algo_name)
                if not algo.valid:
                    raise RuntimeError("the dataformat `%s' is the result of an " \
                                       "algorithm which is not valid" % algo_name)
                self.dataformat = algo.result_dataformat()
            else:
                self.dataformat = DataFormat(self.prefix, dataformat_name)

            if not self.dataformat.valid:
                raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)
325

Philip ABBET's avatar
Philip ABBET committed
326
        return True
327

328
329

    def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
330
        """Configures the data source
331
332


Philip ABBET's avatar
Philip ABBET committed
333
        Parameters:
334

Philip ABBET's avatar
Philip ABBET committed
335
          filename (str): Name of the file to read the data from
336

337
          prefix (str): Establishes the prefix of your installation.
338

339
          start_index (int): The starting index (if not set or set to
Philip ABBET's avatar
Philip ABBET committed
340
            ``None``, the default, read data from the begin of file)
341

342
          end_index (int): The end index (if not set or set to ``None``, the
Philip ABBET's avatar
Philip ABBET committed
343
            default, reads the data until the end)
344

Philip ABBET's avatar
Philip ABBET committed
345
          unpack (bool): Indicates if the data must be unpacked or not
346
347


Philip ABBET's avatar
Philip ABBET committed
348
        Returns:
349

Philip ABBET's avatar
Philip ABBET committed
350
          ``True``, if successful, or ``False`` otherwise.
351

Philip ABBET's avatar
Philip ABBET committed
352
353
        """
        index_re = re.compile(r'^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$')
354

355
        def check_consistency(data_filenames, checksum_filenames):
Philip ABBET's avatar
Philip ABBET committed
356
            """Perform some sanity check on the data/checksum files on disk:
357

Philip ABBET's avatar
Philip ABBET committed
358
359
360
361
              1. One-to-one mapping between data and checksum files
              2. Checksum comparison between hash(data) and checksum files
              3. Contiguous indices if they are present
            """
362

Philip ABBET's avatar
Philip ABBET committed
363
364
365
            # Make sure that we have a perfect match between data files and checksum
            # files
            checksum_filenames_noext = [os.path.splitext(f)[0] for f in checksum_filenames]
366

Philip ABBET's avatar
Philip ABBET committed
367
368
            if data_filenames != checksum_filenames_noext:
                raise IOError("number of data files and checksum files for `%s' " \
369
370
                              "does not match (%d != %d)" % (filename, len(data_filenames),
                              len(checksum_filenames_noext)))
371

Philip ABBET's avatar
Philip ABBET committed
372
373
374
375
376
377
378
            # list of start/end indices to check that there are contiguous
            indices = []
            for f_data, f_chck in zip(data_filenames, checksum_filenames):
                expected_chksum = open(f_chck, 'rt').read().strip()
                current_chksum = hashFileContents(f_data)
                if expected_chksum != current_chksum:
                    raise IOError("data file `%s' has a checksum (%s) that differs " \
379
380
                                  "from expected one (%s)" % (f_data, current_chksum,
                                  expected_chksum))
381

Philip ABBET's avatar
Philip ABBET committed
382
                r = index_re.match(f_data)
383
384
                if r:
                    indices.append((int(r.group(1)), int(r.group(2))))
385

Philip ABBET's avatar
Philip ABBET committed
386
387
            indices = sorted(indices, key=lambda v: v[0])
            ok_indices = True
388

389
390
391
            if len(indices) > 1:
                ok_indices = sum([ (indices[i + 1][0] - indices[i][1] == 1)
                                   for i in range(len(indices) - 1) ])
392

Philip ABBET's avatar
Philip ABBET committed
393
394
            if not ok_indices:
                raise IOError("data file `%s' have missing indices." % f_data)
395

396

Philip ABBET's avatar
Philip ABBET committed
397
        self.prefix = prefix
398
        self.unpack = unpack
399
400


401
402
403
        # Retrieve the list of all needed files
        (self.filenames, indices_filenames, data_checksum_filenames, indices_checksum_filenames) = \
            getAllFilenames(filename, start_index, end_index)
404

405
406
407
        if len(self.filenames) == 0:
            return False

408
        check_consistency(self.filenames, data_checksum_filenames)
409
410


411
412
        # Load all the needed infos from all the files
        FileInfos = namedtuple('FileInfos', ['file_index', 'start_index', 'end_index', 'offset', 'size'])
413

414
415
416
417
418
419
        for file_index, current_filename in enumerate(self.filenames):
            try:
                f = open(current_filename, 'rb')
            except Exception as e:
                logger.warn("Could not setup `%s': %s" % (filename, e))
                return False
420

421
422
423
            # Reads the header of the current file
            self._readHeader(f)
            offset = f.tell()
424

425
426
427
            # Process each data unit from the file
            while True:
                line = f.readline()
Samuel GAIST's avatar
Samuel GAIST committed
428
                if not line:
429
                    break
430

431
                offset += len(line)
432

433
                (start, end, data_size) = [ int(x) for x in line.split() ]
434

435
436
437
438
                if ((start_index is None) or (start >= start_index)) and \
                   ((end_index is None) or (end <= end_index)):
                    self.infos.append(FileInfos(file_index=file_index, start_index=start,
                                                end_index=end, offset=offset, size=data_size))
439

440
441
                f.seek(data_size, 1)
                offset += data_size
442

443
            f.close()
444

Philip ABBET's avatar
Philip ABBET committed
445
        return True
446

447

Philip ABBET's avatar
Philip ABBET committed
448
    def close(self):
449
450
        if self.current_file is not None:
            self.current_file.close()
451

452
        super(CachedDataSource, self).close()
453
454
455
456


    def __getitem__(self, index):
        """Retrieve a block of data
457

Philip ABBET's avatar
Philip ABBET committed
458
        Returns:
459

Philip ABBET's avatar
Philip ABBET committed
460
          A tuple (data, start_index, end_index)
461

Philip ABBET's avatar
Philip ABBET committed
462
        """
463

464
465
466
        if not self.ready:
            self._prepare()

467
        if (index < 0) or (index >= len(self.infos)):
Philip ABBET's avatar
Philip ABBET committed
468
            return (None, None, None)
469

470
471
472
473
474
475
476
477
478
479
480
481
482
483
        infos = self.infos[index]

        if self.current_file_index != infos.file_index:
            if self.current_file is not None:
                self.current_file.close()
                self.current_file = None

            try:
                self.current_file = open(self.filenames[infos.file_index], 'rb')
                self.current_file_index = infos.file_index
            except Exception as e:
                raise IOError("Could not read `%s': %s" % (self.filenames[infos.file_index], e))

        self.current_file.seek(infos.offset, 0)
484

485
486
487
        t1 = time.time()
        encoded_data = self.current_file.read(infos.size)
        t2 = time.time()
488

489
490
        self.read_duration += t2 - t1
        self.nb_bytes_read += infos.size
491

492
        if self.unpack:
Philip ABBET's avatar
Philip ABBET committed
493
            data = self.dataformat.type()
494
            data.unpack(encoded_data)
Philip ABBET's avatar
Philip ABBET committed
495
496
        else:
            data = encoded_data
497

498
        return (data, infos.start_index, infos.end_index)
499
500


501
#----------------------------------------------------------
502
503


504
505
class DatabaseOutputDataSource(DataSource):
    """Utility class to load data from an output of a database view"""
506

507
508
    def __init__(self):
        super(DatabaseOutputDataSource, self).__init__()
509

510
511
512
513
514
        self.prefix        = None
        self.dataformat    = None
        self.view          = None
        self.output_name   = None
        self.pack          = True
515
516


517
518
519
    def setup(self, view, output_name, dataformat_name, prefix, start_index=None,
              end_index=None, pack=False):
        """Configures the data source
520
521


522
        Parameters:
523

524
          prefix (str): Establishes the prefix of your installation.
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593

          start_index (int): The starting index (if not set or set to
            ``None``, the default, read data from the begin of file)

          end_index (int): The end index (if not set or set to ``None``, the
            default, reads the data until the end)

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

        self.prefix      = prefix
        self.view        = view
        self.output_name = output_name
        self.pack        = pack

        self.dataformat = DataFormat(self.prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)


        # Load all the needed infos from all the files
        Infos = namedtuple('Infos', ['start_index', 'end_index'])

        objects = self.view.objects()

        start = None
        end = None
        previous_value = None

        for index, obj in enumerate(objects):
            if start is None:
                start = index
                previous_value = getattr(obj, output_name)
            elif getattr(obj, output_name) != previous_value:
                end = index - 1
                previous_value = None

                if ((start_index is None) or (start >= start_index)) and \
                   ((end_index is None) or (end <= end_index)):
                    self.infos.append(Infos(start_index=start, end_index=end))

                start = index
                previous_value = getattr(obj, output_name)

        end = index

        if ((start_index is None) or (start >= start_index)) and \
           ((end_index is None) or (end <= end_index)):
            self.infos.append(Infos(start_index=start, end_index=end))

        return True


    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

594
595
596
        if not self.ready:
            self._prepare()

597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        t1 = time.time()
        data = self.view.get(self.output_name, infos.start_index)
        t2 = time.time()

        self.read_duration += t2 - t1

        if isinstance(data, dict):
            d = self.dataformat.type()
            d.from_dict(data, casting='safe', add_defaults=False)
            data = d

613
        if self.pack:
614
615
616
617
            data = data.pack()
            self.nb_bytes_read += len(data)

        return (data, infos.start_index, infos.end_index)
618

619

620
621
622
#----------------------------------------------------------


623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
class RemoteDataSource(DataSource):
    """Utility class to load data from a data source accessible via a socket"""

    def __init__(self):
        super(RemoteDataSource, self).__init__()

        self.socket     = None
        self.input_name = None
        self.dataformat = None
        self.unpack     = True


    def setup(self, socket, input_name, dataformat_name, prefix, unpack=True):
        """Configures the data source


        Parameters:

          socket (socket): The socket to use to access the data.

          input_name (str): Name of the input corresponding to the data source.

          dataformat_name (str): Name of the data format.

647
          prefix (str): Establishes the prefix of your installation.
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

        self.socket     = socket
        self.input_name = input_name
        self.unpack     = unpack

        self.dataformat = DataFormat(prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        return True


    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

679
680
681
        if not self.ready:
            self._prepare()

682
683
684
685
686
687
688
689
690
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        logger.debug('send: (get) get %s %d', self.input_name, index)

        t1 = time.time()

691
692
693
        self.socket.send_string('get', zmq.SNDMORE)
        self.socket.send_string(self.input_name, zmq.SNDMORE)
        self.socket.send_string('%d' % index)
694

695
        answer = self.socket.recv().decode('utf-8')
696
697

        if answer == 'err':
698
            self.read_duration += time.time() - t1
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        start = int(answer)
        end = int(self.socket.recv())

        packed = self.socket.recv()

        t2 = time.time()

        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed), start, end)

        self.nb_bytes_read += len(packed)

        if self.unpack:
            data = self.dataformat.type()
            data.unpack(packed)
        else:
            data = packed

        self.read_duration += t2 - t1

        return (data, infos.start_index, infos.end_index)


725
726
727
728
729
730
    def _prepare(self):
        # Load the needed infos from the socket
        Infos = namedtuple('Infos', ['start_index', 'end_index'])

        logger.debug('send: (ifo) infos %s', self.input_name)

731
732
        self.socket.send_string('ifo', zmq.SNDMORE)
        self.socket.send_string(self.input_name)
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750

        answer = self.socket.recv()
        logger.debug('recv: %s', answer)

        if answer == 'err':
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        nb = int(answer)
        for i in range(nb):
            start = int(self.socket.recv())
            end = int(self.socket.recv())
            self.infos.append(Infos(start_index=start, end_index=end))

        self.ready = True


751
752
753
#----------------------------------------------------------


754
755
756
class DataSink(object):

    """Interface of all the Data Sinks
757

758
759
760
    Data Sinks are used by the outputs of an algorithm to write/transmit data.
    """
    __metaclass__ = abc.ABCMeta
761

762
763
764
    @abc.abstractmethod
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data
765

766
        Parameters:
767

768
          data (beat.core.baseformat.baseformat): The block of data to write
769

770
          start_data_index (int): Start index of the written data
771

772
          end_data_index (int): End index of the written data
773

774
        """
775

776
        pass
777

778
779
780
    @abc.abstractmethod
    def isConnected(self):
        pass
781
782


783
784
785
786
    def close(self):
        pass


787
#----------------------------------------------------------
788
789


790
791
792
793
class StdoutDataSink(DataSink):

    """Data Sink that prints informations about the written data on stdout

794
    Note: The written data is lost! Use this class for debugging purposes
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
    """

    def __init__(self):
        super(StdoutDataSink, self).__init__()
        self.dataformat = None
        self.prefix = ''
        self.display_data = True

    def setup(self, dataformat, prefix=None, display_data=True):
        self.dataformat = dataformat
        self.display_data = display_data

        if prefix is not None:
            self.prefix = prefix
            if self.prefix != '':
                self.prefix += ' '

    def write(self, data, start_data_index, end_data_index):
        """Write a block of data

        Parameters:

          data (beat.core.baseformat.baseformat) The block of data to write

          start_data_index (int): Start index of the written data

          end_data_index (int): End index of the written data

        """
824

825
        if self.display_data:
Samuel GAIST's avatar
Samuel GAIST committed
826
            print('%s(%d -> %d): %s' % (self.prefix, start_data_index, end_data_index, str(data)))
827
        else:
Samuel GAIST's avatar
Samuel GAIST committed
828
            print('%s(%d -> %d): <data>' % (self.prefix, start_data_index, end_data_index))
829
830
831
832
833
834
835
836
837


    def isConnected(self):
        return True


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
838
class CachedDataSink(DataSink):
839

Philip ABBET's avatar
Philip ABBET committed
840
    """Data Sink that save data in the Cache
841

Philip ABBET's avatar
Philip ABBET committed
842
    The default behavior is to save the data in a binary format.
843
844
    """

Philip ABBET's avatar
Philip ABBET committed
845
846
847
848
    def __init__(self):
        self.filename = None
        self.encoding = None
        self.dataformat = None
849
850
        self.start_index = None
        self.end_index = None
851

852
853
854
        self.data_file = None
        self.index_file = None
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
855

856
857
        self.nb_bytes_written = 0
        self.write_duration = 0
Philip ABBET's avatar
Philip ABBET committed
858
859


860
    def setup(self, filename, dataformat, start_index, end_index, encoding='binary'):
Philip ABBET's avatar
Philip ABBET committed
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
        """Configures the data sink

        Parameters:

          filename (str): Name of the file to generate

          dataformat (beat.core.dataformat.DataFormat): The dataformat to be used
            inside this file. All objects stored inside this file will respect that
            format.

          encoding (str): String defining the encoding to be used for encoding the
            data. Only a few options are supported: ``binary`` (the default) or
            ``json`` (debugging purposes).

        """

877
878
879
        # Close current file if open
        self.close()

Philip ABBET's avatar
Philip ABBET committed
880
        if encoding not in ('binary', 'json'):
881
            raise RuntimeError("valid formats for data writing are 'binary' "
Philip ABBET's avatar
Philip ABBET committed
882
883
884
                               "or 'json': the format `%s' is invalid" % format)

        if dataformat.name == '__unnamed_dataformat__':
885
            raise RuntimeError("cannot record data using an unnamed data format")
Philip ABBET's avatar
Philip ABBET committed
886

887
        filename, data_ext = os.path.splitext(filename)
Philip ABBET's avatar
Philip ABBET committed
888

889
890
891
892
893
        self.filename = '%s.%d.%d%s' % (filename, start_index, end_index, data_ext)
        self.encoding = encoding
        self.dataformat = dataformat
        self.start_index = start_index
        self.end_index = end_index
Philip ABBET's avatar
Philip ABBET committed
894

895
896
897
        self.nb_bytes_written = 0
        self.write_duration = 0
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
898

899
900
901
902
903
        try:
            self.data_file = open(self.filename, 'wb')
            self.index_file = open(self.filename.replace('.data', '.index'), 'wt')
        except:
            return False
Philip ABBET's avatar
Philip ABBET committed
904

905
906
907
        # Write the dataformat
        self.data_file.write(six.b('%s\n%s\n' % (self.encoding, self.dataformat.name)))
        self.data_file.flush()
Philip ABBET's avatar
Philip ABBET committed
908
909
910

        return True

911
912

    def close(self):
Philip ABBET's avatar
Philip ABBET committed
913
914
915
        """Closes the data sink
        """

916
917
918
        if self.data_file is not None:
            self.data_file.close()
            self.index_file.close()
Philip ABBET's avatar
Philip ABBET committed
919

920
921
922
            # If file is not complete, delete it
            if (self.last_written_data_index is None) or \
               (self.last_written_data_index < self.end_index):
Philip ABBET's avatar
Philip ABBET committed
923
                try:
924
925
926
                    os.remove(self.filename)
                    os.remove(self.filename.replace('.data', '.index'))
                    return True
Philip ABBET's avatar
Philip ABBET committed
927
928
929
                except:
                    return False

930
931
932
            # Creates the checksums for all data and indexes
            chksum_data = hashFileContents(self.filename)
            with open(self.filename + '.checksum', 'wt') as f:
Philip ABBET's avatar
Philip ABBET committed
933
934
                f.write(chksum_data)

935
936
937
938
            index_filename = self.filename.replace('.data', '.index')
            chksum_index = hashFileContents(index_filename)
            with open(index_filename + '.checksum', 'wt') as f:
                f.write(chksum_index)
Philip ABBET's avatar
Philip ABBET committed
939

940
941
942
            self.data_file = None
            self.index_file = None
            self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
943

944
        return True
945
946


Philip ABBET's avatar
Philip ABBET committed
947
    def __del__(self):
948
        """Make sure the files are closed when the object is deleted
Philip ABBET's avatar
Philip ABBET committed
949
950
        """
        self.close()
951

952

Philip ABBET's avatar
Philip ABBET committed
953
954
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data to the filesystem
955

Philip ABBET's avatar
Philip ABBET committed
956
        Parameters:
957

Philip ABBET's avatar
Philip ABBET committed
958
          data (beat.core.baseformat.baseformat): The block of data to write
959

Philip ABBET's avatar
Philip ABBET committed
960
          start_data_index (int): Start index of the written data
961

Philip ABBET's avatar
Philip ABBET committed
962
          end_data_index (int): End index of the written data
963

Philip ABBET's avatar
Philip ABBET committed
964
        """
965

966
        # If the user passed a dictionary - convert it
Philip ABBET's avatar
Philip ABBET committed
967
968
969
970
971
972
973
        if isinstance(data, dict):
            data = self.dataformat.type(**data)
        else:
            # Checks that the input data conforms to the expected format
            if data.__class__._name != self.dataformat.name:
                raise TypeError("input data uses format `%s' while this sink "
                        "expects `%s'" % (data.__class__._name, self.dataformat))
974

975
976
        if self.data_file is None:
            raise RuntimeError("No destination file")
977

978
        # Encoding
Philip ABBET's avatar
Philip ABBET committed
979
980
981
982
983
        if self.encoding == 'binary':
            encoded_data = data.pack()
        else:
            from .utils import NumpyJSONEncoder
            encoded_data = json.dumps(data.as_dict(), indent=4, cls=NumpyJSONEncoder)
984

985
        # Adds a new line by the end of the encoded data
Philip ABBET's avatar
Philip ABBET committed
986
        encoded_data += six.b('\n')
987

Philip ABBET's avatar
Philip ABBET committed
988
        informations = six.b('%d %d %d\n' % (start_data_index,
989
                             end_data_index, len(encoded_data)))
990

Philip ABBET's avatar
Philip ABBET committed
991
        t1 = time.time()
992

993
994
        self.data_file.write(informations + encoded_data)
        self.data_file.flush()
995

Philip ABBET's avatar
Philip ABBET committed
996
        indexes = '%d %d\n' % (start_data_index, end_data_index)
997
998
        self.index_file.write(indexes)
        self.index_file.flush()
999

Philip ABBET's avatar
Philip ABBET committed
1000
        t2 = time.time()
For faster browsing, not all history is shown. View entire blame