inputs.py 18.4 KB
Newer Older
André Anjos's avatar
André Anjos committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#!/usr/bin/env python
# vim: set fileencoding=utf-8 :

###############################################################################
#                                                                             #
# Copyright (c) 2016 Idiap Research Institute, http://www.idiap.ch/           #
# Contact: beat.support@idiap.ch                                              #
#                                                                             #
# This file is part of the beat.backend.python module of the BEAT platform.   #
#                                                                             #
# Commercial License Usage                                                    #
# Licensees holding valid commercial BEAT licenses may use this file in       #
# accordance with the terms contained in a written agreement between you      #
# and Idiap. For further information contact tto@idiap.ch                     #
#                                                                             #
# Alternatively, this file may be used under the terms of the GNU Affero      #
# Public License version 3 as published by the Free Software and appearing    #
# in the file LICENSE.AGPL included in the packaging of this file.            #
# The BEAT platform is distributed in the hope that it will be useful, but    #
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY  #
# or FITNESS FOR A PARTICULAR PURPOSE.                                        #
#                                                                             #
# You should have received a copy of the GNU Affero Public License along      #
# with the BEAT platform. If not, see http://www.gnu.org/licenses/.           #
#                                                                             #
###############################################################################


import time
import logging
logger = logging.getLogger(__name__)

from functools import reduce

import six
import zmq


Philip ABBET's avatar
Philip ABBET committed
39
40
41
42
43
#----------------------------------------------------------


class BaseInput(object):
    """Base class for all the kinds of input of a processing block
André Anjos's avatar
André Anjos committed
44

Philip ABBET's avatar
Philip ABBET committed
45
46
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
André Anjos's avatar
André Anjos committed
47
48


Philip ABBET's avatar
Philip ABBET committed
49
    Parameters:
50

Philip ABBET's avatar
Philip ABBET committed
51
      name (str): Name of the input
52

Philip ABBET's avatar
Philip ABBET committed
53
      data_format (str): Data format accepted by the input
54
55


Philip ABBET's avatar
Philip ABBET committed
56
    Attributes:
57

Philip ABBET's avatar
Philip ABBET committed
58
      group (beat.core.inputs.InputGroup): Group containing this input
59

Philip ABBET's avatar
Philip ABBET committed
60
      name (str): Name of the input (algorithm-specific)
61

Philip ABBET's avatar
Philip ABBET committed
62
63
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
64

Philip ABBET's avatar
Philip ABBET committed
65
66
      data_index (int): Index of the last block of data received on the input
        (see the section *Inputs synchronization* of the User's Guide)
67

Philip ABBET's avatar
Philip ABBET committed
68
69
      data_index_end (int): End index of the last block of data received on the
        input (see the section *Inputs synchronization* of the User's Guide)
70

Philip ABBET's avatar
Philip ABBET committed
71
      data_format (str): Data format accepted by the input
72

Philip ABBET's avatar
Philip ABBET committed
73
74
      data_same_as_previous (bool): Indicates if the last block of data received
        was changed (see the section *Inputs synchronization* of the User's Guide)
75

Philip ABBET's avatar
Philip ABBET committed
76
      nb_data_blocks_read (int): Number of data blocks read so far
77

Philip ABBET's avatar
Philip ABBET committed
78
    """
79

Philip ABBET's avatar
Philip ABBET committed
80
    def __init__(self, name, data_format):
81

82
83
84
85
86
87
88
89
        self.group                 = None
        self.name                  = str(name)
        self.data                  = None
        self.data_index            = -1
        self.data_index_end        = -1
        self.data_same_as_previous = True
        self.data_format           = data_format
        self.nb_data_blocks_read   = 0
90
91


Philip ABBET's avatar
Philip ABBET committed
92
93
    def isDataUnitDone(self):
        """Indicates if the current data unit will change at the next iteration"""
94

95
        if (self.data_index_end >= 0) and (self.group.last_data_index == -1):
96
97
            return True

98
        return (self.data_index_end == self.group.last_data_index)
99
100


Philip ABBET's avatar
Philip ABBET committed
101
102
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
103

Philip ABBET's avatar
Philip ABBET committed
104
        raise NotImplemented
105
106


107
108
109
110
111
112
113
    def hasDataChanged(self):
        """Indicates if the current data unit is different than the one at the
        previous iteration"""

        return not self.data_same_as_previous


Philip ABBET's avatar
Philip ABBET committed
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    def next(self):
        """Retrieves the next block of data"""

        raise NotImplemented


#----------------------------------------------------------


class Input(BaseInput):
    """Represents an input of a processing block that receive data from a
    data source

    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)


    Parameters:

      name (str): Name of the input

      data_format (str): Data format accepted by the input

      data_source (beat.core.platform.data.DataSource): Source of data to be used
        by the input


    Attributes:

      data_source (beat.core.data.DataSource): Source of data used by the output

    """

    def __init__(self, name, data_format, data_source):
        super(Input, self).__init__(name, data_format)

        self.data_source = data_source


    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""

        return self.data_source.hasMoreData()


Philip ABBET's avatar
Philip ABBET committed
159
160
    def next(self):
        """Retrieves the next block of data"""
161

162
163
164
        if self.group.restricted_access:
            raise RuntimeError('Not authorized')

Philip ABBET's avatar
Philip ABBET committed
165
        (self.data, self.data_index, self.data_index_end) = self.data_source.next()
166

Philip ABBET's avatar
Philip ABBET committed
167
168
169
170
171
172
        if self.data is None:
            message = "User algorithm asked for more data for channel " \
                      "`%s' on input `%s', but it is over (no more data). This " \
                      "normally indicates a programming error on the user " \
                      "side." % (self.group.channel, self.name)
            raise RuntimeError(message)
173

Philip ABBET's avatar
Philip ABBET committed
174
175
        self.data_same_as_previous = False
        self.nb_data_blocks_read += 1
176
177
178
179
180


#----------------------------------------------------------


181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class RemoteException(Exception):

    def __init__(self, kind, message):
        super(RemoteException, self).__init__()

        if kind == 'sys':
            self.system_error = message
            self.user_error = ''
        else:
            self.system_error = ''
            self.user_error = message


#----------------------------------------------------------


def process_error(socket):
    kind = socket.recv()
    message = socket.recv()
    raise RemoteException(kind, message)


#----------------------------------------------------------


Philip ABBET's avatar
Philip ABBET committed
206
class RemoteInput(BaseInput):
Philip ABBET's avatar
Philip ABBET committed
207
    """Allows to access the input of a processing block, via a socket.
208

Philip ABBET's avatar
Philip ABBET committed
209
210
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
211

Philip ABBET's avatar
Philip ABBET committed
212
213
    A list of those inputs must be provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`)
214
215


Philip ABBET's avatar
Philip ABBET committed
216
    Parameters:
André Anjos's avatar
André Anjos committed
217

Philip ABBET's avatar
Philip ABBET committed
218
      name (str): Name of the input
André Anjos's avatar
André Anjos committed
219

Philip ABBET's avatar
Philip ABBET committed
220
221
      data_format (object): An object with the preloaded data format for this
        input (see :py:class:`beat.backend.python.dataformat.DataFormat`).
André Anjos's avatar
André Anjos committed
222

Philip ABBET's avatar
Philip ABBET committed
223
      socket (object): A 0MQ socket for writing the data to the server process
André Anjos's avatar
André Anjos committed
224
225


Philip ABBET's avatar
Philip ABBET committed
226
    Attributes:
André Anjos's avatar
André Anjos committed
227

Philip ABBET's avatar
Philip ABBET committed
228
      group (beat.backend.python.inputs.InputGroup): Group containing this input
André Anjos's avatar
André Anjos committed
229

Philip ABBET's avatar
Philip ABBET committed
230
231
      data (beat.core.baseformat.baseformat): The last block of data received on
        the input
André Anjos's avatar
André Anjos committed
232

Philip ABBET's avatar
Philip ABBET committed
233
    """
André Anjos's avatar
André Anjos committed
234

Philip ABBET's avatar
Philip ABBET committed
235
    def __init__(self, name, data_format, socket, unpack=True):
Philip ABBET's avatar
Philip ABBET committed
236
        super(RemoteInput, self).__init__(name, data_format)
André Anjos's avatar
André Anjos committed
237

238
239
240
241
        self.socket         = socket
        self.comm_time      = 0.0     # Total time spent on communication
        self._unpack        = unpack
        self._has_more_data = None    # To avoid repetitive requests
André Anjos's avatar
André Anjos committed
242
243


Philip ABBET's avatar
Philip ABBET committed
244
245
    def hasMoreData(self):
        """Indicates if there is more data to process on the input"""
André Anjos's avatar
André Anjos committed
246

247
248
        if self._has_more_data is None:
            logger.debug('send: (hmd) has-more-data %s %s', self.group.channel, self.name)
249

250
            _start = time.time()
251

252
253
254
            self.socket.send('hmd', zmq.SNDMORE)
            self.socket.send(self.group.channel, zmq.SNDMORE)
            self.socket.send(self.name)
255

256
            answer = self.socket.recv()
257

258
259
            self.comm_time += time.time() - _start
            logger.debug('recv: %s', answer)
260

261
262
            if answer == 'err':
                process_error(self.socket)
263

264
265
266
            self._has_more_data = (answer == 'tru')

        return self._has_more_data
André Anjos's avatar
André Anjos committed
267
268


Philip ABBET's avatar
Philip ABBET committed
269
270
    def next(self):
        """Retrieves the next block of data"""
André Anjos's avatar
André Anjos committed
271

Philip ABBET's avatar
Philip ABBET committed
272
        logger.debug('send: (nxt) next %s %s', self.group.channel, self.name)
273

Philip ABBET's avatar
Philip ABBET committed
274
        _start = time.time()
275

Philip ABBET's avatar
Philip ABBET committed
276
277
278
        self.socket.send('nxt', zmq.SNDMORE)
        self.socket.send(self.group.channel, zmq.SNDMORE)
        self.socket.send(self.name)
279

Philip ABBET's avatar
Philip ABBET committed
280
        answer = self.socket.recv()
281

Philip ABBET's avatar
Philip ABBET committed
282
283
284
        if answer == 'err':
            self.comm_time += time.time() - _start
            process_error(self.socket)
285

Philip ABBET's avatar
Philip ABBET committed
286
287
288
        self.data_index = int(answer)
        self.data_index_end = int(self.socket.recv())
        self.unpack(self.socket.recv())
289

Philip ABBET's avatar
Philip ABBET committed
290
291
        self.comm_time += time.time() - _start
        self.nb_data_blocks_read += 1
André Anjos's avatar
André Anjos committed
292

Philip ABBET's avatar
Philip ABBET committed
293
        self.data_same_as_previous = False
294
        self._has_more_data = None
Philip ABBET's avatar
Philip ABBET committed
295

André Anjos's avatar
André Anjos committed
296

Philip ABBET's avatar
Philip ABBET committed
297
298
    def unpack(self, packed):
        """Receives data through socket"""
André Anjos's avatar
André Anjos committed
299

Philip ABBET's avatar
Philip ABBET committed
300
301
        logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
                     self.data_index, self.data_index_end)
302

Philip ABBET's avatar
Philip ABBET committed
303
304
305
306
307
        if self.unpack:
            self.data = self.data_format.type()
            self.data.unpack(packed)
        else:
            self.data = packed
André Anjos's avatar
André Anjos committed
308
309


310
311
312
#----------------------------------------------------------


313
class InputGroup:
Philip ABBET's avatar
Philip ABBET committed
314
    """Represents a group of inputs synchronized together
André Anjos's avatar
André Anjos committed
315

Philip ABBET's avatar
Philip ABBET committed
316
317
318
    The inputs can be either "local" ones (reading data from the cache) or
    "remote" ones (using a socket to communicate with a database view output
    located inside a docker container).
André Anjos's avatar
André Anjos committed
319

Philip ABBET's avatar
Philip ABBET committed
320
321
    The other end of the socket must be managed by a message handler (see
    :py:class:`beat.backend.python.message_handler.MessageHandler`)
André Anjos's avatar
André Anjos committed
322

Philip ABBET's avatar
Philip ABBET committed
323
324
    A group implementing this interface is provided to the algorithms (see
    :py:class:`beat.backend.python.inputs.InputList`).
André Anjos's avatar
André Anjos committed
325

Philip ABBET's avatar
Philip ABBET committed
326
    See :py:class:`beat.core.inputs.Input`
André Anjos's avatar
André Anjos committed
327

Philip ABBET's avatar
Philip ABBET committed
328
    Example:
André Anjos's avatar
André Anjos committed
329

Philip ABBET's avatar
Philip ABBET committed
330
      .. code-block:: python
331

Philip ABBET's avatar
Philip ABBET committed
332
         inputs = InputList()
André Anjos's avatar
André Anjos committed
333

Philip ABBET's avatar
Philip ABBET committed
334
         print inputs['labels'].data_format
335

Philip ABBET's avatar
Philip ABBET committed
336
337
         for index in range(0, len(inputs)):
             print inputs[index].data_format
338

Philip ABBET's avatar
Philip ABBET committed
339
340
         for input in inputs:
             print input.data_format
341

Philip ABBET's avatar
Philip ABBET committed
342
343
         for input in inputs[0:2]:
             print input.data_format
344
345


Philip ABBET's avatar
Philip ABBET committed
346
    Parameters:
André Anjos's avatar
André Anjos committed
347

Philip ABBET's avatar
Philip ABBET committed
348
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
349

Philip ABBET's avatar
Philip ABBET committed
350
351
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener to use
André Anjos's avatar
André Anjos committed
352

Philip ABBET's avatar
Philip ABBET committed
353
354
      restricted_access (bool): Indicates if the algorithm can freely use the
        inputs
André Anjos's avatar
André Anjos committed
355
356


Philip ABBET's avatar
Philip ABBET committed
357
    Attributes:
André Anjos's avatar
André Anjos committed
358

Philip ABBET's avatar
Philip ABBET committed
359
360
      data_index (int): Index of the last block of data received on the inputs
        (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
361

Philip ABBET's avatar
Philip ABBET committed
362
363
      data_index_end (int): End index of the last block of data received on the
        inputs (see the section *Inputs synchronization* of the User's Guide)
André Anjos's avatar
André Anjos committed
364

Philip ABBET's avatar
Philip ABBET committed
365
      channel (str): Name of the data channel of the group
André Anjos's avatar
André Anjos committed
366

Philip ABBET's avatar
Philip ABBET committed
367
368
      synchronization_listener (beat.core.outputs.SynchronizationListener):
        Synchronization listener used
André Anjos's avatar
André Anjos committed
369
370

    """
371

Philip ABBET's avatar
Philip ABBET committed
372
373
374
375
    def __init__(self, channel, synchronization_listener=None,
            restricted_access=True):

        self._inputs                  = []
376
377
378
379
        self.data_index               = -1  # Lower index across all inputs
        self.data_index_end           = -1  # Bigger index across all inputs
        self.first_data_index         = -1  # Start index of the current data units
        self.last_data_index          = -1  # End index of the current data units
Philip ABBET's avatar
Philip ABBET committed
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
        self.channel                  = str(channel)
        self.synchronization_listener = synchronization_listener
        self.restricted_access        = restricted_access
        self.socket                   = None
        self.comm_time                = 0.


    def __getitem__(self, index):

        if isinstance(index, six.string_types):
            try:
                return [x for x in self._inputs if x.name == index][0]
            except:
                pass
        elif isinstance(index, int):
            if index < len(self._inputs):
                return self._inputs[index]
        return None
398
399


Philip ABBET's avatar
Philip ABBET committed
400
401
    def __iter__(self):
        for k in self._inputs: yield k
402

André Anjos's avatar
André Anjos committed
403

Philip ABBET's avatar
Philip ABBET committed
404
405
    def __len__(self):
        return len(self._inputs)
406
407


Philip ABBET's avatar
Philip ABBET committed
408
409
    def add(self, input):
        """Add an input to the group
410

Philip ABBET's avatar
Philip ABBET committed
411
        Parameters:
412

Philip ABBET's avatar
Philip ABBET committed
413
414
          input (beat.backend.python.inputs.Input or beat.backend.python.inputs.RemoteInput):
                  The input to add
415

Philip ABBET's avatar
Philip ABBET committed
416
        """
417

Philip ABBET's avatar
Philip ABBET committed
418
419
        if isinstance(input, RemoteInput) and (self.socket is None):
            self.socket = input.socket
420

Philip ABBET's avatar
Philip ABBET committed
421
422
        input.group = self
        self._inputs.append(input)
423

André Anjos's avatar
André Anjos committed
424

Philip ABBET's avatar
Philip ABBET committed
425
426
    def localInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, Input) ]: yield k
André Anjos's avatar
André Anjos committed
427
428


Philip ABBET's avatar
Philip ABBET committed
429
430
    def remoteInputs(self):
        for k in [ x for x in self._inputs if isinstance(x, RemoteInput) ]: yield k
André Anjos's avatar
André Anjos committed
431

432

Philip ABBET's avatar
Philip ABBET committed
433
434
    def hasMoreData(self):
        """Indicates if there is more data to process in the group"""
435

Philip ABBET's avatar
Philip ABBET committed
436
437
438
439
        # First process the local inputs
        res = bool([x for x in self.localInputs() if x.hasMoreData()])
        if res:
            return True
440

Philip ABBET's avatar
Philip ABBET committed
441
442
443
        # Next process the remote inputs
        if self.socket is None:
            return False
444

445
446
447
        for x in self.remoteInputs():
            if x.hasMoreData():
                return True
448

449
        return False
Philip ABBET's avatar
Philip ABBET committed
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469


    def next(self):
        """Retrieve the next block of data on all the inputs"""

        # Only for groups not managed by the platform
        if self.restricted_access:
            raise RuntimeError('Not authorized')

        # Only retrieve new data on the inputs where the current data expire first
        lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)
        inputs_to_update = [x for x in self._inputs \
                if x.data_index_end == lower_end_index]
        inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]

        for input in [ x for x in inputs_to_update if isinstance(x, Input) ]:
            input.next()

        remote_inputs_to_update = list([ x for x in inputs_to_update if isinstance(x, RemoteInput) ])
470
471
        for remote_input in remote_inputs_to_update:
            remote_input.next()
Philip ABBET's avatar
Philip ABBET committed
472
473
474
475
476

        for input in inputs_up_to_date:
            input.data_same_as_previous = True

        # Compute the group's start and end indices
477
        self.data_index = reduce(lambda x, y: min(x, y.data_index),
Philip ABBET's avatar
Philip ABBET committed
478
                self._inputs[1:], self._inputs[0].data_index)
479
480
481
482
483
484
        self.data_index_end = reduce(lambda x, y: max(x, y.data_index_end),
                self._inputs[1:], self._inputs[0].data_index_end)

        self.first_data_index = reduce(lambda x, y: max(x, y.data_index),
                self._inputs[1:], self._inputs[0].data_index)
        self.last_data_index = reduce(lambda x, y: min(x, y.data_index_end),
Philip ABBET's avatar
Philip ABBET committed
485
486
487
488
                self._inputs[1:], self._inputs[0].data_index_end)

        # Inform the synchronisation listener
        if self.synchronization_listener is not None:
489
490
            self.synchronization_listener.onIntervalChanged(self.first_data_index,
                                                            self.last_data_index)
André Anjos's avatar
André Anjos committed
491
492


493
494
495
#----------------------------------------------------------


André Anjos's avatar
André Anjos committed
496
class InputList:
Philip ABBET's avatar
Philip ABBET committed
497
    """Represents the list of inputs of a processing block
André Anjos's avatar
André Anjos committed
498

Philip ABBET's avatar
Philip ABBET committed
499
500
501
    Inputs are organized by groups. The inputs inside a group are all
    synchronized together (see the section *Inputs synchronization* of the User's
    Guide).
André Anjos's avatar
André Anjos committed
502

Philip ABBET's avatar
Philip ABBET committed
503
    A list implementing this interface is provided to the algorithms
André Anjos's avatar
André Anjos committed
504

Philip ABBET's avatar
Philip ABBET committed
505
506
    One group of inputs is always considered as the **main** one, and is used to
    drive the algorithm. The usage of the other groups is left to the algorithm.
André Anjos's avatar
André Anjos committed
507

Philip ABBET's avatar
Philip ABBET committed
508
509
    See :py:class:`beat.core.inputs.Input`
    See :py:class:`beat.core.inputs.InputGroup`
André Anjos's avatar
André Anjos committed
510
511


Philip ABBET's avatar
Philip ABBET committed
512
    Example:
André Anjos's avatar
André Anjos committed
513

Philip ABBET's avatar
Philip ABBET committed
514
      .. code-block:: python
André Anjos's avatar
André Anjos committed
515

Philip ABBET's avatar
Philip ABBET committed
516
517
         inputs = InputList()
         ...
André Anjos's avatar
André Anjos committed
518

Philip ABBET's avatar
Philip ABBET committed
519
520
         # Retrieve an input by name
         input = inputs['labels']
André Anjos's avatar
André Anjos committed
521

Philip ABBET's avatar
Philip ABBET committed
522
523
524
         # Retrieve an input by index
         for index in range(0, len(inputs)):
             input = inputs[index]
André Anjos's avatar
André Anjos committed
525

Philip ABBET's avatar
Philip ABBET committed
526
527
528
         # Iteration over all inputs
         for input in inputs:
             ...
André Anjos's avatar
André Anjos committed
529

Philip ABBET's avatar
Philip ABBET committed
530
531
532
         # Iteration over some inputs
         for input in inputs[0:2]:
             ...
André Anjos's avatar
André Anjos committed
533

Philip ABBET's avatar
Philip ABBET committed
534
535
         # Retrieve the group an input belongs to, by input name
         group = inputs.groupOf('label')
André Anjos's avatar
André Anjos committed
536

Philip ABBET's avatar
Philip ABBET committed
537
538
539
         # Retrieve the group an input belongs to
         input = inputs['labels']
         group = input.group
André Anjos's avatar
André Anjos committed
540
541


Philip ABBET's avatar
Philip ABBET committed
542
    Attributes:
André Anjos's avatar
André Anjos committed
543

Philip ABBET's avatar
Philip ABBET committed
544
545
      group (beat.core.inputs.InputGroup): Main group (for data-driven
        algorithms)
André Anjos's avatar
André Anjos committed
546

Philip ABBET's avatar
Philip ABBET committed
547
    """
André Anjos's avatar
André Anjos committed
548

Philip ABBET's avatar
Philip ABBET committed
549
550
551
    def __init__(self):
        self._groups = []
        self.main_group = None
André Anjos's avatar
André Anjos committed
552
553


Philip ABBET's avatar
Philip ABBET committed
554
555
    def add(self, group):
        """Add a group to the list
André Anjos's avatar
André Anjos committed
556

Philip ABBET's avatar
Philip ABBET committed
557
558
559
560
        :param beat.core.platform.inputs.InputGroup group: The group to add
        """
        if group.restricted_access and (self.main_group is None):
            self.main_group = group
André Anjos's avatar
André Anjos committed
561

Philip ABBET's avatar
Philip ABBET committed
562
        self._groups.append(group)
André Anjos's avatar
André Anjos committed
563

564

Philip ABBET's avatar
Philip ABBET committed
565
566
567
568
569
570
571
    def __getitem__(self, index):
        if isinstance(index, six.string_types):
            try:
                return [k for k in map(lambda x: x[index], self._groups) \
                        if k is not None][0]
            except:
                pass
André Anjos's avatar
André Anjos committed
572

Philip ABBET's avatar
Philip ABBET committed
573
574
575
576
        elif isinstance(index, int):
            for group in self._groups:
                if index < len(group): return group[index]
                index -= len(group)
André Anjos's avatar
André Anjos committed
577

Philip ABBET's avatar
Philip ABBET committed
578
        return None
André Anjos's avatar
André Anjos committed
579

580

Philip ABBET's avatar
Philip ABBET committed
581
582
    def __iter__(self):
        for i in range(len(self)): yield self[i]
André Anjos's avatar
André Anjos committed
583

584

Philip ABBET's avatar
Philip ABBET committed
585
586
    def __len__(self):
        return reduce(lambda x, y: x + len(y), self._groups, 0)
André Anjos's avatar
André Anjos committed
587

588

Philip ABBET's avatar
Philip ABBET committed
589
590
    def nbGroups(self):
        return len(self._groups)
André Anjos's avatar
André Anjos committed
591

592

Philip ABBET's avatar
Philip ABBET committed
593
594
595
596
597
    def groupOf(self, input_name):
        try:
            return [k for k in self._groups if k[input_name] is not None][0]
        except:
            return None
André Anjos's avatar
André Anjos committed
598

599

Philip ABBET's avatar
Philip ABBET committed
600
601
602
    def hasMoreData(self):
        """Indicates if there is more data to process in any group"""
        return bool([x for x in self._groups if x.hasMoreData()])
André Anjos's avatar
André Anjos committed
603
604


Philip ABBET's avatar
Philip ABBET committed
605
606
607
608
609
610
611
612
613
614
    def group(self, name_or_index):
        if isinstance(name_or_index, six.string_types):
            try:
                return [x for x in self._groups if x.channel == name_or_index][0]
            except:
                return None
        elif isinstance(name_or_index, int):
            return self._groups[name_or_index]
        else:
            return None