data.py 35.1 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
35 36


37 38 39 40 41 42 43
"""
====
data
====

Data I/O classes and functions
"""
44 45 46 47 48 49 50

import os
import re
import glob
import simplejson as json
import time
import abc
51
import zmq
52 53 54
import logging
import six

55
from functools import reduce
56
from collections import namedtuple
57 58 59 60

from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm
61
from .exceptions import RemoteException
62

63
logger = logging.getLogger(__name__)
64

65

66
# ----------------------------------------------------------
67 68


69 70 71 72
def mixDataIndices(list_of_data_indices):
    """Given a collection of lists of data indices (belonging to separate
    but synchronized files/inputs), returns the most granular list of
    indices that span all the data
73

74
    For example, the mix of
75

76
      [(0, 2), (3, 4)]
77

78
    and
79

80
      [(0, 4)]
81

82
    is:
83

84
      [(0, 2), (3, 4)]
85 86


87
    The mix of
88

89
      [(0, 2), (3, 4)]
90

91
    and
92

93
      [(0, 1), (2, 3), (4, 4)]
94

95
    is:
96

97
      [(0, 1), (2, 2), (3, 3), (4, 4)]
98

Philip ABBET's avatar
Philip ABBET committed
99
    """
100

Samuel GAIST's avatar
Samuel GAIST committed
101 102
    start = max([x[0][0] for x in list_of_data_indices])
    end = min([x[-1][1] for x in list_of_data_indices])
103

104 105
    result = []
    current_start = start
106

107 108
    for index in range(start, end + 1):
        done = False
109

110 111 112
        for l in list_of_data_indices:
            for indices in l:
                if indices[1] == index:
Samuel GAIST's avatar
Samuel GAIST committed
113
                    result.append((current_start, index))
114 115 116
                    current_start = index + 1
                    done = True
                    break
117

118 119
            if done:
                break
120

121
    return result
122 123


124
# ----------------------------------------------------------
125 126


127 128 129
def getAllFilenames(filename, start_index=None, end_index=None):
    """Returns the names of all the files related to the given data file,
    taking the provided start and end indices into account.
130 131


132
    Parameters:
133

134
        filename (str): Name of the data file (path/to/cache/<hash>.data)
135

136 137
        start_index (int): The starting index (if not set or set to
            ``None``, the default, equivalent to ``0``)
138

139 140
        end_index (int): The end index (if not set or set to ``None``, the
            default, equivalent to ``the last existing data``)
141 142


143
    Returns:
144

145 146
        (data_filenames, indices_filenames,
         data_checksum_filenames, indices_checksum_filenames)
147
    """
148

Samuel GAIST's avatar
Samuel GAIST committed
149
    index_re = re.compile(r"^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$")
150

151 152 153 154 155 156
    def file_start(f):
        """Returns the converted start indexes from a filename, otherwise 0"""
        r = index_re.match(f)
        if r:
            return int(r.group(1))
        return 0
157

158 159
    # Retrieve all the related files
    basename, ext = os.path.splitext(filename)
160
    filenames = sorted(glob.glob(basename + ".*"), key=file_start)
161

162 163 164 165 166 167 168 169
    # (If necessary) Only keep files containing the desired indices
    if (start_index is not None) or (end_index is not None):
        filtered_filenames = []
        for f in filenames:
            match = index_re.match(f)
            if match:
                start = int(match.group(1))
                end = int(match.group(2))
Samuel GAIST's avatar
Samuel GAIST committed
170 171 172
                if ((start_index is not None) and (end < start_index)) or (
                    (end_index is not None) and (start > end_index)
                ):
173 174 175
                    continue
            filtered_filenames.append(f)
        filenames = filtered_filenames
176

177
    # Separate the filenames in different lists
Samuel GAIST's avatar
Samuel GAIST committed
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    data_filenames = [x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.data$", x)]
    indices_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.index$", x)
    ]
    data_checksum_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.data.checksum$", x)
    ]
    indices_checksum_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.index.checksum$", x)
    ]

    return (
        data_filenames,
        indices_filenames,
        data_checksum_filenames,
        indices_checksum_filenames,
    )
195 196


197
# ----------------------------------------------------------
198 199


200 201 202 203
class DataSource(object):
    """Base class to load data from some source"""

    def __init__(self):
Samuel GAIST's avatar
Samuel GAIST committed
204
        self.infos = []
205 206
        self.read_duration = 0
        self.nb_bytes_read = 0
Samuel GAIST's avatar
Samuel GAIST committed
207
        self.ready = False
208 209 210 211 212 213 214 215 216

    def close(self):
        self.infos = []

    def __del__(self):
        """Makes sure all resources are released when the object is deleted"""
        self.close()

    def __len__(self):
217 218 219
        if not self.ready:
            self._prepare()

220 221 222
        return len(self.infos)

    def __iter__(self):
223 224 225
        if not self.ready:
            self._prepare()

226 227 228 229
        for i in range(0, len(self.infos)):
            yield self[i]

    def __getitem__(self, index):
Samuel GAIST's avatar
Samuel GAIST committed
230
        raise NotImplementedError
231 232

    def first_data_index(self):
233 234 235
        if not self.ready:
            self._prepare()

236 237 238
        return self.infos[0].start_index

    def last_data_index(self):
239 240 241
        if not self.ready:
            self._prepare()

242 243 244
        return self.infos[-1].end_index

    def data_indices(self):
245 246 247
        if not self.ready:
            self._prepare()

Samuel GAIST's avatar
Samuel GAIST committed
248
        return [(x.start_index, x.end_index) for x in self.infos]
249 250

    def getAtDataIndex(self, data_index):
251 252 253
        if not self.ready:
            self._prepare()

254 255 256 257 258 259 260 261 262 263
        for index, infos in enumerate(self.infos):
            if (infos.start_index <= data_index) and (data_index <= infos.end_index):
                return self[index]

        return (None, None, None)

    def statistics(self):
        """Return the statistics about the number of bytes read"""
        return (self.nb_bytes_read, self.read_duration)

264 265 266 267
    def _prepare(self):
        self.ready = True


268
# ----------------------------------------------------------
269 270 271


class CachedDataSource(DataSource):
272
    """Utility class to load data from a file in the cache"""
Philip ABBET's avatar
Philip ABBET committed
273 274

    def __init__(self):
275 276
        super(CachedDataSource, self).__init__()

Samuel GAIST's avatar
Samuel GAIST committed
277 278 279 280 281
        self.filenames = None
        self.encoding = None  # Must be 'binary' or 'json'
        self.prefix = None
        self.dataformat = None
        self.current_file = None
282
        self.current_file_index = None
Samuel GAIST's avatar
Samuel GAIST committed
283
        self.unpack = True
284 285 286

    def _readHeader(self, file):
        """Read the header of the provided file"""
Philip ABBET's avatar
Philip ABBET committed
287 288

        # Read file format
289 290
        self.encoding = file.readline()[:-1]
        if not isinstance(self.encoding, str):
Samuel GAIST's avatar
Samuel GAIST committed
291
            self.encoding = self.encoding.decode("utf8")
Philip ABBET's avatar
Philip ABBET committed
292

Samuel GAIST's avatar
Samuel GAIST committed
293 294 295 296 297
        if self.encoding not in ("binary", "json"):
            raise RuntimeError(
                "valid formats for data reading are 'binary' "
                "or 'json': the format `%s' is invalid" % self.encoding
            )
Philip ABBET's avatar
Philip ABBET committed
298 299

        # Read data format
300 301 302 303
        dataformat_name = file.readline()[:-1]

        if self.dataformat is None:
            if not isinstance(dataformat_name, str):
Samuel GAIST's avatar
Samuel GAIST committed
304
                dataformat_name = dataformat_name.decode("utf8")
305

Samuel GAIST's avatar
Samuel GAIST committed
306 307
            if dataformat_name.startswith("analysis:"):
                algo_name = dataformat_name.split(":")[1]
308 309
                algo = Algorithm(self.prefix, algo_name)
                if not algo.valid:
Samuel GAIST's avatar
Samuel GAIST committed
310 311 312 313
                    raise RuntimeError(
                        "the dataformat `%s' is the result of an "
                        "algorithm which is not valid" % algo_name
                    )
314 315 316 317 318
                self.dataformat = algo.result_dataformat()
            else:
                self.dataformat = DataFormat(self.prefix, dataformat_name)

            if not self.dataformat.valid:
319 320 321 322 323
                raise RuntimeError(
                    "the dataformat `{}' is not valid\n{}".format(
                        dataformat_name, self.dataformat.errors
                    )
                )
324

Philip ABBET's avatar
Philip ABBET committed
325
        return True
326

327
    def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
328
        """Configures the data source
329 330


Philip ABBET's avatar
Philip ABBET committed
331
        Parameters:
332

Philip ABBET's avatar
Philip ABBET committed
333
          filename (str): Name of the file to read the data from
334

335
          prefix (str): Establishes the prefix of your installation.
336

337
          start_index (int): The starting index (if not set or set to
Philip ABBET's avatar
Philip ABBET committed
338
            ``None``, the default, read data from the begin of file)
339

340
          end_index (int): The end index (if not set or set to ``None``, the
Philip ABBET's avatar
Philip ABBET committed
341
            default, reads the data until the end)
342

Philip ABBET's avatar
Philip ABBET committed
343
          unpack (bool): Indicates if the data must be unpacked or not
344 345


Philip ABBET's avatar
Philip ABBET committed
346
        Returns:
347

Philip ABBET's avatar
Philip ABBET committed
348
          ``True``, if successful, or ``False`` otherwise.
349

Philip ABBET's avatar
Philip ABBET committed
350
        """
Samuel GAIST's avatar
Samuel GAIST committed
351
        index_re = re.compile(r"^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$")
352

353
        def check_consistency(data_filenames, checksum_filenames):
Philip ABBET's avatar
Philip ABBET committed
354
            """Perform some sanity check on the data/checksum files on disk:
355

Philip ABBET's avatar
Philip ABBET committed
356 357 358 359
              1. One-to-one mapping between data and checksum files
              2. Checksum comparison between hash(data) and checksum files
              3. Contiguous indices if they are present
            """
360

361 362
            # Make sure that we have a perfect match between data files and
            # checksum files
Samuel GAIST's avatar
Samuel GAIST committed
363 364 365
            checksum_filenames_noext = [
                os.path.splitext(f)[0] for f in checksum_filenames
            ]
366

Philip ABBET's avatar
Philip ABBET committed
367
            if data_filenames != checksum_filenames_noext:
Samuel GAIST's avatar
Samuel GAIST committed
368 369 370 371 372
                raise IOError(
                    "number of data files and checksum files for `%s' "
                    "does not match (%d != %d)"
                    % (filename, len(data_filenames), len(checksum_filenames_noext))
                )
373

Philip ABBET's avatar
Philip ABBET committed
374 375 376
            # list of start/end indices to check that there are contiguous
            indices = []
            for f_data, f_chck in zip(data_filenames, checksum_filenames):
Samuel GAIST's avatar
Samuel GAIST committed
377
                expected_chksum = open(f_chck, "rt").read().strip()
Philip ABBET's avatar
Philip ABBET committed
378 379
                current_chksum = hashFileContents(f_data)
                if expected_chksum != current_chksum:
Samuel GAIST's avatar
Samuel GAIST committed
380 381 382 383 384
                    raise IOError(
                        "data file `%s' has a checksum (%s) that differs "
                        "from expected one (%s)"
                        % (f_data, current_chksum, expected_chksum)
                    )
385

Philip ABBET's avatar
Philip ABBET committed
386
                r = index_re.match(f_data)
387 388
                if r:
                    indices.append((int(r.group(1)), int(r.group(2))))
389

Philip ABBET's avatar
Philip ABBET committed
390 391
            indices = sorted(indices, key=lambda v: v[0])
            ok_indices = True
392

393
            if len(indices) > 1:
Samuel GAIST's avatar
Samuel GAIST committed
394 395 396 397 398 399
                ok_indices = sum(
                    [
                        (indices[i + 1][0] - indices[i][1] == 1)
                        for i in range(len(indices) - 1)
                    ]
                )
400

Philip ABBET's avatar
Philip ABBET committed
401
            if not ok_indices:
402 403 404
                raise IOError(
                    "data file `%s|%s' has missing indices." % (f_data, f_chck)
                )
405

Philip ABBET's avatar
Philip ABBET committed
406
        self.prefix = prefix
407
        self.unpack = unpack
408

409
        # Retrieve the list of all needed files
Samuel GAIST's avatar
Samuel GAIST committed
410 411 412 413 414 415
        (
            self.filenames,
            indices_filenames,
            data_checksum_filenames,
            indices_checksum_filenames,
        ) = getAllFilenames(filename, start_index, end_index)
416

417
        if len(self.filenames) == 0:
418
            logger.warning("No files found for %s" % filename)
419 420
            return False

421
        check_consistency(self.filenames, data_checksum_filenames)
422

423
        # Load all the needed infos from all the files
Samuel GAIST's avatar
Samuel GAIST committed
424 425 426
        FileInfos = namedtuple(
            "FileInfos", ["file_index", "start_index", "end_index", "offset", "size"]
        )
427

428 429
        for file_index, current_filename in enumerate(self.filenames):
            try:
Samuel GAIST's avatar
Samuel GAIST committed
430
                f = open(current_filename, "rb")
431
            except Exception as e:
432
                logger.warning("Could not setup `%s': %s" % (filename, e))
433
                return False
434

435 436 437
            # Reads the header of the current file
            self._readHeader(f)
            offset = f.tell()
438

439 440 441
            # Process each data unit from the file
            while True:
                line = f.readline()
Samuel GAIST's avatar
Samuel GAIST committed
442
                if not line:
443
                    break
444

445
                offset += len(line)
446

Samuel GAIST's avatar
Samuel GAIST committed
447 448 449 450 451 452 453 454 455 456 457 458 459 460
                (start, end, data_size) = [int(x) for x in line.split()]

                if ((start_index is None) or (start >= start_index)) and (
                    (end_index is None) or (end <= end_index)
                ):
                    self.infos.append(
                        FileInfos(
                            file_index=file_index,
                            start_index=start,
                            end_index=end,
                            offset=offset,
                            size=data_size,
                        )
                    )
461

462 463
                f.seek(data_size, 1)
                offset += data_size
464

465
            f.close()
466

Philip ABBET's avatar
Philip ABBET committed
467
        return True
468

Philip ABBET's avatar
Philip ABBET committed
469
    def close(self):
470 471
        if self.current_file is not None:
            self.current_file.close()
472

473
        super(CachedDataSource, self).close()
474 475 476

    def __getitem__(self, index):
        """Retrieve a block of data
477

Philip ABBET's avatar
Philip ABBET committed
478
        Returns:
479

Philip ABBET's avatar
Philip ABBET committed
480
          A tuple (data, start_index, end_index)
481

Philip ABBET's avatar
Philip ABBET committed
482
        """
483

484 485 486
        if not self.ready:
            self._prepare()

487
        if (index < 0) or (index >= len(self.infos)):
Philip ABBET's avatar
Philip ABBET committed
488
            return (None, None, None)
489

490 491 492 493 494 495 496 497
        infos = self.infos[index]

        if self.current_file_index != infos.file_index:
            if self.current_file is not None:
                self.current_file.close()
                self.current_file = None

            try:
Samuel GAIST's avatar
Samuel GAIST committed
498
                self.current_file = open(self.filenames[infos.file_index], "rb")
499
            except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
500 501 502
                raise IOError(
                    "Could not read `%s': %s" % (self.filenames[infos.file_index], e)
                )
503

504 505
            self.current_file_index = infos.file_index

506
        self.current_file.seek(infos.offset, 0)
507

508 509 510
        t1 = time.time()
        encoded_data = self.current_file.read(infos.size)
        t2 = time.time()
511

512 513
        self.read_duration += t2 - t1
        self.nb_bytes_read += infos.size
514

515
        if self.unpack:
Philip ABBET's avatar
Philip ABBET committed
516
            data = self.dataformat.type()
517
            data.unpack(encoded_data)
Philip ABBET's avatar
Philip ABBET committed
518 519
        else:
            data = encoded_data
520

521
        return (data, infos.start_index, infos.end_index)
522 523


524
# ----------------------------------------------------------
525 526


527 528
class DatabaseOutputDataSource(DataSource):
    """Utility class to load data from an output of a database view"""
529

530 531
    def __init__(self):
        super(DatabaseOutputDataSource, self).__init__()
532

Samuel GAIST's avatar
Samuel GAIST committed
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548
        self.prefix = None
        self.dataformat = None
        self.view = None
        self.output_name = None
        self.pack = True

    def setup(
        self,
        view,
        output_name,
        dataformat_name,
        prefix,
        start_index=None,
        end_index=None,
        pack=False,
    ):
549
        """Configures the data source
550 551


552
        Parameters:
553

554
          prefix (str): Establishes the prefix of your installation.
555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570

          start_index (int): The starting index (if not set or set to
            ``None``, the default, read data from the begin of file)

          end_index (int): The end index (if not set or set to ``None``, the
            default, reads the data until the end)

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

Samuel GAIST's avatar
Samuel GAIST committed
571 572
        self.prefix = prefix
        self.view = view
573
        self.output_name = output_name
Samuel GAIST's avatar
Samuel GAIST committed
574
        self.pack = pack
575 576 577 578 579 580 581

        self.dataformat = DataFormat(self.prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        # Load all the needed infos from all the files
Samuel GAIST's avatar
Samuel GAIST committed
582
        Infos = namedtuple("Infos", ["start_index", "end_index"])
583 584 585 586 587 588 589

        objects = self.view.objects()

        start = None
        end = None
        previous_value = None

590 591
        attribute = self.view.get_output_mapping(output_name)

592 593 594
        for index, obj in enumerate(objects):
            if start is None:
                start = index
595 596
                previous_value = getattr(obj, attribute)
            elif getattr(obj, attribute) != previous_value:
597 598 599
                end = index - 1
                previous_value = None

Samuel GAIST's avatar
Samuel GAIST committed
600 601 602
                if ((start_index is None) or (start >= start_index)) and (
                    (end_index is None) or (end <= end_index)
                ):
603 604 605
                    self.infos.append(Infos(start_index=start, end_index=end))

                start = index
606
                previous_value = getattr(obj, attribute)
607 608 609

        end = index

Samuel GAIST's avatar
Samuel GAIST committed
610 611 612
        if ((start_index is None) or (start >= start_index)) and (
            (end_index is None) or (end <= end_index)
        ):
613 614 615 616 617 618 619 620 621 622 623 624 625
            self.infos.append(Infos(start_index=start, end_index=end))

        return True

    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

626 627 628
        if not self.ready:
            self._prepare()

629 630 631 632 633 634 635 636 637 638 639 640 641
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        t1 = time.time()
        data = self.view.get(self.output_name, infos.start_index)
        t2 = time.time()

        self.read_duration += t2 - t1

        if isinstance(data, dict):
            d = self.dataformat.type()
Samuel GAIST's avatar
Samuel GAIST committed
642
            d.from_dict(data, casting="safe", add_defaults=False)
643 644
            data = d

645
        if self.pack:
646 647 648 649
            data = data.pack()
            self.nb_bytes_read += len(data)

        return (data, infos.start_index, infos.end_index)
650

651

652
# ----------------------------------------------------------
653 654


655 656 657 658 659 660
class RemoteDataSource(DataSource):
    """Utility class to load data from a data source accessible via a socket"""

    def __init__(self):
        super(RemoteDataSource, self).__init__()

Samuel GAIST's avatar
Samuel GAIST committed
661
        self.socket = None
662 663
        self.input_name = None
        self.dataformat = None
Samuel GAIST's avatar
Samuel GAIST committed
664
        self.unpack = True
665 666 667 668 669 670 671

    def setup(self, socket, input_name, dataformat_name, prefix, unpack=True):
        """Configures the data source


        Parameters:

672
          socket (zmq.Socket): The socket to use to access the data.
673 674 675 676 677

          input_name (str): Name of the input corresponding to the data source.

          dataformat_name (str): Name of the data format.

678
          prefix (str): Establishes the prefix of your installation.
679 680 681 682 683 684 685 686 687 688

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

Samuel GAIST's avatar
Samuel GAIST committed
689
        self.socket = socket
690
        self.input_name = input_name
Samuel GAIST's avatar
Samuel GAIST committed
691
        self.unpack = unpack
692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708

        self.dataformat = DataFormat(prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        return True

    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

709 710 711
        if not self.ready:
            self._prepare()

712 713 714 715 716
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

Samuel GAIST's avatar
Samuel GAIST committed
717
        logger.debug("send: (get) get %s %d", self.input_name, index)
718 719 720

        t1 = time.time()

Samuel GAIST's avatar
Samuel GAIST committed
721
        self.socket.send_string("get", zmq.SNDMORE)
722
        self.socket.send_string(self.input_name, zmq.SNDMORE)
Samuel GAIST's avatar
Samuel GAIST committed
723
        self.socket.send_string("%d" % index)
724

Samuel GAIST's avatar
Samuel GAIST committed
725
        answer = self.socket.recv().decode("utf-8")
726

Samuel GAIST's avatar
Samuel GAIST committed
727
        if answer == "err":
728
            self.read_duration += time.time() - t1
729 730 731 732 733 734 735 736 737 738 739
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        start = int(answer)
        end = int(self.socket.recv())

        packed = self.socket.recv()

        t2 = time.time()

Samuel GAIST's avatar
Samuel GAIST committed
740
        logger.debug("recv: <bin> (size=%d), indexes=(%d, %d)", len(packed), start, end)
741 742 743 744 745 746 747 748 749 750 751 752 753

        self.nb_bytes_read += len(packed)

        if self.unpack:
            data = self.dataformat.type()
            data.unpack(packed)
        else:
            data = packed

        self.read_duration += t2 - t1

        return (data, infos.start_index, infos.end_index)

754 755
    def _prepare(self):
        # Load the needed infos from the socket
Samuel GAIST's avatar
Samuel GAIST committed
756
        Infos = namedtuple("Infos", ["start_index", "end_index"])
757

Samuel GAIST's avatar
Samuel GAIST committed
758
        logger.debug("send: (ifo) infos %s", self.input_name)
759

Samuel GAIST's avatar
Samuel GAIST committed
760
        self.socket.send_string("ifo", zmq.SNDMORE)
761
        self.socket.send_string(self.input_name)
762

Samuel GAIST's avatar
Samuel GAIST committed
763 764
        answer = self.socket.recv().decode("utf-8")
        logger.debug("recv: %s", answer)
765

Samuel GAIST's avatar
Samuel GAIST committed
766
        if answer == "err":
767 768 769 770 771 772 773 774 775 776 777 778 779
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        nb = int(answer)
        for i in range(nb):
            start = int(self.socket.recv())
            end = int(self.socket.recv())
            self.infos.append(Infos(start_index=start, end_index=end))

        self.ready = True


780
# ----------------------------------------------------------
781 782


783 784 785
class DataSink(object):

    """Interface of all the Data Sinks
786

787 788
    Data Sinks are used by the outputs of an algorithm to write/transmit data.
    """
Samuel GAIST's avatar
Samuel GAIST committed
789

790
    __metaclass__ = abc.ABCMeta
791

792 793 794
    @abc.abstractmethod
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data
795

796
        Parameters:
797

798
          data (baseformat.baseformat): The block of data to write
799

800
          start_data_index (int): Start index of the written data
801

802
          end_data_index (int): End index of the written data
803

804
        """
805

806
        pass
807

808 809
    @abc.abstractmethod
    def isConnected(self):
810
        """Returns whether the data sink is connected"""
811

812
        pass
813

814
    def close(self):
815 816
        """Closes the data sink"""

817 818 819
        pass


820
# ----------------------------------------------------------
821 822


823 824 825 826
class StdoutDataSink(DataSink):

    """Data Sink that prints informations about the written data on stdout

827
    Note: The written data is lost! Use this class for debugging purposes
828 829 830 831 832
    """

    def __init__(self):
        super(StdoutDataSink, self).__init__()
        self.dataformat = None
Samuel GAIST's avatar
Samuel GAIST committed
833
        self.prefix = ""
834 835 836 837 838 839 840 841
        self.display_data = True

    def setup(self, dataformat, prefix=None, display_data=True):
        self.dataformat = dataformat
        self.display_data = display_data

        if prefix is not None:
            self.prefix = prefix
Samuel GAIST's avatar
Samuel GAIST committed
842 843
            if self.prefix != "":
                self.prefix += " "
844 845 846 847 848 849

    def write(self, data, start_data_index, end_data_index):
        """Write a block of data

        Parameters:

850
          data (baseformat.baseformat) The block of data to write
851 852 853 854 855 856

          start_data_index (int): Start index of the written data

          end_data_index (int): End index of the written data

        """
857

858
        if self.display_data:
Samuel GAIST's avatar
Samuel GAIST committed
859 860 861 862
            print(
                "%s(%d -> %d): %s"
                % (self.prefix, start_data_index, end_data_index, str(data))
            )
863
        else:
Samuel GAIST's avatar
Samuel GAIST committed
864 865 866
            print(
                "%s(%d -> %d): <data>" % (self.prefix, start_data_index, end_data_index)
            )
867 868 869 870 871

    def isConnected(self):
        return True


872
# ----------------------------------------------------------
873 874


Philip ABBET's avatar
Philip ABBET committed
875
class CachedDataSink(DataSink):
876

Philip ABBET's avatar
Philip ABBET committed
877
    """Data Sink that save data in the Cache
878

Philip ABBET's avatar
Philip ABBET committed
879
    The default behavior is to save the data in a binary format.
880 881
    """

Philip ABBET's avatar
Philip ABBET committed
882 883 884 885
    def __init__(self):
        self.filename = None
        self.encoding = None
        self.dataformat = None
886 887
        self.start_index = None
        self.end_index = None
888

889 890 891
        self.data_file = None
        self.index_file = None
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
892

893 894
        self.nb_bytes_written = 0
        self.write_duration = 0
Philip ABBET's avatar
Philip ABBET committed
895

Samuel GAIST's avatar
Samuel GAIST committed
896
    def setup(self, filename, dataformat, start_index, end_index, encoding="binary"):
Philip ABBET's avatar
Philip ABBET committed
897 898 899 900 901 902
        """Configures the data sink

        Parameters:

          filename (str): Name of the file to generate

903
          dataformat (dataformat.DataFormat): The dataformat to be used
904 905
            inside this file. All objects stored inside this file will respect
            that format.
Philip ABBET's avatar
Philip ABBET committed
906

907 908 909
          encoding (str): String defining the encoding to be used for encoding
            the data. Only a few options are supported: ``binary``
            (the default) or ``json`` (debugging purposes).
Philip ABBET's avatar
Philip ABBET committed
910 911 912

        """

913 914 915
        # Close current file if open
        self.close()

Samuel GAIST's avatar
Samuel GAIST committed
916 917 918 919 920
        if encoding not in ("binary", "json"):
            raise RuntimeError(
                "valid formats for data writing are 'binary' "
                "or 'json': the format `%s' is invalid" % format
            )
Philip ABBET's avatar
Philip ABBET committed
921

Samuel GAIST's avatar
Samuel GAIST committed
922
        if dataformat.name == "__unnamed_dataformat__":
923
            raise RuntimeError("cannot record data using an unnamed data format")
Philip ABBET's avatar
Philip ABBET committed
924

925
        filename, data_ext = os.path.splitext(filename)
Philip ABBET's avatar
Philip ABBET committed
926

Samuel GAIST's avatar
Samuel GAIST committed
927
        self.filename = "%s.%d.%d%s" % (filename, start_index, end_index, data_ext)
928 929 930 931
        self.encoding = encoding
        self.dataformat = dataformat
        self.start_index = start_index
        self.end_index = end_index
Philip ABBET's avatar
Philip ABBET committed
932

933 934 935
        self.nb_bytes_written = 0
        self.write_duration = 0
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
936

937
        try:
Samuel GAIST's avatar
Samuel GAIST committed
938
            self.data_file = open(self.filename, "wb")
939 940 941 942
        except Exception as e:
            logger.error("Failed to open data file {}: {}".format(self.filename, e))
            return False

943
        index_filename = self.filename.replace(".data", ".index")
944 945 946 947
        try:
            self.index_file = open(index_filename, "wt")
        except Exception as e:
            logger.error("Failed to open index file {}: {}".format(index_filename, e))
948
            return False
Philip ABBET's avatar
Philip ABBET committed
949

950
        # Write the dataformat
Samuel GAIST's avatar
Samuel GAIST committed
951
        self.data_file.write(six.b("%s\n%s\n" % (self.encoding, self.dataformat.name)))
952
        self.data_file.flush()
Philip ABBET's avatar
Philip ABBET committed
953 954 955

        return True

956
    def close(self):
Philip ABBET's avatar
Philip ABBET committed
957 958 959
        """Closes the data sink
        """

960 961 962
        if self.data_file is not None:
            self.data_file.close()
            self.index_file.close()
Philip ABBET's avatar
Philip ABBET committed
963

964 965 966
            data_filename = self.data_file.name
            index_filename = self.index_file.name

967
            # If file is not complete, delete it
Samuel GAIST's avatar
Samuel GAIST committed
968 969 970
            if (self.last_written_data_index is None) or (
                self.last_written_data_index < self.end_index
            ):
971 972 973 974
                if self.last_written_data_index is None:
                    message = "No data written"
                else:
                    message = "No enough data written: last written {} vs end {}".format(
975
                        self.last_written_data_index, self.end_index
976 977 978 979
                    )

                logger.warning("Removing cache files: {}".format(message))

980 981 982 983 984 985 986 987 988
                artifacts_removed = True

                for filename in [data_filename, index_filename]:
                    try:
                        os.remove(filename)
                    except Exception as e:
                        logger.warning("Failed to remove {}: {}".format(filename, e))
                        artifacts_removed = False
                return artifacts_removed
Philip ABBET's avatar
Philip ABBET committed
989

990
            # Creates the checksums for all data and indexes
991 992
            chksum_data = hashFileContents(data_filename)
            with open(data_filename + ".checksum", "wt") as f:
Philip ABBET's avatar
Philip ABBET committed
993 994
                f.write(chksum_data)

995
            chksum_index = hashFileContents(index_filename)
Samuel GAIST's avatar
Samuel GAIST committed
996
            with open(index_filename + ".checksum", "wt") as f:
997
                f.write(chksum_index)
Philip ABBET's avatar
Philip ABBET committed
998

999 1000 1001
            self.data_file = None
            self.index_file = None
            self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
1002

1003
        return True
1004

Philip ABBET's avatar
Philip ABBET committed
1005
    def __del__(self):
1006
        """Make sure the files are closed when the object is deleted
Philip ABBET's avatar
Philip ABBET committed
1007 1008
        """
        self.close()
1009

Philip ABBET's avatar
Philip ABBET committed
1010 1011
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data to the filesystem
1012

Philip ABBET's avatar
Philip ABBET committed
1013
        Parameters:
1014

1015
          data (baseformat.baseformat): The block of data to write
1016

Philip ABBET's avatar
Philip ABBET committed
1017
          start_data_index (int): Start index of the written data
1018

Philip ABBET's avatar
Philip ABBET committed
1019
          end_data_index (int): End index of the written data
1020

Philip ABBET's avatar
Philip ABBET committed
1021
        """
1022

1023
        # If the user passed a dictionary - convert it
Philip ABBET's avatar
Philip ABBET committed
1024 1025 1026 1027 1028
        if isinstance(data, dict):
            data = self.dataformat.type(**data)
        else:
            # Checks that the input data conforms to the expected format
            if data.__class__._name != self.dataformat.name:
Samuel GAIST's avatar
Samuel GAIST committed
1029 1030 1031 1032
                raise TypeError(
                    "input data uses format `%s' while this sink "
                    "expects `%s'" % (data.__class__._name, self.dataformat)
                )
1033

1034 1035
        if self.data_file is None:
            raise RuntimeError("No destination file")
1036

1037
        # Encoding
Samuel GAIST's avatar
Samuel GAIST committed
1038
        if self.encoding == "binary":
Philip ABBET's avatar
Philip ABBET committed
1039 1040 1041
            encoded_data = data