data.py 36.2 KB
Newer Older
1 2 3
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
###################################################################################
#                                                                                 #
# Copyright (c) 2019 Idiap Research Institute, http://www.idiap.ch/               #
# Contact: beat.support@idiap.ch                                                  #
#                                                                                 #
# Redistribution and use in source and binary forms, with or without              #
# modification, are permitted provided that the following conditions are met:     #
#                                                                                 #
# 1. Redistributions of source code must retain the above copyright notice, this  #
# list of conditions and the following disclaimer.                                #
#                                                                                 #
# 2. Redistributions in binary form must reproduce the above copyright notice,    #
# this list of conditions and the following disclaimer in the documentation       #
# and/or other materials provided with the distribution.                          #
#                                                                                 #
# 3. Neither the name of the copyright holder nor the names of its contributors   #
# may be used to endorse or promote products derived from this software without   #
# specific prior written permission.                                              #
#                                                                                 #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND #
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED   #
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE          #
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE    #
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL      #
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR      #
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER      #
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,   #
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE   #
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.            #
#                                                                                 #
###################################################################################
35 36


37 38 39 40 41 42 43
"""
====
data
====

Data I/O classes and functions
"""
44 45 46 47 48 49 50

import os
import re
import glob
import simplejson as json
import time
import abc
51
import zmq
52 53 54
import logging
import six

55
from functools import reduce
56
from collections import namedtuple
57 58 59 60

from .hash import hashFileContents
from .dataformat import DataFormat
from .algorithm import Algorithm
61
from .exceptions import RemoteException
62

63
logger = logging.getLogger(__name__)
64

65

66
# ----------------------------------------------------------
67 68


69 70 71 72
def mixDataIndices(list_of_data_indices):
    """Given a collection of lists of data indices (belonging to separate
    but synchronized files/inputs), returns the most granular list of
    indices that span all the data
73

74
    For example, the mix of
75

76
      [(0, 2), (3, 4)]
77

78
    and
79

80
      [(0, 4)]
81

82
    is:
83

84
      [(0, 2), (3, 4)]
85 86


87
    The mix of
88

89
      [(0, 2), (3, 4)]
90

91
    and
92

93
      [(0, 1), (2, 3), (4, 4)]
94

95
    is:
96

97
      [(0, 1), (2, 2), (3, 3), (4, 4)]
98

Philip ABBET's avatar
Philip ABBET committed
99
    """
100

Samuel GAIST's avatar
Samuel GAIST committed
101 102
    start = max([x[0][0] for x in list_of_data_indices])
    end = min([x[-1][1] for x in list_of_data_indices])
103

104 105
    result = []
    current_start = start
106

107 108
    for index in range(start, end + 1):
        done = False
109

110 111 112
        for l in list_of_data_indices:
            for indices in l:
                if indices[1] == index:
Samuel GAIST's avatar
Samuel GAIST committed
113
                    result.append((current_start, index))
114 115 116
                    current_start = index + 1
                    done = True
                    break
117

118 119
            if done:
                break
120

121
    return result
122 123


124
# ----------------------------------------------------------
125 126


127 128 129
def getAllFilenames(filename, start_index=None, end_index=None):
    """Returns the names of all the files related to the given data file,
    taking the provided start and end indices into account.
130 131


132
    Parameters:
133

134
        filename (str): Name of the data file (path/to/cache/<hash>.data)
135

136 137
        start_index (int): The starting index (if not set or set to
            ``None``, the default, equivalent to ``0``)
138

139 140
        end_index (int): The end index (if not set or set to ``None``, the
            default, equivalent to ``the last existing data``)
141 142


143
    Returns:
144

145 146
        (data_filenames, indices_filenames,
         data_checksum_filenames, indices_checksum_filenames)
147
    """
148

Samuel GAIST's avatar
Samuel GAIST committed
149
    index_re = re.compile(r"^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$")
150

151 152 153 154 155 156
    def file_start(f):
        """Returns the converted start indexes from a filename, otherwise 0"""
        r = index_re.match(f)
        if r:
            return int(r.group(1))
        return 0
157

158 159
    # Retrieve all the related files
    basename, ext = os.path.splitext(filename)
160
    filenames = sorted(glob.glob(basename + ".*"), key=file_start)
161

162 163 164 165 166 167 168 169
    # (If necessary) Only keep files containing the desired indices
    if (start_index is not None) or (end_index is not None):
        filtered_filenames = []
        for f in filenames:
            match = index_re.match(f)
            if match:
                start = int(match.group(1))
                end = int(match.group(2))
Samuel GAIST's avatar
Samuel GAIST committed
170 171 172
                if ((start_index is not None) and (end < start_index)) or (
                    (end_index is not None) and (start > end_index)
                ):
173 174 175
                    continue
            filtered_filenames.append(f)
        filenames = filtered_filenames
176

177
    # Separate the filenames in different lists
Samuel GAIST's avatar
Samuel GAIST committed
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
    data_filenames = [x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.data$", x)]
    indices_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.index$", x)
    ]
    data_checksum_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.data.checksum$", x)
    ]
    indices_checksum_filenames = [
        x for x in filenames if re.match(r"^.*\.(\d+)\.(\d+)\.index.checksum$", x)
    ]

    return (
        data_filenames,
        indices_filenames,
        data_checksum_filenames,
        indices_checksum_filenames,
    )
195 196


197
# ----------------------------------------------------------
198 199


200 201 202 203
class DataSource(object):
    """Base class to load data from some source"""

    def __init__(self):
Samuel GAIST's avatar
Samuel GAIST committed
204
        self.infos = []
205 206
        self.read_duration = 0
        self.nb_bytes_read = 0
Samuel GAIST's avatar
Samuel GAIST committed
207
        self.ready = False
208 209 210 211

    def close(self):
        self.infos = []

212 213 214 215 216 217 218 219
    def reset(self):
        """Reset the state of the data source

        This shall only clear the current state, not require a new call
        to setup the source.
        """
        pass

220 221 222 223 224
    def __del__(self):
        """Makes sure all resources are released when the object is deleted"""
        self.close()

    def __len__(self):
225 226 227
        if not self.ready:
            self._prepare()

228 229 230
        return len(self.infos)

    def __iter__(self):
231 232 233
        if not self.ready:
            self._prepare()

234 235 236 237
        for i in range(0, len(self.infos)):
            yield self[i]

    def __getitem__(self, index):
Samuel GAIST's avatar
Samuel GAIST committed
238
        raise NotImplementedError
239 240

    def first_data_index(self):
241 242 243
        if not self.ready:
            self._prepare()

244 245 246
        return self.infos[0].start_index

    def last_data_index(self):
247 248 249
        if not self.ready:
            self._prepare()

250 251 252
        return self.infos[-1].end_index

    def data_indices(self):
253 254 255
        if not self.ready:
            self._prepare()

Samuel GAIST's avatar
Samuel GAIST committed
256
        return [(x.start_index, x.end_index) for x in self.infos]
257 258

    def getAtDataIndex(self, data_index):
259 260 261
        if not self.ready:
            self._prepare()

262 263 264 265 266 267 268 269 270 271
        for index, infos in enumerate(self.infos):
            if (infos.start_index <= data_index) and (data_index <= infos.end_index):
                return self[index]

        return (None, None, None)

    def statistics(self):
        """Return the statistics about the number of bytes read"""
        return (self.nb_bytes_read, self.read_duration)

272 273 274 275
    def _prepare(self):
        self.ready = True


276
# ----------------------------------------------------------
277

278 279 280 281 282 283
# helper to store file information
# required to be out of the CachedDataSource for pickling reasons
FileInfos = namedtuple(
    "FileInfos", ["file_index", "start_index", "end_index", "offset", "size"]
)

284 285

class CachedDataSource(DataSource):
286
    """Utility class to load data from a file in the cache"""
Philip ABBET's avatar
Philip ABBET committed
287 288

    def __init__(self):
289 290
        super(CachedDataSource, self).__init__()

Samuel GAIST's avatar
Samuel GAIST committed
291 292 293 294 295
        self.filenames = None
        self.encoding = None  # Must be 'binary' or 'json'
        self.prefix = None
        self.dataformat = None
        self.current_file = None
296
        self.current_file_index = None
Samuel GAIST's avatar
Samuel GAIST committed
297
        self.unpack = True
298

299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321
    def __getstate__(self):
        # do not pass open files when being pickled/copied
        state = self.__dict__.copy()

        if state["current_file"] is not None:
            del state["current_file"]
            state["__had_open_file__"] = True

        return state

    def __setstate__(self, state):
        # restore the state after being pickled/copied
        had_open_file_before_pickle = state.pop("__had_open_file__", False)

        self.__dict__.update(state)

        if had_open_file_before_pickle:
            try:
                path = self.filenames[self.current_file_index]
                self.current_file = open(path, "rb")
            except Exception as e:
                raise IOError("Could not read `%s': %s" % (path, e))

322 323
    def _readHeader(self, file):
        """Read the header of the provided file"""
Philip ABBET's avatar
Philip ABBET committed
324 325

        # Read file format
326 327
        self.encoding = file.readline()[:-1]
        if not isinstance(self.encoding, str):
Samuel GAIST's avatar
Samuel GAIST committed
328
            self.encoding = self.encoding.decode("utf8")
Philip ABBET's avatar
Philip ABBET committed
329

Samuel GAIST's avatar
Samuel GAIST committed
330 331 332 333 334
        if self.encoding not in ("binary", "json"):
            raise RuntimeError(
                "valid formats for data reading are 'binary' "
                "or 'json': the format `%s' is invalid" % self.encoding
            )
Philip ABBET's avatar
Philip ABBET committed
335 336

        # Read data format
337 338 339 340
        dataformat_name = file.readline()[:-1]

        if self.dataformat is None:
            if not isinstance(dataformat_name, str):
Samuel GAIST's avatar
Samuel GAIST committed
341
                dataformat_name = dataformat_name.decode("utf8")
342

Samuel GAIST's avatar
Samuel GAIST committed
343 344
            if dataformat_name.startswith("analysis:"):
                algo_name = dataformat_name.split(":")[1]
345 346
                algo = Algorithm(self.prefix, algo_name)
                if not algo.valid:
Samuel GAIST's avatar
Samuel GAIST committed
347 348 349 350
                    raise RuntimeError(
                        "the dataformat `%s' is the result of an "
                        "algorithm which is not valid" % algo_name
                    )
351 352 353 354 355
                self.dataformat = algo.result_dataformat()
            else:
                self.dataformat = DataFormat(self.prefix, dataformat_name)

            if not self.dataformat.valid:
356 357 358 359 360
                raise RuntimeError(
                    "the dataformat `{}' is not valid\n{}".format(
                        dataformat_name, self.dataformat.errors
                    )
                )
361

Philip ABBET's avatar
Philip ABBET committed
362
        return True
363

364
    def setup(self, filename, prefix, start_index=None, end_index=None, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
365
        """Configures the data source
366 367


Philip ABBET's avatar
Philip ABBET committed
368
        Parameters:
369

Philip ABBET's avatar
Philip ABBET committed
370
          filename (str): Name of the file to read the data from
371

372
          prefix (str): Establishes the prefix of your installation.
373

374
          start_index (int): The starting index (if not set or set to
Philip ABBET's avatar
Philip ABBET committed
375
            ``None``, the default, read data from the begin of file)
376

377
          end_index (int): The end index (if not set or set to ``None``, the
Philip ABBET's avatar
Philip ABBET committed
378
            default, reads the data until the end)
379

Philip ABBET's avatar
Philip ABBET committed
380
          unpack (bool): Indicates if the data must be unpacked or not
381 382


Philip ABBET's avatar
Philip ABBET committed
383
        Returns:
384

Philip ABBET's avatar
Philip ABBET committed
385
          ``True``, if successful, or ``False`` otherwise.
386

Philip ABBET's avatar
Philip ABBET committed
387
        """
Samuel GAIST's avatar
Samuel GAIST committed
388
        index_re = re.compile(r"^.*\.(\d+)\.(\d+)\.(data|index)(.checksum)?$")
389

390
        def check_consistency(data_filenames, checksum_filenames):
Philip ABBET's avatar
Philip ABBET committed
391
            """Perform some sanity check on the data/checksum files on disk:
392

Philip ABBET's avatar
Philip ABBET committed
393 394 395 396
              1. One-to-one mapping between data and checksum files
              2. Checksum comparison between hash(data) and checksum files
              3. Contiguous indices if they are present
            """
397

398 399
            # Make sure that we have a perfect match between data files and
            # checksum files
Samuel GAIST's avatar
Samuel GAIST committed
400 401 402
            checksum_filenames_noext = [
                os.path.splitext(f)[0] for f in checksum_filenames
            ]
403

Philip ABBET's avatar
Philip ABBET committed
404
            if data_filenames != checksum_filenames_noext:
Samuel GAIST's avatar
Samuel GAIST committed
405 406 407 408 409
                raise IOError(
                    "number of data files and checksum files for `%s' "
                    "does not match (%d != %d)"
                    % (filename, len(data_filenames), len(checksum_filenames_noext))
                )
410

Philip ABBET's avatar
Philip ABBET committed
411 412 413
            # list of start/end indices to check that there are contiguous
            indices = []
            for f_data, f_chck in zip(data_filenames, checksum_filenames):
414 415
                with open(f_chck, "rt") as f:
                    expected_chksum = f.read().strip()
Philip ABBET's avatar
Philip ABBET committed
416 417
                current_chksum = hashFileContents(f_data)
                if expected_chksum != current_chksum:
Samuel GAIST's avatar
Samuel GAIST committed
418 419 420 421 422
                    raise IOError(
                        "data file `%s' has a checksum (%s) that differs "
                        "from expected one (%s)"
                        % (f_data, current_chksum, expected_chksum)
                    )
423

Philip ABBET's avatar
Philip ABBET committed
424
                r = index_re.match(f_data)
425 426
                if r:
                    indices.append((int(r.group(1)), int(r.group(2))))
427

Philip ABBET's avatar
Philip ABBET committed
428 429
            indices = sorted(indices, key=lambda v: v[0])
            ok_indices = True
430

431
            if len(indices) > 1:
Samuel GAIST's avatar
Samuel GAIST committed
432 433 434 435 436 437
                ok_indices = sum(
                    [
                        (indices[i + 1][0] - indices[i][1] == 1)
                        for i in range(len(indices) - 1)
                    ]
                )
438

Philip ABBET's avatar
Philip ABBET committed
439
            if not ok_indices:
440 441 442
                raise IOError(
                    "data file `%s|%s' has missing indices." % (f_data, f_chck)
                )
443

Philip ABBET's avatar
Philip ABBET committed
444
        self.prefix = prefix
445
        self.unpack = unpack
446

447
        # Retrieve the list of all needed files
Samuel GAIST's avatar
Samuel GAIST committed
448 449 450 451 452 453
        (
            self.filenames,
            indices_filenames,
            data_checksum_filenames,
            indices_checksum_filenames,
        ) = getAllFilenames(filename, start_index, end_index)
454

455
        if len(self.filenames) == 0:
456
            logger.warning("No files found for %s" % filename)
457 458
            return False

459
        check_consistency(self.filenames, data_checksum_filenames)
460

461 462
        for file_index, current_filename in enumerate(self.filenames):
            try:
Samuel GAIST's avatar
Samuel GAIST committed
463
                f = open(current_filename, "rb")
464
            except Exception as e:
465
                logger.warning("Could not setup `%s': %s" % (filename, e))
466
                return False
467

468 469 470
            # Reads the header of the current file
            self._readHeader(f)
            offset = f.tell()
471

472 473 474
            # Process each data unit from the file
            while True:
                line = f.readline()
Samuel GAIST's avatar
Samuel GAIST committed
475
                if not line:
476
                    break
477

478
                offset += len(line)
479

Samuel GAIST's avatar
Samuel GAIST committed
480 481 482 483 484 485 486 487 488 489 490 491 492 493
                (start, end, data_size) = [int(x) for x in line.split()]

                if ((start_index is None) or (start >= start_index)) and (
                    (end_index is None) or (end <= end_index)
                ):
                    self.infos.append(
                        FileInfos(
                            file_index=file_index,
                            start_index=start,
                            end_index=end,
                            offset=offset,
                            size=data_size,
                        )
                    )
494

495 496
                f.seek(data_size, 1)
                offset += data_size
497

498
            f.close()
499

Philip ABBET's avatar
Philip ABBET committed
500
        return True
501

Philip ABBET's avatar
Philip ABBET committed
502
    def close(self):
503
        self.reset()
504

505
        super(CachedDataSource, self).close()
506

507 508 509 510 511 512 513 514
    def reset(self):
        """Rest the current state"""

        if self.current_file is not None:
            self.current_file.close()
            self.current_file = None
        self.current_file_index = None

515 516
    def __getitem__(self, index):
        """Retrieve a block of data
517

Philip ABBET's avatar
Philip ABBET committed
518
        Returns:
519

Philip ABBET's avatar
Philip ABBET committed
520
          A tuple (data, start_index, end_index)
521

Philip ABBET's avatar
Philip ABBET committed
522
        """
523

524 525 526
        if not self.ready:
            self._prepare()

527
        if (index < 0) or (index >= len(self.infos)):
Philip ABBET's avatar
Philip ABBET committed
528
            return (None, None, None)
529

530 531 532 533 534 535 536 537
        infos = self.infos[index]

        if self.current_file_index != infos.file_index:
            if self.current_file is not None:
                self.current_file.close()
                self.current_file = None

            try:
Samuel GAIST's avatar
Samuel GAIST committed
538
                self.current_file = open(self.filenames[infos.file_index], "rb")
539
            except Exception as e:
Samuel GAIST's avatar
Samuel GAIST committed
540 541 542
                raise IOError(
                    "Could not read `%s': %s" % (self.filenames[infos.file_index], e)
                )
543

544 545
            self.current_file_index = infos.file_index

546
        self.current_file.seek(infos.offset, 0)
547

548 549 550
        t1 = time.time()
        encoded_data = self.current_file.read(infos.size)
        t2 = time.time()
551

552 553
        self.read_duration += t2 - t1
        self.nb_bytes_read += infos.size
554

555
        if self.unpack:
Philip ABBET's avatar
Philip ABBET committed
556
            data = self.dataformat.type()
557
            data.unpack(encoded_data)
Philip ABBET's avatar
Philip ABBET committed
558 559
        else:
            data = encoded_data
560

561
        return (data, infos.start_index, infos.end_index)
562 563


564
# ----------------------------------------------------------
565 566


567 568
class DatabaseOutputDataSource(DataSource):
    """Utility class to load data from an output of a database view"""
569

570 571
    def __init__(self):
        super(DatabaseOutputDataSource, self).__init__()
572

Samuel GAIST's avatar
Samuel GAIST committed
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
        self.prefix = None
        self.dataformat = None
        self.view = None
        self.output_name = None
        self.pack = True

    def setup(
        self,
        view,
        output_name,
        dataformat_name,
        prefix,
        start_index=None,
        end_index=None,
        pack=False,
    ):
589
        """Configures the data source
590 591


592
        Parameters:
593

594
          prefix (str): Establishes the prefix of your installation.
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610

          start_index (int): The starting index (if not set or set to
            ``None``, the default, read data from the begin of file)

          end_index (int): The end index (if not set or set to ``None``, the
            default, reads the data until the end)

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

Samuel GAIST's avatar
Samuel GAIST committed
611 612
        self.prefix = prefix
        self.view = view
613
        self.output_name = output_name
Samuel GAIST's avatar
Samuel GAIST committed
614
        self.pack = pack
615 616 617 618 619 620 621

        self.dataformat = DataFormat(self.prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        # Load all the needed infos from all the files
Samuel GAIST's avatar
Samuel GAIST committed
622
        Infos = namedtuple("Infos", ["start_index", "end_index"])
623 624 625 626 627 628 629

        objects = self.view.objects()

        start = None
        end = None
        previous_value = None

630 631
        attribute = self.view.get_output_mapping(output_name)

632 633 634
        for index, obj in enumerate(objects):
            if start is None:
                start = index
635 636
                previous_value = getattr(obj, attribute)
            elif getattr(obj, attribute) != previous_value:
637 638 639
                end = index - 1
                previous_value = None

Samuel GAIST's avatar
Samuel GAIST committed
640 641 642
                if ((start_index is None) or (start >= start_index)) and (
                    (end_index is None) or (end <= end_index)
                ):
643 644 645
                    self.infos.append(Infos(start_index=start, end_index=end))

                start = index
646
                previous_value = getattr(obj, attribute)
647 648 649

        end = index

Samuel GAIST's avatar
Samuel GAIST committed
650 651 652
        if ((start_index is None) or (start >= start_index)) and (
            (end_index is None) or (end <= end_index)
        ):
653 654 655 656 657 658 659 660 661 662 663 664 665
            self.infos.append(Infos(start_index=start, end_index=end))

        return True

    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

666 667 668
        if not self.ready:
            self._prepare()

669 670 671 672 673 674 675 676 677 678 679 680 681
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

        t1 = time.time()
        data = self.view.get(self.output_name, infos.start_index)
        t2 = time.time()

        self.read_duration += t2 - t1

        if isinstance(data, dict):
            d = self.dataformat.type()
Samuel GAIST's avatar
Samuel GAIST committed
682
            d.from_dict(data, casting="safe", add_defaults=False)
683 684
            data = d

685
        if self.pack:
686 687 688 689
            data = data.pack()
            self.nb_bytes_read += len(data)

        return (data, infos.start_index, infos.end_index)
690

691

692
# ----------------------------------------------------------
693 694


695 696 697 698 699 700
class RemoteDataSource(DataSource):
    """Utility class to load data from a data source accessible via a socket"""

    def __init__(self):
        super(RemoteDataSource, self).__init__()

Samuel GAIST's avatar
Samuel GAIST committed
701
        self.socket = None
702 703
        self.input_name = None
        self.dataformat = None
Samuel GAIST's avatar
Samuel GAIST committed
704
        self.unpack = True
705 706 707 708 709 710 711

    def setup(self, socket, input_name, dataformat_name, prefix, unpack=True):
        """Configures the data source


        Parameters:

712
          socket (zmq.Socket): The socket to use to access the data.
713 714 715 716 717

          input_name (str): Name of the input corresponding to the data source.

          dataformat_name (str): Name of the data format.

718
          prefix (str): Establishes the prefix of your installation.
719 720 721 722 723 724 725 726 727 728

          unpack (bool): Indicates if the data must be unpacked or not


        Returns:

          ``True``, if successful, or ``False`` otherwise.

        """

Samuel GAIST's avatar
Samuel GAIST committed
729
        self.socket = socket
730
        self.input_name = input_name
Samuel GAIST's avatar
Samuel GAIST committed
731
        self.unpack = unpack
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748

        self.dataformat = DataFormat(prefix, dataformat_name)

        if not self.dataformat.valid:
            raise RuntimeError("the dataformat `%s' is not valid" % dataformat_name)

        return True

    def __getitem__(self, index):
        """Retrieve a block of data

        Returns:

          A tuple (data, start_index, end_index)

        """

749 750 751
        if not self.ready:
            self._prepare()

752 753 754 755 756
        if (index < 0) or (index >= len(self.infos)):
            return (None, None, None)

        infos = self.infos[index]

Samuel GAIST's avatar
Samuel GAIST committed
757
        logger.debug("send: (get) get %s %d", self.input_name, index)
758 759 760

        t1 = time.time()

Samuel GAIST's avatar
Samuel GAIST committed
761
        self.socket.send_string("get", zmq.SNDMORE)
762
        self.socket.send_string(self.input_name, zmq.SNDMORE)
Samuel GAIST's avatar
Samuel GAIST committed
763
        self.socket.send_string("%d" % index)
764

Samuel GAIST's avatar
Samuel GAIST committed
765
        answer = self.socket.recv().decode("utf-8")
766

Samuel GAIST's avatar
Samuel GAIST committed
767
        if answer == "err":
768
            self.read_duration += time.time() - t1
769 770 771 772 773 774 775 776 777 778 779
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        start = int(answer)
        end = int(self.socket.recv())

        packed = self.socket.recv()

        t2 = time.time()

Samuel GAIST's avatar
Samuel GAIST committed
780
        logger.debug("recv: <bin> (size=%d), indexes=(%d, %d)", len(packed), start, end)
781 782 783 784 785 786 787 788 789 790 791 792 793

        self.nb_bytes_read += len(packed)

        if self.unpack:
            data = self.dataformat.type()
            data.unpack(packed)
        else:
            data = packed

        self.read_duration += t2 - t1

        return (data, infos.start_index, infos.end_index)

794 795
    def _prepare(self):
        # Load the needed infos from the socket
Samuel GAIST's avatar
Samuel GAIST committed
796
        Infos = namedtuple("Infos", ["start_index", "end_index"])
797

Samuel GAIST's avatar
Samuel GAIST committed
798
        logger.debug("send: (ifo) infos %s", self.input_name)
799

Samuel GAIST's avatar
Samuel GAIST committed
800
        self.socket.send_string("ifo", zmq.SNDMORE)
801
        self.socket.send_string(self.input_name)
802

Samuel GAIST's avatar
Samuel GAIST committed
803 804
        answer = self.socket.recv().decode("utf-8")
        logger.debug("recv: %s", answer)
805

Samuel GAIST's avatar
Samuel GAIST committed
806
        if answer == "err":
807 808 809 810 811 812 813 814 815 816 817 818 819
            kind = self.socket.recv()
            message = self.socket.recv()
            raise RemoteException(kind, message)

        nb = int(answer)
        for i in range(nb):
            start = int(self.socket.recv())
            end = int(self.socket.recv())
            self.infos.append(Infos(start_index=start, end_index=end))

        self.ready = True


820
# ----------------------------------------------------------
821 822


823 824 825
class DataSink(object):

    """Interface of all the Data Sinks
826

827 828
    Data Sinks are used by the outputs of an algorithm to write/transmit data.
    """
Samuel GAIST's avatar
Samuel GAIST committed
829

830
    __metaclass__ = abc.ABCMeta
831

832 833 834
    @abc.abstractmethod
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data
835

836
        Parameters:
837

838
          data (baseformat.baseformat): The block of data to write
839

840
          start_data_index (int): Start index of the written data
841

842
          end_data_index (int): End index of the written data
843

844
        """
845

846
        pass
847

848 849
    @abc.abstractmethod
    def isConnected(self):
850
        """Returns whether the data sink is connected"""
851

852
        pass
853

854
    def close(self):
855 856
        """Closes the data sink"""

857 858 859
        pass


860
# ----------------------------------------------------------
861 862


863 864 865 866
class StdoutDataSink(DataSink):

    """Data Sink that prints informations about the written data on stdout

867
    Note: The written data is lost! Use this class for debugging purposes
868 869 870 871 872
    """

    def __init__(self):
        super(StdoutDataSink, self).__init__()
        self.dataformat = None
Samuel GAIST's avatar
Samuel GAIST committed
873
        self.prefix = ""
874 875 876 877 878 879 880 881
        self.display_data = True

    def setup(self, dataformat, prefix=None, display_data=True):
        self.dataformat = dataformat
        self.display_data = display_data

        if prefix is not None:
            self.prefix = prefix
Samuel GAIST's avatar
Samuel GAIST committed
882 883
            if self.prefix != "":
                self.prefix += " "
884 885 886 887 888 889

    def write(self, data, start_data_index, end_data_index):
        """Write a block of data

        Parameters:

890
          data (baseformat.baseformat) The block of data to write
891 892 893 894 895 896

          start_data_index (int): Start index of the written data

          end_data_index (int): End index of the written data

        """
897

898
        if self.display_data:
Samuel GAIST's avatar
Samuel GAIST committed
899 900 901 902
            print(
                "%s(%d -> %d): %s"
                % (self.prefix, start_data_index, end_data_index, str(data))
            )
903
        else:
Samuel GAIST's avatar
Samuel GAIST committed
904 905 906
            print(
                "%s(%d -> %d): <data>" % (self.prefix, start_data_index, end_data_index)
            )
907 908 909 910 911

    def isConnected(self):
        return True


912
# ----------------------------------------------------------
913 914


Philip ABBET's avatar
Philip ABBET committed
915
class CachedDataSink(DataSink):
916

Philip ABBET's avatar
Philip ABBET committed
917
    """Data Sink that save data in the Cache
918

Philip ABBET's avatar
Philip ABBET committed
919
    The default behavior is to save the data in a binary format.
920 921
    """

Philip ABBET's avatar
Philip ABBET committed
922 923 924 925
    def __init__(self):
        self.filename = None
        self.encoding = None
        self.dataformat = None
926 927
        self.start_index = None
        self.end_index = None
928

929 930 931
        self.data_file = None
        self.index_file = None
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
932

933 934
        self.nb_bytes_written = 0
        self.write_duration = 0
Philip ABBET's avatar
Philip ABBET committed
935

Samuel GAIST's avatar
Samuel GAIST committed
936
    def setup(self, filename, dataformat, start_index, end_index, encoding="binary"):
Philip ABBET's avatar
Philip ABBET committed
937 938 939 940 941 942
        """Configures the data sink

        Parameters:

          filename (str): Name of the file to generate

943
          dataformat (dataformat.DataFormat): The dataformat to be used
944 945
            inside this file. All objects stored inside this file will respect
            that format.
Philip ABBET's avatar
Philip ABBET committed
946

947 948 949
          encoding (str): String defining the encoding to be used for encoding
            the data. Only a few options are supported: ``binary``
            (the default) or ``json`` (debugging purposes).
Philip ABBET's avatar
Philip ABBET committed
950 951 952

        """

953 954 955
        # Close current file if open
        self.close()

Samuel GAIST's avatar
Samuel GAIST committed
956 957 958 959 960
        if encoding not in ("binary", "json"):
            raise RuntimeError(
                "valid formats for data writing are 'binary' "
                "or 'json': the format `%s' is invalid" % format
            )
Philip ABBET's avatar
Philip ABBET committed
961

Samuel GAIST's avatar
Samuel GAIST committed
962
        if dataformat.name == "__unnamed_dataformat__":
963
            raise RuntimeError("cannot record data using an unnamed data format")
Philip ABBET's avatar
Philip ABBET committed
964

965
        filename, data_ext = os.path.splitext(filename)
Philip ABBET's avatar
Philip ABBET committed
966

Samuel GAIST's avatar
Samuel GAIST committed
967
        self.filename = "%s.%d.%d%s" % (filename, start_index, end_index, data_ext)
968 969 970 971
        self.encoding = encoding
        self.dataformat = dataformat
        self.start_index = start_index
        self.end_index = end_index
Philip ABBET's avatar
Philip ABBET committed
972

973 974 975
        self.nb_bytes_written = 0
        self.write_duration = 0
        self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
976

977
        try:
Samuel GAIST's avatar
Samuel GAIST committed
978
            self.data_file = open(self.filename, "wb")
979 980 981 982
        except Exception as e:
            logger.error("Failed to open data file {}: {}".format(self.filename, e))
            return False

983
        index_filename = self.filename.replace(".data", ".index")
984 985 986 987
        try:
            self.index_file = open(index_filename, "wt")
        except Exception as e:
            logger.error("Failed to open index file {}: {}".format(index_filename, e))
988
            return False
Philip ABBET's avatar
Philip ABBET committed
989

990
        # Write the dataformat
Samuel GAIST's avatar
Samuel GAIST committed
991
        self.data_file.write(six.b("%s\n%s\n" % (self.encoding, self.dataformat.name)))
992
        self.data_file.flush()
Philip ABBET's avatar
Philip ABBET committed
993 994 995

        return True

996
    def close(self):
Philip ABBET's avatar
Philip ABBET committed
997 998 999
        """Closes the data sink
        """

1000 1001 1002
        if self.data_file is not None:
            self.data_file.close()
            self.index_file.close()
Philip ABBET's avatar
Philip ABBET committed
1003

1004 1005 1006
            data_filename = self.data_file.name
            index_filename = self.index_file.name

1007
            # If file is not complete, delete it
Samuel GAIST's avatar
Samuel GAIST committed
1008 1009 1010
            if (self.last_written_data_index is None) or (
                self.last_written_data_index < self.end_index
            ):
1011 1012 1013 1014
                if self.last_written_data_index is None:
                    message = "No data written"
                else:
                    message = "No enough data written: last written {} vs end {}".format(
1015
                        self.last_written_data_index, self.end_index
1016 1017 1018 1019
                    )

                logger.warning("Removing cache files: {}".format(message))

1020 1021 1022 1023 1024 1025 1026 1027 1028
                artifacts_removed = True

                for filename in [data_filename, index_filename]:
                    try:
                        os.remove(filename)
                    except Exception as e:
                        logger.warning("Failed to remove {}: {}".format(filename, e))
                        artifacts_removed = False
                return artifacts_removed
Philip ABBET's avatar
Philip ABBET committed
1029

1030
            # Creates the checksums for all data and indexes
1031 1032
            chksum_data = hashFileContents(data_filename)
            with open(data_filename + ".checksum", "wt") as f:
Philip ABBET's avatar
Philip ABBET committed
1033 1034
                f.write(chksum_data)

1035
            chksum_index = hashFileContents(index_filename)
Samuel GAIST's avatar
Samuel GAIST committed
1036
            with open(index_filename + ".checksum", "wt") as f:
1037
                f.write(chksum_index)
Philip ABBET's avatar
Philip ABBET committed
1038

1039 1040 1041
            self.data_file = None
            self.index_file = None
            self.last_written_data_index = None
Philip ABBET's avatar
Philip ABBET committed
1042

1043
        return True
1044

Philip ABBET's avatar
Philip ABBET committed
1045
    def __del__(self):
1046
        """Make sure the files are closed when the object is deleted
Philip ABBET's avatar
Philip ABBET committed
1047 1048
        """
        self.close()
1049

Philip ABBET's avatar
Philip ABBET committed
1050 1051
    def write(self, data, start_data_index, end_data_index):
        """Writes a block of data to the filesystem
1052

Philip ABBET's avatar
Philip ABBET committed
1053
        Parameters:
1054

1055
          data (baseformat.baseformat): The block of data to write
1056

Philip ABBET's avatar
Philip ABBET committed
1057
          start_data_index (int): Start index of the written data
1058

Philip ABBET's avatar
Philip ABBET committed
1059
          end_data_index (int): End index of the written data
1060

Philip ABBET's avatar
Philip ABBET committed
1061
        """