inputs.py 19.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import time
import logging
logger = logging.getLogger(__name__)

from functools import reduce

import six
import zmq


class Input:
Philip ABBET's avatar
Philip ABBET committed
40
    """Represents the input of a processing block
André Anjos's avatar
André Anjos committed
41

Philip ABBET's avatar
Philip ABBET committed
42
43
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
André Anjos's avatar
André Anjos committed
44
45


Philip ABBET's avatar
Philip ABBET committed
46
    Parameters:
47

Philip ABBET's avatar
Philip ABBET committed
48
      name (str): Name of the input
49

Philip ABBET's avatar
Philip ABBET committed
50
      data_format (str): Data format accepted by the input
51

Philip ABBET's avatar
Philip ABBET committed
52
53
      data_source (beat.core.platform.data.DataSource): Source of data to be used
        by the input
54
55


Philip ABBET's avatar
Philip ABBET committed
56
    Attributes:
57

Philip ABBET's avatar
Philip ABBET committed
58
      group (beat.core.inputs.InputGroup): Group containing this input
59

Philip ABBET's avatar
Philip ABBET committed
60
      name (str): Name of the input (algorithm-specific)
61

Philip ABBET's avatar
Philip ABBET committed
62
63
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
64

Philip ABBET's avatar
Philip ABBET committed
65
66
      data_index (int): Index of the last block of data received on the input
        (see the section *Inputs synchronization* of the User's Guide)
67

Philip ABBET's avatar
Philip ABBET committed
68
69
      data_index_end (int): End index of the last block of data received on the
        input (see the section *Inputs synchronization* of the User's Guide)
70

Philip ABBET's avatar
Philip ABBET committed
71
      data_format (str): Data format accepted by the input
72

Philip ABBET's avatar
Philip ABBET committed
73
      data_source (beat.core.data.DataSource): Source of data used by the output
74

Philip ABBET's avatar
Philip ABBET committed
75
      nb_data_blocks_read (int): Number of data blocks read so far
76

Philip ABBET's avatar
Philip ABBET committed
77
    """
78

Philip ABBET's avatar
Philip ABBET committed
79
    def __init__(self, name, data_format, data_source):
80

81
82
83
84
85
86
87
88
89
        self.group                 = None
        self.name                  = str(name)
        self.data                  = None
        self.data_index            = -1
        self.data_index_end        = -1
        self.data_same_as_previous = True
        self.data_format           = data_format
        self.data_source           = data_source
        self.nb_data_blocks_read   = 0
90
91


Philip ABBET's avatar
Philip ABBET committed
92
93
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
94

95
96
97
        if (self.data_index_end >= 0) and (self.group.data_index_end == -1):
            return True

Philip ABBET's avatar
Philip ABBET committed
98
        return (self.data_index_end == self.group.data_index_end)
99
100


Philip ABBET's avatar
Philip ABBET committed
101
102
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
103

Philip ABBET's avatar
Philip ABBET committed
104
        return self.data_source.hasMoreData()
105
106


107
108
109
110
111
112
113
    def hasDataChanged(self):
        """Indicates if the current data unit is different than the one at the
        previous iteration"""

        return not self.data_same_as_previous


Philip ABBET's avatar
Philip ABBET committed
114
115
    def next(self):
        """Retrieves the next block of data"""
116

117
118
119
        if self.group.restricted_access:
            raise RuntimeError('Not authorized')

Philip ABBET's avatar
Philip ABBET committed
120
        (self.data, self.data_index, self.data_index_end) = self.data_source.next()
121

Philip ABBET's avatar
Philip ABBET committed
122
123
124
125
126
127
        if self.data is None:
            message = "User algorithm asked for more data for channel " \
                      "`%s' on input `%s', but it is over (no more data). This " \
                      "normally indicates a programming error on the user " \
                      "side." % (self.group.channel, self.name)
            raise RuntimeError(message)
128

Philip ABBET's avatar
Philip ABBET committed
129
130
        self.data_same_as_previous = False
        self.nb_data_blocks_read += 1
131
132
133
134
135


#----------------------------------------------------------


136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message


#----------------------------------------------------------


def process_error(socket):
    kind = socket.recv()
    message = socket.recv()
    raise RemoteException(kind, message)


#----------------------------------------------------------


161
class RemoteInput:
Philip ABBET's avatar
Philip ABBET committed
162
    """Allows to access the input of a processing block, via a socket.
163

Philip ABBET's avatar
Philip ABBET committed
164
165
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
166

Philip ABBET's avatar
Philip ABBET committed
167
168
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
169
170


Philip ABBET's avatar
Philip ABBET committed
171
    Parameters:
André Anjos's avatar
André Anjos committed
172

Philip ABBET's avatar
Philip ABBET committed
173
      name (str): Name of the input
André Anjos's avatar
André Anjos committed
174

Philip ABBET's avatar
Philip ABBET committed
175
176
      data_format (object): An object with the preloaded data format for this
        input (see :py:class:`beat.backend.python.dataformat.DataFormat`).
André Anjos's avatar
André Anjos committed
177

Philip ABBET's avatar
Philip ABBET committed
178
      socket (object): A 0MQ socket for writing the data to the server process
André Anjos's avatar
André Anjos committed
179
180


Philip ABBET's avatar
Philip ABBET committed
181
    Attributes:
André Anjos's avatar
André Anjos committed
182

Philip ABBET's avatar
Philip ABBET committed
183
      group (beat.backend.python.inputs.InputGroup): Group containing this input
André Anjos's avatar
André Anjos committed
184

Philip ABBET's avatar
Philip ABBET committed
185
186
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
André Anjos's avatar
André Anjos committed
187

Philip ABBET's avatar
Philip ABBET committed
188
    """
André Anjos's avatar
André Anjos committed
189

Philip ABBET's avatar
Philip ABBET committed
190
    def __init__(self, name, data_format, socket, unpack=True):
André Anjos's avatar
André Anjos committed
191

Philip ABBET's avatar
Philip ABBET committed
192
193
194
195
196
197
198
199
200
201
        self.name = str(name)
        self.data_format = data_format
        self.socket = socket
        self.data = None
        self.data_index = -1
        self.data_index_end = -1
        self.group = None
        self.comm_time = 0. #total time spent on communication
        self.nb_data_blocks_read = 0
        self._unpack = unpack
André Anjos's avatar
André Anjos committed
202
203


Philip ABBET's avatar
Philip ABBET committed
204
205
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
André Anjos's avatar
André Anjos committed
206

Philip ABBET's avatar
Philip ABBET committed
207
        logger.debug('send: (idd) is-dataunit-done %s', self.name)
208

Philip ABBET's avatar
Philip ABBET committed
209
        _start = time.time()
210

Philip ABBET's avatar
Philip ABBET committed
211
212
213
        self.socket.send('idd', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
214

Philip ABBET's avatar
Philip ABBET committed
215
        answer = self.socket.recv()
216

Philip ABBET's avatar
Philip ABBET committed
217
218
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)
219

Philip ABBET's avatar
Philip ABBET committed
220
        return answer == 'tru'
André Anjos's avatar
André Anjos committed
221
222


Philip ABBET's avatar
Philip ABBET committed
223
224
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
André Anjos's avatar
André Anjos committed
225

Philip ABBET's avatar
Philip ABBET committed
226
        logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
227

Philip ABBET's avatar
Philip ABBET committed
228
        _start = time.time()
229

Philip ABBET's avatar
Philip ABBET committed
230
231
232
        self.socket.send('hmd', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
233

Philip ABBET's avatar
Philip ABBET committed
234
        answer = self.socket.recv()
235

Philip ABBET's avatar
Philip ABBET committed
236
237
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)
238

Philip ABBET's avatar
Philip ABBET committed
239
240
        if answer == 'err':
            process_error(self.socket)
241

Philip ABBET's avatar
Philip ABBET committed
242
        return answer == 'tru'
André Anjos's avatar
André Anjos committed
243
244


245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
    def hasDataChanged(self):
        """Indicates if the current data unit is different than the one at the
        previous iteration"""

        logger.debug('send: (hdc) has-data-changed %s %s', self.group.channel, self.name)

        _start = time.time()

        self.socket.send('hdc', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)

        answer = self.socket.recv()

        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)

        if answer == 'err':
            process_error(self.socket)

        return answer == 'tru'


Philip ABBET's avatar
Philip ABBET committed
268
269
    def next(self):
        """Retrieves the next block of data"""
André Anjos's avatar
André Anjos committed
270

Philip ABBET's avatar
Philip ABBET committed
271
        logger.debug('send: (nxt) next %s %s', self.group.channel, self.name)
272

Philip ABBET's avatar
Philip ABBET committed
273
        _start = time.time()
274

Philip ABBET's avatar
Philip ABBET committed
275
276
277
        self.socket.send('nxt', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
278

Philip ABBET's avatar
Philip ABBET committed
279
        answer = self.socket.recv()
280

Philip ABBET's avatar
Philip ABBET committed
281
282
283
        if answer == 'err':
            self.comm_time += time.time() - _start
            process_error(self.socket)
284

Philip ABBET's avatar
Philip ABBET committed
285
286
287
        self.data_index = int(answer)
        self.data_index_end = int(self.socket.recv())
        self.unpack(self.socket.recv())
288

Philip ABBET's avatar
Philip ABBET committed
289
290
        self.comm_time += time.time() - _start
        self.nb_data_blocks_read += 1
André Anjos's avatar
André Anjos committed
291
292


Philip ABBET's avatar
Philip ABBET committed
293
294
    def unpack(self, packed):
        """Receives data through socket"""
André Anjos's avatar
André Anjos committed
295

Philip ABBET's avatar
Philip ABBET committed
296
297
        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
                     self.data_index, self.data_index_end)
298

Philip ABBET's avatar
Philip ABBET committed
299
300
301
302
303
        if self.unpack:
            self.data = self.data_format.type()
            self.data.unpack(packed)
        else:
            self.data = packed
André Anjos's avatar
André Anjos committed
304
305


306
307
308
#----------------------------------------------------------


309
class InputGroup:
Philip ABBET's avatar
Philip ABBET committed
310
    """Represents a group of inputs synchronized together
André Anjos's avatar
André Anjos committed
311

Philip ABBET's avatar
Philip ABBET committed
312
313
314
    The inputs can be either "local" ones (reading data from the cache) or
    "remote" ones (using a socket to communicate with a database view output
    located inside a docker container).
André Anjos's avatar
André Anjos committed
315

Philip ABBET's avatar
Philip ABBET committed
316
317
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
André Anjos's avatar
André Anjos committed
318

Philip ABBET's avatar
Philip ABBET committed
319
320
    A group implementing this interface is provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`).
André Anjos's avatar
André Anjos committed
321

Philip ABBET's avatar
Philip ABBET committed
322
    See :py:class:`beat.core.inputs.Input`
André Anjos's avatar
André Anjos committed
323

Philip ABBET's avatar
Philip ABBET committed
324
    Example:
André Anjos's avatar
André Anjos committed
325

Philip ABBET's avatar
Philip ABBET committed
326
      .. code-block:: python
327

Philip ABBET's avatar
Philip ABBET committed
328
         inputs = InputList()
André Anjos's avatar
André Anjos committed
329

Philip ABBET's avatar
Philip ABBET committed
330
         print inputs['labels'].data_format
331

Philip ABBET's avatar
Philip ABBET committed
332
333
         for index in range(0, len(inputs)):
             print inputs[index].data_format
334

Philip ABBET's avatar
Philip ABBET committed
335
336
         for input in inputs:
             print input.data_format
337

Philip ABBET's avatar
Philip ABBET committed
338
339
         for input in inputs[0:2]:
             print input.data_format
340
341


Philip ABBET's avatar
Philip ABBET committed
342
    Parameters:
André Anjos's avatar
André Anjos committed
343

Philip ABBET's avatar
Philip ABBET committed
344
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
345

Philip ABBET's avatar
Philip ABBET committed
346
347
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener to use
André Anjos's avatar
André Anjos committed
348

Philip ABBET's avatar
Philip ABBET committed
349
350
      restricted_access (bool): Indicates if the algorithm can freely use the
        inputs
André Anjos's avatar
André Anjos committed
351
352


Philip ABBET's avatar
Philip ABBET committed
353
    Attributes:
André Anjos's avatar
André Anjos committed
354

Philip ABBET's avatar
Philip ABBET committed
355
356
      data_index (int): Index of the last block of data received on the inputs
        (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
357

Philip ABBET's avatar
Philip ABBET committed
358
359
      data_index_end (int): End index of the last block of data received on the
        inputs (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
360

Philip ABBET's avatar
Philip ABBET committed
361
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
362

Philip ABBET's avatar
Philip ABBET committed
363
364
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener used
André Anjos's avatar
André Anjos committed
365
366

    """
367

Philip ABBET's avatar
Philip ABBET committed
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
    def __init__(self, channel, synchronization_listener=None,
            restricted_access=True):

        self._inputs                  = []
        self.data_index               = -1
        self.data_index_end           = -1
        self.channel                  = str(channel)
        self.synchronization_listener = synchronization_listener
        self.restricted_access        = restricted_access
        self.socket                   = None
        self.comm_time                = 0.


    def __getitem__(self, index):

        if isinstance(index, six.string_types):
            try:
                return [x for x in self._inputs if x.name == index][0]
            except:
                pass
        elif isinstance(index, int):
            if index < len(self._inputs):
                return self._inputs[index]
        return None
392
393


Philip ABBET's avatar
Philip ABBET committed
394
395
    def __iter__(self):
        for k in self._inputs: yield k
396

André Anjos's avatar
André Anjos committed
397

Philip ABBET's avatar
Philip ABBET committed
398
399
    def __len__(self):
        return len(self._inputs)
400
401


Philip ABBET's avatar
Philip ABBET committed
402
403
    def add(self, input):
        """Add an input to the group
404

Philip ABBET's avatar
Philip ABBET committed
405
        Parameters:
406

Philip ABBET's avatar
Philip ABBET committed
407
408
          input (beat.backend.python.inputs.Input or beat.backend.python.inputs.RemoteInput):
                  The input to add
409

Philip ABBET's avatar
Philip ABBET committed
410
        """
411

Philip ABBET's avatar
Philip ABBET committed
412
413
        if isinstance(input, RemoteInput) and (self.socket is None):
            self.socket = input.socket
414

Philip ABBET's avatar
Philip ABBET committed
415
416
        input.group = self
        self._inputs.append(input)
417

André Anjos's avatar
André Anjos committed
418

Philip ABBET's avatar
Philip ABBET committed
419
420
    def localInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, Input) ]: yield k
André Anjos's avatar
André Anjos committed
421
422


Philip ABBET's avatar
Philip ABBET committed
423
424
    def remoteInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, RemoteInput) ]: yield k
André Anjos's avatar
André Anjos committed
425

426

Philip ABBET's avatar
Philip ABBET committed
427
428
    def hasMoreData(self):
        """Indicates if there is more data to process in the group"""
429

Philip ABBET's avatar
Philip ABBET committed
430
431
432
433
        # First process the local inputs
        res = bool([x for x in self.localInputs() if x.hasMoreData()])
        if res:
            return True
434

Philip ABBET's avatar
Philip ABBET committed
435
436
437
        # Next process the remote inputs
        if self.socket is None:
            return False
438

Philip ABBET's avatar
Philip ABBET committed
439
        logger.debug('send: (hmd) has-more-data %s', self.channel)
440

Philip ABBET's avatar
Philip ABBET committed
441
        _start = time.time()
442

Philip ABBET's avatar
Philip ABBET committed
443
444
        self.socket.send('hmd', zmq.SNDMORE)
        self.socket.send(self.channel)
445

Philip ABBET's avatar
Philip ABBET committed
446
        answer = self.socket.recv()
447

Philip ABBET's avatar
Philip ABBET committed
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)

        if answer == 'err':
            process_error(self.socket)

        return answer == 'tru'


    def next(self):
        """Retrieve the next block of data on all the inputs"""

        # Only for groups not managed by the platform
        if self.restricted_access:
            raise RuntimeError('Not authorized')

        # Only retrieve new data on the inputs where the current data expire first
        lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)
        inputs_to_update = [x for x in self._inputs \
                if x.data_index_end == lower_end_index]
        inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]

        for input in [ x for x in inputs_to_update if isinstance(x, Input) ]:
            input.next()
            input.data_same_as_previous = False

        remote_inputs_to_update = list([ x for x in inputs_to_update if isinstance(x, RemoteInput) ])
        if len(remote_inputs_to_update) > 0:
            logger.debug('send: (nxt) next %s', self.channel)
            self.socket.send('nxt', zmq.SNDMORE)
            self.socket.send(self.channel)

            # read all incomming data
            _start = time.time()
            more = True
            parts = []
            while more:
                parts.append(self.socket.recv())
                if parts[-1] == 'err':
                    self.comm_time += time.time() - _start
                    process_error(self.socket)

                more = self.socket.getsockopt(zmq.RCVMORE)

            n = int(parts.pop(0))
            logger.debug('recv: %d (inputs)', n)
            for k in range(n):
                name = parts.pop(0)
                logger.debug('recv: %s (data follows)', name)
                inpt = self[name]
                if inpt is None:
                    raise RuntimeError("Could not find input `%s' at input group for " \
                        "channel `%s' while performing `next' operation on this group " \
                        "(current reading position is %d/%d)" % \
                        (name, self.channel, k, n))
                inpt.data_index = int(parts.pop(0))
                inpt.data_index_end = int(parts.pop(0))
                inpt.unpack(parts.pop(0))
                inpt.nb_data_blocks_read += 1
            self.comm_time += time.time() - _start

        for input in inputs_up_to_date:
            input.data_same_as_previous = True

        # Compute the group's start and end indices
        self.data_index = reduce(lambda x, y: max(x, y.data_index),
                self._inputs[1:], self._inputs[0].data_index)
        self.data_index_end = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)

        # Inform the synchronisation listener
        if self.synchronization_listener is not None:
            self.synchronization_listener.onIntervalChanged(self.data_index,
                    self.data_index_end)
André Anjos's avatar
André Anjos committed
523
524


525
526
527
#----------------------------------------------------------


André Anjos's avatar
André Anjos committed
528
class InputList:
Philip ABBET's avatar
Philip ABBET committed
529
    """Represents the list of inputs of a processing block
André Anjos's avatar
André Anjos committed
530

Philip ABBET's avatar
Philip ABBET committed
531
532
533
    Inputs are organized by groups. The inputs inside a group are all
    synchronized together (see the section *Inputs synchronization* of the User's
    Guide).
André Anjos's avatar
André Anjos committed
534

Philip ABBET's avatar
Philip ABBET committed
535
    A list implementing this interface is provided to the algorithms
André Anjos's avatar
André Anjos committed
536

Philip ABBET's avatar
Philip ABBET committed
537
538
    One group of inputs is always considered as the **main** one, and is used to
    drive the algorithm. The usage of the other groups is left to the algorithm.
André Anjos's avatar
André Anjos committed
539

Philip ABBET's avatar
Philip ABBET committed
540
541
    See :py:class:`beat.core.inputs.Input`
    See :py:class:`beat.core.inputs.InputGroup`
André Anjos's avatar
André Anjos committed
542
543


Philip ABBET's avatar
Philip ABBET committed
544
    Example:
André Anjos's avatar
André Anjos committed
545

Philip ABBET's avatar
Philip ABBET committed
546
      .. code-block:: python
André Anjos's avatar
André Anjos committed
547

Philip ABBET's avatar
Philip ABBET committed
548
549
         inputs = InputList()
         ...
André Anjos's avatar
André Anjos committed
550

Philip ABBET's avatar
Philip ABBET committed
551
552
         # Retrieve an input by name
         input = inputs['labels']
André Anjos's avatar
André Anjos committed
553

Philip ABBET's avatar
Philip ABBET committed
554
555
556
         # Retrieve an input by index
         for index in range(0, len(inputs)):
             input = inputs[index]
André Anjos's avatar
André Anjos committed
557

Philip ABBET's avatar
Philip ABBET committed
558
559
560
         # Iteration over all inputs
         for input in inputs:
             ...
André Anjos's avatar
André Anjos committed
561

Philip ABBET's avatar
Philip ABBET committed
562
563
564
         # Iteration over some inputs
         for input in inputs[0:2]:
             ...
André Anjos's avatar
André Anjos committed
565

Philip ABBET's avatar
Philip ABBET committed
566
567
         # Retrieve the group an input belongs to, by input name
         group = inputs.groupOf('label')
André Anjos's avatar
André Anjos committed
568

Philip ABBET's avatar
Philip ABBET committed
569
570
571
         # Retrieve the group an input belongs to
         input = inputs['labels']
         group = input.group
André Anjos's avatar
André Anjos committed
572
573


Philip ABBET's avatar
Philip ABBET committed
574
    Attributes:
André Anjos's avatar
André Anjos committed
575

Philip ABBET's avatar
Philip ABBET committed
576
577
      group (beat.core.inputs.InputGroup): Main group (for data-driven
        algorithms)
André Anjos's avatar
André Anjos committed
578

Philip ABBET's avatar
Philip ABBET committed
579
    """
André Anjos's avatar
André Anjos committed
580

Philip ABBET's avatar
Philip ABBET committed
581
582
583
    def __init__(self):
        self._groups = []
        self.main_group = None
André Anjos's avatar
André Anjos committed
584
585


Philip ABBET's avatar
Philip ABBET committed
586
587
    def add(self, group):
        """Add a group to the list
André Anjos's avatar
André Anjos committed
588

Philip ABBET's avatar
Philip ABBET committed
589
590
591
592
        :param beat.core.platform.inputs.InputGroup group: The group to add
        """
        if group.restricted_access and (self.main_group is None):
            self.main_group = group
André Anjos's avatar
André Anjos committed
593

Philip ABBET's avatar
Philip ABBET committed
594
        self._groups.append(group)
André Anjos's avatar
André Anjos committed
595

596

Philip ABBET's avatar
Philip ABBET committed
597
598
599
600
601
602
603
    def __getitem__(self, index):
        if isinstance(index, six.string_types):
            try:
                return [k for k in map(lambda x: x[index], self._groups) \
                        if k is not None][0]
            except:
                pass
André Anjos's avatar
André Anjos committed
604

Philip ABBET's avatar
Philip ABBET committed
605
606
607
608
        elif isinstance(index, int):
            for group in self._groups:
                if index < len(group): return group[index]
                index -= len(group)
André Anjos's avatar
André Anjos committed
609

Philip ABBET's avatar
Philip ABBET committed
610
        return None
André Anjos's avatar
André Anjos committed
611

612

Philip ABBET's avatar
Philip ABBET committed
613
614
    def __iter__(self):
        for i in range(len(self)): yield self[i]
André Anjos's avatar
André Anjos committed
615

616

Philip ABBET's avatar
Philip ABBET committed
617
618
    def __len__(self):
        return reduce(lambda x, y: x + len(y), self._groups, 0)
André Anjos's avatar
André Anjos committed
619

620

Philip ABBET's avatar
Philip ABBET committed
621
622
    def nbGroups(self):
        return len(self._groups)
André Anjos's avatar
André Anjos committed
623

624

Philip ABBET's avatar
Philip ABBET committed
625
626
627
628
629
    def groupOf(self, input_name):
        try:
            return [k for k in self._groups if k[input_name] is not None][0]
        except:
            return None
André Anjos's avatar
André Anjos committed
630

631

Philip ABBET's avatar
Philip ABBET committed
632
633
634
    def hasMoreData(self):
        """Indicates if there is more data to process in any group"""
        return bool([x for x in self._groups if x.hasMoreData()])
André Anjos's avatar
André Anjos committed
635
636


Philip ABBET's avatar
Philip ABBET committed
637
638
639
640
641
642
643
644
645
646
    def group(self, name_or_index):
        if isinstance(name_or_index, six.string_types):
            try:
                return [x for x in self._groups if x.channel == name_or_index][0]
            except:
                return None
        elif isinstance(name_or_index, int):
            return self._groups[name_or_index]
        else:
            return None