Commit b5fc0451 authored by Samuel GAIST's avatar Samuel GAIST
Browse files

[outputs] Add RemotelySyncedOutput

This will be used to let the loop know that a
write occurred in the loop_user block.
parent 7b3d31a6
......@@ -45,6 +45,7 @@ This module implements output related classes
import six
import logging
logger = logging.getLogger(__name__)
......@@ -101,16 +102,16 @@ class Output(object):
"""
def __init__(self, name, data_sink, synchronization_listener=None,
force_start_index=0):
def __init__(
self, name, data_sink, synchronization_listener=None, force_start_index=0
):
self.name = str(name)
self.last_written_data_index = force_start_index - 1
self.nb_data_blocks_written = 0
self.data_sink = data_sink
self.name = str(name)
self.last_written_data_index = force_start_index - 1
self.nb_data_blocks_written = 0
self.data_sink = data_sink
self._synchronization_listener = synchronization_listener
def _createData(self):
"""Retrieves an uninitialized block of data corresponding to the data
format of the output
......@@ -118,13 +119,14 @@ class Output(object):
This method must be called to correctly create a new block of data
"""
if hasattr(self.data_sink, 'dataformat'):
if hasattr(self.data_sink, "dataformat"):
return self.data_sink.dataformat.type()
else:
raise RuntimeError("The currently used data sink is not bound to " \
"a dataformat - you cannot create uninitialized data under " \
"these circumstances")
raise RuntimeError(
"The currently used data sink is not bound to "
"a dataformat - you cannot create uninitialized data under "
"these circumstances"
)
def write(self, data, end_data_index=None):
"""Write a block of data on the output
......@@ -145,7 +147,7 @@ class Output(object):
# if the user passes a dictionary, converts to the proper baseformat type
if isinstance(data, dict):
d = self.data_sink.dataformat.type()
d.from_dict(data, casting='safe', add_defaults=False)
d.from_dict(data, casting="safe", add_defaults=False)
data = d
self.data_sink.write(data, self.last_written_data_index + 1, end_data_index)
......@@ -153,27 +155,29 @@ class Output(object):
self.last_written_data_index = end_data_index
self.nb_data_blocks_written += 1
def isDataMissing(self):
"""Returns whether data are missing"""
return (self._synchronization_listener is not None) and \
(self._synchronization_listener.data_index_end != self.last_written_data_index)
return (self._synchronization_listener is not None) and (
self._synchronization_listener.data_index_end
!= self.last_written_data_index
)
def isConnected(self):
"""Returns whether the associated data sink is connected"""
return self.data_sink.isConnected()
def _compute_end_data_index(self, end_data_index):
if end_data_index is not None:
if (end_data_index < self.last_written_data_index + 1) or \
((self._synchronization_listener is not None) and \
(end_data_index > self._synchronization_listener.data_index_end)):
raise KeyError("Algorithm logic error on write(): `end_data_index' " \
"is not consistent with last written index")
if (end_data_index < self.last_written_data_index + 1) or (
(self._synchronization_listener is not None)
and (end_data_index > self._synchronization_listener.data_index_end)
):
raise KeyError(
"Algorithm logic error on write(): `end_data_index' "
"is not consistent with last written index"
)
elif self._synchronization_listener is not None:
end_data_index = self._synchronization_listener.data_index_end
......@@ -183,14 +187,35 @@ class Output(object):
return end_data_index
def close(self):
"""Closes the associated data sink"""
self.data_sink.close()
#----------------------------------------------------------
# ----------------------------------------------------------
class RemotelySyncedOutput(Output):
def __init__(
self,
name,
data_sink,
socket,
synchronization_listener=None,
force_start_index=0,
):
super(RemotelySyncedOutput, self).__init__(
name, data_sink, synchronization_listener, force_start_index
)
self.socket = socket
def write(self, data, end_data_index=None):
super(RemotelySyncedOutput, self).write(data, end_data_index)
self.socket.send_string("wrt")
# ----------------------------------------------------------
class OutputList:
......@@ -223,7 +248,6 @@ class OutputList:
def __init__(self):
self._outputs = []
def __getitem__(self, index):
if isinstance(index, six.string_types):
......@@ -232,18 +256,17 @@ class OutputList:
except IndexError:
pass
elif isinstance(index, int):
if index < len(self._outputs): return self._outputs[index]
if index < len(self._outputs):
return self._outputs[index]
return None
def __iter__(self):
for k in self._outputs: yield k
for k in self._outputs:
yield k
def __len__(self):
return len(self._outputs)
def add(self, output):
"""Adds an output to the list
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment