inputs.py 18.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import time
import logging
logger = logging.getLogger(__name__)

from functools import reduce

import six
import zmq


class Input:
Philip ABBET's avatar
Philip ABBET committed
40
    """Represents the input of a processing block
André Anjos's avatar
André Anjos committed
41

Philip ABBET's avatar
Philip ABBET committed
42
43
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
André Anjos's avatar
André Anjos committed
44
45


Philip ABBET's avatar
Philip ABBET committed
46
    Parameters:
47

Philip ABBET's avatar
Philip ABBET committed
48
      name (str): Name of the input
49

Philip ABBET's avatar
Philip ABBET committed
50
      data_format (str): Data format accepted by the input
51

Philip ABBET's avatar
Philip ABBET committed
52
53
      data_source (beat.core.platform.data.DataSource): Source of data to be used
        by the input
54
55


Philip ABBET's avatar
Philip ABBET committed
56
    Attributes:
57

Philip ABBET's avatar
Philip ABBET committed
58
      group (beat.core.inputs.InputGroup): Group containing this input
59

Philip ABBET's avatar
Philip ABBET committed
60
      name (str): Name of the input (algorithm-specific)
61

Philip ABBET's avatar
Philip ABBET committed
62
63
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
64

Philip ABBET's avatar
Philip ABBET committed
65
66
      data_index (int): Index of the last block of data received on the input
        (see the section *Inputs synchronization* of the User's Guide)
67

Philip ABBET's avatar
Philip ABBET committed
68
69
      data_index_end (int): End index of the last block of data received on the
        input (see the section *Inputs synchronization* of the User's Guide)
70

Philip ABBET's avatar
Philip ABBET committed
71
      data_format (str): Data format accepted by the input
72

Philip ABBET's avatar
Philip ABBET committed
73
      data_source (beat.core.data.DataSource): Source of data used by the output
74

Philip ABBET's avatar
Philip ABBET committed
75
      nb_data_blocks_read (int): Number of data blocks read so far
76

Philip ABBET's avatar
Philip ABBET committed
77
    """
78

Philip ABBET's avatar
Philip ABBET committed
79
    def __init__(self, name, data_format, data_source):
80

Philip ABBET's avatar
Philip ABBET committed
81
82
83
84
85
86
87
88
89
        self.group               = None
        self.name                = str(name)
        self.data                = None
        self.data_index          = -1
        self.data_index_end      = -1
        self.data_same_as_previous = False
        self.data_format         = data_format
        self.data_source         = data_source
        self.nb_data_blocks_read = 0
90
91


Philip ABBET's avatar
Philip ABBET committed
92
93
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
94

Philip ABBET's avatar
Philip ABBET committed
95
        return (self.data_index_end == self.group.data_index_end)
96
97


Philip ABBET's avatar
Philip ABBET committed
98
99
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
100

Philip ABBET's avatar
Philip ABBET committed
101
        return self.data_source.hasMoreData()
102
103


Philip ABBET's avatar
Philip ABBET committed
104
105
    def next(self):
        """Retrieves the next block of data"""
106

Philip ABBET's avatar
Philip ABBET committed
107
        (self.data, self.data_index, self.data_index_end) = self.data_source.next()
108

Philip ABBET's avatar
Philip ABBET committed
109
110
111
112
113
114
        if self.data is None:
            message = "User algorithm asked for more data for channel " \
                      "`%s' on input `%s', but it is over (no more data). This " \
                      "normally indicates a programming error on the user " \
                      "side." % (self.group.channel, self.name)
            raise RuntimeError(message)
115

Philip ABBET's avatar
Philip ABBET committed
116
117
        self.data_same_as_previous = False
        self.nb_data_blocks_read += 1
118
119
120
121
122


#----------------------------------------------------------


123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message


#----------------------------------------------------------


def process_error(socket):
    kind = socket.recv()
    message = socket.recv()
    raise RemoteException(kind, message)


#----------------------------------------------------------


148
class RemoteInput:
Philip ABBET's avatar
Philip ABBET committed
149
    """Allows to access the input of a processing block, via a socket.
150

Philip ABBET's avatar
Philip ABBET committed
151
152
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
153

Philip ABBET's avatar
Philip ABBET committed
154
155
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
156
157


Philip ABBET's avatar
Philip ABBET committed
158
    Parameters:
André Anjos's avatar
André Anjos committed
159

Philip ABBET's avatar
Philip ABBET committed
160
      name (str): Name of the input
André Anjos's avatar
André Anjos committed
161

Philip ABBET's avatar
Philip ABBET committed
162
163
      data_format (object): An object with the preloaded data format for this
        input (see :py:class:`beat.backend.python.dataformat.DataFormat`).
André Anjos's avatar
André Anjos committed
164

Philip ABBET's avatar
Philip ABBET committed
165
      socket (object): A 0MQ socket for writing the data to the server process
André Anjos's avatar
André Anjos committed
166
167


Philip ABBET's avatar
Philip ABBET committed
168
    Attributes:
André Anjos's avatar
André Anjos committed
169

Philip ABBET's avatar
Philip ABBET committed
170
      group (beat.backend.python.inputs.InputGroup): Group containing this input
André Anjos's avatar
André Anjos committed
171

Philip ABBET's avatar
Philip ABBET committed
172
173
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
André Anjos's avatar
André Anjos committed
174

Philip ABBET's avatar
Philip ABBET committed
175
    """
André Anjos's avatar
André Anjos committed
176

Philip ABBET's avatar
Philip ABBET committed
177
    def __init__(self, name, data_format, socket, unpack=True):
André Anjos's avatar
André Anjos committed
178

Philip ABBET's avatar
Philip ABBET committed
179
180
181
182
183
184
185
186
187
188
        self.name = str(name)
        self.data_format = data_format
        self.socket = socket
        self.data = None
        self.data_index = -1
        self.data_index_end = -1
        self.group = None
        self.comm_time = 0. #total time spent on communication
        self.nb_data_blocks_read = 0
        self._unpack = unpack
André Anjos's avatar
André Anjos committed
189
190


Philip ABBET's avatar
Philip ABBET committed
191
192
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
André Anjos's avatar
André Anjos committed
193

Philip ABBET's avatar
Philip ABBET committed
194
        logger.debug('send: (idd) is-dataunit-done %s', self.name)
195

Philip ABBET's avatar
Philip ABBET committed
196
        _start = time.time()
197

Philip ABBET's avatar
Philip ABBET committed
198
199
200
        self.socket.send('idd', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
201

Philip ABBET's avatar
Philip ABBET committed
202
        answer = self.socket.recv()
203

Philip ABBET's avatar
Philip ABBET committed
204
205
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)
206

Philip ABBET's avatar
Philip ABBET committed
207
        return answer == 'tru'
André Anjos's avatar
André Anjos committed
208
209


Philip ABBET's avatar
Philip ABBET committed
210
211
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
André Anjos's avatar
André Anjos committed
212

Philip ABBET's avatar
Philip ABBET committed
213
        logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
214

Philip ABBET's avatar
Philip ABBET committed
215
        _start = time.time()
216

Philip ABBET's avatar
Philip ABBET committed
217
218
219
        self.socket.send('hmd', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
220

Philip ABBET's avatar
Philip ABBET committed
221
        answer = self.socket.recv()
222

Philip ABBET's avatar
Philip ABBET committed
223
224
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)
225

Philip ABBET's avatar
Philip ABBET committed
226
227
        if answer == 'err':
            process_error(self.socket)
228

Philip ABBET's avatar
Philip ABBET committed
229
        return answer == 'tru'
André Anjos's avatar
André Anjos committed
230
231


Philip ABBET's avatar
Philip ABBET committed
232
233
    def next(self):
        """Retrieves the next block of data"""
André Anjos's avatar
André Anjos committed
234

Philip ABBET's avatar
Philip ABBET committed
235
        logger.debug('send: (nxt) next %s %s', self.group.channel, self.name)
236

Philip ABBET's avatar
Philip ABBET committed
237
        _start = time.time()
238

Philip ABBET's avatar
Philip ABBET committed
239
240
241
        self.socket.send('nxt', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
242

Philip ABBET's avatar
Philip ABBET committed
243
        answer = self.socket.recv()
244

Philip ABBET's avatar
Philip ABBET committed
245
246
247
        if answer == 'err':
            self.comm_time += time.time() - _start
            process_error(self.socket)
248

Philip ABBET's avatar
Philip ABBET committed
249
250
251
        self.data_index = int(answer)
        self.data_index_end = int(self.socket.recv())
        self.unpack(self.socket.recv())
252

Philip ABBET's avatar
Philip ABBET committed
253
254
        self.comm_time += time.time() - _start
        self.nb_data_blocks_read += 1
André Anjos's avatar
André Anjos committed
255
256


Philip ABBET's avatar
Philip ABBET committed
257
258
    def unpack(self, packed):
        """Receives data through socket"""
André Anjos's avatar
André Anjos committed
259

Philip ABBET's avatar
Philip ABBET committed
260
261
        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
                     self.data_index, self.data_index_end)
262

Philip ABBET's avatar
Philip ABBET committed
263
264
265
266
267
        if self.unpack:
            self.data = self.data_format.type()
            self.data.unpack(packed)
        else:
            self.data = packed
André Anjos's avatar
André Anjos committed
268
269


270
271
272
#----------------------------------------------------------


273
class InputGroup:
Philip ABBET's avatar
Philip ABBET committed
274
    """Represents a group of inputs synchronized together
André Anjos's avatar
André Anjos committed
275

Philip ABBET's avatar
Philip ABBET committed
276
277
278
    The inputs can be either "local" ones (reading data from the cache) or
    "remote" ones (using a socket to communicate with a database view output
    located inside a docker container).
André Anjos's avatar
André Anjos committed
279

Philip ABBET's avatar
Philip ABBET committed
280
281
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
André Anjos's avatar
André Anjos committed
282

Philip ABBET's avatar
Philip ABBET committed
283
284
    A group implementing this interface is provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`).
André Anjos's avatar
André Anjos committed
285

Philip ABBET's avatar
Philip ABBET committed
286
    See :py:class:`beat.core.inputs.Input`
André Anjos's avatar
André Anjos committed
287

Philip ABBET's avatar
Philip ABBET committed
288
    Example:
André Anjos's avatar
André Anjos committed
289

Philip ABBET's avatar
Philip ABBET committed
290
      .. code-block:: python
291

Philip ABBET's avatar
Philip ABBET committed
292
         inputs = InputList()
André Anjos's avatar
André Anjos committed
293

Philip ABBET's avatar
Philip ABBET committed
294
         print inputs['labels'].data_format
295

Philip ABBET's avatar
Philip ABBET committed
296
297
         for index in range(0, len(inputs)):
             print inputs[index].data_format
298

Philip ABBET's avatar
Philip ABBET committed
299
300
         for input in inputs:
             print input.data_format
301

Philip ABBET's avatar
Philip ABBET committed
302
303
         for input in inputs[0:2]:
             print input.data_format
304
305


Philip ABBET's avatar
Philip ABBET committed
306
    Parameters:
André Anjos's avatar
André Anjos committed
307

Philip ABBET's avatar
Philip ABBET committed
308
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
309

Philip ABBET's avatar
Philip ABBET committed
310
311
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener to use
André Anjos's avatar
André Anjos committed
312

Philip ABBET's avatar
Philip ABBET committed
313
314
      restricted_access (bool): Indicates if the algorithm can freely use the
        inputs
André Anjos's avatar
André Anjos committed
315
316


Philip ABBET's avatar
Philip ABBET committed
317
    Attributes:
André Anjos's avatar
André Anjos committed
318

Philip ABBET's avatar
Philip ABBET committed
319
320
      data_index (int): Index of the last block of data received on the inputs
        (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
321

Philip ABBET's avatar
Philip ABBET committed
322
323
      data_index_end (int): End index of the last block of data received on the
        inputs (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
324

Philip ABBET's avatar
Philip ABBET committed
325
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
326

Philip ABBET's avatar
Philip ABBET committed
327
328
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener used
André Anjos's avatar
André Anjos committed
329
330

    """
331

Philip ABBET's avatar
Philip ABBET committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
    def __init__(self, channel, synchronization_listener=None,
            restricted_access=True):

        self._inputs                  = []
        self.data_index               = -1
        self.data_index_end           = -1
        self.channel                  = str(channel)
        self.synchronization_listener = synchronization_listener
        self.restricted_access        = restricted_access
        self.socket                   = None
        self.comm_time                = 0.


    def __getitem__(self, index):

        if isinstance(index, six.string_types):
            try:
                return [x for x in self._inputs if x.name == index][0]
            except:
                pass
        elif isinstance(index, int):
            if index < len(self._inputs):
                return self._inputs[index]
        return None
356
357


Philip ABBET's avatar
Philip ABBET committed
358
359
    def __iter__(self):
        for k in self._inputs: yield k
360

André Anjos's avatar
André Anjos committed
361

Philip ABBET's avatar
Philip ABBET committed
362
363
    def __len__(self):
        return len(self._inputs)
364
365


Philip ABBET's avatar
Philip ABBET committed
366
367
    def add(self, input):
        """Add an input to the group
368

Philip ABBET's avatar
Philip ABBET committed
369
        Parameters:
370

Philip ABBET's avatar
Philip ABBET committed
371
372
          input (beat.backend.python.inputs.Input or beat.backend.python.inputs.RemoteInput):
                  The input to add
373

Philip ABBET's avatar
Philip ABBET committed
374
        """
375

Philip ABBET's avatar
Philip ABBET committed
376
377
        if isinstance(input, RemoteInput) and (self.socket is None):
            self.socket = input.socket
378

Philip ABBET's avatar
Philip ABBET committed
379
380
        input.group = self
        self._inputs.append(input)
381

André Anjos's avatar
André Anjos committed
382

Philip ABBET's avatar
Philip ABBET committed
383
384
    def localInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, Input) ]: yield k
André Anjos's avatar
André Anjos committed
385
386


Philip ABBET's avatar
Philip ABBET committed
387
388
    def remoteInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, RemoteInput) ]: yield k
André Anjos's avatar
André Anjos committed
389

390

Philip ABBET's avatar
Philip ABBET committed
391
392
    def hasMoreData(self):
        """Indicates if there is more data to process in the group"""
393

Philip ABBET's avatar
Philip ABBET committed
394
395
396
397
        # First process the local inputs
        res = bool([x for x in self.localInputs() if x.hasMoreData()])
        if res:
            return True
398

Philip ABBET's avatar
Philip ABBET committed
399
400
401
        # Next process the remote inputs
        if self.socket is None:
            return False
402

Philip ABBET's avatar
Philip ABBET committed
403
        logger.debug('send: (hmd) has-more-data %s', self.channel)
404

Philip ABBET's avatar
Philip ABBET committed
405
        _start = time.time()
406

Philip ABBET's avatar
Philip ABBET committed
407
408
        self.socket.send('hmd', zmq.SNDMORE)
        self.socket.send(self.channel)
409

Philip ABBET's avatar
Philip ABBET committed
410
        answer = self.socket.recv()
411

Philip ABBET's avatar
Philip ABBET committed
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
        self.comm_time += time.time() - _start
        logger.debug('recv: %s', answer)

        if answer == 'err':
            process_error(self.socket)

        return answer == 'tru'


    def next(self):
        """Retrieve the next block of data on all the inputs"""

        # Only for groups not managed by the platform
        if self.restricted_access:
            raise RuntimeError('Not authorized')

        # Only retrieve new data on the inputs where the current data expire first
        lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)
        inputs_to_update = [x for x in self._inputs \
                if x.data_index_end == lower_end_index]
        inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]

        for input in [ x for x in inputs_to_update if isinstance(x, Input) ]:
            input.next()
            input.data_same_as_previous = False

        remote_inputs_to_update = list([ x for x in inputs_to_update if isinstance(x, RemoteInput) ])
        if len(remote_inputs_to_update) > 0:
            logger.debug('send: (nxt) next %s', self.channel)
            self.socket.send('nxt', zmq.SNDMORE)
            self.socket.send(self.channel)

            # read all incomming data
            _start = time.time()
            more = True
            parts = []
            while more:
                parts.append(self.socket.recv())
                if parts[-1] == 'err':
                    self.comm_time += time.time() - _start
                    process_error(self.socket)

                more = self.socket.getsockopt(zmq.RCVMORE)

            n = int(parts.pop(0))
            logger.debug('recv: %d (inputs)', n)
            for k in range(n):
                name = parts.pop(0)
                logger.debug('recv: %s (data follows)', name)
                inpt = self[name]
                if inpt is None:
                    raise RuntimeError("Could not find input `%s' at input group for " \
                        "channel `%s' while performing `next' operation on this group " \
                        "(current reading position is %d/%d)" % \
                        (name, self.channel, k, n))
                inpt.data_index = int(parts.pop(0))
                inpt.data_index_end = int(parts.pop(0))
                inpt.unpack(parts.pop(0))
                inpt.nb_data_blocks_read += 1
            self.comm_time += time.time() - _start

        for input in inputs_up_to_date:
            input.data_same_as_previous = True

        # Compute the group's start and end indices
        self.data_index = reduce(lambda x, y: max(x, y.data_index),
                self._inputs[1:], self._inputs[0].data_index)
        self.data_index_end = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)

        # Inform the synchronisation listener
        if self.synchronization_listener is not None:
            self.synchronization_listener.onIntervalChanged(self.data_index,
                    self.data_index_end)
André Anjos's avatar
André Anjos committed
487
488


489
490
491
#----------------------------------------------------------


André Anjos's avatar
André Anjos committed
492
class InputList:
Philip ABBET's avatar
Philip ABBET committed
493
    """Represents the list of inputs of a processing block
André Anjos's avatar
André Anjos committed
494

Philip ABBET's avatar
Philip ABBET committed
495
496
497
    Inputs are organized by groups. The inputs inside a group are all
    synchronized together (see the section *Inputs synchronization* of the User's
    Guide).
André Anjos's avatar
André Anjos committed
498

Philip ABBET's avatar
Philip ABBET committed
499
    A list implementing this interface is provided to the algorithms
André Anjos's avatar
André Anjos committed
500

Philip ABBET's avatar
Philip ABBET committed
501
502
    One group of inputs is always considered as the **main** one, and is used to
    drive the algorithm. The usage of the other groups is left to the algorithm.
André Anjos's avatar
André Anjos committed
503

Philip ABBET's avatar
Philip ABBET committed
504
505
    See :py:class:`beat.core.inputs.Input`
    See :py:class:`beat.core.inputs.InputGroup`
André Anjos's avatar
André Anjos committed
506
507


Philip ABBET's avatar
Philip ABBET committed
508
    Example:
André Anjos's avatar
André Anjos committed
509

Philip ABBET's avatar
Philip ABBET committed
510
      .. code-block:: python
André Anjos's avatar
André Anjos committed
511

Philip ABBET's avatar
Philip ABBET committed
512
513
         inputs = InputList()
         ...
André Anjos's avatar
André Anjos committed
514

Philip ABBET's avatar
Philip ABBET committed
515
516
         # Retrieve an input by name
         input = inputs['labels']
André Anjos's avatar
André Anjos committed
517

Philip ABBET's avatar
Philip ABBET committed
518
519
520
         # Retrieve an input by index
         for index in range(0, len(inputs)):
             input = inputs[index]
André Anjos's avatar
André Anjos committed
521

Philip ABBET's avatar
Philip ABBET committed
522
523
524
         # Iteration over all inputs
         for input in inputs:
             ...
André Anjos's avatar
André Anjos committed
525

Philip ABBET's avatar
Philip ABBET committed
526
527
528
         # Iteration over some inputs
         for input in inputs[0:2]:
             ...
André Anjos's avatar
André Anjos committed
529

Philip ABBET's avatar
Philip ABBET committed
530
531
         # Retrieve the group an input belongs to, by input name
         group = inputs.groupOf('label')
André Anjos's avatar
André Anjos committed
532

Philip ABBET's avatar
Philip ABBET committed
533
534
535
         # Retrieve the group an input belongs to
         input = inputs['labels']
         group = input.group
André Anjos's avatar
André Anjos committed
536
537


Philip ABBET's avatar
Philip ABBET committed
538
    Attributes:
André Anjos's avatar
André Anjos committed
539

Philip ABBET's avatar
Philip ABBET committed
540
541
      group (beat.core.inputs.InputGroup): Main group (for data-driven
        algorithms)
André Anjos's avatar
André Anjos committed
542

Philip ABBET's avatar
Philip ABBET committed
543
    """
André Anjos's avatar
André Anjos committed
544

Philip ABBET's avatar
Philip ABBET committed
545
546
547
    def __init__(self):
        self._groups = []
        self.main_group = None
André Anjos's avatar
André Anjos committed
548
549


Philip ABBET's avatar
Philip ABBET committed
550
551
    def add(self, group):
        """Add a group to the list
André Anjos's avatar
André Anjos committed
552

Philip ABBET's avatar
Philip ABBET committed
553
554
555
556
        :param beat.core.platform.inputs.InputGroup group: The group to add
        """
        if group.restricted_access and (self.main_group is None):
            self.main_group = group
André Anjos's avatar
André Anjos committed
557

Philip ABBET's avatar
Philip ABBET committed
558
        self._groups.append(group)
André Anjos's avatar
André Anjos committed
559

560

Philip ABBET's avatar
Philip ABBET committed
561
562
563
564
565
566
567
    def __getitem__(self, index):
        if isinstance(index, six.string_types):
            try:
                return [k for k in map(lambda x: x[index], self._groups) \
                        if k is not None][0]
            except:
                pass
André Anjos's avatar
André Anjos committed
568

Philip ABBET's avatar
Philip ABBET committed
569
570
571
572
        elif isinstance(index, int):
            for group in self._groups:
                if index < len(group): return group[index]
                index -= len(group)
André Anjos's avatar
André Anjos committed
573

Philip ABBET's avatar
Philip ABBET committed
574
        return None
André Anjos's avatar
André Anjos committed
575

576

Philip ABBET's avatar
Philip ABBET committed
577
578
    def __iter__(self):
        for i in range(len(self)): yield self[i]
André Anjos's avatar
André Anjos committed
579

580

Philip ABBET's avatar
Philip ABBET committed
581
582
    def __len__(self):
        return reduce(lambda x, y: x + len(y), self._groups, 0)
André Anjos's avatar
André Anjos committed
583

584

Philip ABBET's avatar
Philip ABBET committed
585
586
    def nbGroups(self):
        return len(self._groups)
André Anjos's avatar
André Anjos committed
587

588

Philip ABBET's avatar
Philip ABBET committed
589
590
591
592
593
    def groupOf(self, input_name):
        try:
            return [k for k in self._groups if k[input_name] is not None][0]
        except:
            return None
André Anjos's avatar
André Anjos committed
594

595

Philip ABBET's avatar
Philip ABBET committed
596
597
598
    def hasMoreData(self):
        """Indicates if there is more data to process in any group"""
        return bool([x for x in self._groups if x.hasMoreData()])
André Anjos's avatar
André Anjos committed
599
600


Philip ABBET's avatar
Philip ABBET committed
601
602
603
604
605
606
607
608
609
610
    def group(self, name_or_index):
        if isinstance(name_or_index, six.string_types):
            try:
                return [x for x in self._groups if x.channel == name_or_index][0]
            except:
                return None
        elif isinstance(name_or_index, int):
            return self._groups[name_or_index]
        else:
            return None