Skip to content
Snippets Groups Projects
Commit 890da671 authored by Philip ABBET's avatar Philip ABBET
Browse files

Inputs: receive data indexes alongside data units

parent f0c66547
No related branches found
No related tags found
1 merge request!8Refactoring and support of dataset providing in a container
...@@ -68,8 +68,11 @@ class Input: ...@@ -68,8 +68,11 @@ class Input:
self.data_format = data_format self.data_format = data_format
self.socket = socket self.socket = socket
self.data = None self.data = None
self.data_index = -1
self.data_index_end = -1
self.group = None self.group = None
self.comm_time = 0. #total time spent on communication self.comm_time = 0. #total time spent on communication
self.nb_data_blocks_read = 0
def isDataUnitDone(self): def isDataUnitDone(self):
...@@ -108,15 +111,19 @@ class Input: ...@@ -108,15 +111,19 @@ class Input:
self.socket.send('nxt', zmq.SNDMORE) self.socket.send('nxt', zmq.SNDMORE)
self.socket.send(self.group.channel, zmq.SNDMORE) self.socket.send(self.group.channel, zmq.SNDMORE)
self.socket.send(self.name) self.socket.send(self.name)
self.data_index = int(self.socket.recv())
self.data_index_end = int(self.socket.recv())
self.unpack(self.socket.recv()) self.unpack(self.socket.recv())
self.comm_time += time.time() - _start self.comm_time += time.time() - _start
self.nb_data_blocks_read += 1
def unpack(self, packed): def unpack(self, packed):
"""Receives data through socket""" """Receives data through socket"""
self.data = self.data_format.type() self.data = self.data_format.type()
logger.debug('recv: <bin> (size=%d)', len(packed)) logger.debug('recv: <bin> (size=%d), indexes=(%d, %d)', len(packed),
self.data_index, self.data_index_end)
self.data.unpack(packed) self.data.unpack(packed)
...@@ -239,7 +246,10 @@ class InputGroup: ...@@ -239,7 +246,10 @@ class InputGroup:
"channel `%s' while performing `next' operation on this group " \ "channel `%s' while performing `next' operation on this group " \
"(current reading position is %d/%d)" % \ "(current reading position is %d/%d)" % \
(name, self.channel, k, n)) (name, self.channel, k, n))
inpt.data_index = int(parts.pop(0))
inpt.data_index_end = int(parts.pop(0))
inpt.unpack(parts.pop(0)) inpt.unpack(parts.pop(0))
inpt.nb_data_blocks_read += 1
self.comm_time += time.time() - _start self.comm_time += time.time() - _start
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment