Commit 4e46c864 authored by Philip ABBET's avatar Philip ABBET

Refactoring: all kind of Inputs and InputGroups are now in this package

parent 890da671
......@@ -43,6 +43,224 @@ class Input:
:py:class:`beat.backend.python.inputs.InputList`)
Parameters:
name (str): Name of the input
data_format (str): Data format accepted by the input
data_source (beat.core.platform.data.DataSource): Source of data to be used
by the input
Attributes:
group (beat.core.inputs.InputGroup): Group containing this input
name (str): Name of the input (algorithm-specific)
data (beat.core.baseformat.baseformat): The last block of data received on
the input
data_index (int): Index of the last block of data received on the input
(see the section *Inputs synchronization* of the User's Guide)
data_index_end (int): End index of the last block of data received on the
input (see the section *Inputs synchronization* of the User's Guide)
data_format (str): Data format accepted by the input
data_source (beat.core.data.DataSource): Source of data used by the output
nb_data_blocks_read (int): Number of data blocks read so far
"""
def __init__(self, name, data_format, data_source):
self.group = None
self.name = name
self.data = None
self.data_index = -1
self.data_index_end = -1
self.data_same_as_previous = False
self.data_format = data_format
self.data_source = data_source
self.nb_data_blocks_read = 0
def isDataUnitDone(self):
"""Indicates if the current data unit will change at the next iteration"""
return (self.data_index_end == self.group.data_index_end)
def hasMoreData(self):
"""Indicates if there is more data to process on the input"""
return self.data_source.hasMoreData()
def next(self):
"""Retrieves the next block of data"""
(self.data, self.data_index, self.data_index_end) = self.data_source.next()
self.data_same_as_previous = False
self.nb_data_blocks_read += 1
#----------------------------------------------------------
class InputGroup:
"""Represents a group of inputs synchronized together
A group implementing this interface is provided to the algorithms (see
:py:class:`beat.backend.python.inputs.InputList`).
See :py:class:`beat.core.inputs.Input`
Example:
.. code-block:: python
inputs = InputList()
print inputs['labels'].data_format
for index in range(0, len(inputs)):
print inputs[index].data_format
for input in inputs:
print input.data_format
for input in inputs[0:2]:
print input.data_format
Parameters:
channel (str): Name of the data channel of the group
synchronization_listener (beat.core.outputs.SynchronizationListener):
Synchronization listener to use
restricted_access (bool): Indicates if the algorithm can freely use the
inputs
Atttributes:
data_index (int): Index of the last block of data received on the inputs
(see the section *Inputs synchronization* of the User's Guide)
data_index_end (int): End index of the last block of data received on the
inputs (see the section *Inputs synchronization* of the User's Guide)
channel (str): Name of the data channel of the group
synchronization_listener (beat.core.outputs.SynchronizationListener):
Synchronization listener used
"""
def __init__(self, channel, synchronization_listener=None,
restricted_access=True):
self._inputs = []
self.data_index = -1
self.data_index_end = -1
self.channel = channel
self.synchronization_listener = synchronization_listener
self.restricted_access = restricted_access
def __getitem__(self, index):
if isinstance(index, six.string_types):
try:
return [x for x in self._inputs if x.name == index][0]
except:
pass
elif isinstance(index, int):
if index < len(self._inputs):
return self._inputs[index]
return None
def __iter__(self):
for k in self._inputs: yield k
def __len__(self):
return len(self._inputs)
def add(self, input):
"""Add an input to the group
Parameters:
input (beat.core.inputs.Input): The input to add
"""
input.group = self
self._inputs.append(input)
def hasMoreData(self):
"""Indicates if there is more data to process in the group"""
return bool([x for x in self._inputs if x.hasMoreData()])
def next(self):
"""Retrieve the next block of data on all the inputs"""
# Only for groups not managed by the platform
if self.restricted_access: raise RuntimeError('Not authorized')
# Only retrieve new data on the inputs where the current data expire first
lower_end_index = reduce(lambda x, y: min(x, y.data_index_end),
self._inputs[1:], self._inputs[0].data_index_end)
inputs_to_update = [x for x in self._inputs \
if x.data_index_end == lower_end_index]
inputs_up_to_date = [x for x in self._inputs if x not in inputs_to_update]
for input in inputs_to_update:
input.next()
input.data_same_as_previous = False
for input in inputs_up_to_date:
input.data_same_as_previous = True
# Compute the group's start and end indices
self.data_index = reduce(lambda x, y: max(x, y.data_index),
self._inputs[1:], self._inputs[0].data_index)
self.data_index_end = reduce(lambda x, y: min(x, y.data_index_end),
self._inputs[1:], self._inputs[0].data_index_end)
# Inform the synchronisation listener
if self.synchronization_listener is not None:
self.synchronization_listener.onIntervalChanged(self.data_index,
self.data_index_end)
#----------------------------------------------------------
class RemoteInput:
"""Allows to access the input of a processing block, via a socket.
The other end of the socket must be managed by a message handler (see
:py:class:`beat.backend.python.message_handler.MessageHandler`)
A list of those inputs must be provided to the algorithms (see
:py:class:`beat.backend.python.inputs.InputList`)
Parameters:
name (str): Name of the input
......@@ -127,8 +345,14 @@ class Input:
self.data.unpack(packed)
class InputGroup:
"""Represents a group of inputs synchronized together
#----------------------------------------------------------
class RemoteInputGroup:
"""Allows to access a group of inputs synchronized together, via a socket.
The other end of the socket must be managed by a message handler (see
:py:class:`beat.backend.python.message_handler.MessageHandler`)
A group implementing this interface is provided to the algorithms (see
:py:class:`beat.core.inputs.InputList`).
......@@ -164,7 +388,6 @@ class InputGroup:
"""
def __init__(self, channel, restricted_access, socket):
self._inputs = []
......@@ -253,6 +476,9 @@ class InputGroup:
self.comm_time += time.time() - _start
#----------------------------------------------------------
class InputList:
"""Represents the list of inputs of a processing block
......@@ -321,6 +547,7 @@ class InputList:
self._groups.append(group)
def __getitem__(self, index):
if isinstance(index, six.string_types):
try:
......@@ -336,21 +563,26 @@ class InputList:
return None
def __iter__(self):
for i in range(len(self)): yield self[i]
def __len__(self):
return reduce(lambda x, y: x + len(y), self._groups, 0)
def nbGroups(self):
return len(self._groups)
def groupOf(self, input_name):
try:
return [k for k in self._groups if k[input_name] is not None][0]
except:
return None
def hasMoreData(self):
"""Indicates if there is more data to process in any group"""
return bool([x for x in self._groups if x.hasMoreData()])
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment