data.py 34.2 KB
Newer Older
1
2
3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

Samuel GAIST's avatar
Samuel GAIST committed
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
35
36


37
38
39
40
41
42
43
"""
====
data
====

Data I/O classes and functions
"""
44
45
46
47
48
49
50

import os
import re
import glob
import simplejson as json
import time
import abc
51
import zmq
52
53
54
import logging
import six

55
from functools import reduce
56
from collections import namedtuple
57
58
59
60

from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm
61
from .exceptions import RemoteException
62

63
logger = logging.getLogger(__name__)
64

65

66
# ----------------------------------------------------------
67
68


69
70
71
72
def mixDataIndices(list_of_data_indices):
    """Given a collection of lists of data indices (belonging to separate
    but synchronized files/inputs), returns the most granular list of
    indices that span all the data
73

74
    For example, the mix of
75

76
      [(0, 2), (3, 4)]
77

78
    and
79

80
      [(0, 4)]
81

82
    is:
83

84
      [(0, 2), (3, 4)]
85
86


87
    The mix of
88

89
      [(0, 2), (3, 4)]
90

91
    and
92

93
      [(0, 1), (2, 3), (4, 4)]
94

95
    is:
96

97
      [(0, 1), (2, 2), (3, 3), (4, 4)]
98

Philip ABBET's avatar
Philip ABBET committed
99
    """
100

Samuel GAIST's avatar
Samuel GAIST committed
101
102
    start = max([x[0][0] for x in list_of_data_indices])
    end = min([x[-1][1] for x in list_of_data_indices])
103

104
105
    result = []
    current_start = start
106

107
108
    for index in range(start, end + 1):
        done = False
109

110
111
112
        for l in list_of_data_indices:
            for indices in l:
                if indices[1] == index:
Samuel GAIST's avatar
Samuel GAIST committed
113
                    result.append((current_start, index))
114
115
116
                    current_start = index + 1
                    done = True
                    break
117

118
119
            if done:
                break
120

121
    return result
122
123


124
# ----------------------------------------------------------
125
126


127
128
129
def getAllFilenames(filename, start_index=None, end_index=None):
    """Returns the names of all the files related to the given data file,
    taking the provided start and end indices into account.
130
131


132
    Parameters:
133

134
        filename (str): Name of the data file (path/to/cache/<hash>.data)
135

136
137
        start_index (int): The starting index (if not set or set to
            ``None``, the default, equivalent to ``0``)
138

139
140
        end_index (int): The end index (if not set or set to ``None``, the
            default, equivalent to ``the last existing data``)
141
142


143
    Returns:
144

145
146
        (data_filenames, indices_filenames,
         data_checksum_filenames, indices_checksum_filenames)
147
    """
148

Samuel GAIST's avatar
Samuel GAIST committed
149
    index_re = re.compile(r"^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$")
150

151
152
153
154
155
156
    def file_start(f):
        """Returns the converted start indexes from a filename, otherwise 0"""
        r = index_re.match(f)
        if r:
            return int(r.group(1))
        return 0
157

158
159
    # Retrieve all the related files
    basename, ext = os.path.splitext(filename)
Samuel GAIST's avatar
Samuel GAIST committed
160
    filenames = sorted(glob.glob(basename + "*"), key=file_start)
161

162
163
164
165
166
167
168
169
    # (If necessary) Only keep files containing the desired indices
    if (start_index is not None) or (end_index is not None):
        filtered_filenames = []
        for f in filenames:
            match = index_re.match(f)
            if match:
                start = int(match.group(1))
                end = int(match.group(2))
Samuel GAIST's avatar
Samuel GAIST committed
170
171
172
                if ((start_index is not None) and (end < start_index)) or (
                    (end_index is not None) and (start > end_index)
                ):
173
174
175
                    continue
            filtered_filenames.append(f)
        filenames = filtered_filenames
176

177
    # Separate the filenames in different lists
Samuel GAIST's avatar
Samuel GAIST committed
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
    data_filenames = [x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.data$", x)]
    indices_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.index$", x)
    ]
    data_checksum_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.data.checksum$", x)
    ]
    indices_checksum_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.index.checksum$", x)
    ]

    return (
        data_filenames,
        indices_filenames,
        data_checksum_filenames,
        indices_checksum_filenames,
    )
195
196


197
# ----------------------------------------------------------
198
199


200
201
202
203
class DataSource(object):
    """Base class to load data from some source"""

    def __init__(self):
Samuel GAIST's avatar
Samuel GAIST committed
204
        self.infos = []
205
206
        self.read_duration = 0
        self.nb_bytes_read = 0
Samuel GAIST's avatar
Samuel GAIST committed
207
        self.ready = False
208
209
210
211
212
213
214
215
216

    def close(self):
        self.infos = []

    def __del__(self):
        """Makes sure all resources are released when the object is deleted"""
        self.close()

    def __len__(self):
217
218
219
        if not self.ready:
            self._prepare()

220
221
222
        return len(self.infos)

    def __iter__(self):
223
224
225
        if not self.ready:
            self._prepare()

226
227
228
229
        for i in range(0, len(self.infos)):
            yield self[i]

    def __getitem__(self, index):
Samuel GAIST's avatar
Samuel GAIST committed
230
        raise NotImplementedError
231
232

    def first_data_index(self):
233
234
235
        if not self.ready:
            self._prepare()

236
237
238
        return self.infos[0].start_index

    def last_data_index(self):
239
240
241
        if not self.ready:
            self._prepare()

242
243
244
        return self.infos[-1].end_index

    def data_indices(self):
245
246
247
        if not self.ready:
            self._prepare()

Samuel GAIST's avatar
Samuel GAIST committed
248
        return [(x.start_index, x.end_index) for x in self.infos]
249
250

    def getAtDataIndex(self, data_index):
251
252
253
        if not self.ready:
            self._prepare()

254
255
256
257
258
259
260
261
262
263
        for index, infos in enumerate(self.infos):
            if (infos.start_index <= data_index) and (data_index <= infos.end_index):
                return self[index]

        return (None, None, None)

    def statistics(self):
        """Return the statistics about the number of bytes read"""
        return (self.nb_bytes_read, self.read_duration)

264
265
266
267
    def _prepare(self):
        self.ready = True


268
# ----------------------------------------------------------
269
270
271


class CachedDataSource(DataSource):
272
    """Utility class to load data from a file in the cache"""
Philip ABBET's avatar
Philip ABBET committed
273
274

    def __init__(self):
275
276
        super(CachedDataSource, self).__init__()

Samuel GAIST's avatar
Samuel GAIST committed
277
278
279
280
281
        self.filenames = None
        self.encoding = None  # Must be 'binary' or 'json'
        self.prefix = None
        self.dataformat = None
        self.current_file = None
282
        self.current_file_index = None
Samuel GAIST's avatar
Samuel GAIST committed
283
        self.unpack = True
284
285
286

    def _readHeader(self, file):
        """Read the header of the provided file"""
Philip ABBET's avatar
Philip ABBET committed
287
288

        # Read file format
289
290
        self.encoding = file.readline()[:-1]
        if not isinstance(self.encoding, str):
Samuel GAIST's avatar
Samuel GAIST committed
291
            self.encoding = self.encoding.decode("utf8")
Philip ABBET's avatar
Philip ABBET committed
292

Samuel GAIST's avatar
Samuel GAIST committed
293
294
295
296
297
        if self.encoding not in ("binary", "json"):
            raise RuntimeError(
                "valid formats for data reading are 'binary' "
                "or 'json': the format `%s' is invalid" % self.encoding
            )
Philip ABBET's avatar
Philip ABBET committed
298
299

        # Read data format
300
301
302
303
        dataformat_name = file.readline()[:-1]

        if self.dataformat is None:
            if not isinstance(dataformat_name, str):
Samuel GAIST's avatar
Samuel GAIST committed
304
                dataformat_name = dataformat_name.decode("utf8")
305

Samuel GAIST's avatar
Samuel GAIST committed
306
307
            if dataformat_name.startswith("analysis:"):
                algo_name = dataformat_name.split(":")[1]
308
309
                algo = Algorithm(self.prefix, algo_name)
                if not algo.valid:
Samuel GAIST's avatar
Samuel GAIST committed
310
311
312
313
                    raise RuntimeError(
                        "the dataformat `%s' is the result of an "
                        "algorithm which is not valid" % algo_name
                    )
314
315
316
317
318
                self.dataformat = algo.result_dataformat()
            else:
                self.dataformat = DataFormat(self.prefix, dataformat_name)

            if not self.dataformat.valid:
319
320
321
322
323
                raise RuntimeError(
                    "the dataformat `{}' is not valid\n{}".format(
                        dataformat_name, self.dataformat.errors
                    )
                )
324

Philip ABBET's avatar
Philip ABBET committed
325
        return True
326

327
    def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
328
        """Configures the data source
329
330


Philip ABBET's avatar
Philip ABBET committed
331
        Parameters:
332

Philip ABBET's avatar
Philip ABBET committed
333
          filename (str): Name of the file to read the data from
334

335
          prefix (str): Establishes the prefix of your installation.
336

337
          start_index (int): The starting index (if not set or set to
Philip ABBET's avatar
Philip ABBET committed
338
            ``None``, the default, read data from the begin of file)
339

340
          end_index (int): The end index (if not set or set to ``None``, the
Philip ABBET's avatar
Philip ABBET committed
341
            default, reads the data until the end)
342

Philip ABBET's avatar
Philip ABBET committed
343
          unpack (bool): Indicates if the data must be unpacked or not
344
345


Philip ABBET's avatar
Philip ABBET committed
346
        Returns:
347

Philip ABBET's avatar
Philip ABBET committed
348
          ``True``, if successful, or ``False`` otherwise.
349

Philip ABBET's avatar
Philip ABBET committed
350
        """
Samuel GAIST's avatar
Samuel GAIST committed
351
        index_re = re.compile(r"^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$")
352

353
        def check_consistency(data_filenames, checksum_filenames):
Philip ABBET's avatar
Philip ABBET committed
354
            """Perform some sanity check on the data/checksum files on disk:
355

Philip ABBET's avatar
Philip ABBET committed
356
357
358
359
              1. One-to-one mapping between data and checksum files
              2. Checksum comparison between hash(data) and checksum files
              3. Contiguous indices if they are present
            """
360

361
362
            # Make sure that we have a perfect match between data files and
            # checksum files
Samuel GAIST's avatar
Samuel GAIST committed
363
364
365
            checksum_filenames_noext = [
                os.path.splitext(f)[0] for f in checksum_filenames
            ]
366

Philip ABBET's avatar
Philip ABBET committed
367
            if data_filenames != checksum_filenames_noext:
Samuel GAIST's avatar
Samuel GAIST committed
368
369
370
371
372
                raise IOError(
                    "number of data files and checksum files for `%s' "
                    "does not match (%d != %d)"
                    % (filename, len(data_filenames), len(checksum_filenames_noext))
                )
373

Philip ABBET's avatar
Philip ABBET committed
374
375
376
            # list of start/end indices to check that there are contiguous
            indices = []
            for f_data, f_chck in zip(data_filenames, checksum_filenames):
Samuel GAIST's avatar
Samuel GAIST committed
377
                expected_chksum = open(f_chck, "rt").read().strip()
Philip ABBET's avatar
Philip ABBET committed
378
379
                current_chksum = hashFileContents(f_data)
                if expected_chksum != current_chksum:
Samuel GAIST's avatar
Samuel GAIST committed
380
381
382
383
384
                    raise IOError(
                        "data file `%s' has a checksum (%s) that differs "
                        "from expected one (%s)"
                        % (f_data, current_chksum, expected_chksum)
                    )
385

Philip ABBET's avatar
Philip ABBET committed
386
                r = index_re.match(f_data)
387
388
                if r:
                    indices.append((int(r.group(1)), int(r.group(2))))
389

Philip ABBET's avatar
Philip ABBET committed
390
391
            indices = sorted(indices, key=lambda v: v[0])
            ok_indices = True
392

393
            if len(indices) > 1:
Samuel GAIST's avatar
Samuel GAIST committed
394
395
396
397
398
399
                ok_indices = sum(
                    [
                        (indices[i + 1][0] - indices[i][1] == 1)
                        for i in range(len(indices) - 1)
                    ]
                )
400

Philip ABBET's avatar
Philip ABBET committed
401
402
            if not ok_indices:
                raise IOError("data file `%s' have missing indices." % f_data)
403

Philip ABBET's avatar
Philip ABBET committed
404
        self.prefix = prefix
405
        self.unpack = unpack
406

407
        # Retrieve the list of all needed files
Samuel GAIST's avatar
Samuel GAIST committed
408
409
410
411
412
413
        (
            self.filenames,
            indices_filenames,
            data_checksum_filenames,
            indices_checksum_filenames,
        ) = getAllFilenames(filename, start_index, end_index)
414

415
        if len(self.filenames) == 0:
416
            logger.warn("No files found for %s" % filename)
417
418
            return False

419
        check_consistency(self.filenames, data_checksum_filenames)
420

421
        # Load all the needed infos from all the files
Samuel GAIST's avatar
Samuel GAIST committed
422
423
424
        FileInfos = namedtuple(
            "FileInfos", ["file_index", "start_index", "end_index", "offset", "size"]
        )
425

426
427
        for file_index, current_filename in enumerate(self.filenames):
            try:
Samuel GAIST's avatar
Samuel GAIST committed
428
                f = open(current_filename, "rb")
429
430
431
            except Exception as e:
                logger.warn("Could not setup `%s': %s" % (filename, e))
                return False
432

433
434
435
            # Reads the header of the current file
            self._readHeader(f)
            offset = f.tell()
436

437
438
439
            # Process each data unit from the file
            while True:
                line = f.readline()
Samuel GAIST's avatar
Samuel GAIST committed
440
                if not line:
441
                    break
442

443
                offset += len(line)
444

Samuel GAIST's avatar
Samuel GAIST committed
445
446
447
448
449
450
451
452
453
454
455
456
457
458
                (start, end, data_size) = [int(x) for x in line.split()]

                if ((start_index is None) or (start >= start_index)) and (
                    (end_index is None) or (end <= end_index)
                ):
                    self.infos.append(
                        FileInfos(
                            file_index=file_index,
                            start_index=start,
                            end_index=end,
                            offset=offset,
                            size=data_size,
                        )
                    )
459

460
461
                f.seek(data_size, 1)
                offset += data_size
462

463
            f.close()
464

Philip ABBET's avatar
Philip ABBET committed
465
        return True
466

Philip ABBET's avatar
Philip ABBET committed
467
    def close(self):
468
469
        if self.current_file is not None:
            self.current_file.close()
470

471
        super(CachedDataSource, self).close()
472
473
474

    def __getitem__(self, index):
        """Retrieve a block of data
475

Philip ABBET's avatar
Philip ABBET committed
476
        Returns:
477

Philip ABBET's avatar
Philip ABBET committed
478
          A tuple (data, start_index, end_index)
479

Philip ABBET's avatar
Philip ABBET committed
480
        """
481

482
483
484
        if not self.ready:
            self._prepare()

485
        if (index < 0) or (index >= len(self.infos)):
Philip ABBET's avatar
Philip ABBET committed
486
            return (None, None, None)
487

488
489
490
491
492
493
494
495
        infos = self.infos[index]

        if self.current_file_index != infos.file_index:
            if self.current_file is not None:
                self.current_file.close()
                self.current_file = None

            try:
Samuel GAIST's avatar
Samuel GAIST committed
496
                self.current_file = open(self.filenames[infos.file_index], "rb")
497
498
                self.current_file_index = infos.file_index
            except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
499
500
501
                raise IOError(
                    "Could not read `%s': %s" % (self.filenames[infos.file_index], e)
                )
502
503

        self.current_file.seek(infos.offset, 0)
504

505
506
507
        t1 = time.time()
        encoded_data = self.current_file.read(infos.size)
        t2 = time.time()
508

509
510
        self.read_duration += t2 - t1
        self.nb_bytes_read += infos.size
511

512
        if self.unpack:
Philip ABBET's avatar
Philip ABBET committed
513
            data = self.dataformat.type()
514
            data.unpack(encoded_data)
Philip ABBET's avatar
Philip ABBET committed
515
516
        else:
            data = encoded_data
517

518
        return (data, infos.start_index, infos.end_index)
519
520


521
# ----------------------------------------------------------
522
523


524
525
class DatabaseOutputDataSource(DataSource):
    """Utility class to load data from an output of a database view"""
526

527
528
    def __init__(self):
        super(DatabaseOutputDataSource, self).__init__()
529

Samuel GAIST's avatar
Samuel GAIST committed
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
        self.prefix = None
        self.dataformat = None
        self.view = None
        self.output_name = None
        self.pack = True

    def setup(
        self,
        view,
        output_name,
        dataformat_name,
        prefix,
        start_index=None,
        end_index=None,
        pack=False,
    ):
546
        """Configures the data source
547
548


549
        Parameters:
550

551
          prefix (str): Establishes the prefix of your installation.
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567

          start_index (int): The starting index (if not set or set to
            ``None``, the default, read data from the begin of file)

          end_index (int): The end index (if not set or set to ``None``, the
            default, reads the data until the end)

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

Samuel GAIST's avatar
Samuel GAIST committed
568
569
        self.prefix = prefix
        self.view = view
570
        self.output_name = output_name
Samuel GAIST's avatar
Samuel GAIST committed
571
        self.pack = pack
572
573
574
575
576
577
578

        self.dataformat = DataFormat(self.prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        # Load all the needed infos from all the files
Samuel GAIST's avatar
Samuel GAIST committed
579
        Infos = namedtuple("Infos", ["start_index", "end_index"])
580
581
582
583
584
585
586

        objects = self.view.objects()

        start = None
        end = None
        previous_value = None

587
588
        attribute = self.view.get_output_mapping(output_name)

589
590
591
        for index, obj in enumerate(objects):
            if start is None:
                start = index
592
593
                previous_value = getattr(obj, attribute)
            elif getattr(obj, attribute) != previous_value:
594
595
596
                end = index - 1
                previous_value = None

Samuel GAIST's avatar
Samuel GAIST committed
597
598
599
                if ((start_index is None) or (start >= start_index)) and (
                    (end_index is None) or (end <= end_index)
                ):
600
601
602
                    self.infos.append(Infos(start_index=start, end_index=end))

                start = index
603
                previous_value = getattr(obj, attribute)
604
605
606

        end = index

Samuel GAIST's avatar
Samuel GAIST committed
607
608
609
        if ((start_index is None) or (start >= start_index)) and (
            (end_index is None) or (end <= end_index)
        ):
610
611
612
613
614
615
616
617
618
619
620
621
622
            self.infos.append(Infos(start_index=start, end_index=end))

        return True

    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

623
624
625
        if not self.ready:
            self._prepare()

626
627
628
629
630
631
632
633
634
635
636
637
638
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        t1 = time.time()
        data = self.view.get(self.output_name, infos.start_index)
        t2 = time.time()

        self.read_duration += t2 - t1

        if isinstance(data, dict):
            d = self.dataformat.type()
Samuel GAIST's avatar
Samuel GAIST committed
639
            d.from_dict(data, casting="safe", add_defaults=False)
640
641
            data = d

642
        if self.pack:
643
644
645
646
            data = data.pack()
            self.nb_bytes_read += len(data)

        return (data, infos.start_index, infos.end_index)
647

648

649
# ----------------------------------------------------------
650
651


652
653
654
655
656
657
class RemoteDataSource(DataSource):
    """Utility class to load data from a data source accessible via a socket"""

    def __init__(self):
        super(RemoteDataSource, self).__init__()

Samuel GAIST's avatar
Samuel GAIST committed
658
        self.socket = None
659
660
        self.input_name = None
        self.dataformat = None
Samuel GAIST's avatar
Samuel GAIST committed
661
        self.unpack = True
662
663
664
665
666
667
668

    def setup(self, socket, input_name, dataformat_name, prefix, unpack=True):
        """Configures the data source


        Parameters:

669
          socket (zmq.Socket): The socket to use to access the data.
670
671
672
673
674

          input_name (str): Name of the input corresponding to the data source.

          dataformat_name (str): Name of the data format.

675
          prefix (str): Establishes the prefix of your installation.
676
677
678
679
680
681
682
683
684
685

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

Samuel GAIST's avatar
Samuel GAIST committed
686
        self.socket = socket
687
        self.input_name = input_name
Samuel GAIST's avatar
Samuel GAIST committed
688
        self.unpack = unpack
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705

        self.dataformat = DataFormat(prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        return True

    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

706
707
708
        if not self.ready:
            self._prepare()

709
710
711
712
713
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

Samuel GAIST's avatar
Samuel GAIST committed
714
        logger.debug("send: (get) get %s %d", self.input_name, index)
715
716
717

        t1 = time.time()

Samuel GAIST's avatar
Samuel GAIST committed
718
        self.socket.send_string("get", zmq.SNDMORE)
719
        self.socket.send_string(self.input_name, zmq.SNDMORE)
Samuel GAIST's avatar
Samuel GAIST committed
720
        self.socket.send_string("%d" % index)
721

Samuel GAIST's avatar
Samuel GAIST committed
722
        answer = self.socket.recv().decode("utf-8")
723

Samuel GAIST's avatar
Samuel GAIST committed
724
        if answer == "err":
725
            self.read_duration += time.time() - t1
726
727
728
729
730
731
732
733
734
735
736
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        start = int(answer)
        end = int(self.socket.recv())

        packed = self.socket.recv()

        t2 = time.time()

Samuel GAIST's avatar
Samuel GAIST committed
737
        logger.debug("recv: <bin> (size=%d), indexes=(%d, %d)", len(packed), start, end)
738
739
740
741
742
743
744
745
746
747
748
749
750

        self.nb_bytes_read += len(packed)

        if self.unpack:
            data = self.dataformat.type()
            data.unpack(packed)
        else:
            data = packed

        self.read_duration += t2 - t1

        return (data, infos.start_index, infos.end_index)

751
752
    def _prepare(self):
        # Load the needed infos from the socket
Samuel GAIST's avatar
Samuel GAIST committed
753
        Infos = namedtuple("Infos", ["start_index", "end_index"])
754

Samuel GAIST's avatar
Samuel GAIST committed
755
        logger.debug("send: (ifo) infos %s", self.input_name)
756

Samuel GAIST's avatar
Samuel GAIST committed
757
        self.socket.send_string("ifo", zmq.SNDMORE)
758
        self.socket.send_string(self.input_name)
759

Samuel GAIST's avatar
Samuel GAIST committed
760
761
        answer = self.socket.recv().decode("utf-8")
        logger.debug("recv: %s", answer)
762

Samuel GAIST's avatar
Samuel GAIST committed
763
        if answer == "err":
764
765
766
767
768
769
770
771
772
773
774
775
776
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        nb = int(answer)
        for i in range(nb):
            start = int(self.socket.recv())
            end = int(self.socket.recv())
            self.infos.append(Infos(start_index=start, end_index=end))

        self.ready = True


777
# ----------------------------------------------------------
778
779


780
781
782
class DataSink(object):

    """Interface of all the Data Sinks
783

784
785
    Data Sinks are used by the outputs of an algorithm to write/transmit data.
    """
Samuel GAIST's avatar
Samuel GAIST committed
786

787
    __metaclass__ = abc.ABCMeta
788

789
790
791
    @abc.abstractmethod
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data
792

793
        Parameters:
794

795
          data (baseformat.baseformat): The block of data to write
796

797
          start_data_index (int): Start index of the written data
798

799
          end_data_index (int): End index of the written data
800

801
        """
802

803
        pass
804

805
806
    @abc.abstractmethod
    def isConnected(self):
807
        """Returns whether the data sink is connected"""
808

809
        pass
810

811
    def close(self):
812
813
        """Closes the data sink"""

814
815
816
        pass


817
# ----------------------------------------------------------
818
819


820
821
822
823
class StdoutDataSink(DataSink):

    """Data Sink that prints informations about the written data on stdout

824
    Note: The written data is lost! Use this class for debugging purposes
825
826
827
828
829
    """

    def __init__(self):
        super(StdoutDataSink, self).__init__()
        self.dataformat = None
Samuel GAIST's avatar
Samuel GAIST committed
830
        self.prefix = ""
831
832
833
834
835
836
837
838
        self.display_data = True

    def setup(self, dataformat, prefix=None, display_data=True):
        self.dataformat = dataformat
        self.display_data = display_data

        if prefix is not None:
            self.prefix = prefix
Samuel GAIST's avatar
Samuel GAIST committed
839
840
            if self.prefix != "":
                self.prefix += " "
841
842
843
844
845
846

    def write(self, data, start_data_index, end_data_index):
        """Write a block of data

        Parameters:

847
          data (baseformat.baseformat) The block of data to write
848
849
850
851
852
853

          start_data_index (int): Start index of the written data

          end_data_index (int): End index of the written data

        """
854

855
        if self.display_data:
Samuel GAIST's avatar
Samuel GAIST committed
856
857
858
859
            print(
                "%s(%d -> %d): %s"
                % (self.prefix, start_data_index, end_data_index, str(data))
            )
860
        else:
Samuel GAIST's avatar
Samuel GAIST committed
861
862
863
            print(
                "%s(%d -> %d): <data>" % (self.prefix, start_data_index, end_data_index)
            )
864
865
866
867
868

    def isConnected(self):
        return True


869
# ----------------------------------------------------------
870
871


Philip ABBET's avatar
Philip ABBET committed
872
class CachedDataSink(DataSink):
873

Philip ABBET's avatar
Philip ABBET committed
874
    """Data Sink that save data in the Cache
875

Philip ABBET's avatar
Philip ABBET committed
876
    The default behavior is to save the data in a binary format.
877
878
    """

Philip ABBET's avatar
Philip ABBET committed
879
880
881
882
    def __init__(self):
        self.filename = None
        self.encoding = None
        self.dataformat = None
883
884
        self.start_index = None
        self.end_index = None
885

886
887
888
        self.data_file = None
        self.index_file = None
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
889

890
891
        self.nb_bytes_written = 0
        self.write_duration = 0
Philip ABBET's avatar
Philip ABBET committed
892

Samuel GAIST's avatar
Samuel GAIST committed
893
    def setup(self, filename, dataformat, start_index, end_index, encoding="binary"):
Philip ABBET's avatar
Philip ABBET committed
894
895
896
897
898
899
        """Configures the data sink

        Parameters:

          filename (str): Name of the file to generate

900
          dataformat (dataformat.DataFormat): The dataformat to be used
901
902
            inside this file. All objects stored inside this file will respect
            that format.
Philip ABBET's avatar
Philip ABBET committed
903

904
905
906
          encoding (str): String defining the encoding to be used for encoding
            the data. Only a few options are supported: ``binary``
            (the default) or ``json`` (debugging purposes).
Philip ABBET's avatar
Philip ABBET committed
907
908
909

        """

910
911
912
        # Close current file if open
        self.close()

Samuel GAIST's avatar
Samuel GAIST committed
913
914
915
916
917
        if encoding not in ("binary", "json"):
            raise RuntimeError(
                "valid formats for data writing are 'binary' "
                "or 'json': the format `%s' is invalid" % format
            )
Philip ABBET's avatar
Philip ABBET committed
918

Samuel GAIST's avatar
Samuel GAIST committed
919
        if dataformat.name == "__unnamed_dataformat__":
920
            raise RuntimeError("cannot record data using an unnamed data format")
Philip ABBET's avatar
Philip ABBET committed
921

922
        filename, data_ext = os.path.splitext(filename)
Philip ABBET's avatar
Philip ABBET committed
923

Samuel GAIST's avatar
Samuel GAIST committed
924
        self.filename = "%s.%d.%d%s" % (filename, start_index, end_index, data_ext)
925
926
927
928
        self.encoding = encoding
        self.dataformat = dataformat
        self.start_index = start_index
        self.end_index = end_index
Philip ABBET's avatar
Philip ABBET committed
929

930
931
932
        self.nb_bytes_written = 0
        self.write_duration = 0
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
933

934
        try:
Samuel GAIST's avatar
Samuel GAIST committed
935
936
937
            self.data_file = open(self.filename, "wb")
            self.index_file = open(self.filename.replace(".data", ".index"), "wt")
        except Exception:
938
            return False
Philip ABBET's avatar
Philip ABBET committed
939

940
        # Write the dataformat
Samuel GAIST's avatar
Samuel GAIST committed
941
        self.data_file.write(six.b("%s\n%s\n" % (self.encoding, self.dataformat.name)))
942
        self.data_file.flush()
Philip ABBET's avatar
Philip ABBET committed
943
944
945

        return True

946
    def close(self):
Philip ABBET's avatar
Philip ABBET committed
947
948
949
        """Closes the data sink
        """

950
951
952
        if self.data_file is not None:
            self.data_file.close()
            self.index_file.close()
Philip ABBET's avatar
Philip ABBET committed
953

954
            # If file is not complete, delete it
Samuel GAIST's avatar
Samuel GAIST committed
955
956
957
            if (self.last_written_data_index is None) or (
                self.last_written_data_index < self.end_index
            ):
Philip ABBET's avatar
Philip ABBET committed
958
                try:
959
                    os.remove(self.filename)
Samuel GAIST's avatar
Samuel GAIST committed
960
                    os.remove(self.filename.replace(".data", ".index"))
961
                    return True
Samuel GAIST's avatar
Samuel GAIST committed
962
                except Exception:
Philip ABBET's avatar
Philip ABBET committed
963
964
                    return False

965
966
            # Creates the checksums for all data and indexes
            chksum_data = hashFileContents(self.filename)
Samuel GAIST's avatar
Samuel GAIST committed
967
            with open(self.filename + ".checksum", "wt") as f:
Philip ABBET's avatar
Philip ABBET committed
968
969
                f.write(chksum_data)

Samuel GAIST's avatar
Samuel GAIST committed
970
            index_filename = self.filename.replace(".data", ".index")
971
            chksum_index = hashFileContents(index_filename)
Samuel GAIST's avatar
Samuel GAIST committed
972
            with open(index_filename + ".checksum", "wt") as f:
973
                f.write(chksum_index)
Philip ABBET's avatar
Philip ABBET committed
974

975
976
977
            self.data_file = None
            self.index_file = None
            self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
978

979
        return True
980

Philip ABBET's avatar
Philip ABBET committed
981
    def __del__(self):
982
        """Make sure the files are closed when the object is deleted
Philip ABBET's avatar
Philip ABBET committed
983
984
        """
        self.close()
985

Philip ABBET's avatar
Philip ABBET committed
986
987
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data to the filesystem
988

Philip ABBET's avatar
Philip ABBET committed
989
        Parameters:
990

991
          data (baseformat.baseformat): The block of data to write
992

Philip ABBET's avatar
Philip ABBET committed
993
          start_data_index (int): Start index of the written data
994

Philip ABBET's avatar
Philip ABBET committed
995
          end_data_index (int): End index of the written data
996

Philip ABBET's avatar
Philip ABBET committed
997
        """