inputs.py 19.8 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import time
import logging
logger = logging.getLogger(__name__)

from functools import reduce

import six
import zmq


Philip ABBET's avatar
Philip ABBET committed
39
40
41
42
43
#----------------------------------------------------------


class BaseInput(object):
    """Base class for all the kinds of input of a processing block
André Anjos's avatar
André Anjos committed
44

Philip ABBET's avatar
Philip ABBET committed
45
46
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
André Anjos's avatar
André Anjos committed
47
48


Philip ABBET's avatar
Philip ABBET committed
49
    Parameters:
50

Philip ABBET's avatar
Philip ABBET committed
51
      name (str): Name of the input
52

Philip ABBET's avatar
Philip ABBET committed
53
      data_format (str): Data format accepted by the input
54
55


Philip ABBET's avatar
Philip ABBET committed
56
    Attributes:
57

Philip ABBET's avatar
Philip ABBET committed
58
      group (beat.core.inputs.InputGroup): Group containing this input
59

Philip ABBET's avatar
Philip ABBET committed
60
      name (str): Name of the input (algorithm-specific)
61

Philip ABBET's avatar
Philip ABBET committed
62
63
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
64

Philip ABBET's avatar
Philip ABBET committed
65
66
      data_index (int): Index of the last block of data received on the input
        (see the section *Inputs synchronization* of the User's Guide)
67

Philip ABBET's avatar
Philip ABBET committed
68
69
      data_index_end (int): End index of the last block of data received on the
        input (see the section *Inputs synchronization* of the User's Guide)
70

Philip ABBET's avatar
Philip ABBET committed
71
      data_format (str): Data format accepted by the input
72

Philip ABBET's avatar
Philip ABBET committed
73
74
      data_same_as_previous (bool): Indicates if the last block of data received
        was changed (see the section *Inputs synchronization* of the User's Guide)
75

Philip ABBET's avatar
Philip ABBET committed
76
      nb_data_blocks_read (int): Number of data blocks read so far
77

Philip ABBET's avatar
Philip ABBET committed
78
    """
79

Philip ABBET's avatar
Philip ABBET committed
80
    def __init__(self, name, data_format):
81

82
83
84
85
86
87
88
89
        self.group                 = None
        self.name                  = str(name)
        self.data                  = None
        self.data_index            = -1
        self.data_index_end        = -1
        self.data_same_as_previous = True
        self.data_format           = data_format
        self.nb_data_blocks_read   = 0
90
91


Philip ABBET's avatar
Philip ABBET committed
92
93
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
94

95
        if (self.data_index_end >= 0) and (self.group.last_data_index == -1):
96
97
            return True

98
        return (self.data_index_end == self.group.last_data_index)
99
100


Philip ABBET's avatar
Philip ABBET committed
101
102
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
103

Philip ABBET's avatar
Philip ABBET committed
104
        raise NotImplemented
105
106


107
108
109
110
111
112
113
    def hasDataChanged(self):
        """Indicates if the current data unit is different than the one at the
        previous iteration"""

        return not self.data_same_as_previous


Philip ABBET's avatar
Philip ABBET committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    def next(self):
        """Retrieves the next block of data"""

        raise NotImplemented


#----------------------------------------------------------


class Input(BaseInput):
    """Represents an input of a processing block that receive data from a
    data source

    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)


    Parameters:

      name (str): Name of the input

      data_format (str): Data format accepted by the input

      data_source (beat.core.platform.data.DataSource): Source of data to be used
        by the input


    Attributes:

      data_source (beat.core.data.DataSource): Source of data used by the output

    """

    def __init__(self, name, data_format, data_source):
        super(Input, self).__init__(name, data_format)

        self.data_source = data_source


    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""

        return self.data_source.hasMoreData()


Philip ABBET's avatar
Philip ABBET committed
159
160
    def next(self):
        """Retrieves the next block of data"""
161

162
163
164
        if self.group.restricted_access:
            raise RuntimeError('Not authorized')

Philip ABBET's avatar
Philip ABBET committed
165
        (self.data, self.data_index, self.data_index_end) = self.data_source.next()
166

Philip ABBET's avatar
Philip ABBET committed
167
168
169
170
171
172
        if self.data is None:
            message = "User algorithm asked for more data for channel " \
                      "`%s' on input `%s', but it is over (no more data). This " \
                      "normally indicates a programming error on the user " \
                      "side." % (self.group.channel, self.name)
            raise RuntimeError(message)
173

Philip ABBET's avatar
Philip ABBET committed
174
175
        self.data_same_as_previous = False
        self.nb_data_blocks_read += 1
176
177
178
179
180


#----------------------------------------------------------


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message


#----------------------------------------------------------


def process_error(socket):
    kind = socket.recv()
    message = socket.recv()
    raise RemoteException(kind, message)


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
206
class RemoteInput(BaseInput):
Philip ABBET's avatar
Philip ABBET committed
207
    """Allows to access the input of a processing block, via a socket.
208

Philip ABBET's avatar
Philip ABBET committed
209
210
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
211

Philip ABBET's avatar
Philip ABBET committed
212
213
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
214
215


Philip ABBET's avatar
Philip ABBET committed
216
    Parameters:
André Anjos's avatar
André Anjos committed
217

Philip ABBET's avatar
Philip ABBET committed
218
      name (str): Name of the input
André Anjos's avatar
André Anjos committed
219

Philip ABBET's avatar
Philip ABBET committed
220
221
      data_format (object): An object with the preloaded data format for this
        input (see :py:class:`beat.backend.python.dataformat.DataFormat`).
André Anjos's avatar
André Anjos committed
222

Philip ABBET's avatar
Philip ABBET committed
223
      socket (object): A 0MQ socket for writing the data to the server process
André Anjos's avatar
André Anjos committed
224
225


Philip ABBET's avatar
Philip ABBET committed
226
    Attributes:
André Anjos's avatar
André Anjos committed
227

Philip ABBET's avatar
Philip ABBET committed
228
      group (beat.backend.python.inputs.InputGroup): Group containing this input
André Anjos's avatar
André Anjos committed
229

Philip ABBET's avatar
Philip ABBET committed
230
231
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
André Anjos's avatar
André Anjos committed
232

Philip ABBET's avatar
Philip ABBET committed
233
    """
André Anjos's avatar
André Anjos committed
234

Philip ABBET's avatar
Philip ABBET committed
235
    def __init__(self, name, data_format, socket, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
236
        super(RemoteInput, self).__init__(name, data_format)
André Anjos's avatar
André Anjos committed
237

Philip ABBET's avatar
Philip ABBET committed
238
239
240
        self.socket    = socket
        self.comm_time = 0.0     # Total time spent on communication
        self._unpack   = unpack
André Anjos's avatar
André Anjos committed
241
242


Philip ABBET's avatar
Philip ABBET committed
243
244
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
André Anjos's avatar
André Anjos committed
245

Philip ABBET's avatar
Philip ABBET committed
246
        logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
247

Philip ABBET's avatar
Philip ABBET committed
248
        _start = time.time()
249

Philip ABBET's avatar
Philip ABBET committed
250
251
252
        self.socket.send('hmd', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
253

Philip ABBET's avatar
Philip ABBET committed
254
        answer = self.socket.recv()
255

Philip ABBET's avatar
Philip ABBET committed
256
257
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)
258

Philip ABBET's avatar
Philip ABBET committed
259
260
        if answer == 'err':
            process_error(self.socket)
261

Philip ABBET's avatar
Philip ABBET committed
262
        return answer == 'tru'
André Anjos's avatar
André Anjos committed
263
264


Philip ABBET's avatar
Philip ABBET committed
265
266
    def next(self):
        """Retrieves the next block of data"""
André Anjos's avatar
André Anjos committed
267

Philip ABBET's avatar
Philip ABBET committed
268
        logger.debug('send: (nxt) next %s %s', self.group.channel, self.name)
269

Philip ABBET's avatar
Philip ABBET committed
270
        _start = time.time()
271

Philip ABBET's avatar
Philip ABBET committed
272
273
274
        self.socket.send('nxt', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
275

Philip ABBET's avatar
Philip ABBET committed
276
        answer = self.socket.recv()
277

Philip ABBET's avatar
Philip ABBET committed
278
279
280
        if answer == 'err':
            self.comm_time += time.time() - _start
            process_error(self.socket)
281

Philip ABBET's avatar
Philip ABBET committed
282
283
284
        self.data_index = int(answer)
        self.data_index_end = int(self.socket.recv())
        self.unpack(self.socket.recv())
285

Philip ABBET's avatar
Philip ABBET committed
286
287
        self.comm_time += time.time() - _start
        self.nb_data_blocks_read += 1
André Anjos's avatar
André Anjos committed
288

Philip ABBET's avatar
Philip ABBET committed
289
290
        self.data_same_as_previous = False

André Anjos's avatar
André Anjos committed
291

Philip ABBET's avatar
Philip ABBET committed
292
293
    def unpack(self, packed):
        """Receives data through socket"""
André Anjos's avatar
André Anjos committed
294

Philip ABBET's avatar
Philip ABBET committed
295
296
        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
                     self.data_index, self.data_index_end)
297

Philip ABBET's avatar
Philip ABBET committed
298
299
300
301
302
        if self.unpack:
            self.data = self.data_format.type()
            self.data.unpack(packed)
        else:
            self.data = packed
André Anjos's avatar
André Anjos committed
303
304


305
306
307
#----------------------------------------------------------


308
class InputGroup:
Philip ABBET's avatar
Philip ABBET committed
309
    """Represents a group of inputs synchronized together
André Anjos's avatar
André Anjos committed
310

Philip ABBET's avatar
Philip ABBET committed
311
312
313
    The inputs can be either "local" ones (reading data from the cache) or
    "remote" ones (using a socket to communicate with a database view output
    located inside a docker container).
André Anjos's avatar
André Anjos committed
314

Philip ABBET's avatar
Philip ABBET committed
315
316
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
André Anjos's avatar
André Anjos committed
317

Philip ABBET's avatar
Philip ABBET committed
318
319
    A group implementing this interface is provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`).
André Anjos's avatar
André Anjos committed
320

Philip ABBET's avatar
Philip ABBET committed
321
    See :py:class:`beat.core.inputs.Input`
André Anjos's avatar
André Anjos committed
322

Philip ABBET's avatar
Philip ABBET committed
323
    Example:
André Anjos's avatar
André Anjos committed
324

Philip ABBET's avatar
Philip ABBET committed
325
      .. code-block:: python
326

Philip ABBET's avatar
Philip ABBET committed
327
         inputs = InputList()
André Anjos's avatar
André Anjos committed
328

Philip ABBET's avatar
Philip ABBET committed
329
         print inputs['labels'].data_format
330

Philip ABBET's avatar
Philip ABBET committed
331
332
         for index in range(0, len(inputs)):
             print inputs[index].data_format
333

Philip ABBET's avatar
Philip ABBET committed
334
335
         for input in inputs:
             print input.data_format
336

Philip ABBET's avatar
Philip ABBET committed
337
338
         for input in inputs[0:2]:
             print input.data_format
339
340


Philip ABBET's avatar
Philip ABBET committed
341
    Parameters:
André Anjos's avatar
André Anjos committed
342

Philip ABBET's avatar
Philip ABBET committed
343
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
344

Philip ABBET's avatar
Philip ABBET committed
345
346
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener to use
André Anjos's avatar
André Anjos committed
347

Philip ABBET's avatar
Philip ABBET committed
348
349
      restricted_access (bool): Indicates if the algorithm can freely use the
        inputs
André Anjos's avatar
André Anjos committed
350
351


Philip ABBET's avatar
Philip ABBET committed
352
    Attributes:
André Anjos's avatar
André Anjos committed
353

Philip ABBET's avatar
Philip ABBET committed
354
355
      data_index (int): Index of the last block of data received on the inputs
        (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
356

Philip ABBET's avatar
Philip ABBET committed
357
358
      data_index_end (int): End index of the last block of data received on the
        inputs (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
359

Philip ABBET's avatar
Philip ABBET committed
360
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
361

Philip ABBET's avatar
Philip ABBET committed
362
363
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener used
André Anjos's avatar
André Anjos committed
364
365

    """
366

Philip ABBET's avatar
Philip ABBET committed
367
368
369
370
    def __init__(self, channel, synchronization_listener=None,
            restricted_access=True):

        self._inputs                  = []
371
372
373
374
        self.data_index               = -1  # Lower index across all inputs
        self.data_index_end           = -1  # Bigger index across all inputs
        self.first_data_index         = -1  # Start index of the current data units
        self.last_data_index          = -1  # End index of the current data units
Philip ABBET's avatar
Philip ABBET committed
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
        self.channel                  = str(channel)
        self.synchronization_listener = synchronization_listener
        self.restricted_access        = restricted_access
        self.socket                   = None
        self.comm_time                = 0.


    def __getitem__(self, index):

        if isinstance(index, six.string_types):
            try:
                return [x for x in self._inputs if x.name == index][0]
            except:
                pass
        elif isinstance(index, int):
            if index < len(self._inputs):
                return self._inputs[index]
        return None
393
394


Philip ABBET's avatar
Philip ABBET committed
395
396
    def __iter__(self):
        for k in self._inputs: yield k
397

André Anjos's avatar
André Anjos committed
398

Philip ABBET's avatar
Philip ABBET committed
399
400
    def __len__(self):
        return len(self._inputs)
401
402


Philip ABBET's avatar
Philip ABBET committed
403
404
    def add(self, input):
        """Add an input to the group
405

Philip ABBET's avatar
Philip ABBET committed
406
        Parameters:
407

Philip ABBET's avatar
Philip ABBET committed
408
409
          input (beat.backend.python.inputs.Input or beat.backend.python.inputs.RemoteInput):
                  The input to add
410

Philip ABBET's avatar
Philip ABBET committed
411
        """
412

Philip ABBET's avatar
Philip ABBET committed
413
414
        if isinstance(input, RemoteInput) and (self.socket is None):
            self.socket = input.socket
415

Philip ABBET's avatar
Philip ABBET committed
416
417
        input.group = self
        self._inputs.append(input)
418

André Anjos's avatar
André Anjos committed
419

Philip ABBET's avatar
Philip ABBET committed
420
421
    def localInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, Input) ]: yield k
André Anjos's avatar
André Anjos committed
422
423


Philip ABBET's avatar
Philip ABBET committed
424
425
    def remoteInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, RemoteInput) ]: yield k
André Anjos's avatar
André Anjos committed
426

427

Philip ABBET's avatar
Philip ABBET committed
428
429
    def hasMoreData(self):
        """Indicates if there is more data to process in the group"""
430

Philip ABBET's avatar
Philip ABBET committed
431
432
433
434
        # First process the local inputs
        res = bool([x for x in self.localInputs() if x.hasMoreData()])
        if res:
            return True
435

Philip ABBET's avatar
Philip ABBET committed
436
437
438
        # Next process the remote inputs
        if self.socket is None:
            return False
439

Philip ABBET's avatar
Philip ABBET committed
440
        logger.debug('send: (hmd) has-more-data %s', self.channel)
441

Philip ABBET's avatar
Philip ABBET committed
442
        _start = time.time()
443

Philip ABBET's avatar
Philip ABBET committed
444
445
        self.socket.send('hmd', zmq.SNDMORE)
        self.socket.send(self.channel)
446

Philip ABBET's avatar
Philip ABBET committed
447
        answer = self.socket.recv()
448

Philip ABBET's avatar
Philip ABBET committed
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)

        if answer == 'err':
            process_error(self.socket)

        return answer == 'tru'


    def next(self):
        """Retrieve the next block of data on all the inputs"""

        # Only for groups not managed by the platform
        if self.restricted_access:
            raise RuntimeError('Not authorized')

        # Only retrieve new data on the inputs where the current data expire first
        lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)
        inputs_to_update = [x for x in self._inputs \
                if x.data_index_end == lower_end_index]
        inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]

        for input in [ x for x in inputs_to_update if isinstance(x, Input) ]:
            input.next()
            input.data_same_as_previous = False

        remote_inputs_to_update = list([ x for x in inputs_to_update if isinstance(x, RemoteInput) ])
        if len(remote_inputs_to_update) > 0:
            logger.debug('send: (nxt) next %s', self.channel)
            self.socket.send('nxt', zmq.SNDMORE)
            self.socket.send(self.channel)

            # read all incomming data
            _start = time.time()
            more = True
            parts = []
            while more:
                parts.append(self.socket.recv())
                if parts[-1] == 'err':
                    self.comm_time += time.time() - _start
                    process_error(self.socket)

                more = self.socket.getsockopt(zmq.RCVMORE)

            n = int(parts.pop(0))
            logger.debug('recv: %d (inputs)', n)
            for k in range(n):
                name = parts.pop(0)
                logger.debug('recv: %s (data follows)', name)
                inpt = self[name]
                if inpt is None:
                    raise RuntimeError("Could not find input `%s' at input group for " \
                        "channel `%s' while performing `next' operation on this group " \
                        "(current reading position is %d/%d)" % \
                        (name, self.channel, k, n))
                inpt.data_index = int(parts.pop(0))
                inpt.data_index_end = int(parts.pop(0))
                inpt.unpack(parts.pop(0))
                inpt.nb_data_blocks_read += 1
            self.comm_time += time.time() - _start

        for input in inputs_up_to_date:
            input.data_same_as_previous = True

        # Compute the group's start and end indices
515
        self.data_index = reduce(lambda x, y: min(x, y.data_index),
Philip ABBET's avatar
Philip ABBET committed
516
                self._inputs[1:], self._inputs[0].data_index)
517
518
519
520
521
522
        self.data_index_end = reduce(lambda x, y: max(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)

        self.first_data_index = reduce(lambda x, y: max(x, y.data_index),
                self._inputs[1:], self._inputs[0].data_index)
        self.last_data_index = reduce(lambda x, y: min(x, y.data_index_end),
Philip ABBET's avatar
Philip ABBET committed
523
524
525
526
                self._inputs[1:], self._inputs[0].data_index_end)

        # Inform the synchronisation listener
        if self.synchronization_listener is not None:
527
528
            self.synchronization_listener.onIntervalChanged(self.first_data_index,
                                                            self.last_data_index)
André Anjos's avatar
André Anjos committed
529
530


531
532
533
#----------------------------------------------------------


André Anjos's avatar
André Anjos committed
534
class InputList:
Philip ABBET's avatar
Philip ABBET committed
535
    """Represents the list of inputs of a processing block
André Anjos's avatar
André Anjos committed
536

Philip ABBET's avatar
Philip ABBET committed
537
538
539
    Inputs are organized by groups. The inputs inside a group are all
    synchronized together (see the section *Inputs synchronization* of the User's
    Guide).
André Anjos's avatar
André Anjos committed
540

Philip ABBET's avatar
Philip ABBET committed
541
    A list implementing this interface is provided to the algorithms
André Anjos's avatar
André Anjos committed
542

Philip ABBET's avatar
Philip ABBET committed
543
544
    One group of inputs is always considered as the **main** one, and is used to
    drive the algorithm. The usage of the other groups is left to the algorithm.
André Anjos's avatar
André Anjos committed
545

Philip ABBET's avatar
Philip ABBET committed
546
547
    See :py:class:`beat.core.inputs.Input`
    See :py:class:`beat.core.inputs.InputGroup`
André Anjos's avatar
André Anjos committed
548
549


Philip ABBET's avatar
Philip ABBET committed
550
    Example:
André Anjos's avatar
André Anjos committed
551

Philip ABBET's avatar
Philip ABBET committed
552
      .. code-block:: python
André Anjos's avatar
André Anjos committed
553

Philip ABBET's avatar
Philip ABBET committed
554
555
         inputs = InputList()
         ...
André Anjos's avatar
André Anjos committed
556

Philip ABBET's avatar
Philip ABBET committed
557
558
         # Retrieve an input by name
         input = inputs['labels']
André Anjos's avatar
André Anjos committed
559

Philip ABBET's avatar
Philip ABBET committed
560
561
562
         # Retrieve an input by index
         for index in range(0, len(inputs)):
             input = inputs[index]
André Anjos's avatar
André Anjos committed
563

Philip ABBET's avatar
Philip ABBET committed
564
565
566
         # Iteration over all inputs
         for input in inputs:
             ...
André Anjos's avatar
André Anjos committed
567

Philip ABBET's avatar
Philip ABBET committed
568
569
570
         # Iteration over some inputs
         for input in inputs[0:2]:
             ...
André Anjos's avatar
André Anjos committed
571

Philip ABBET's avatar
Philip ABBET committed
572
573
         # Retrieve the group an input belongs to, by input name
         group = inputs.groupOf('label')
André Anjos's avatar
André Anjos committed
574

Philip ABBET's avatar
Philip ABBET committed
575
576
577
         # Retrieve the group an input belongs to
         input = inputs['labels']
         group = input.group
André Anjos's avatar
André Anjos committed
578
579


Philip ABBET's avatar
Philip ABBET committed
580
    Attributes:
André Anjos's avatar
André Anjos committed
581

Philip ABBET's avatar
Philip ABBET committed
582
583
      group (beat.core.inputs.InputGroup): Main group (for data-driven
        algorithms)
André Anjos's avatar
André Anjos committed
584

Philip ABBET's avatar
Philip ABBET committed
585
    """
André Anjos's avatar
André Anjos committed
586

Philip ABBET's avatar
Philip ABBET committed
587
588
589
    def __init__(self):
        self._groups = []
        self.main_group = None
André Anjos's avatar
André Anjos committed
590
591


Philip ABBET's avatar
Philip ABBET committed
592
593
    def add(self, group):
        """Add a group to the list
André Anjos's avatar
André Anjos committed
594

Philip ABBET's avatar
Philip ABBET committed
595
596
597
598
        :param beat.core.platform.inputs.InputGroup group: The group to add
        """
        if group.restricted_access and (self.main_group is None):
            self.main_group = group
André Anjos's avatar
André Anjos committed
599

Philip ABBET's avatar
Philip ABBET committed
600
        self._groups.append(group)
André Anjos's avatar
André Anjos committed
601

602

Philip ABBET's avatar
Philip ABBET committed
603
604
605
606
607
608
609
    def __getitem__(self, index):
        if isinstance(index, six.string_types):
            try:
                return [k for k in map(lambda x: x[index], self._groups) \
                        if k is not None][0]
            except:
                pass
André Anjos's avatar
André Anjos committed
610

Philip ABBET's avatar
Philip ABBET committed
611
612
613
614
        elif isinstance(index, int):
            for group in self._groups:
                if index < len(group): return group[index]
                index -= len(group)
André Anjos's avatar
André Anjos committed
615

Philip ABBET's avatar
Philip ABBET committed
616
        return None
André Anjos's avatar
André Anjos committed
617

618

Philip ABBET's avatar
Philip ABBET committed
619
620
    def __iter__(self):
        for i in range(len(self)): yield self[i]
André Anjos's avatar
André Anjos committed
621

622

Philip ABBET's avatar
Philip ABBET committed
623
624
    def __len__(self):
        return reduce(lambda x, y: x + len(y), self._groups, 0)
André Anjos's avatar
André Anjos committed
625

626

Philip ABBET's avatar
Philip ABBET committed
627
628
    def nbGroups(self):
        return len(self._groups)
André Anjos's avatar
André Anjos committed
629

630

Philip ABBET's avatar
Philip ABBET committed
631
632
633
634
635
    def groupOf(self, input_name):
        try:
            return [k for k in self._groups if k[input_name] is not None][0]
        except:
            return None
André Anjos's avatar
André Anjos committed
636

637

Philip ABBET's avatar
Philip ABBET committed
638
639
640
    def hasMoreData(self):
        """Indicates if there is more data to process in any group"""
        return bool([x for x in self._groups if x.hasMoreData()])
André Anjos's avatar
André Anjos committed
641
642


Philip ABBET's avatar
Philip ABBET committed
643
644
645
646
647
648
649
650
651
652
    def group(self, name_or_index):
        if isinstance(name_or_index, six.string_types):
            try:
                return [x for x in self._groups if x.channel == name_or_index][0]
            except:
                return None
        elif isinstance(name_or_index, int):
            return self._groups[name_or_index]
        else:
            return None