Commit 548211ce authored by Philip ABBET's avatar Philip ABBET

Optimization: CachedDataSource and RemoteInput can now avoid to unpack data

parent 94810515
...@@ -123,6 +123,7 @@ class CachedDataSource(DataSource): ...@@ -123,6 +123,7 @@ class CachedDataSource(DataSource):
self._cache = six.b('') self._cache = six.b('')
self._nb_bytes_read = 0 self._nb_bytes_read = 0
self._read_duration = 0 self._read_duration = 0
self._unpack = True
def _readHeader(self): def _readHeader(self):
"""Read the header of the current file""" """Read the header of the current file"""
...@@ -155,7 +156,7 @@ class CachedDataSource(DataSource): ...@@ -155,7 +156,7 @@ class CachedDataSource(DataSource):
return True return True
def setup(self, filename, prefix, force_start_index=None, def setup(self, filename, prefix, force_start_index=None,
force_end_index=None): force_end_index=None, unpack=True):
"""Configures the data source """Configures the data source
...@@ -171,6 +172,8 @@ class CachedDataSource(DataSource): ...@@ -171,6 +172,8 @@ class CachedDataSource(DataSource):
force_end_index (int): The end index (if not set or set to ``None``, the force_end_index (int): The end index (if not set or set to ``None``, the
default, reads the data until the end) default, reads the data until the end)
unpack (bool): Indicates if the data must be unpacked or not
Returns: Returns:
...@@ -263,6 +266,8 @@ class CachedDataSource(DataSource): ...@@ -263,6 +266,8 @@ class CachedDataSource(DataSource):
self.filenames = trim_filename(data_filenames, force_start_index, self.filenames = trim_filename(data_filenames, force_start_index,
force_end_index) force_end_index)
self._unpack = unpack
# Read the first file to process # Read the first file to process
self.cur_file_index = 0 self.cur_file_index = 0
try: try:
...@@ -349,8 +354,11 @@ class CachedDataSource(DataSource): ...@@ -349,8 +354,11 @@ class CachedDataSource(DataSource):
encoded_data = self._cache + data encoded_data = self._cache + data
self._cache = six.b('') self._cache = six.b('')
data = self.dataformat.type() if self._unpack:
data.unpack(encoded_data) #checks validity data = self.dataformat.type()
data.unpack(encoded_data) #checks validity
else:
data = encoded_data
result = (data, self.next_start_index, self.next_end_index) result = (data, self.next_start_index, self.next_end_index)
......
...@@ -166,7 +166,7 @@ class RemoteInput: ...@@ -166,7 +166,7 @@ class RemoteInput:
""" """
def __init__(self, name, data_format, socket): def __init__(self, name, data_format, socket, unpack=True):
self.name = str(name) self.name = str(name)
self.data_format = data_format self.data_format = data_format
...@@ -177,6 +177,7 @@ class RemoteInput: ...@@ -177,6 +177,7 @@ class RemoteInput:
self.group = None self.group = None
self.comm_time = 0. #total time spent on communication self.comm_time = 0. #total time spent on communication
self.nb_data_blocks_read = 0 self.nb_data_blocks_read = 0
self._unpack = unpack
def isDataUnitDone(self): def isDataUnitDone(self):
...@@ -248,10 +249,14 @@ class RemoteInput: ...@@ -248,10 +249,14 @@ class RemoteInput:
def unpack(self, packed): def unpack(self, packed):
"""Receives data through socket""" """Receives data through socket"""
self.data = self.data_format.type()
logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed), logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
self.data_index, self.data_index_end) self.data_index, self.data_index_end)
self.data.unpack(packed)
if self.unpack:
self.data = self.data_format.type()
self.data.unpack(packed)
else:
self.data = packed
#---------------------------------------------------------- #----------------------------------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment