Commit ac5c51d8 authored by Philip ABBET's avatar Philip ABBET

Refactoring of the outputs

parent b3e13776
......@@ -101,7 +101,7 @@ class BaseInput(object):
def hasMoreData(self):
"""Indicates if there is more data to process on the input"""
raise NotImplemented
raise NotImplemented()
def hasDataChanged(self):
......@@ -114,7 +114,7 @@ class BaseInput(object):
def next(self):
"""Retrieves the next block of data"""
raise NotImplemented
raise NotImplemented()
#----------------------------------------------------------
......
......@@ -93,7 +93,7 @@ class MessageHandler(threading.Thread):
break
timeout = 1000 #ms
socks = dict(self.poller.poll(timeout)) #yields to the next greenlet
socks = dict(self.poller.poll(timeout))
if self.socket in socks and socks[self.socket] == zmq.POLLIN:
......
......@@ -52,8 +52,8 @@ class SynchronizationListener:
#----------------------------------------------------------
class Output:
"""Represents one output of a processing block
class BaseOutput(object):
"""Base class for all the kinds of output of a processing block
A list of outputs implementing this interface is provided to the algorithms
(see :py:class:`beat.core.outputs.OutputList`).
......@@ -81,30 +81,14 @@ class Output:
"""
def __init__(self, name, data_sink, synchronization_listener=None,
dataset_output=False, force_start_index=0):
def __init__(self, name, synchronization_listener=None,
dataset_output=False, force_start_index=0):
self.name = str(name)
self.data_sink = data_sink
self.last_written_data_index = force_start_index - 1
self.nb_data_blocks_written = 0
self._synchronization_listener = synchronization_listener
self._dataset_output = dataset_output
self.last_written_data_index = force_start_index-1
self.nb_data_blocks_written = 0
def _createData(self):
"""Retrieves an uninitialized block of data corresponding to the data
format of the output
This method must be called to correctly create a new block of data
"""
if hasattr(self.data_sink, 'dataformat'):
return self.data_sink.dataformat.type()
else:
raise RuntimeError("The currently used data sink is not bound to " \
"a dataformat - you cannot create uninitialized data under " \
"these circumstances")
def write(self, data, end_data_index=None):
......@@ -121,6 +105,20 @@ class Output:
"""
raise NotImplemented()
def isDataMissing(self):
return not(self._dataset_output) and \
(self._synchronization_listener is not None) and \
(self._synchronization_listener.data_index_end != self.last_written_data_index)
def isConnected(self):
raise NotImplemented()
def _compute_end_data_index(self, end_data_index):
if self._dataset_output:
if end_data_index is None:
end_data_index = self.last_written_data_index + 1
......@@ -143,6 +141,82 @@ class Output:
else:
end_data_index = self.last_written_data_index + 1
return end_data_index
#----------------------------------------------------------
class Output(BaseOutput):
"""Represents one output of a processing block
A list of outputs implementing this interface is provided to the algorithms
(see :py:class:`beat.core.outputs.OutputList`).
Parameters:
name (str): Name of the output
data_sink (beat.core.data.DataSink): Sink of data to be used by the output,
pre-configured with the correct data format.
Attributes:
name (str): Name of the output (algorithm-specific)
data_sink (beat.core.data.DataSink): Sink of data used by the output
last_written_data_index (int): Index of the last block of data written by
the output
nb_data_blocks_written (int): Number of data blocks written so far
"""
def __init__(self, name, data_sink, synchronization_listener=None,
dataset_output=False, force_start_index=0):
super(Output, self).__init__(name, synchronization_listener=synchronization_listener,
dataset_output=dataset_output,
force_start_index=force_start_index)
self.data_sink = data_sink
def _createData(self):
"""Retrieves an uninitialized block of data corresponding to the data
format of the output
This method must be called to correctly create a new block of data
"""
if hasattr(self.data_sink, 'dataformat'):
return self.data_sink.dataformat.type()
else:
raise RuntimeError("The currently used data sink is not bound to " \
"a dataformat - you cannot create uninitialized data under " \
"these circumstances")
def write(self, data, end_data_index=None):
"""Write a block of data on the output
Parameters:
data (beat.core.baseformat.baseformat): The block of data to write, or
None (if the algorithm doesn't want to write any data)
end_data_index (int): Last index of the written data (see the section
*Inputs synchronization* of the User's Guide). If not specified, the
*current end data index* of the Inputs List is used
"""
end_data_index = self._compute_end_data_index(end_data_index)
# if the user passes a dictionary, converts to the proper baseformat type
if isinstance(data, dict):
d = self.data_sink.dataformat.type()
......@@ -155,12 +229,6 @@ class Output:
self.nb_data_blocks_written += 1
def isDataMissing(self):
return not(self._dataset_output) and \
(self._synchronization_listener is not None) and \
(self._synchronization_listener.data_index_end != self.last_written_data_index)
def isConnected(self):
return self.data_sink.isConnected()
......@@ -168,7 +236,7 @@ class Output:
#----------------------------------------------------------
class RemoteOutput:
class RemoteOutput(BaseOutput):
"""Represents one output of a processing block
A list of outputs implementing this interface is provided to the algorithms
......@@ -186,15 +254,20 @@ class RemoteOutput:
"""
def __init__(self, name, data_format, socket):
def __init__(self, name, data_format, socket, synchronization_listener=None,
dataset_output=False, force_start_index=0):
self.name = str(name)
self.data_format = data_format
self.socket = socket
self.comm_time = 0. #total time spent on communication
super(RemoteOutput, self).__init__(name, synchronization_listener=synchronization_listener,
dataset_output=dataset_output,
force_start_index=force_start_index)
self.data_format = data_format
self.socket = socket
self.comm_time = 0.0 # Total time spent on communication
self._is_connected = None
def write(self, data):
def write(self, data, end_data_index=None):
"""Write a block of data to the output socket
Parameters:
......@@ -203,6 +276,8 @@ class RemoteOutput:
write, or ``None`` (if the algorithm doesn't want to write any data)
"""
end_data_index = self._compute_end_data_index(end_data_index)
# if the user passes a dictionary, converts to the proper baseformat type
if isinstance(data, dict):
d = self.data_format.type()
......@@ -221,41 +296,43 @@ class RemoteOutput:
# packs the data (before starting sending the message)
logger.debug('send: (wrt) write %s', self.name)
logger.debug('send: (wrt) write %s (%d, %d)', self.name,
self.last_written_data_index + 1, end_data_index)
_start = time.time()
self.socket.send('wrt', zmq.SNDMORE)
self.socket.send(self.name, zmq.SNDMORE)
self.socket.send('%d' % end_data_index, zmq.SNDMORE)
self.comm_time += time.time() - _start
logger.debug('send: <bin> (size=%d)', len(packed))
_start = time.time()
self.socket.send(packed)
answer = self.socket.recv() #ack (sync point)
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
self.last_written_data_index = end_data_index
def isDataMissing(self):
logger.debug('send: (idm) is-data-missing %s', self.name)
_start = time.time()
self.socket.send('idm', zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
return answer == 'tru'
def isConnected(self):
if self._is_connected is None:
logger.debug('send: (oic) output-is-connected %s', self.name)
def isConnected(self):
_start = time.time()
self.socket.send('oic', zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('send: (oic) output-is-connected %s', self.name)
_start = time.time()
self.socket.send('oic', zmq.SNDMORE)
self.socket.send(self.name)
answer = self.socket.recv()
self.comm_time += time.time() - _start
logger.debug('recv: %s', answer)
return answer == 'tru'
logger.debug('recv: %s', answer)
self._is_connected = (answer == 'tru')
return self._is_connected
#----------------------------------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment