inputs.py 22.8 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import time
import logging
logger = logging.getLogger(__name__)

from functools import reduce

import six
import zmq

38
39
from .data import mixDataIndices

André Anjos's avatar
André Anjos committed
40

Philip ABBET's avatar
Philip ABBET committed
41
42
43
44
45
#----------------------------------------------------------


class BaseInput(object):
    """Base class for all the kinds of input of a processing block
André Anjos's avatar
André Anjos committed
46

Philip ABBET's avatar
Philip ABBET committed
47
48
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
André Anjos's avatar
André Anjos committed
49
50


Philip ABBET's avatar
Philip ABBET committed
51
    Parameters:
52

Philip ABBET's avatar
Philip ABBET committed
53
      name (str): Name of the input
54

Philip ABBET's avatar
Philip ABBET committed
55
      data_format (str): Data format accepted by the input
56
57


Philip ABBET's avatar
Philip ABBET committed
58
    Attributes:
59

Philip ABBET's avatar
Philip ABBET committed
60
      group (beat.core.inputs.InputGroup): Group containing this input
61

Philip ABBET's avatar
Philip ABBET committed
62
      name (str): Name of the input (algorithm-specific)
63

Philip ABBET's avatar
Philip ABBET committed
64
65
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
66

Philip ABBET's avatar
Philip ABBET committed
67
68
      data_index (int): Index of the last block of data received on the input
        (see the section *Inputs synchronization* of the User's Guide)
69

Philip ABBET's avatar
Philip ABBET committed
70
71
      data_index_end (int): End index of the last block of data received on the
        input (see the section *Inputs synchronization* of the User's Guide)
72

Philip ABBET's avatar
Philip ABBET committed
73
      data_format (str): Data format accepted by the input
74

Philip ABBET's avatar
Philip ABBET committed
75
76
      data_same_as_previous (bool): Indicates if the last block of data received
        was changed (see the section *Inputs synchronization* of the User's Guide)
77

Philip ABBET's avatar
Philip ABBET committed
78
      nb_data_blocks_read (int): Number of data blocks read so far
79

Philip ABBET's avatar
Philip ABBET committed
80
    """
81

Philip ABBET's avatar
Philip ABBET committed
82
    def __init__(self, name, data_format):
83

84
85
86
87
88
89
90
91
        self.group                 = None
        self.name                  = str(name)
        self.data                  = None
        self.data_index            = -1
        self.data_index_end        = -1
        self.data_same_as_previous = True
        self.data_format           = data_format
        self.nb_data_blocks_read   = 0
92
93


Philip ABBET's avatar
Philip ABBET committed
94
95
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
96

97
        if (self.data_index_end >= 0) and (self.group.last_data_index == -1):
98
99
            return True

100
        return (self.data_index_end == self.group.last_data_index)
101
102


Philip ABBET's avatar
Philip ABBET committed
103
104
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
105

Philip ABBET's avatar
Philip ABBET committed
106
        raise NotImplemented()
107
108


109
110
111
112
113
114
115
    def hasDataChanged(self):
        """Indicates if the current data unit is different than the one at the
        previous iteration"""

        return not self.data_same_as_previous


Philip ABBET's avatar
Philip ABBET committed
116
117
118
    def next(self):
        """Retrieves the next block of data"""

Philip ABBET's avatar
Philip ABBET committed
119
        raise NotImplemented()
Philip ABBET's avatar
Philip ABBET committed
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160


#----------------------------------------------------------


class Input(BaseInput):
    """Represents an input of a processing block that receive data from a
    data source

    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)


    Parameters:

      name (str): Name of the input

      data_format (str): Data format accepted by the input

      data_source (beat.core.platform.data.DataSource): Source of data to be used
        by the input


    Attributes:

      data_source (beat.core.data.DataSource): Source of data used by the output

    """

    def __init__(self, name, data_format, data_source):
        super(Input, self).__init__(name, data_format)

        self.data_source = data_source


    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""

        return self.data_source.hasMoreData()


Philip ABBET's avatar
Philip ABBET committed
161
162
    def next(self):
        """Retrieves the next block of data"""
163

164
165
166
        if self.group.restricted_access:
            raise RuntimeError('Not authorized')

Philip ABBET's avatar
Philip ABBET committed
167
        (self.data, self.data_index, self.data_index_end) = self.data_source.next()
168

Philip ABBET's avatar
Philip ABBET committed
169
170
171
172
173
174
        if self.data is None:
            message = "User algorithm asked for more data for channel " \
                      "`%s' on input `%s', but it is over (no more data). This " \
                      "normally indicates a programming error on the user " \
                      "side." % (self.group.channel, self.name)
            raise RuntimeError(message)
175

Philip ABBET's avatar
Philip ABBET committed
176
177
        self.data_same_as_previous = False
        self.nb_data_blocks_read += 1
178
179
180
181
182


#----------------------------------------------------------


183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message


#----------------------------------------------------------


def process_error(socket):
    kind = socket.recv()
    message = socket.recv()
    raise RemoteException(kind, message)


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
208
class RemoteInput(BaseInput):
Philip ABBET's avatar
Philip ABBET committed
209
    """Allows to access the input of a processing block, via a socket.
210

Philip ABBET's avatar
Philip ABBET committed
211
212
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
213

Philip ABBET's avatar
Philip ABBET committed
214
215
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
216
217


Philip ABBET's avatar
Philip ABBET committed
218
    Parameters:
André Anjos's avatar
André Anjos committed
219

Philip ABBET's avatar
Philip ABBET committed
220
      name (str): Name of the input
André Anjos's avatar
André Anjos committed
221

Philip ABBET's avatar
Philip ABBET committed
222
223
      data_format (object): An object with the preloaded data format for this
        input (see :py:class:`beat.backend.python.dataformat.DataFormat`).
André Anjos's avatar
André Anjos committed
224

Philip ABBET's avatar
Philip ABBET committed
225
      socket (object): A 0MQ socket for writing the data to the server process
André Anjos's avatar
André Anjos committed
226
227


Philip ABBET's avatar
Philip ABBET committed
228
    Attributes:
André Anjos's avatar
André Anjos committed
229

Philip ABBET's avatar
Philip ABBET committed
230
      group (beat.backend.python.inputs.InputGroup): Group containing this input
André Anjos's avatar
André Anjos committed
231

Philip ABBET's avatar
Philip ABBET committed
232
233
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
André Anjos's avatar
André Anjos committed
234

Philip ABBET's avatar
Philip ABBET committed
235
    """
André Anjos's avatar
André Anjos committed
236

Philip ABBET's avatar
Philip ABBET committed
237
    def __init__(self, name, data_format, socket, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
238
        super(RemoteInput, self).__init__(name, data_format)
André Anjos's avatar
André Anjos committed
239

240
241
242
243
        self.socket         = socket
        self.comm_time      = 0.0     # Total time spent on communication
        self._unpack        = unpack
        self._has_more_data = None    # To avoid repetitive requests
André Anjos's avatar
André Anjos committed
244
245


Philip ABBET's avatar
Philip ABBET committed
246
247
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
André Anjos's avatar
André Anjos committed
248

249
250
        if self._has_more_data is None:
            logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
251

252
            _start = time.time()
253

254
255
256
            self.socket.send('hmd', zmq.SNDMORE)
            self.socket.send(self.group.channel, zmq.SNDMORE)
            self.socket.send(self.name)
257

258
            answer = self.socket.recv()
259

260
261
            self.comm_time += time.time() - _start
            logger.debug('recv: %s', answer)
262

263
264
            if answer == 'err':
                process_error(self.socket)
265

266
267
268
            self._has_more_data = (answer == 'tru')

        return self._has_more_data
André Anjos's avatar
André Anjos committed
269
270


Philip ABBET's avatar
Philip ABBET committed
271
272
    def next(self):
        """Retrieves the next block of data"""
André Anjos's avatar
André Anjos committed
273

Philip ABBET's avatar
Philip ABBET committed
274
        logger.debug('send: (nxt) next %s %s', self.group.channel, self.name)
275

Philip ABBET's avatar
Philip ABBET committed
276
        _start = time.time()
277

Philip ABBET's avatar
Philip ABBET committed
278
279
280
        self.socket.send('nxt', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
281

Philip ABBET's avatar
Philip ABBET committed
282
        answer = self.socket.recv()
283

Philip ABBET's avatar
Philip ABBET committed
284
285
286
        if answer == 'err':
            self.comm_time += time.time() - _start
            process_error(self.socket)
287

Philip ABBET's avatar
Philip ABBET committed
288
289
290
        self.data_index = int(answer)
        self.data_index_end = int(self.socket.recv())
        self.unpack(self.socket.recv())
291

Philip ABBET's avatar
Philip ABBET committed
292
293
        self.comm_time += time.time() - _start
        self.nb_data_blocks_read += 1
André Anjos's avatar
André Anjos committed
294

Philip ABBET's avatar
Philip ABBET committed
295
        self.data_same_as_previous = False
296
        self._has_more_data = None
Philip ABBET's avatar
Philip ABBET committed
297

André Anjos's avatar
André Anjos committed
298

Philip ABBET's avatar
Philip ABBET committed
299
300
    def unpack(self, packed):
        """Receives data through socket"""
André Anjos's avatar
André Anjos committed
301

Philip ABBET's avatar
Philip ABBET committed
302
303
        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
                     self.data_index, self.data_index_end)
304

Philip ABBET's avatar
Philip ABBET committed
305
306
307
308
309
        if self.unpack:
            self.data = self.data_format.type()
            self.data.unpack(packed)
        else:
            self.data = packed
André Anjos's avatar
André Anjos committed
310
311


312
313
314
#----------------------------------------------------------


315
class InputGroup:
Philip ABBET's avatar
Philip ABBET committed
316
    """Represents a group of inputs synchronized together
André Anjos's avatar
André Anjos committed
317

Philip ABBET's avatar
Philip ABBET committed
318
319
320
    The inputs can be either "local" ones (reading data from the cache) or
    "remote" ones (using a socket to communicate with a database view output
    located inside a docker container).
André Anjos's avatar
André Anjos committed
321

Philip ABBET's avatar
Philip ABBET committed
322
323
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
André Anjos's avatar
André Anjos committed
324

Philip ABBET's avatar
Philip ABBET committed
325
326
    A group implementing this interface is provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`).
André Anjos's avatar
André Anjos committed
327

Philip ABBET's avatar
Philip ABBET committed
328
    See :py:class:`beat.core.inputs.Input`
André Anjos's avatar
André Anjos committed
329

Philip ABBET's avatar
Philip ABBET committed
330
    Example:
André Anjos's avatar
André Anjos committed
331

Philip ABBET's avatar
Philip ABBET committed
332
      .. code-block:: python
333

Philip ABBET's avatar
Philip ABBET committed
334
         inputs = InputList()
André Anjos's avatar
André Anjos committed
335

Philip ABBET's avatar
Philip ABBET committed
336
         print inputs['labels'].data_format
337

Philip ABBET's avatar
Philip ABBET committed
338
339
         for index in range(0, len(inputs)):
             print inputs[index].data_format
340

Philip ABBET's avatar
Philip ABBET committed
341
342
         for input in inputs:
             print input.data_format
343

Philip ABBET's avatar
Philip ABBET committed
344
345
         for input in inputs[0:2]:
             print input.data_format
346
347


Philip ABBET's avatar
Philip ABBET committed
348
    Parameters:
André Anjos's avatar
André Anjos committed
349

Philip ABBET's avatar
Philip ABBET committed
350
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
351

Philip ABBET's avatar
Philip ABBET committed
352
353
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener to use
André Anjos's avatar
André Anjos committed
354

Philip ABBET's avatar
Philip ABBET committed
355
356
      restricted_access (bool): Indicates if the algorithm can freely use the
        inputs
André Anjos's avatar
André Anjos committed
357
358


Philip ABBET's avatar
Philip ABBET committed
359
    Attributes:
André Anjos's avatar
André Anjos committed
360

Philip ABBET's avatar
Philip ABBET committed
361
362
      data_index (int): Index of the last block of data received on the inputs
        (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
363

Philip ABBET's avatar
Philip ABBET committed
364
365
      data_index_end (int): End index of the last block of data received on the
        inputs (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
366

Philip ABBET's avatar
Philip ABBET committed
367
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
368

Philip ABBET's avatar
Philip ABBET committed
369
370
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener used
André Anjos's avatar
André Anjos committed
371
372

    """
373

Philip ABBET's avatar
Philip ABBET committed
374
375
376
377
    def __init__(self, channel, synchronization_listener=None,
            restricted_access=True):

        self._inputs                  = []
378
379
380
381
        self.data_index               = -1  # Lower index across all inputs
        self.data_index_end           = -1  # Bigger index across all inputs
        self.first_data_index         = -1  # Start index of the current data units
        self.last_data_index          = -1  # End index of the current data units
Philip ABBET's avatar
Philip ABBET committed
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
        self.channel                  = str(channel)
        self.synchronization_listener = synchronization_listener
        self.restricted_access        = restricted_access
        self.socket                   = None
        self.comm_time                = 0.


    def __getitem__(self, index):

        if isinstance(index, six.string_types):
            try:
                return [x for x in self._inputs if x.name == index][0]
            except:
                pass
        elif isinstance(index, int):
            if index < len(self._inputs):
                return self._inputs[index]
        return None
400
401


Philip ABBET's avatar
Philip ABBET committed
402
403
    def __iter__(self):
        for k in self._inputs: yield k
404

André Anjos's avatar
André Anjos committed
405

Philip ABBET's avatar
Philip ABBET committed
406
407
    def __len__(self):
        return len(self._inputs)
408
409


Philip ABBET's avatar
Philip ABBET committed
410
411
    def add(self, input):
        """Add an input to the group
412

Philip ABBET's avatar
Philip ABBET committed
413
        Parameters:
414

Philip ABBET's avatar
Philip ABBET committed
415
416
          input (beat.backend.python.inputs.Input or beat.backend.python.inputs.RemoteInput):
                  The input to add
417

Philip ABBET's avatar
Philip ABBET committed
418
        """
419

Philip ABBET's avatar
Philip ABBET committed
420
421
        if isinstance(input, RemoteInput) and (self.socket is None):
            self.socket = input.socket
422

Philip ABBET's avatar
Philip ABBET committed
423
424
        input.group = self
        self._inputs.append(input)
425

André Anjos's avatar
André Anjos committed
426

Philip ABBET's avatar
Philip ABBET committed
427
428
    def localInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, Input) ]: yield k
André Anjos's avatar
André Anjos committed
429
430


Philip ABBET's avatar
Philip ABBET committed
431
432
    def remoteInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, RemoteInput) ]: yield k
André Anjos's avatar
André Anjos committed
433

434

Philip ABBET's avatar
Philip ABBET committed
435
436
    def hasMoreData(self):
        """Indicates if there is more data to process in the group"""
437

Philip ABBET's avatar
Philip ABBET committed
438
439
440
441
        # First process the local inputs
        res = bool([x for x in self.localInputs() if x.hasMoreData()])
        if res:
            return True
442

Philip ABBET's avatar
Philip ABBET committed
443
444
445
        # Next process the remote inputs
        if self.socket is None:
            return False
446

447
448
449
        for x in self.remoteInputs():
            if x.hasMoreData():
                return True
450

451
        return False
Philip ABBET's avatar
Philip ABBET committed
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471


    def next(self):
        """Retrieve the next block of data on all the inputs"""

        # Only for groups not managed by the platform
        if self.restricted_access:
            raise RuntimeError('Not authorized')

        # Only retrieve new data on the inputs where the current data expire first
        lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)
        inputs_to_update = [x for x in self._inputs \
                if x.data_index_end == lower_end_index]
        inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]

        for input in [ x for x in inputs_to_update if isinstance(x, Input) ]:
            input.next()

        remote_inputs_to_update = list([ x for x in inputs_to_update if isinstance(x, RemoteInput) ])
472
473
        for remote_input in remote_inputs_to_update:
            remote_input.next()
Philip ABBET's avatar
Philip ABBET committed
474
475
476
477
478

        for input in inputs_up_to_date:
            input.data_same_as_previous = True

        # Compute the group's start and end indices
479
        self.data_index = reduce(lambda x, y: min(x, y.data_index),
Philip ABBET's avatar
Philip ABBET committed
480
                self._inputs[1:], self._inputs[0].data_index)
481
482
483
484
485
486
        self.data_index_end = reduce(lambda x, y: max(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)

        self.first_data_index = reduce(lambda x, y: max(x, y.data_index),
                self._inputs[1:], self._inputs[0].data_index)
        self.last_data_index = reduce(lambda x, y: min(x, y.data_index_end),
Philip ABBET's avatar
Philip ABBET committed
487
488
489
490
                self._inputs[1:], self._inputs[0].data_index_end)

        # Inform the synchronisation listener
        if self.synchronization_listener is not None:
491
492
            self.synchronization_listener.onIntervalChanged(self.first_data_index,
                                                            self.last_data_index)
André Anjos's avatar
André Anjos committed
493
494


495
496
497
#----------------------------------------------------------


498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
class DataView(object):

    def __init__(self, data_loader_group, data_indices):
        self.infos             = {}
        self.data_indices      = data_indices
        self.nb_data_units     = len(data_indices)
        self.data_index        = data_indices[0][0]
        self.data_index_end    = data_indices[-1][1]

        for input_name, infos in data_loader_group.infos.items():
            input_data_indices = []
            current_start = self.data_index

            for i in range(self.data_index, self.data_index_end + 1):
                for indices in infos['data_indices']:
                    if indices[1] == i:
                        input_data_indices.append( (current_start, i) )
                        current_start = i + 1
                        break

            if (len(input_data_indices) == 0) or (input_data_indices[-1][1] != self.data_index_end):
                input_data_indices.append( (current_start, self.data_index_end) )

            self.infos[input_name] = dict(
                cached_file = infos['cached_file'],
                data_indices = input_data_indices,
                data = None,
                start_index = -1,
                end_index = -1,
            )


    def count(self, input_name=None):
        if input_name is not None:
            try:
                return len(self.infos[input_name]['data_indices'])
            except:
                return None
        else:
            return self.nb_data_units


    def __getitem__(self, index):
        if index < 0:
            return (None, None, None)

        try:
            indices = self.data_indices[index]
        except:
            return (None, None, None)

        result = {}

        for input_name, infos in self.infos.items():
            if (indices[0] < infos['start_index']) or (infos['end_index'] < indices[0]):
                (infos['data'], infos['start_index'], infos['end_index']) = \
                        infos['cached_file'].getAtDataIndex(indices[0])

            result[input_name] = infos['data']

        return (result, indices[0], indices[1])


#----------------------------------------------------------


class DataLoaderGroup(object):

    def __init__(self, channel):
        self.channel            = str(channel)
        self.infos              = {}
        self.mixed_data_indices = None
        self.nb_data_units      = 0
        self.data_index         = -1  # Lower index across all inputs
        self.data_index_end     = -1  # Bigger index across all inputs


    def add(self, input_name, cached_file):
        self.infos[input_name] = dict(
            cached_file = cached_file,
            data_indices = cached_file.data_indices(),
            data = None,
            start_index = -1,
            end_index = -1,
        )

        self.mixed_data_indices = mixDataIndices([ x['data_indices'] for x in self.infos.values() ])
        self.nb_data_units = len(self.mixed_data_indices)
        self.data_index = self.mixed_data_indices[0][0]
        self.data_index_end = self.mixed_data_indices[-1][1]


    def count(self, input_name=None):
        if input_name is not None:
            try:
                return len(self.infos[input_name]['data_indices'])
            except:
                return 0
        else:
            return self.nb_data_units


    def view(self, input_name, index):
        if index < 0:
            return None

        try:
            indices = self.infos[input_name]['data_indices'][index]
        except:
            return None

        limited_data_indices = [ x for x in self.mixed_data_indices
                                 if (indices[0] <= x[0]) and (x[1] <= indices[1]) ]

        return DataView(self, limited_data_indices)


    def __getitem__(self, index):
        if index < 0:
            return (None, None, None)

        try:
            indices = self.mixed_data_indices[index]
        except:
            return (None, None, None)

        result = {}

        for input_name, infos in self.infos.items():
            if (indices[0] < infos['start_index']) or (infos['end_index'] < indices[0]):
                (infos['data'], infos['start_index'], infos['end_index']) = \
                        infos['cached_file'].getAtDataIndex(indices[0])

            result[input_name] = infos['data']

        return (result, indices[0], indices[1])


#----------------------------------------------------------


André Anjos's avatar
André Anjos committed
639
class InputList:
Philip ABBET's avatar
Philip ABBET committed
640
    """Represents the list of inputs of a processing block
André Anjos's avatar
André Anjos committed
641

Philip ABBET's avatar
Philip ABBET committed
642
643
644
    Inputs are organized by groups. The inputs inside a group are all
    synchronized together (see the section *Inputs synchronization* of the User's
    Guide).
André Anjos's avatar
André Anjos committed
645

Philip ABBET's avatar
Philip ABBET committed
646
    A list implementing this interface is provided to the algorithms
André Anjos's avatar
André Anjos committed
647

Philip ABBET's avatar
Philip ABBET committed
648
649
    One group of inputs is always considered as the **main** one, and is used to
    drive the algorithm. The usage of the other groups is left to the algorithm.
André Anjos's avatar
André Anjos committed
650

Philip ABBET's avatar
Philip ABBET committed
651
652
    See :py:class:`beat.core.inputs.Input`
    See :py:class:`beat.core.inputs.InputGroup`
André Anjos's avatar
André Anjos committed
653
654


Philip ABBET's avatar
Philip ABBET committed
655
    Example:
André Anjos's avatar
André Anjos committed
656

Philip ABBET's avatar
Philip ABBET committed
657
      .. code-block:: python
André Anjos's avatar
André Anjos committed
658

Philip ABBET's avatar
Philip ABBET committed
659
660
         inputs = InputList()
         ...
André Anjos's avatar
André Anjos committed
661

Philip ABBET's avatar
Philip ABBET committed
662
663
         # Retrieve an input by name
         input = inputs['labels']
André Anjos's avatar
André Anjos committed
664

Philip ABBET's avatar
Philip ABBET committed
665
666
667
         # Retrieve an input by index
         for index in range(0, len(inputs)):
             input = inputs[index]
André Anjos's avatar
André Anjos committed
668

Philip ABBET's avatar
Philip ABBET committed
669
670
671
         # Iteration over all inputs
         for input in inputs:
             ...
André Anjos's avatar
André Anjos committed
672

Philip ABBET's avatar
Philip ABBET committed
673
674
675
         # Iteration over some inputs
         for input in inputs[0:2]:
             ...
André Anjos's avatar
André Anjos committed
676

Philip ABBET's avatar
Philip ABBET committed
677
678
         # Retrieve the group an input belongs to, by input name
         group = inputs.groupOf('label')
André Anjos's avatar
André Anjos committed
679

Philip ABBET's avatar
Philip ABBET committed
680
681
682
         # Retrieve the group an input belongs to
         input = inputs['labels']
         group = input.group
André Anjos's avatar
André Anjos committed
683
684


Philip ABBET's avatar
Philip ABBET committed
685
    Attributes:
André Anjos's avatar
André Anjos committed
686

Philip ABBET's avatar
Philip ABBET committed
687
688
      group (beat.core.inputs.InputGroup): Main group (for data-driven
        algorithms)
André Anjos's avatar
André Anjos committed
689

Philip ABBET's avatar
Philip ABBET committed
690
    """
André Anjos's avatar
André Anjos committed
691

Philip ABBET's avatar
Philip ABBET committed
692
693
694
    def __init__(self):
        self._groups = []
        self.main_group = None
André Anjos's avatar
André Anjos committed
695
696


Philip ABBET's avatar
Philip ABBET committed
697
698
    def add(self, group):
        """Add a group to the list
André Anjos's avatar
André Anjos committed
699

Philip ABBET's avatar
Philip ABBET committed
700
701
702
703
        :param beat.core.platform.inputs.InputGroup group: The group to add
        """
        if group.restricted_access and (self.main_group is None):
            self.main_group = group
André Anjos's avatar
André Anjos committed
704

Philip ABBET's avatar
Philip ABBET committed
705
        self._groups.append(group)
André Anjos's avatar
André Anjos committed
706

707

Philip ABBET's avatar
Philip ABBET committed
708
709
710
711
712
713
714
    def __getitem__(self, index):
        if isinstance(index, six.string_types):
            try:
                return [k for k in map(lambda x: x[index], self._groups) \
                        if k is not None][0]
            except:
                pass
André Anjos's avatar
André Anjos committed
715

Philip ABBET's avatar
Philip ABBET committed
716
717
718
719
        elif isinstance(index, int):
            for group in self._groups:
                if index < len(group): return group[index]
                index -= len(group)
André Anjos's avatar
André Anjos committed
720

Philip ABBET's avatar
Philip ABBET committed
721
        return None
André Anjos's avatar
André Anjos committed
722

723

Philip ABBET's avatar
Philip ABBET committed
724
725
    def __iter__(self):
        for i in range(len(self)): yield self[i]
André Anjos's avatar
André Anjos committed
726

727

Philip ABBET's avatar
Philip ABBET committed
728
729
    def __len__(self):
        return reduce(lambda x, y: x + len(y), self._groups, 0)
André Anjos's avatar
André Anjos committed
730

731

Philip ABBET's avatar
Philip ABBET committed
732
733
    def nbGroups(self):
        return len(self._groups)
André Anjos's avatar
André Anjos committed
734

735

Philip ABBET's avatar
Philip ABBET committed
736
737
738
739
740
    def groupOf(self, input_name):
        try:
            return [k for k in self._groups if k[input_name] is not None][0]
        except:
            return None
André Anjos's avatar
André Anjos committed
741

742

Philip ABBET's avatar
Philip ABBET committed
743
744
745
    def hasMoreData(self):
        """Indicates if there is more data to process in any group"""
        return bool([x for x in self._groups if x.hasMoreData()])
André Anjos's avatar
André Anjos committed
746
747


Philip ABBET's avatar
Philip ABBET committed
748
749
750
751
752
753
754
755
756
757
    def group(self, name_or_index):
        if isinstance(name_or_index, six.string_types):
            try:
                return [x for x in self._groups if x.channel == name_or_index][0]
            except:
                return None
        elif isinstance(name_or_index, int):
            return self._groups[name_or_index]
        else:
            return None