Commit 5eff3119 authored by Philip ABBET's avatar Philip ABBET

Refactoring of the inputs

parent 50e1a4ec
...@@ -36,8 +36,11 @@ import six ...@@ -36,8 +36,11 @@ import six
import zmq import zmq
class Input: #----------------------------------------------------------
"""Represents the input of a processing block
class BaseInput(object):
"""Base class for all the kinds of input of a processing block
A list of those inputs must be provided to the algorithms (see A list of those inputs must be provided to the algorithms (see
:py:class:`beat.backend.python.inputs.InputList`) :py:class:`beat.backend.python.inputs.InputList`)
...@@ -49,9 +52,6 @@ class Input: ...@@ -49,9 +52,6 @@ class Input:
data_format (str): Data format accepted by the input data_format (str): Data format accepted by the input
data_source (beat.core.platform.data.DataSource): Source of data to be used
by the input
Attributes: Attributes:
...@@ -70,13 +70,14 @@ class Input: ...@@ -70,13 +70,14 @@ class Input:
data_format (str): Data format accepted by the input data_format (str): Data format accepted by the input
data_source (beat.core.data.DataSource): Source of data used by the output data_same_as_previous (bool): Indicates if the last block of data received
was changed (see the section *Inputs synchronization* of the User's Guide)
nb_data_blocks_read (int): Number of data blocks read so far nb_data_blocks_read (int): Number of data blocks read so far
""" """
def __init__(self, name, data_format, data_source): def __init__(self, name, data_format):
self.group = None self.group = None
self.name = str(name) self.name = str(name)
...@@ -85,7 +86,6 @@ class Input: ...@@ -85,7 +86,6 @@ class Input:
self.data_index_end = -1 self.data_index_end = -1
self.data_same_as_previous = True self.data_same_as_previous = True
self.data_format = data_format self.data_format = data_format
self.data_source = data_source
self.nb_data_blocks_read = 0 self.nb_data_blocks_read = 0
...@@ -101,7 +101,7 @@ class Input: ...@@ -101,7 +101,7 @@ class Input:
def hasMoreData(self): def hasMoreData(self):
"""Indicates if there is more data to process on the input""" """Indicates if there is more data to process on the input"""
return self.data_source.hasMoreData() raise NotImplemented
def hasDataChanged(self): def hasDataChanged(self):
...@@ -111,6 +111,51 @@ class Input: ...@@ -111,6 +111,51 @@ class Input:
return not self.data_same_as_previous return not self.data_same_as_previous
def next(self):
"""Retrieves the next block of data"""
raise NotImplemented
#----------------------------------------------------------
class Input(BaseInput):
"""Represents an input of a processing block that receive data from a
data source
A list of those inputs must be provided to the algorithms (see
:py:class:`beat.backend.python.inputs.InputList`)
Parameters:
name (str): Name of the input
data_format (str): Data format accepted by the input
data_source (beat.core.platform.data.DataSource): Source of data to be used
by the input
Attributes:
data_source (beat.core.data.DataSource): Source of data used by the output
"""
def __init__(self, name, data_format, data_source):
super(Input, self).__init__(name, data_format)
self.data_source = data_source
def hasMoreData(self):
"""Indicates if there is more data to process on the input"""
return self.data_source.hasMoreData()
def next(self): def next(self):
"""Retrieves the next block of data""" """Retrieves the next block of data"""
...@@ -158,7 +203,7 @@ def process_error(socket): ...@@ -158,7 +203,7 @@ def process_error(socket):
#---------------------------------------------------------- #----------------------------------------------------------
class RemoteInput: class RemoteInput(BaseInput):
"""Allows to access the input of a processing block, via a socket. """Allows to access the input of a processing block, via a socket.
The other end of the socket must be managed by a message handler (see The other end of the socket must be managed by a message handler (see
...@@ -188,36 +233,11 @@ class RemoteInput: ...@@ -188,36 +233,11 @@ class RemoteInput:
""" """
def __init__(self, name, data_format, socket, unpack=True): def __init__(self, name, data_format, socket, unpack=True):
super(RemoteInput, self).__init__(name, data_format)
self.name = str(name) self.socket = socket
self.data_format = data_format self.comm_time = 0.0 # Total time spent on communication
self.socket = socket self._unpack = unpack
self.data = None
self.data_index = -1
self.data_index_end = -1
self.group = None
self.comm_time = 0. #total time spent on communication
self.nb_data_blocks_read = 0
self._unpack = unpack
def isDataUnitDone(self):
"""Indicates if the current data unit will change at the next iteration"""
logger.debug('send: (idd) is-dataunit-done %s', self.name)
_start = time.time()
self.socket.send('idd', zmq.SNDMORE)
self.socket.send(self.group.channel, zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
return answer == 'tru'
def hasMoreData(self): def hasMoreData(self):
...@@ -242,29 +262,6 @@ class RemoteInput: ...@@ -242,29 +262,6 @@ class RemoteInput:
return answer == 'tru' return answer == 'tru'
def hasDataChanged(self):
"""Indicates if the current data unit is different than the one at the
previous iteration"""
logger.debug('send: (hdc) has-data-changed %s %s', self.group.channel, self.name)
_start = time.time()
self.socket.send('hdc', zmq.SNDMORE)
self.socket.send(self.group.channel, zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
if answer == 'err':
process_error(self.socket)
return answer == 'tru'
def next(self): def next(self):
"""Retrieves the next block of data""" """Retrieves the next block of data"""
...@@ -289,6 +286,8 @@ class RemoteInput: ...@@ -289,6 +286,8 @@ class RemoteInput:
self.comm_time += time.time() - _start self.comm_time += time.time() - _start
self.nb_data_blocks_read += 1 self.nb_data_blocks_read += 1
self.data_same_as_previous = False
def unpack(self, packed): def unpack(self, packed):
"""Receives data through socket""" """Receives data through socket"""
......
...@@ -75,8 +75,6 @@ class MessageHandler(threading.Thread): ...@@ -75,8 +75,6 @@ class MessageHandler(threading.Thread):
self.callbacks = dict( self.callbacks = dict(
nxt = self.next, nxt = self.next,
hmd = self.has_more_data, hmd = self.has_more_data,
hdc = self.has_data_changed,
idd = self.is_dataunit_done,
don = self.done, don = self.done,
err = self.error, err = self.error,
) )
...@@ -252,26 +250,6 @@ class MessageHandler(threading.Thread): ...@@ -252,26 +250,6 @@ class MessageHandler(threading.Thread):
self.socket.send(what) self.socket.send(what)
def has_data_changed(self, channel, name):
"""Syntax: hdc channel name"""
logger.debug('recv: hdc %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.hasDataChanged() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def is_dataunit_done(self, channel, name):
"""Syntax: idd channel name"""
logger.debug('recv: idd %s %s', channel, name)
input_candidate = self._get_input_candidate(channel, name)
what = 'tru' if input_candidate.isDataUnitDone() else 'fal'
logger.debug('send: %s', what)
self.socket.send(what)
def _acknowledge(self): def _acknowledge(self):
logger.debug('send: ack') logger.debug('send: ack')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment